stream: close all subs when EventProcessor is shutdown.

This commit is contained in:
Daniel Nephin 2020-07-16 15:26:26 -04:00
parent e802689bbe
commit decba06b7d
2 changed files with 54 additions and 2 deletions

View File

@ -103,8 +103,7 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
// TODO: also close all subscriptions so the subscribers are moved
// to the new publisher?
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.sendEvents(update)
@ -249,6 +248,17 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
}
}
func (s *subscriptions) closeAll() {
s.lock.Lock()
defer s.lock.Unlock()
for _, byRequest := range s.byToken {
for _, sub := range byRequest {
sub.forceClose()
}
}
}
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {

View File

@ -111,3 +111,45 @@ func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
case <-time.After(100 * time.Millisecond):
}
}
func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
handlers := newTestSnapshotHandlers()
fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
return 0, nil
}
handlers[intTopic(22)] = fn
handlers[intTopic(33)] = fn
publisher := NewEventPublisher(ctx, handlers, time.Second)
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
require.NoError(t, err)
defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33)})
require.NoError(t, err)
defer sub2.Unsubscribe()
cancel() // Shutdown
err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
}
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
events, err := sub.Next(ctx)
switch {
case err != nil:
return err
case len(events) == 1 && events[0].IsEndOfSnapshot():
continue
}
}
}