From 6e87e83d778fee0c0d697d2f245a8393d7d40a95 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 6 Jul 2020 14:34:58 -0400 Subject: [PATCH] state: Make handleACLUpdate async once again So that we keep as much as possible out of the FSM commit hot path. --- agent/consul/state/acl_events.go | 70 +++++++----- agent/consul/state/acl_events_test.go | 126 ++++++++------------- agent/consul/state/event_publisher.go | 120 +++++--------------- agent/consul/state/event_publisher_test.go | 4 +- agent/consul/state/stream_topics.go | 14 +-- agent/consul/stream/event.go | 20 +++- 6 files changed, 139 insertions(+), 215 deletions(-) diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index f26a05b09..696864357 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -7,42 +7,52 @@ import ( memdb "github.com/hashicorp/go-memdb" ) -// ACLEventsFromChanges returns all the ACL token, policy or role events that -// should be emitted given a set of changes to the state store. -// TODO: Add OpDelete/OpUpdate to the event or payload? -func aclEventsFromChanges(_ db.ReadTxn, changes db.Changes) ([]stream.Event, error) { - var events []stream.Event +// aclChangeUnsubscribeEvent creates and returns stream.UnsubscribeEvents that +// are used to unsubscribe any subscriptions which match the tokens from the events. +// +// These are special events that will never be returned to a subscriber. +func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { + var secretIDs []string - // TODO: mapping of table->topic? for _, change := range changes.Changes { switch change.Table { case "acl-tokens": token := changeObject(change).(*structs.ACLToken) - e := stream.Event{ - Topic: stream.Topic_ACLTokens, - Index: changes.Index, - Payload: token, - } - events = append(events, e) - case "acl-policies": - policy := changeObject(change).(*structs.ACLPolicy) - e := stream.Event{ - Topic: stream.Topic_ACLPolicies, - Index: changes.Index, - Payload: policy, - } - events = append(events, e) + secretIDs = append(secretIDs, token.SecretID) + case "acl-roles": role := changeObject(change).(*structs.ACLRole) - e := stream.Event{ - Topic: stream.Topic_ACLRoles, - Index: changes.Index, - Payload: role, + tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta) + if err != nil { + return nil, err + } + secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens) + + case "acl-policies": + policy := changeObject(change).(*structs.ACLPolicy) + tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) + if err != nil { + return nil, err + } + secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens) + + roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) + if err != nil { + return nil, err + } + for role := roles.Next(); role != nil; role = roles.Next() { + role := role.(*structs.ACLRole) + + tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta) + if err != nil { + return nil, err + } + secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens) } - events = append(events, e) } } - return events, nil + // TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent + return []stream.Event{stream.NewUnsubscribeEvent(secretIDs)}, nil } // changeObject returns the object before it was deleted if the change was a delete, @@ -53,3 +63,11 @@ func changeObject(change memdb.Change) interface{} { } return change.After } + +func appendSecretIDsFromTokenIterator(seq []string, tokens memdb.ResultIterator) []string { + for token := tokens.Next(); token != nil; token = tokens.Next() { + token := token.(*structs.ACLToken) + seq = append(seq, token.SecretID) + } + return seq +} diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index 0b8acbd4c..c2c7e1843 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestACLEventsFromChanges(t *testing.T) { +func TestACLChangeUnsubscribeEvent(t *testing.T) { cases := []struct { Name string Setup func(s *Store, tx *txn) error @@ -23,7 +23,7 @@ func TestACLEventsFromChanges(t *testing.T) { Mutate: func(s *Store, tx *txn) error { return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false) }, - expected: newACLTokenEvent(100, 1), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { Name: "token update", @@ -37,7 +37,7 @@ func TestACLEventsFromChanges(t *testing.T) { token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}} return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false) }, - expected: newACLTokenEvent(100, 1, structs.ACLTokenPolicyLink{ID: "33333333-1111-1111-1111-111111111111"}), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { Name: "token delete", @@ -48,58 +48,42 @@ func TestACLEventsFromChanges(t *testing.T) { token := newACLToken(1) return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil) }, - expected: newACLTokenEvent(100, 1), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { - Name: "policy create", - Mutate: func(s *Store, tx *txn) error { - return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) - }, - expected: newACLPolicyEvent(100, 1), + Name: "policy create", + Mutate: newACLPolicyWithSingleToken, + // two identical tokens, because Mutate has two changes + expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)), }, { - Name: "policy update", - Setup: func(s *Store, tx *txn) error { - return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) - }, + Name: "policy update", + Setup: newACLPolicyWithSingleToken, Mutate: func(s *Store, tx *txn) error { policy := newACLPolicy(1) policy.Rules = `operator = "write"` return s.aclPolicySetTxn(tx, tx.Index, policy) }, - expected: stream.Event{ - Topic: stream.Topic_ACLPolicies, - Index: 100, - Payload: &structs.ACLPolicy{ - ID: "22222222-1111-1111-1111-111111111111", - Name: "test_policy_1", - Rules: `operator = "write"`, - }, - }, + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { - Name: "policy delete", - Setup: func(s *Store, tx *txn) error { - return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) - }, + Name: "policy delete", + Setup: newACLPolicyWithSingleToken, Mutate: func(s *Store, tx *txn) error { policy := newACLPolicy(1) return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil) }, - expected: newACLPolicyEvent(100, 1), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { - Name: "role create", - Mutate: func(s *Store, tx *txn) error { - return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) - }, - expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)), + Name: "role create", + Mutate: newACLRoleWithSingleToken, + // Two tokens with the same ID, because there are two changes in Mutate + expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)), }, { - Name: "role update", - Setup: func(s *Store, tx *txn) error { - return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) - }, + Name: "role update", + Setup: newACLRoleWithSingleToken, Mutate: func(s *Store, tx *txn) error { role := newACLRole(1, newACLRolePolicyLink(1)) policy2 := newACLPolicy(2) @@ -109,18 +93,16 @@ func TestACLEventsFromChanges(t *testing.T) { }) return s.aclRoleSetTxn(tx, tx.Index, role, true) }, - expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1), newACLRolePolicyLink(2)), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, { - Name: "role delete", - Setup: func(s *Store, tx *txn) error { - return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) - }, + Name: "role delete", + Setup: newACLRoleWithSingleToken, Mutate: func(s *Store, tx *txn) error { role := newACLRole(1, newACLRolePolicyLink(1)) return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil) }, - expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)), + expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), }, } @@ -146,61 +128,43 @@ func TestACLEventsFromChanges(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - events, err := aclEventsFromChanges(tx, db.Changes{Index: 100, Changes: tx.Changes()}) + events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()}) require.NoError(t, err) require.Len(t, events, 1) actual := events[0] - // ignore modified and created index because we don't set them in our expected values - // TODO: gotest.tools/assert would make this easier - normalizePayload(&actual) require.Equal(t, tc.expected, actual) }) } } -func normalizePayload(s *stream.Event) { - switch s := s.Payload.(type) { - case *structs.ACLToken: - s.ModifyIndex = 0 - s.CreateIndex = 0 - s.Hash = nil - case *structs.ACLPolicy: - s.ModifyIndex = 0 - s.CreateIndex = 0 - case *structs.ACLRole: - s.ModifyIndex = 0 - s.CreateIndex = 0 +func newACLRoleWithSingleToken(s *Store, tx *txn) error { + role := newACLRole(1, newACLRolePolicyLink(1)) + if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil { + return err } + token := newACLToken(1) + token.Roles = append(token.Roles, structs.ACLTokenRoleLink{ID: role.ID}) + return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false) } -func newACLTokenEvent(idx uint64, n int, policies ...structs.ACLTokenPolicyLink) stream.Event { - uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(n)) - return stream.Event{ - Topic: stream.Topic_ACLTokens, - Index: idx, - Payload: &structs.ACLToken{ - AccessorID: uuid, - SecretID: uuid, - Policies: policies, - }, +func newACLPolicyWithSingleToken(s *Store, tx *txn) error { + policy := newACLPolicy(1) + if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil { + return err } + token := newACLToken(1) + token.Policies = append(token.Policies, structs.ACLTokenPolicyLink{ID: policy.ID}) + return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false) } -func newACLPolicyEvent(idx uint64, n int) stream.Event { - return stream.Event{ - Topic: stream.Topic_ACLPolicies, - Index: idx, - Payload: newACLPolicy(n), - } -} - -func newACLRoleEvent(idx uint64, n int, policies ...structs.ACLRolePolicyLink) stream.Event { - return stream.Event{ - Topic: stream.Topic_ACLRoles, - Index: idx, - Payload: newACLRole(n, policies...), +func newSecretIDs(ids ...int) []string { + result := make([]string, 0, len(ids)) + for _, id := range ids { + uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(id)) + result = append(result, uuid) } + return result } func newACLToken(n int) *structs.ACLToken { diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index c6177169c..b1fb22960 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -8,8 +8,6 @@ import ( "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" ) // EventPublisher receives changes events from Publish, and sends them to all @@ -47,7 +45,7 @@ type EventPublisher struct { // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. - publishCh chan commitUpdate + publishCh chan changeEvents handlers map[stream.Topic]TopicHandler } @@ -65,8 +63,7 @@ type subscriptions struct { byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription } -// TODO: rename -type commitUpdate struct { +type changeEvents struct { events []stream.Event } @@ -90,7 +87,7 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl snapCacheTTL: snapCacheTTL, topicBuffers: make(map[stream.Topic]*stream.EventBuffer), snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), - publishCh: make(chan commitUpdate, 64), + publishCh: make(chan changeEvents, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), }, @@ -106,6 +103,8 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl // used from a goroutine. The caller should never use the tx once it has been // passed to PublishChanged. func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { + defer tx.Abort() + var events []stream.Event for topic, handler := range e.handlers { if handler.ProcessChanges != nil { @@ -117,29 +116,7 @@ func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error } } - // TODO: call tx.Abort when this is done with tx. - for _, event := range events { - // If the event is an ACL update, treat it as a special case. Currently - // ACL update events are only used internally to recognize when a subscriber - // should reload its subscription. - if event.Topic == stream.Topic_ACLTokens || - event.Topic == stream.Topic_ACLPolicies || - event.Topic == stream.Topic_ACLRoles { - - if err := e.subscriptions.handleACLUpdate(tx, event); err != nil { - // This seems pretty drastic? What would be better. It's not super safe - // to continue since we might have missed some ACL update and so leak - // data to unauthorized clients but crashing whole server also seems - // bad. I wonder if we could send a "reset" to all subscribers instead - // and effectively re-start all subscriptions to be on the safe side - // without just crashing? - // TODO(banks): reset all instead of panic? - panic(err) - } - } - } - - e.publishCh <- commitUpdate{events: events} + e.publishCh <- changeEvents{events: events} return nil } @@ -158,9 +135,18 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { // sendEvents sends the given events to any applicable topic listeners, as well // as any ACL update events to cause affected listeners to reset their stream. -func (e *EventPublisher) sendEvents(update commitUpdate) { +func (e *EventPublisher) sendEvents(update changeEvents) { + for _, event := range update.events { + if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok { + e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs) + } + } + eventsByTopic := make(map[stream.Topic][]stream.Event) for _, event := range update.events { + if event.Topic == stream.TopicInternal { + continue + } eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } @@ -184,65 +170,6 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer return buf } -// handleACLUpdate handles an ACL token/policy/role update. -func (s *subscriptions) handleACLUpdate(tx db.ReadTxn, event stream.Event) error { - s.lock.RLock() - defer s.lock.RUnlock() - - switch event.Topic { - case stream.Topic_ACLTokens: - token := event.Payload.(*structs.ACLToken) - for _, sub := range s.byToken[token.SecretID] { - sub.Close() - } - - case stream.Topic_ACLPolicies: - policy := event.Payload.(*structs.ACLPolicy) - tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) - if err != nil { - return err - } - s.closeSubscriptionsForTokens(tokens) - - // Find any roles using this policy so tokens with those roles can be reloaded. - roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) - if err != nil { - return err - } - for role := roles.Next(); role != nil; role = roles.Next() { - role := role.(*structs.ACLRole) - - tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta) - if err != nil { - return err - } - s.closeSubscriptionsForTokens(tokens) - } - - case stream.Topic_ACLRoles: - role := event.Payload.(*structs.ACLRole) - tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta) - if err != nil { - return err - } - s.closeSubscriptionsForTokens(tokens) - } - - return nil -} - -// This method requires the subscriptions.lock.RLock is held (the read-only lock) -func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator) { - for token := tokens.Next(); token != nil; token = tokens.Next() { - token := token.(*structs.ACLToken) - if subs, ok := s.byToken[token.SecretID]; ok { - for _, sub := range subs { - sub.Close() - } - } - } -} - // Subscribe returns a new stream.Subscription for the given request. A // subscription will stream an initial snapshot of events matching the request // if required and then block until new events that modify the request occur, or @@ -258,7 +185,7 @@ func (e *EventPublisher) Subscribe( ) (*stream.Subscription, error) { // Ensure we know how to make a snapshot for this topic _, ok := e.handlers[req.Topic] - if !ok { + if !ok || req.Topic == stream.TopicInternal { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -331,6 +258,19 @@ func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscripti subsByToken[req] = sub } +func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { + s.lock.RLock() + defer s.lock.RUnlock() + + for _, secretID := range tokenSecretIDs { + if subs, ok := s.byToken[secretID]; ok { + for _, sub := range subs { + sub.Close() + } + } + } +} + // unsubscribe must be called when a client is no longer interested in a // subscription to free resources monitoring changes in it's ACL token. // diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/event_publisher_test.go index 5c4108966..d685b6fb6 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/event_publisher_test.go @@ -133,8 +133,8 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler { return idx, nil }, }, - stream.Topic_ACLTokens: { - ProcessChanges: aclEventsFromChanges, + stream.TopicInternal: { + ProcessChanges: aclChangeUnsubscribeEvent, }, } } diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index 0d7f30708..d7d84fcbb 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -7,16 +7,8 @@ import ( // newTopicHandlers returns the default handlers for state change events. func newTopicHandlers() map[stream.Topic]TopicHandler { return map[stream.Topic]TopicHandler{ - // For now we don't actually support subscribing to ACL* topics externally - // so these have no Snapshot methods yet. We do need to have a - // ProcessChanges func to publish the partial events on ACL changes though - // so that we can invalidate other subscriptions if their effective ACL - // permissions change. - stream.Topic_ACLTokens: { - ProcessChanges: aclEventsFromChanges, - }, - // Note no ACLPolicies/ACLRoles defined yet because we publish all events - // from one handler to save on iterating/filtering and duplicating code and - // there are no snapshots for these yet per comment above. + // TopicInternal is a special case for processors that handle events that are + // not for subscribers. They are used by the stream package. + stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, } } diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 29e99ee1a..d05537488 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -5,11 +5,9 @@ type Topic int32 // TODO: remove underscores // TODO: type string instead of int? const ( - Topic_ServiceHealth Topic = 0 - Topic_ServiceHealthConnect Topic = 1 - Topic_ACLTokens Topic = 2 - Topic_ACLPolicies Topic = 3 - Topic_ACLRoles Topic = 4 + TopicInternal Topic = 0 + Topic_ServiceHealth Topic = 1 + Topic_ServiceHealthConnect Topic = 2 ) // TODO: @@ -31,3 +29,15 @@ func (e Event) IsResumeStream() bool { type endOfSnapshot struct{} type ResumeStream struct{} + +// TODO: unexport once EventPublisher is in stream package +type UnsubscribePayload struct { + TokensSecretIDs []string +} + +// NewUnsubscribeEvent returns a special Event that is handled by the +// stream package, and is never sent to subscribers. It results in any subscriptions +// which match any of the TokenSecretIDs to be unsubscribed. +func NewUnsubscribeEvent(tokenSecretIDs []string) Event { + return Event{Payload: UnsubscribePayload{TokensSecretIDs: tokenSecretIDs}} +}