stream: move goroutine out of New

This change will make it easier to manage goroutine lifecycle from the caller.

Also expose EventPublisher from state.Store
This commit is contained in:
Daniel Nephin 2020-09-09 16:26:11 -04:00
parent e345c8d8a6
commit d192b0a080
6 changed files with 29 additions and 17 deletions

View File

@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) {
// this outside the Server to avoid exposing this outside the package. // this outside the Server to avoid exposing this outside the package.
type FSM struct { type FSM struct {
logger hclog.Logger logger hclog.Logger
path string
// apply is built off the commands global and is used to route apply // apply is built off the commands global and is used to route apply
// operations to their appropriate handlers. // operations to their appropriate handlers.

View File

@ -37,14 +37,10 @@ type Changes struct {
// 2. Sent to the eventPublisher which will create and emit change events // 2. Sent to the eventPublisher which will create and emit change events
type changeTrackerDB struct { type changeTrackerDB struct {
db *memdb.MemDB db *memdb.MemDB
publisher eventPublisher publisher *stream.EventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error) processChanges func(ReadTxn, Changes) ([]stream.Event, error)
} }
type eventPublisher interface {
Publish(events []stream.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
// code may use it to create a read-only transaction, but it will panic if called // code may use it to create a read-only transaction, but it will panic if called
// with write=true. // with write=true.

View File

@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
lockDelay: NewDelay(), lockDelay: NewDelay(),
stopEventPublisher: cancel, stopEventPublisher: cancel,
} }
pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second)
s.db = &changeTrackerDB{ s.db = &changeTrackerDB{
db: db, db: db,
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), publisher: pub,
processChanges: processDBChanges, processChanges: processDBChanges,
} }
go pub.Run(ctx)
return s, nil return s, nil
} }
// EventPublisher returns the stream.EventPublisher used by the Store to
// publish events.
func (s *Store) EventPublisher() *stream.EventPublisher {
return s.db.publisher
}
// Snapshot is used to create a point-in-time snapshot of the entire db. // Snapshot is used to create a point-in-time snapshot of the entire db.
func (s *Store) Snapshot() *Snapshot { func (s *Store) Snapshot() *Snapshot {
tx := s.db.Txn(false) tx := s.db.Txn(false)

View File

@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription) sub, err := publisher.Subscribe(subscription)
require.NoError(err) require.NoError(err)
@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription) sub, err := publisher.Subscribe(subscription)
require.NoError(err) require.NoError(err)
@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription) sub, err := publisher.Subscribe(subscription)
require.NoError(err) require.NoError(err)
@ -433,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher s.db.publisher = publisher
sub, err := publisher.Subscribe(req) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)

View File

@ -79,7 +79,7 @@ type SnapshotAppender interface {
// A goroutine is run in the background to publish events to all subscribes. // A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources, // Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing. // and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{ e := &EventPublisher{
snapCacheTTL: snapCacheTTL, snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer), topicBuffers: make(map[Topic]*eventBuffer),
@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache
snapshotHandlers: handlers, snapshotHandlers: handlers,
} }
go e.handleUpdates(ctx)
return e return e
} }
@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) {
} }
} }
func (e *EventPublisher) handleUpdates(ctx context.Context) { // Run the event publisher until ctx is cancelled. Run should be called from a
// goroutine to forward events from Publish to all the appropriate subscribers.
func (e *EventPublisher) Run(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
sub, err := publisher.Subscribe(subscription) sub, err := publisher.Subscribe(subscription)
require.NoError(t, err) require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub) eventCh := consumeSubscription(ctx, sub)
@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
handlers[intTopic(22)] = fn handlers[intTopic(22)] = fn
handlers[intTopic(33)] = fn handlers[intTopic(33)] = fn
publisher := NewEventPublisher(ctx, handlers, time.Second) publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx)
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
require.NoError(t, err) require.NoError(t, err)