package state import ( "context" "testing" "time" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" ) func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) // Setup token and wait for good state token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) defer sub.Unsubscribe() eventCh := testRunSub(sub) // Stream should get EndOfSnapshot e := assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot()) // Update an unrelated token. token2 := &structs.ACLToken{ AccessorID: "a7bbf480-8440-4f55-acfc-6fdca25cb13e", SecretID: "72e81982-7a0f-491f-a60e-c9c802ac1402", } token2.SetHash(false) require.NoError(s.ACLTokenSet(3, token2.Clone())) // Ensure there's no reset event. assertNoEvent(t, eventCh) // Now update the token used in the subscriber. token3 := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", Description: "something else", } token3.SetHash(false) require.NoError(s.ACLTokenSet(4, token3.Clone())) // Ensure the reset event was sent. err = assertErr(t, eventCh) require.Equal(stream.ErrSubForceClosed, err) // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } sub2, err := publisher.Subscribe(subscription2) require.NoError(err) defer sub2.Unsubscribe() eventCh2 := testRunSub(sub2) // Expect initial EoS e = assertEvent(t, eventCh2) require.True(e.IsEndOfSnapshot()) // Delete the unrelated token. require.NoError(s.ACLTokenDeleteByAccessor(5, token2.AccessorID, nil)) // Ensure there's no reset event. assertNoEvent(t, eventCh2) // Delete the token used by the subscriber. require.NoError(s.ACLTokenDeleteByAccessor(6, token.AccessorID, nil)) // Ensure the reset event was sent. err = assertErr(t, eventCh2) require.Equal(stream.ErrSubForceClosed, err) } func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) // Create token and wait for good state token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) defer sub.Unsubscribe() eventCh := testRunSub(sub) // Ignore the end of snapshot event e := assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Update an unrelated policy. policy2 := structs.ACLPolicy{ ID: testPolicyID_C, Name: "foo-read", Rules: `node "foo" { policy = "read" }`, Syntax: acl.SyntaxCurrent, Datacenters: []string{"dc1"}, } policy2.SetHash(false) require.NoError(s.ACLPolicySet(3, &policy2)) // Ensure there's no reset event. assertNoEvent(t, eventCh) // Now update the policy used in the subscriber. policy3 := structs.ACLPolicy{ ID: testPolicyID_A, Name: "node-read", Rules: `node_prefix "" { policy = "write" }`, Syntax: acl.SyntaxCurrent, Datacenters: []string{"dc1"}, } policy3.SetHash(false) require.NoError(s.ACLPolicySet(4, &policy3)) // Ensure the reset event was sent. assertReset(t, eventCh, true) // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) require.NoError(err) defer sub.Unsubscribe() eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Delete the unrelated policy. require.NoError(s.ACLPolicyDeleteByID(5, testPolicyID_C, nil)) // Ensure there's no reload event. assertNoEvent(t, eventCh) // Delete the policy used by the subscriber. require.NoError(s.ACLPolicyDeleteByID(6, testPolicyID_A, nil)) // Ensure the reload event was sent. err = assertErr(t, eventCh) require.Equal(stream.ErrSubForceClosed, err) // Register another subscription. subscription3 := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } sub, err = publisher.Subscribe(subscription3) require.NoError(err) defer sub.Unsubscribe() eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Now update the policy used in role B, but not directly in the token. policy4 := structs.ACLPolicy{ ID: testPolicyID_B, Name: "node-read", Rules: `node_prefix "foo" { policy = "read" }`, Syntax: acl.SyntaxCurrent, Datacenters: []string{"dc1"}, } policy4.SetHash(false) require.NoError(s.ACLPolicySet(7, &policy4)) // Ensure the reset event was sent. assertReset(t, eventCh, true) } func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() require := require.New(t) s := testACLTokensStateStore(t) // Create token and wait for good state token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) eventCh := testRunSub(sub) // Stream should get EndOfSnapshot e := assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot()) // Update an unrelated role (the token has role testRoleID_B). role := structs.ACLRole{ ID: testRoleID_A, Name: "unrelated-role", Description: "test", } role.SetHash(false) require.NoError(s.ACLRoleSet(3, &role)) // Ensure there's no reload event. assertNoEvent(t, eventCh) // Now update the role used by the token in the subscriber. role2 := structs.ACLRole{ ID: testRoleID_B, Name: "my-new-role", Description: "changed", } role2.SetHash(false) require.NoError(s.ACLRoleSet(4, &role2)) // Ensure the reload event was sent. assertReset(t, eventCh, false) // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) require.NoError(err) eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Delete the unrelated policy. require.NoError(s.ACLRoleDeleteByID(5, testRoleID_A, nil)) // Ensure there's no reload event. assertNoEvent(t, eventCh) // Delete the policy used by the subscriber. require.NoError(s.ACLRoleDeleteByID(6, testRoleID_B, nil)) // Ensure the reload event was sent. assertReset(t, eventCh, false) } type nextResult struct { Event stream.Event Err error } func testRunSub(sub *stream.Subscription) <-chan nextResult { eventCh := make(chan nextResult, 1) go func() { for { es, err := sub.Next(context.TODO()) eventCh <- nextResult{ Event: es, Err: err, } if err != nil { return } } }() return eventCh } func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { t.Helper() select { case next := <-eventCh: require.NoError(t, next.Err) require.Len(t, next.Event, 1) t.Fatalf("got unwanted event: %#v", next.Event.Payload) case <-time.After(100 * time.Millisecond): } } func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { t.Helper() select { case next := <-eventCh: require.NoError(t, next.Err) return &next.Event case <-time.After(100 * time.Millisecond): t.Fatalf("no event after 100ms") } return nil } func assertErr(t *testing.T, eventCh <-chan nextResult) error { t.Helper() select { case next := <-eventCh: require.Error(t, next.Err) return next.Err case <-time.After(100 * time.Millisecond): t.Fatalf("no err after 100ms") } return nil } // assertReset checks that a ResetStream event is send to the subscription // within 100ms. If allowEOS is true it will ignore any intermediate events that // come before the reset provided they are EndOfSnapshot events because in many // cases it's non-deterministic whether the snapshot will complete before the // acl reset is handled. func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { t.Helper() for { select { case next := <-eventCh: if allowEOS { if next.Err == nil && next.Event.IsEndOfSnapshot() { continue } } require.Error(t, next.Err) require.Equal(t, stream.ErrSubForceClosed, next.Err) return case <-time.After(100 * time.Millisecond): t.Fatalf("no err after 100ms") } } } type topic string func (t topic) String() string { return string(t) } var topicService topic = "test-topic-service" func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) if err != nil { return idx, err } for _, node := range nodes { event := stream.Event{ Topic: req.Topic, Index: node.ModifyIndex, Payload: nodePayload{node: node, key: req.Key}, } snap.Append([]stream.Event{event}) } return idx, nil }, } } type nodePayload struct { key string node *structs.ServiceNode } func (p nodePayload) MatchesKey(key, _, partition string) bool { if key == "" && partition == "" { return true } if p.node == nil { return false } if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() { return false } return p.key == key } func (p nodePayload) HasReadPermission(acl.Authorizer) bool { return true } func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020", Description: "something", Policies: []structs.ACLTokenPolicyLink{ {ID: testPolicyID_A}, }, Roles: []structs.ACLTokenRoleLink{ {ID: testRoleID_B}, }, } token.SetHash(false) // If we subscribe immediately after we create a token we race with the // publisher that is publishing the ACL token event for the token we just // created. That means that the subscription we create right after will often // be immediately reset. The most reliable way to avoid that without just // sleeping for some arbitrary time is to pre-subscribe using the token before // it actually exists (which works because the publisher doesn't check tokens // it assumes something lower down did that) and then wait for it to be reset // so we know the initial token write event has been sent out before // continuing... req := &stream.SubscribeRequest{ Topic: topicService, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(req) require.NoError(t, err) defer sub.Unsubscribe() eventCh := testRunSub(sub) // Create the ACL token to be used in the subscription. require.NoError(t, s.ACLTokenSet(2, token.Clone())) // Wait for the pre-subscription to be reset assertReset(t, eventCh, true) return token }