stream: Move EventPublisher to stream package

The EventPublisher is the central hub of the PubSub system. It is toughly coupled with much of
stream. Some stream internals were exported exclusively for EventPublisher.

The two Subscribe cases (with or without index) were also awkwardly split between two packages. By
moving EventPublisher into stream they are now both in the same package (although still in different files).
This commit is contained in:
Daniel Nephin 2020-07-06 16:15:13 -04:00
parent 6e87e83d77
commit 9e37894778
7 changed files with 338 additions and 267 deletions

View File

@ -55,8 +55,7 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
publish: func(changes db.Changes) error {
// publish provides a new read-only Txn to PublishChanges so that
// events can be constructed from the current state at the time of
// Commit, and so that operations can be performed in a goroutine
// after this WriteTxn is committed.
// Commit.
return c.publisher.PublishChanges(c.db.Txn(false), changes)
},
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"time"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
@ -166,7 +167,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
lockDelay: NewDelay(),
db: &changeTrackerDB{
db: db,
publisher: NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second),
publisher: stream.NewEventPublisher(ctx, newTopicHandlers(), 10*time.Second),
},
}
return s, nil

View File

@ -12,225 +12,7 @@ import (
"github.com/stretchr/testify/require"
)
type nextResult struct {
Events []stream.Event
Err error
}
func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- nextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return nil
}
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
t.Helper()
select {
case next := <-eventCh:
require.Error(t, next.Err)
return next.Err
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
return nil
}
// assertReset checks that a ResetStream event is send to the subscription
// within 100ms. If allowEOS is true it will ignore any intermediate events that
// come before the reset provided they are EndOfSnapshot events because in many
// cases it's non-deterministic whether the snapshot will complete before the
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
timeoutCh := time.After(100 * time.Millisecond)
for {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
continue
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
return
case <-timeoutCh:
t.Fatalf("no err after 100ms")
}
}
}
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler {
return map[stream.Topic]TopicHandler{
topicService: {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
}
return idx, nil
},
},
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
{ID: testRoleID_B},
},
}
token.SetHash(false)
// If we subscribe immediately after we create a token we race with the
// publisher that is publishing the ACL token event for the token we just
// created. That means that the subscription we create right after will often
// be immediately reset. The most reliable way to avoid that without just
// sleeping for some arbitrary time is to pre-subscribe using the token before
// it actually exists (which works because the publisher doesn't check tokens
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := testRunSub(sub)
// Create the ACL token to be used in the subscription.
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
// Wait for the pre-subscription to be reset
assertReset(t, eventCh, true)
return token
}
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
t.Parallel()
require := require.New(t)
store, err := NewStateStore(nil)
require.NoError(err)
reg := structs.TestRegisterRequest(t)
reg.Service.ID = "web1"
require.NoError(store.EnsureRegistration(1, reg))
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: reg.Service.Service,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(store), 0)
store.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
eventCh := testRunSub(sub)
// Stream should get the instance and then EndOfSnapshot
e := assertEvent(t, eventCh)
srv := e.Payload.(*structs.ServiceNode)
require.Equal(srv.ServiceID, "web1")
e = assertEvent(t, eventCh)
require.True(e.IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoEvent(t, eventCh)
// Add a new instance of service on a different node
reg.Node = "node2"
require.NoError(store.EnsureRegistration(1, reg))
// Subscriber should see registration
e = assertEvent(t, eventCh)
srv = e.Payload.(*structs.ServiceNode)
require.Equal(srv.Node, "node2")
}
func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -247,7 +29,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -311,7 +93,7 @@ func TestEventPublisher_Publish_ACLTokenUpdate(t *testing.T) {
require.Equal(stream.ErrSubscriptionReload, err)
}
func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -328,7 +110,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -425,7 +207,7 @@ func TestEventPublisher_Publish_ACLPolicyUpdate(t *testing.T) {
assertReset(t, eventCh, true)
}
func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
t.Parallel()
require := require.New(t)
s := testACLTokensStateStore(t)
@ -442,7 +224,7 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(err)
@ -504,3 +286,177 @@ func TestEventPublisher_Publish_ACLRoleUpdate(t *testing.T) {
// Ensure the reload event was sent.
assertReset(t, eventCh, false)
}
type nextResult struct {
Events []stream.Event
Err error
}
func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- nextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return nil
}
func assertErr(t *testing.T, eventCh <-chan nextResult) error {
t.Helper()
select {
case next := <-eventCh:
require.Error(t, next.Err)
return next.Err
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
return nil
}
// assertReset checks that a ResetStream event is send to the subscription
// within 100ms. If allowEOS is true it will ignore any intermediate events that
// come before the reset provided they are EndOfSnapshot events because in many
// cases it's non-deterministic whether the snapshot will complete before the
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
timeoutCh := time.After(100 * time.Millisecond)
for {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
continue
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
return
case <-timeoutCh:
t.Fatalf("no err after 100ms")
}
}
}
var topicService stream.Topic = 901
func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
topicService: {
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
if change.Table == "services" {
service := change.After.(*structs.ServiceNode)
events = append(events, stream.Event{
Topic: topicService,
Key: service.ServiceName,
Index: changes.Index,
Payload: service,
})
}
}
return events, nil
},
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil {
return idx, err
}
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
}
buffer.Append([]stream.Event{event})
}
return idx, nil
},
},
stream.TopicInternal: {
ProcessChanges: aclChangeUnsubscribeEvent,
},
}
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
SecretID: "4268ce0d-d7ae-4718-8613-42eba9036020",
Description: "something",
Policies: []structs.ACLTokenPolicyLink{
{ID: testPolicyID_A},
},
Roles: []structs.ACLTokenRoleLink{
{ID: testRoleID_B},
},
}
token.SetHash(false)
// If we subscribe immediately after we create a token we race with the
// publisher that is publishing the ACL token event for the token we just
// created. That means that the subscription we create right after will often
// be immediately reset. The most reliable way to avoid that without just
// sleeping for some arbitrary time is to pre-subscribe using the token before
// it actually exists (which works because the publisher doesn't check tokens
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestTopicHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := testRunSub(sub)
// Create the ACL token to be used in the subscription.
require.NoError(t, s.ACLTokenSet(2, token.Clone(), false))
// Wait for the pre-subscription to be reset
assertReset(t, eventCh, true)
return token
}

View File

@ -5,8 +5,8 @@ import (
)
// newTopicHandlers returns the default handlers for state change events.
func newTopicHandlers() map[stream.Topic]TopicHandler {
return map[stream.Topic]TopicHandler{
func newTopicHandlers() map[stream.Topic]stream.TopicHandler {
return map[stream.Topic]stream.TopicHandler{
// TopicInternal is a special case for processors that handle events that are
// not for subscribers. They are used by the stream package.
stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent},

View File

@ -4,6 +4,7 @@ type Topic int32
// TODO: remove underscores
// TODO: type string instead of int?
// TODO: define non-internal topics in state package?
const (
TopicInternal Topic = 0
Topic_ServiceHealth Topic = 1

View File

@ -1,4 +1,4 @@
package state
package stream
import (
"context"
@ -7,7 +7,6 @@ import (
"time"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/consul/agent/consul/stream"
)
// EventPublisher receives changes events from Publish, and sends them to all
@ -34,11 +33,11 @@ type EventPublisher struct {
// topicBuffers stores the head of the linked-list buffer to publish events to
// for a topic.
topicBuffers map[stream.Topic]*stream.EventBuffer
topicBuffers map[Topic]*EventBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key.
// TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot
snapCache map[Topic]map[string]*EventSnapshot
subscriptions *subscriptions
@ -47,7 +46,7 @@ type EventPublisher struct {
// the Commit call in the FSM hot path.
publishCh chan changeEvents
handlers map[stream.Topic]TopicHandler
handlers map[Topic]TopicHandler
}
type subscriptions struct {
@ -60,21 +59,21 @@ type subscriptions struct {
// When the token is modified all subscriptions under that token will be
// reloaded.
// A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
byToken map[string]map[*SubscribeRequest]*Subscription
}
type changeEvents struct {
events []stream.Event
events []Event
}
// TopicHandler provides functions which create stream.Events for a topic.
type TopicHandler struct {
// Snapshot creates the necessary events to reproduce the current state and
// appends them to the EventBuffer.
Snapshot func(*stream.SubscribeRequest, *stream.EventBuffer) (index uint64, err error)
Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error)
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
// those changes.
ProcessChanges func(db.ReadTxn, db.Changes) ([]stream.Event, error)
ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
}
// NewEventPublisher returns an EventPublisher for publishing change events.
@ -82,14 +81,14 @@ type TopicHandler struct {
// A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
topicBuffers: make(map[Topic]*EventBuffer),
snapCache: make(map[Topic]map[string]*EventSnapshot),
publishCh: make(chan changeEvents, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
handlers: handlers,
}
@ -99,13 +98,13 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl
return e
}
// PublishChanges to all subscribers. tx is a read-only transaction that may be
// used from a goroutine. The caller should never use the tx once it has been
// passed to PublishChanged.
// PublishChanges to all subscribers. tx is a read-only transaction that captures
// the state at the time the change happened. The caller must never use the tx once
// it has been passed to PublishChanged.
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error {
defer tx.Abort()
var events []stream.Event
var events []Event
for topic, handler := range e.handlers {
if handler.ProcessChanges != nil {
es, err := handler.ProcessChanges(tx, changes)
@ -137,14 +136,14 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
// as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) {
for _, event := range update.events {
if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok {
if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs)
}
}
eventsByTopic := make(map[stream.Topic][]stream.Event)
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
if event.Topic == stream.TopicInternal {
if event.Topic == TopicInternal {
continue
}
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
@ -161,10 +160,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
// already exist.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer {
func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer {
buf, ok := e.topicBuffers[topic]
if !ok {
buf = stream.NewEventBuffer()
buf = NewEventBuffer()
e.topicBuffers[topic] = buf
}
return buf
@ -181,11 +180,11 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer
// call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(
ctx context.Context,
req *stream.SubscribeRequest,
) (*stream.Subscription, error) {
req *SubscribeRequest,
) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := e.handlers[req.Topic]
if !ok || req.Topic == stream.TopicInternal {
if !ok || req.Topic == TopicInternal {
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
@ -198,26 +197,26 @@ func (e *EventPublisher) Subscribe(
// See if we need a snapshot
topicHead := buf.Head()
var sub *stream.Subscription
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "resume stream" message to signal to
// client it's cache is still good. (note that this can be distinguished
// from a legitimate empty snapshot due to the index matching the one the
// client sent), then follow along from here in the topic.
e := stream.Event{
e := Event{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: stream.ResumeStream{},
Payload: ResumeStream{},
}
// Make a new buffer to send to the client containing the resume.
buf := stream.NewEventBuffer()
buf := NewEventBuffer()
// Store the head of that buffer before we append to it to give as the
// starting point for the subscription.
subHead := buf.Head()
buf.Append([]stream.Event{e})
buf.Append([]Event{e})
// Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer.
@ -227,13 +226,13 @@ func (e *EventPublisher) Subscribe(
}
buf.AppendBuffer(follow)
sub = stream.NewSubscription(ctx, req, subHead)
sub = NewSubscription(ctx, req, subHead)
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = stream.NewSubscription(ctx, req, snap.Snap)
sub = NewSubscription(ctx, req, snap.Snap)
}
e.subscriptions.add(req, sub)
@ -246,13 +245,13 @@ func (e *EventPublisher) Subscribe(
return sub, nil
}
func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) {
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription)
subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
@ -275,7 +274,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
// subscription to free resources monitoring changes in it's ACL token.
//
// req MUST be the same pointer that was used to register the subscription.
func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) {
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
s.lock.Lock()
defer s.lock.Unlock()
@ -289,11 +288,11 @@ func (s *subscriptions) unsubscribe(req *stream.SubscribeRequest) {
}
}
func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) {
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) {
// See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*stream.EventSnapshot)
topicSnaps = make(map[string]*EventSnapshot)
e.snapCache[req.Topic] = topicSnaps
}
@ -308,7 +307,7 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe
return nil, fmt.Errorf("unknown topic %d", req.Topic)
}
snap = stream.NewEventSnapshot(req, topicHead, handler.Snapshot)
snap = NewEventSnapshot(req, topicHead, handler.Snapshot)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap

View File

@ -0,0 +1,115 @@
package stream
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state/db"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
var testTopic Topic = 999
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestTopicHandlers(), 0)
sub, err := publisher.Subscribe(ctx, subscription)
require.NoError(t, err)
eventCh := consumeSubscription(sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
require.Equal(t, expected, result.Events)
result = nextResult(t, eventCh)
require.Len(t, result.Events, 1)
require.True(t, result.Events[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
err = publisher.PublishChanges(&memdb.Txn{}, db.Changes{})
require.NoError(t, err)
// Subscriber should see the published event
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events)
}
func newTestTopicHandlers() map[Topic]TopicHandler {
return map[Topic]TopicHandler{
testTopic: {
Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
return 1, nil
},
ProcessChanges: func(tx db.ReadTxn, changes db.Changes) ([]Event, error) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
}}
return events, nil
},
},
}
}
func consumeSubscription(sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next()
eventCh <- subNextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}