stream: full test coverage for EventPublisher.Subscribe
This commit is contained in:
parent
754a1bbe24
commit
9c5181c897
|
@ -36,7 +36,7 @@ type EventPublisher struct {
|
|||
// publishCh is used to send messages from an active txn to a goroutine which
|
||||
// publishes events, so that publishing can happen asynchronously from
|
||||
// the Commit call in the FSM hot path.
|
||||
publishCh chan changeEvents
|
||||
publishCh chan []Event
|
||||
|
||||
snapshotHandlers SnapshotHandlers
|
||||
}
|
||||
|
@ -54,10 +54,6 @@ type subscriptions struct {
|
|||
byToken map[string]map[*SubscribeRequest]*Subscription
|
||||
}
|
||||
|
||||
type changeEvents struct {
|
||||
events []Event
|
||||
}
|
||||
|
||||
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
||||
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
||||
// The nil Topic is reserved and should not be used.
|
||||
|
@ -84,7 +80,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
|
|||
snapCacheTTL: snapCacheTTL,
|
||||
topicBuffers: make(map[Topic]*eventBuffer),
|
||||
snapCache: make(map[Topic]map[string]*eventSnapshot),
|
||||
publishCh: make(chan changeEvents, 64),
|
||||
publishCh: make(chan []Event, 64),
|
||||
subscriptions: &subscriptions{
|
||||
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
||||
},
|
||||
|
@ -97,7 +93,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
|
|||
// Publish events to all subscribers of the event Topic.
|
||||
func (e *EventPublisher) Publish(events []Event) {
|
||||
if len(events) > 0 {
|
||||
e.publishCh <- changeEvents{events: events}
|
||||
e.publishCh <- events
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,16 +106,16 @@ func (e *EventPublisher) Run(ctx context.Context) {
|
|||
e.subscriptions.closeAll()
|
||||
return
|
||||
case update := <-e.publishCh:
|
||||
e.sendEvents(update)
|
||||
e.publishEvent(update)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendEvents sends the given events to any applicable topic listeners, as well
|
||||
// as any ACL update events to cause affected listeners to reset their stream.
|
||||
func (e *EventPublisher) sendEvents(update changeEvents) {
|
||||
// publishEvent appends the events to any applicable topic buffers. It handles
|
||||
// any closeSubscriptionPayload events by closing associated subscriptions.
|
||||
func (e *EventPublisher) publishEvent(events []Event) {
|
||||
eventsByTopic := make(map[Topic][]Event)
|
||||
for _, event := range update.events {
|
||||
for _, event := range events {
|
||||
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
||||
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
||||
continue
|
||||
|
@ -183,7 +179,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
|||
}
|
||||
snap := newEventSnapshot()
|
||||
|
||||
// TODO: testcase for this case, especially the from-cache-splice case
|
||||
// if the request has an Index the client view is stale and must be reset
|
||||
// with a NewSnapshotToFollow event.
|
||||
if req.Index > 0 {
|
||||
|
|
|
@ -17,8 +17,8 @@ func (i intTopic) String() string {
|
|||
|
||||
var testTopic Topic = intTopic(999)
|
||||
|
||||
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
||||
subscription := &SubscribeRequest{
|
||||
func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
}
|
||||
|
@ -28,20 +28,18 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
|||
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
|
||||
go publisher.Run(ctx)
|
||||
|
||||
sub, err := publisher.Subscribe(subscription)
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
eventCh := consumeSubscription(ctx, sub)
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
|
||||
result := nextResult(t, eventCh)
|
||||
require.NoError(t, result.Err)
|
||||
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
|
||||
require.Equal(t, expected, result.Events)
|
||||
next := getNextEvents(t, eventCh)
|
||||
expected := []Event{testSnapshotEvent}
|
||||
require.Equal(t, expected, next)
|
||||
|
||||
result = nextResult(t, eventCh)
|
||||
require.Len(t, result.Events, 1)
|
||||
require.True(t, result.Events[0].IsEndOfSnapshot())
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Len(t, next, 1)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
|
||||
// Now subscriber should block waiting for updates
|
||||
assertNoResult(t, eventCh)
|
||||
|
||||
events := []Event{{
|
||||
|
@ -52,10 +50,16 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
|||
publisher.Publish(events)
|
||||
|
||||
// Subscriber should see the published event
|
||||
result = nextResult(t, eventCh)
|
||||
require.NoError(t, result.Err)
|
||||
next = getNextEvents(t, eventCh)
|
||||
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
|
||||
require.Equal(t, expected, result.Events)
|
||||
require.Equal(t, expected, next)
|
||||
}
|
||||
|
||||
var testSnapshotEvent = Event{
|
||||
Topic: testTopic,
|
||||
Payload: "snapshot-event-payload",
|
||||
Key: "sub-key",
|
||||
Index: 1,
|
||||
}
|
||||
|
||||
func newTestSnapshotHandlers() SnapshotHandlers {
|
||||
|
@ -64,18 +68,18 @@ func newTestSnapshotHandlers() SnapshotHandlers {
|
|||
if req.Topic != testTopic {
|
||||
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
|
||||
}
|
||||
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
|
||||
buf.Append([]Event{testSnapshotEvent})
|
||||
return 1, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
|
||||
eventCh := make(chan subNextResult, 1)
|
||||
func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
|
||||
eventCh := make(chan eventOrErr, 1)
|
||||
go func() {
|
||||
for {
|
||||
es, err := sub.Next(ctx)
|
||||
eventCh <- subNextResult{
|
||||
eventCh <- eventOrErr{
|
||||
Events: es,
|
||||
Err: err,
|
||||
}
|
||||
|
@ -87,30 +91,31 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR
|
|||
return eventCh
|
||||
}
|
||||
|
||||
type subNextResult struct {
|
||||
type eventOrErr struct {
|
||||
Events []Event
|
||||
Err error
|
||||
}
|
||||
|
||||
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
|
||||
func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event {
|
||||
t.Helper()
|
||||
select {
|
||||
case next := <-eventCh:
|
||||
return next
|
||||
require.NoError(t, next.Err)
|
||||
return next.Events
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("no event after 100ms")
|
||||
t.Fatalf("timeout waiting for event from subscription")
|
||||
return nil
|
||||
}
|
||||
return subNextResult{}
|
||||
}
|
||||
|
||||
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
|
||||
func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) {
|
||||
t.Helper()
|
||||
select {
|
||||
case next := <-eventCh:
|
||||
require.NoError(t, next.Err)
|
||||
require.Len(t, next.Events, 1)
|
||||
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case <-time.After(25 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,3 +161,232 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||
go publisher.Run(ctx)
|
||||
_, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
||||
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
||||
}
|
||||
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
next := getNextEvents(t, eventCh)
|
||||
expected := []Event{testSnapshotEvent}
|
||||
require.Equal(t, expected, next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Len(t, next, 1)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
|
||||
// Now subscriber should block waiting for updates
|
||||
assertNoResult(t, eventCh)
|
||||
|
||||
events := []Event{{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
Payload: "the-published-event-payload",
|
||||
Index: 3,
|
||||
}}
|
||||
publisher.Publish(events)
|
||||
|
||||
// Subscriber should see the published event
|
||||
next = getNextEvents(t, eventCh)
|
||||
expected = []Event{events[0]}
|
||||
require.Equal(t, expected, next)
|
||||
}
|
||||
|
||||
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||
go publisher.Run(ctx)
|
||||
// Include the same event in the topicBuffer
|
||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||
|
||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
|
||||
next := getNextEvents(t, eventCh)
|
||||
expected := []Event{testSnapshotEvent}
|
||||
require.Equal(t, expected, next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Len(t, next, 1)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
require.Equal(t, uint64(1), next[0].Index)
|
||||
})
|
||||
|
||||
runStep(t, "resume the subscription", func(t *testing.T) {
|
||||
newReq := *req
|
||||
newReq.Index = 1
|
||||
sub, err := publisher.Subscribe(&newReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
assertNoResult(t, eventCh)
|
||||
|
||||
expected := Event{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
Index: 3,
|
||||
Payload: "event-3",
|
||||
}
|
||||
publisher.publishEvent([]Event{expected})
|
||||
|
||||
next := getNextEvents(t, eventCh)
|
||||
require.Equal(t, []Event{expected}, next)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
|
||||
go publisher.Run(ctx)
|
||||
// Include the same event in the topicBuffer
|
||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||
|
||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
|
||||
next := getNextEvents(t, eventCh)
|
||||
expected := []Event{testSnapshotEvent}
|
||||
require.Equal(t, expected, next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Len(t, next, 1)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
require.Equal(t, uint64(1), next[0].Index)
|
||||
})
|
||||
|
||||
nextEvent := Event{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
Index: 3,
|
||||
Payload: "event-3",
|
||||
}
|
||||
|
||||
runStep(t, "publish an event while unsubed", func(t *testing.T) {
|
||||
publisher.publishEvent([]Event{nextEvent})
|
||||
})
|
||||
|
||||
runStep(t, "resume the subscription", func(t *testing.T) {
|
||||
newReq := *req
|
||||
newReq.Index = 1
|
||||
sub, err := publisher.Subscribe(&newReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
next := getNextEvents(t, eventCh)
|
||||
require.True(t, next[0].IsNewSnapshotToFollow(), next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Equal(t, testSnapshotEvent, next[0])
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
})
|
||||
}
|
||||
|
||||
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
|
||||
req := &SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||
go publisher.Run(ctx)
|
||||
// Include the same event in the topicBuffer
|
||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||
|
||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||
sub, err := publisher.Subscribe(req)
|
||||
require.NoError(t, err)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
|
||||
next := getNextEvents(t, eventCh)
|
||||
expected := []Event{testSnapshotEvent}
|
||||
require.Equal(t, expected, next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Len(t, next, 1)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
require.Equal(t, uint64(1), next[0].Index)
|
||||
})
|
||||
|
||||
nextEvent := Event{
|
||||
Topic: testTopic,
|
||||
Key: "sub-key",
|
||||
Index: 3,
|
||||
Payload: "event-3",
|
||||
}
|
||||
|
||||
runStep(t, "publish an event while unsubed", func(t *testing.T) {
|
||||
publisher.publishEvent([]Event{nextEvent})
|
||||
})
|
||||
|
||||
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
||||
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
||||
}
|
||||
|
||||
runStep(t, "resume the subscription", func(t *testing.T) {
|
||||
newReq := *req
|
||||
newReq.Index = 1
|
||||
sub, err := publisher.Subscribe(&newReq)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventCh := runSubscription(ctx, sub)
|
||||
next := getNextEvents(t, eventCh)
|
||||
require.True(t, next[0].IsNewSnapshotToFollow(), next)
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Equal(t, testSnapshotEvent, next[0])
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.True(t, next[0].IsEndOfSnapshot())
|
||||
|
||||
next = getNextEvents(t, eventCh)
|
||||
require.Equal(t, nextEvent, next[0])
|
||||
})
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
if !t.Run(name, fn) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue