diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 417962bb7..22bf7e245 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -1,7 +1,6 @@ package state import ( - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" @@ -11,7 +10,7 @@ import ( // are used to unsubscribe any subscriptions which match the tokens from the events. // // These are special events that will never be returned to a subscriber. -func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { +func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, error) { var secretIDs []string for _, change := range changes.Changes { diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index 550d7cace..ab8b776e5 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -128,7 +127,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()}) + events, err := aclChangeUnsubscribeEvent(tx, Changes{Index: 100, Changes: tx.Changes()}) require.NoError(t, err) require.Len(t, events, 1) diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index 5a1dbe713..a3e19bc2d 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -5,7 +5,6 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) @@ -290,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -356,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/db/txn.go b/agent/consul/state/db/txn.go deleted file mode 100644 index a568bc43f..000000000 --- a/agent/consul/state/db/txn.go +++ /dev/null @@ -1,17 +0,0 @@ -package db - -import "github.com/hashicorp/go-memdb" - -// ReadTxn is implemented by memdb.Txn to perform read operations. -type ReadTxn interface { - Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) - Abort() -} - -// Changes wraps a memdb.Changes to include the index at which these changes -// were made. -type Changes struct { - // Index is the latest index at the time these changes were committed. - Index uint64 - Changes memdb.Changes -} diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index d607e9bc2..b0dc21717 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -1,20 +1,37 @@ package state import ( - "github.com/hashicorp/consul/agent/consul/state/db" + "fmt" + + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/go-memdb" ) +// ReadTxn is implemented by memdb.Txn to perform read operations. +type ReadTxn interface { + Get(table, index string, args ...interface{}) (memdb.ResultIterator, error) + Abort() +} + +// Changes wraps a memdb.Changes to include the index at which these changes +// were made. +type Changes struct { + // Index is the latest index at the time these changes were committed. + Index uint64 + Changes memdb.Changes +} + // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are // sent to the eventPublisher which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB - publisher changePublisher + db *memdb.MemDB + publisher eventPublisher + processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -type changePublisher interface { - PublishChanges(tx db.ReadTxn, changes db.Changes) error +type eventPublisher interface { + PublishEvents(events []stream.Event) } // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting @@ -50,19 +67,26 @@ func (c *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), - Index: idx, - publish: func(changes db.Changes) error { - // publish provides a new read-only Txn to PublishChanges so that - // events can be constructed from the current state at the time of - // Commit. - return c.publisher.PublishChanges(c.db.Txn(false), changes) - }, + Txn: c.db.Txn(true), + Index: idx, + publish: c.publish, } t.Txn.TrackChanges() return t } +func (c *changeTrackerDB) publish(changes Changes) error { + readOnlyTx := c.db.Txn(false) + defer readOnlyTx.Abort() + + events, err := c.processChanges(readOnlyTx, changes) + if err != nil { + return fmt.Errorf("failed generating events from changes: %v", err) + } + c.publisher.PublishEvents(events) + return nil +} + // WriteTxnRestore returns a wrapped RW transaction that does NOT have change // tracking enabled. This should only be used in Restore where we need to // replace the entire contents of the Store without a need to track the changes. @@ -89,7 +113,7 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(changes db.Changes) error + publish func(changes Changes) error } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -103,7 +127,7 @@ func (tx *txn) Commit() error { // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := db.Changes{ + changes := Changes{ Index: tx.Index, Changes: tx.Txn.Changes(), } @@ -115,3 +139,12 @@ func (tx *txn) Commit() error { tx.Txn.Commit() return nil } + +func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + // TODO: add other table handlers here. + return aclChangeUnsubscribeEvent(tx, changes) +} + +func newSnapshotHandlers() stream.SnapshotHandlers { + return stream.SnapshotHandlers{} +} diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index a39c30bcf..3692deb9d 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -167,8 +167,9 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), db: &changeTrackerDB{ - db: db, - publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second), + db: db, + publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second), + processChanges: processDBChanges, }, stopEventPublisher: cancel, } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 641d85370..857a3a1e1 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state/db" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" @@ -29,7 +28,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -110,7 +109,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -224,7 +223,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) @@ -372,44 +371,24 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { var topicService stream.Topic = 901 -func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { - return map[stream.Topic]stream.TopicHandler{ - topicService: { - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) { - var events []stream.Event - for _, change := range changes.Changes { - if change.Table == "services" { - service := change.After.(*structs.ServiceNode) - events = append(events, stream.Event{ - Topic: topicService, - Key: service.ServiceName, - Index: changes.Index, - Payload: service, - }) - } - } - return events, nil - }, - Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { - idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) - if err != nil { - return idx, err - } +func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { + return stream.SnapshotHandlers{ + topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + if err != nil { + return idx, err + } - for _, node := range nodes { - event := stream.Event{ - Topic: req.Topic, - Key: req.Key, - Index: node.ModifyIndex, - Payload: node, - } - snap.Append([]stream.Event{event}) + for _, node := range nodes { + event := stream.Event{ + Topic: req.Topic, + Key: req.Key, + Index: node.ModifyIndex, + Payload: node, } - return idx, nil - }, - }, - stream.TopicInternal: { - ProcessChanges: aclChangeUnsubscribeEvent, + snap.Append([]stream.Event{event}) + } + return idx, nil }, } } @@ -445,7 +424,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0) + publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) s.db.publisher = publisher sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go deleted file mode 100644 index effcaa8c5..000000000 --- a/agent/consul/state/stream_topics.go +++ /dev/null @@ -1,14 +0,0 @@ -package state - -import ( - "github.com/hashicorp/consul/agent/consul/stream" -) - -// newTopicHandlers returns the default handlers for state change events. -func newTopicHandlers() map[stream.Topic]stream.TopicHandler { - return map[stream.Topic]stream.TopicHandler{ - // TopicInternal is a special case for processors that handle events that are - // not for subscribers. They are used by the stream package. - stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent}, - } -} diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 06f40f1f3..2bd11fd5a 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -8,9 +8,8 @@ type Topic int32 // TODO: remove underscores // TODO: type string instead of int? -// TODO: define non-internal topics in state package? +// TODO: move topics to state package? const ( - TopicInternal Topic = 0 Topic_ServiceHealth Topic = 1 Topic_ServiceHealthConnect Topic = 2 ) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index fee2f2923..bc83ce8eb 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -5,8 +5,6 @@ import ( "fmt" "sync" "time" - - "github.com/hashicorp/consul/agent/consul/state/db" ) // EventPublisher receives changes events from Publish, and sends them to all @@ -46,7 +44,7 @@ type EventPublisher struct { // the Commit call in the FSM hot path. publishCh chan changeEvents - handlers map[Topic]TopicHandler + snapshotHandlers SnapshotHandlers } type subscriptions struct { @@ -66,15 +64,9 @@ type changeEvents struct { events []Event } -// TopicHandler provides functions which create stream.Events for a topic. -type TopicHandler struct { - // Snapshot creates the necessary events to reproduce the current state and - // appends them to the eventBuffer. - Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) - // ProcessChanges accepts a slice of Changes, and builds a slice of events for - // those changes. - ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error) -} +// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot +// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. +type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. type SnapshotAppender interface { @@ -88,7 +80,7 @@ type SnapshotAppender interface { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[Topic]*eventBuffer), @@ -97,7 +89,7 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, - handlers: handlers, + snapshotHandlers: handlers, } go e.handleUpdates(ctx) @@ -105,25 +97,13 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna return e } -// PublishChanges to all subscribers. tx is a read-only transaction that captures +// PublishEvents to all subscribers. tx is a read-only transaction that captures // the state at the time the change happened. The caller must never use the tx once // it has been passed to PublishChanged. -func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error { - defer tx.Abort() - - var events []Event - for topic, handler := range e.handlers { - if handler.ProcessChanges != nil { - es, err := handler.ProcessChanges(tx, changes) - if err != nil { - return fmt.Errorf("failed generating events for topic %q: %s", topic, err) - } - events = append(events, es...) - } +func (e *EventPublisher) PublishEvents(events []Event) { + if len(events) > 0 { + e.publishCh <- changeEvents{events: events} } - - e.publishCh <- changeEvents{events: events} - return nil } func (e *EventPublisher) handleUpdates(ctx context.Context) { @@ -150,9 +130,6 @@ func (e *EventPublisher) sendEvents(update changeEvents) { eventsByTopic := make(map[Topic][]Event) for _, event := range update.events { - if event.Topic == TopicInternal { - continue - } eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) } @@ -190,8 +167,8 @@ func (e *EventPublisher) Subscribe( req *SubscribeRequest, ) (*Subscription, error) { // Ensure we know how to make a snapshot for this topic - _, ok := e.handlers[req.Topic] - if !ok || req.Topic == TopicInternal { + _, ok := e.snapshotHandlers[req.Topic] + if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } @@ -296,7 +273,6 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) { } func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { - // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { topicSnaps = make(map[string]*eventSnapshot) @@ -308,13 +284,12 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf return snap, nil } - // No snap or errored snap in cache, create a new one - handler, ok := e.handlers[req.Topic] + handler, ok := e.snapshotHandlers[req.Topic] if !ok { return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = newEventSnapshot(req, topicHead, handler.Snapshot) + snap = newEventSnapshot(req, topicHead, handler) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index cdefc4f9d..8988562f1 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/consul/state/db" - "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" ) @@ -21,7 +19,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0) + publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) eventCh := consumeSubscription(sub) @@ -38,8 +36,12 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { // Now subscriber should block waiting for updates assertNoResult(t, eventCh) - err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{}) - require.NoError(t, err) + events := []Event{{ + Topic: testTopic, + Key: "sub-key", + Payload: "the-published-event-payload", + }} + publisher.PublishEvents(events) // Subscriber should see the published event result = nextResult(t, eventCh) @@ -48,24 +50,14 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { require.Equal(t, expected, result.Events) } -func newTestTopicHandlers() map[Topic]TopicHandler { - return map[Topic]TopicHandler{ - testTopic: { - Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { - if req.Topic != testTopic { - return 0, fmt.Errorf("unexpected topic: %v", req.Topic) - } - buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) - return 1, nil - }, - ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) { - events := []Event{{ - Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", - }} - return events, nil - }, +func newTestSnapshotHandlers() SnapshotHandlers { + return SnapshotHandlers{ + testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + if req.Topic != testTopic { + return 0, fmt.Errorf("unexpected topic: %v", req.Topic) + } + buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}) + return 1, nil }, } }