From aa571bd0cef05dc92b9ee9ff555ecd44a9c9e14d Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 6 Jul 2020 18:44:51 -0400 Subject: [PATCH] state: Move change processing out of EventPublisher EventPublisher was receiving TopicHandlers, which had a couple of problems: - ChangeProcessors were being grouped by Topic, but they completely ignored the topic and were performed on every change - ChangeProcessors required EventPublisher to be aware of database changes By moving ChangeProcesors out of EventPublisher, and having Publish accept events instead of changes, EventPublisher no longer needs to be aware of these things. Handlers is now only SnapshotHandlers, which are still mapped by Topic. Also allows us to remove the small 'db' package that had only two types. They can now be unexported types in state. --- agent/consul/state/acl_events.go | 3 +- agent/consul/state/acl_events_test.go | 3 +- agent/consul/state/acl_oss.go | 7 +-- agent/consul/state/db/txn.go | 17 ------ agent/consul/state/memdb.go | 63 +++++++++++++++----- agent/consul/state/state_store.go | 5 +- agent/consul/state/store_integration_test.go | 61 +++++++------------ agent/consul/state/stream_topics.go | 14 ----- agent/consul/stream/event.go | 3 +- agent/consul/stream/event_publisher.go | 53 +++++----------- agent/consul/stream/event_publisher_test.go | 38 +++++------- 11 files changed, 106 insertions(+), 161 deletions(-) delete mode 100644 agent/consul/state/db/txn.go delete mode 100644 agent/consul/state/stream_topics.go diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 417962bb7..22bf7e245 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -1,7 +1,6 @@ package state import ( - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -11,7 +10,7 @@ import ( // are used to unsubscribe any subscriptions which match the tokens from the events. // // These are special events that will never be returned to a subscriber. -func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { +func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, error) { var secretIDs []string for _, change := range changes.Changes { diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index 550d7cace..ab8b776e5 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -128,7 +127,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()}) + events, err := aclChangeUnsubscribeEvent(tx, Changes{Index: 100, Changes: tx.Changes()}) require.NoError(t, err) require.Len(t, events, 1) diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index 5a1dbe713..a3e19bc2d 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -5,7 +5,6 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -290,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -356,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/db/txn.go b/agent/consul/state/db/txn.go deleted file mode 100644 index a568bc43f..000000000 --- a/agent/consul/state/db/txn.go +++ /dev/null @@ -1,17 +0,0 @@ -package db - -import "github.com/hashicorp/go-memdb" - -// ReadTxn is implemented by memdb.Txn to perform read operations. -type ReadTxn interface { - Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) - Abort() -} - -// Changes wraps a memdb.Changes to include the index at which these changes -// were made. -type Changes struct { - // Index is the latest index at the time these changes were committed. - Index uint64 - Changes memdb.Changes -} diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index d607e9bc2..b0dc21717 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -1,20 +1,37 @@ package state import ( - "github.com/hashicorp/consul/agent/consul/state/db" + "fmt" + + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/go-memdb" ) +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} + // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are // sent to the eventPublisher which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB - publisher changePublisher + db *memdb.MemDB + publisher eventPublisher + processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -type changePublisher interface { - PublishChanges(tx db.ReadTxn, changes db.Changes) error +type eventPublisher interface { + PublishEvents(events []stream.Event) } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -50,19 +67,26 @@ func (c *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), - Index: idx, - publish: func(changes db.Changes) error { - // publish provides a new read-only Txn to PublishChanges so that - // events can be constructed from the current state at the time of - // Commit. - return c.publisher.PublishChanges(c.db.Txn(false), changes) - }, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, } t.Txn.TrackChanges() return t } +func (c *changeTrackerDB) publish(changes Changes) error { + readOnlyTx := c.db.Txn(false) + defer readOnlyTx.Abort() + + events, err := c.processChanges(readOnlyTx, changes) + if err != nil { + return fmt.Errorf("failed generating events from changes: %v", err) + } + c.publisher.PublishEvents(events) + return nil +} + // WriteTxnRestore returns a wrapped RW transaction that does NOT have change // tracking enabled. This should only be used in Restore where we need to // replace the entire contents of the Store without a need to track the changes. @@ -89,7 +113,7 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(changes db.Changes) error + publish func(changes Changes) error } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -103,7 +127,7 @@ func (tx *txn) Commit() error { // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := db.Changes{ + changes := Changes{ Index: tx.Index, Changes: tx.Txn.Changes(), } @@ -115,3 +139,12 @@ func (tx *txn) Commit() error { tx.Txn.Commit() return nil } + +func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + // TODO: add other table handlers here. + return aclChangeUnsubscribeEvent(tx, changes) +} + +func newSnapshotHandlers() stream.SnapshotHandlers { + return stream.SnapshotHandlers{} +} diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index a39c30bcf..3692deb9d 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -167,8 +167,9 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), db: &changeTrackerDB{ - db: db, - publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), + db: db, + publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second), + processChanges: processDBChanges, }, stopEventPublisher: cancel, } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 641d85370..857a3a1e1 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -29,7 +28,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -110,7 +109,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -224,7 +223,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -372,44 +371,24 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { var topicService stream.Topic = 901 -func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { - return map[stream.Topic]stream.TopicHandler{ - topicService: { - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { - var events []stream.Event - for _, change := range changes.Changes { - if change.Table == "services" { - service := change.After.(*structs.ServiceNode) - events = append(events, stream.Event{ - Topic: topicService, - Key: service.ServiceName, - Index: changes.Index, - Payload: service, - }) - } - } - return events, nil - }, - Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { - idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) - if err != nil { - return idx, err - } +func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { + return stream.SnapshotHandlers{ + topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + if err != nil { + return idx, err + } - for _, node := range nodes { - event := stream.Event{ - Topic: req.Topic, - Key: req.Key, - Index: node.ModifyIndex, - Payload: node, - } - snap.Append([]stream.Event{event}) + for _, node := range nodes { + event := stream.Event{ + Topic: req.Topic, + Key: req.Key, + Index: node.ModifyIndex, + Payload: node, } - return idx, nil - }, - }, - stream.TopicInternal: { - ProcessChanges: aclChangeUnsubscribeEvent, + snap.Append([]stream.Event{event}) + } + return idx, nil }, } } @@ -445,7 +424,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go deleted file mode 100644 index effcaa8c5..000000000 --- a/agent/consul/state/stream_topics.go +++ /dev/null @@ -1,14 +0,0 @@ -package state - -import ( - "github.com/hashicorp/consul/agent/consul/stream" -) - -// newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]stream.TopicHandler { - return map[stream.Topic]stream.TopicHandler{ - // TopicInternal is a special case for processors that handle events that are - // not for subscribers. They are used by the stream package. - stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, - } -} diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 06f40f1f3..2bd11fd5a 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -8,9 +8,8 @@ type Topic int32 // TODO: remove underscores // TODO: type string instead of int? -// TODO: define non-internal topics in state package? +// TODO: move topics to state package? const ( - TopicInternal Topic = 0 Topic_ServiceHealth Topic = 1 Topic_ServiceHealthConnect Topic = 2 ) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index fee2f2923..bc83ce8eb 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -5,8 +5,6 @@ import ( "fmt" "sync" "time" - - "github.com/hashicorp/consul/agent/consul/state/db" ) // EventPublisher receives changes events from Publish, and sends them to all @@ -46,7 +44,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan changeEvents - handlers map[Topic]TopicHandler + snapshotHandlers SnapshotHandlers } type subscriptions struct { @@ -66,15 +64,9 @@ type changeEvents struct { events []Event } -// TopicHandler provides functions which create stream.Events for a topic. -type TopicHandler struct { - // Snapshot creates the necessary events to reproduce the current state and - // appends them to the eventBuffer. - Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) - // ProcessChanges accepts a slice of Changes, and builds a slice of events for - // those changes. - ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error) -} +// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot +// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. +type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. type SnapshotAppender interface { @@ -88,7 +80,7 @@ type SnapshotAppender interface { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[Topic]*eventBuffer), @@ -97,7 +89,7 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, - handlers: handlers, + snapshotHandlers: handlers, } go e.handleUpdates(ctx) @@ -105,25 +97,13 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna return e } -// PublishChanges to all subscribers. tx is a read-only transaction that captures +// PublishEvents to all subscribers. tx is a read-only transaction that captures // the state at the time the change happened. The caller must never use the tx once // it has been passed to PublishChanged. -func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { - defer tx.Abort() - - var events []Event - for topic, handler := range e.handlers { - if handler.ProcessChanges != nil { - es, err := handler.ProcessChanges(tx, changes) - if err != nil { - return fmt.Errorf("failed generating events for topic %q: %s", topic, err) - } - events = append(events, es...) - } +func (e *EventPublisher) PublishEvents(events []Event) { + if len(events) > 0 { + e.publishCh <- changeEvents{events: events} } - - e.publishCh <- changeEvents{events: events} - return nil } func (e *EventPublisher) handleUpdates(ctx context.Context) { @@ -150,9 +130,6 @@ func (e *EventPublisher) sendEvents(update changeEvents) { eventsByTopic := make(map[Topic][]Event) for _, event := range update.events { - if event.Topic == TopicInternal { - continue - } eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } @@ -190,8 +167,8 @@ func (e *EventPublisher) Subscribe( req *SubscribeRequest, ) (*Subscription, error) { // Ensure we know how to make a snapshot for this topic - _, ok := e.handlers[req.Topic] - if !ok || req.Topic == TopicInternal { + _, ok := e.snapshotHandlers[req.Topic] + if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -296,7 +273,6 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) { } func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { - // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { topicSnaps = make(map[string]*eventSnapshot) @@ -308,13 +284,12 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf return snap, nil } - // No snap or errored snap in cache, create a new one - handler, ok := e.handlers[req.Topic] + handler, ok := e.snapshotHandlers[req.Topic] if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = newEventSnapshot(req, topicHead, handler.Snapshot) + snap = newEventSnapshot(req, topicHead, handler) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index cdefc4f9d..8988562f1 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/consul/state/db" - "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -21,7 +19,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0) + publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) eventCh := consumeSubscription(sub) @@ -38,8 +36,12 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { // Now subscriber should block waiting for updates assertNoResult(t, eventCh) - err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{}) - require.NoError(t, err) + events := []Event{{ + Topic: testTopic, + Key: "sub-key", + Payload: "the-published-event-payload", + }} + publisher.PublishEvents(events) // Subscriber should see the published event result = nextResult(t, eventCh) @@ -48,24 +50,14 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { require.Equal(t, expected, result.Events) } -func newTestTopicHandlers() map[Topic]TopicHandler { - return map[Topic]TopicHandler{ - testTopic: { - Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { - if req.Topic != testTopic { - return 0, fmt.Errorf("unexpected topic: %v", req.Topic) - } - buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) - return 1, nil - }, - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) { - events := []Event{{ - Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", - }} - return events, nil - }, +func newTestSnapshotHandlers() SnapshotHandlers { + return SnapshotHandlers{ + testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) + return 1, nil }, } }