state: Move change processing out of EventPublisher

EventPublisher was receiving TopicHandlers, which had a couple of
problems:

- ChangeProcessors were being grouped by Topic, but they completely
  ignored the topic and were performed on every change
- ChangeProcessors required EventPublisher to be aware of database
  changes

By moving ChangeProcesors out of EventPublisher, and having Publish
accept events instead of changes, EventPublisher no longer needs to
be aware of these things.

Handlers is now only SnapshotHandlers, which are still mapped by Topic.

Also allows us to remove the small 'db' package that had only two types.
They can now be unexported types in state.
This commit is contained in:
Daniel Nephin 2020-07-06 18:44:51 -04:00
parent 23a940daad
commit aa571bd0ce
11 changed files with 106 additions and 161 deletions

View File

@ -1,7 +1,6 @@
package state
import (
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
@ -11,7 +10,7 @@ import (
// are used to unsubscribe any subscriptions which match the tokens from the events.
//
// These are special events that will never be returned to a subscriber.
func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
func aclChangeUnsubscribeEvent(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var secretIDs []string
for _, change := range changes.Changes {

View File

@ -5,7 +5,6 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
@ -128,7 +127,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
// Note we call the func under test directly rather than publishChanges so
// we can test this in isolation.
events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()})
events, err := aclChangeUnsubscribeEvent(tx, Changes{Index: 100, Changes: tx.Changes()})
require.NoError(t, err)
require.Len(t, events, 1)

View File

@ -5,7 +5,6 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
@ -290,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re
return tx.Get("acl-tokens", "local", false)
}
func aclTokenListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "policies", policy)
}
func aclTokenListByRole(tx db.ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclTokenListByRole(tx ReadTxn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-tokens", "roles", role)
}
@ -356,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte
return tx.Get("acl-roles", "id")
}
func aclRoleListByPolicy(tx db.ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
func aclRoleListByPolicy(tx ReadTxn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("acl-roles", "policies", policy)
}

View File

@ -1,17 +0,0 @@
package db
import "github.com/hashicorp/go-memdb"
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}

View File

@ -1,20 +1,37 @@
package state
import (
"github.com/hashicorp/consul/agent/consul/state/db"
"fmt"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/go-memdb"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
type ReadTxn interface {
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
Abort()
}
// Changes wraps a memdb.Changes to include the index at which these changes
// were made.
type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
}
// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
// all write transactions. When the transaction is committed the changes are
// sent to the eventPublisher which will create and emit change events.
type changeTrackerDB struct {
db *memdb.MemDB
publisher changePublisher
publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
type changePublisher interface {
PublishChanges(tx db.ReadTxn, changes db.Changes) error
type eventPublisher interface {
PublishEvents(events []stream.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
@ -52,17 +69,24 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
t := &txn{
Txn: c.db.Txn(true),
Index: idx,
publish: func(changes db.Changes) error {
// publish provides a new read-only Txn to PublishChanges so that
// events can be constructed from the current state at the time of
// Commit.
return c.publisher.PublishChanges(c.db.Txn(false), changes)
},
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}
func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
events, err := c.processChanges(readOnlyTx, changes)
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.PublishEvents(events)
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
@ -89,7 +113,7 @@ type txn struct {
// Index is stored so that it may be passed along to any subscribers as part
// of a change event.
Index uint64
publish func(changes db.Changes) error
publish func(changes Changes) error
}
// Commit first pushes changes to EventPublisher, then calls Commit on the
@ -103,7 +127,7 @@ func (tx *txn) Commit() error {
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := db.Changes{
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
@ -115,3 +139,12 @@ func (tx *txn) Commit() error {
tx.Txn.Commit()
return nil
}
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add other table handlers here.
return aclChangeUnsubscribeEvent(tx, changes)
}
func newSnapshotHandlers() stream.SnapshotHandlers {
return stream.SnapshotHandlers{}
}

View File

@ -168,7 +168,8 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
lockDelay: NewDelay(),
db: &changeTrackerDB{
db: db,
publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second),
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second),
processChanges: processDBChanges,
},
stopEventPublisher: cancel,
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
@ -29,7 +28,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -110,7 +109,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -224,7 +223,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -372,25 +371,9 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
topicService: {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
@ -407,10 +390,6 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
}
return idx, nil
},
},
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
},
}
}
@ -445,7 +424,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)

View File

@ -1,14 +0,0 @@
package state
import (
"github.com/hashicorp/consul/agent/consul/stream"
)
// newTopicHandlers returns the default handlers for state change events.
func newTopicHandlers() map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
// TopicInternal is a special case for processors that handle events that are
// not for subscribers. They are used by the stream package.
stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent},
}
}

View File

@ -8,9 +8,8 @@ type Topic int32
// TODO: remove underscores
// TODO: type string instead of int?
// TODO: define non-internal topics in state package?
// TODO: move topics to state package?
const (
TopicInternal Topic = 0
Topic_ServiceHealth Topic = 1
Topic_ServiceHealthConnect Topic = 2
)

View File

@ -5,8 +5,6 @@ import (
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/agent/consul/state/db"
)
// EventPublisher receives changes events from Publish, and sends them to all
@ -46,7 +44,7 @@ type EventPublisher struct {
// the Commit call in the FSM hot path.
publishCh chan changeEvents
handlers map[Topic]TopicHandler
snapshotHandlers SnapshotHandlers
}
type subscriptions struct {
@ -66,15 +64,9 @@ type changeEvents struct {
events []Event
}
// TopicHandler provides functions which create stream.Events for a topic.
type TopicHandler struct {
// Snapshot creates the necessary events to reproduce the current state and
// appends them to the eventBuffer.
Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes.
ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface {
@ -88,7 +80,7 @@ type SnapshotAppender interface {
// A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
@ -97,7 +89,7 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
handlers: handlers,
snapshotHandlers: handlers,
}
go e.handleUpdates(ctx)
@ -105,25 +97,13 @@ func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, sna
return e
}
// PublishChanges to all subscribers. tx is a read-only transaction that captures
// PublishEvents to all subscribers. tx is a read-only transaction that captures
// the state at the time the change happened. The caller must never use the tx once
// it has been passed to PublishChanged.
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error {
defer tx.Abort()
var events []Event
for topic, handler := range e.handlers {
if handler.ProcessChanges != nil {
es, err := handler.ProcessChanges(tx, changes)
if err != nil {
return fmt.Errorf("failed generating events for topic %q: %s", topic, err)
}
events = append(events, es...)
}
}
func (e *EventPublisher) PublishEvents(events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
return nil
}
}
func (e *EventPublisher) handleUpdates(ctx context.Context) {
@ -150,9 +130,6 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
if event.Topic == TopicInternal {
continue
}
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
}
@ -190,8 +167,8 @@ func (e *EventPublisher) Subscribe(
req *SubscribeRequest,
) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := e.handlers[req.Topic]
if !ok || req.Topic == TopicInternal {
_, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
@ -296,7 +273,6 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
}
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
// See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*eventSnapshot)
@ -308,13 +284,12 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
return snap, nil
}
// No snap or errored snap in cache, create a new one
handler, ok := e.handlers[req.Topic]
handler, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
snap = newEventSnapshot(req, topicHead, handler.Snapshot)
snap = newEventSnapshot(req, topicHead, handler)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap

View File

@ -6,8 +6,6 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
@ -21,7 +19,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0)
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := consumeSubscription(sub)
@ -38,8 +36,12 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{})
require.NoError(t, err)
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
publisher.PublishEvents(events)
// Subscriber should see the published event
result = nextResult(t, eventCh)
@ -48,25 +50,15 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
require.Equal(t, expected, result.Events)
}
func newTestTopicHandlers() map[Topic]TopicHandler {
return map[Topic]TopicHandler{
testTopic: {
Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{
testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
return 1, nil
},
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
return events, nil
},
},
}
}