stream: Add forceClose and refactor subscription filtering

Move the subscription context to Next. context.Context should generally
never be stored in a struct because it makes that struct only valid
while the context is valid. This is rarely obvious from the caller.
Adds a forceClosed channel in place of the old context, and uses the new
context as a way for the caller to stop the Subscription blocking.

Remove some recursion out of bufferImte.Next. The caller is already looping so we can continue
in that loop instead of recursing. This ensures currentItem is updated immediately (which probably
does not matter in practice), and also removes the chance that we overflow the stack.

NextNoBlock and FollowAfter do not need to handle bufferItem.Err, the caller already
handles it.

Moves filter to a method to simplify Next, and more explicitly separate filtering from looping.

Also improve some godoc

Only unwrap itemBuffer.Err when necessary
This commit is contained in:
Daniel Nephin 2020-07-08 00:31:22 -04:00
parent 2595436f62
commit a2f8605c66
11 changed files with 204 additions and 229 deletions

View File

@ -31,7 +31,7 @@ type changeTrackerDB struct {
}
type eventPublisher interface {
PublishEvents(events []stream.Event)
Publish(events []stream.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
@ -83,7 +83,7 @@ func (c *changeTrackerDB) publish(changes Changes) error {
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.PublishEvents(events)
c.publisher.Publish(events)
return nil
}

View File

@ -30,8 +30,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)
@ -69,8 +70,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
Key: "nope",
Token: token.SecretID,
}
sub2, err := publisher.Subscribe(ctx, subscription2)
sub2, err := publisher.Subscribe(subscription2)
require.NoError(err)
defer sub2.Unsubscribe()
eventCh2 := testRunSub(sub2)
@ -111,8 +113,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)
@ -154,7 +157,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(ctx, subscription2)
sub, err = publisher.Subscribe(subscription2)
require.NoError(err)
eventCh = testRunSub(sub)
@ -182,8 +185,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(ctx, subscription3)
sub, err = publisher.Subscribe(subscription3)
require.NoError(err)
defer sub.Unsubscribe()
eventCh = testRunSub(sub)
@ -225,7 +229,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
eventCh := testRunSub(sub)
@ -264,7 +268,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
Key: "nope",
Token: token.SecretID,
}
sub, err = publisher.Subscribe(ctx, subscription2)
sub, err = publisher.Subscribe(subscription2)
require.NoError(err)
eventCh = testRunSub(sub)
@ -295,7 +299,7 @@ func testRunSub(sub *stream.Subscription) <-chan nextResult {
eventCh := make(chan nextResult, 1)
go func() {
for {
es, err := sub.Next()
es, err := sub.Next(context.TODO())
eventCh <- nextResult{
Events: es,
Err: err,
@ -351,7 +355,6 @@ func assertErr(t *testing.T, eventCh <-chan nextResult) error {
// acl reset is handled.
func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
t.Helper()
timeoutCh := time.After(100 * time.Millisecond)
for {
select {
case next := <-eventCh:
@ -363,7 +366,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
return
case <-timeoutCh:
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
}
}
@ -416,7 +419,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
// it assumes something lower down did that) and then wait for it to be reset
// so we know the initial token write event has been sent out before
// continuing...
subscription := &stream.SubscribeRequest{
req := &stream.SubscribeRequest{
Topic: topicService,
Key: "nope",
Token: token.SecretID,
@ -426,8 +429,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
s.db.publisher = publisher
sub, err := publisher.Subscribe(ctx, subscription)
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := testRunSub(sub)

View File

@ -3,6 +3,7 @@ package stream
import (
"context"
"errors"
"fmt"
"sync/atomic"
)
@ -13,17 +14,17 @@ import (
// specific design has several important features that significantly simplify a
// lot of our PubSub machinery.
//
// The Buffer itself only ever tracks the most recent set of events published so
// if there are no consumers older events are automatically garbage collected.
// Notification of new events is done by closing a channel on the previous head
// The eventBuffer only tracks the most recent set of published events, so
// if there are no consumers, older events are automatically garbage collected.
// Consumers are notified of new events by closing a channel on the previous head
// allowing efficient broadcast to many watchers without having to run multiple
// goroutines or deliver to O(N) separate channels.
//
// Because it's a linked list with atomically updated pointers, readers don't
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
// have to take a lock and can consume at their own pace. We also don't need a
// fixed limit on the number of items which avoids needing to configure
// buffer length to balance wasting lots of memory all the time against being able to
// tolerate occasional slow readers.
// fixed limit on the number of items, which avoids needing to configure
// buffer length to balance wasting memory, against being able to tolerate
// occasionally slow readers.
//
// The buffer is used to deliver all messages broadcast to a topic for active
// subscribers to consume, but it is also an effective way to both deliver and
@ -50,8 +51,8 @@ import (
// Events array. This enables subscribers to start watching for the next update
// immediately.
//
// The zero value eventBuffer is _not_ a usable type since it has not been
// initialized with an empty bufferItem so can't be used to wait for the first
// The zero value eventBuffer is _not_ usable, as it has not been
// initialized with an empty bufferItem so can not be used to wait for the first
// published event. Call newEventBuffer to construct a new buffer.
//
// Calls to Append or AppendBuffer that mutate the head must be externally
@ -65,7 +66,7 @@ type eventBuffer struct {
// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer() *eventBuffer {
b := &eventBuffer{}
b.head.Store(newBufferItem())
b.head.Store(newBufferItem(nil))
return b
}
@ -77,10 +78,7 @@ func newEventBuffer() *eventBuffer {
// goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) Append(events []Event) {
// Push events to the head
it := newBufferItem()
it.Events = events
b.AppendBuffer(it)
b.AppendItem(newBufferItem(events))
}
// AppendBuffer joins another buffer which may be the tail of a separate buffer
@ -92,7 +90,7 @@ func (b *eventBuffer) Append(events []Event) {
//
// AppendBuffer only supports a single concurrent caller and must be externally
// synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) AppendBuffer(item *bufferItem) {
func (b *eventBuffer) AppendItem(item *bufferItem) {
// First store it as the next node for the old head this ensures once it's
// visible to new searchers the linked list is already valid. Not sure it
// matters but this seems nicer.
@ -105,15 +103,6 @@ func (b *eventBuffer) AppendBuffer(item *bufferItem) {
// don't set chan to nil since that will race with readers accessing it.
}
// AppendErr publishes an error result to the end of the buffer. This is
// considered terminal and will cause all subscribers to end their current
// streaming subscription and return the error. AppendErr only supports a
// single concurrent caller and must be externally synchronized with other
// Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) AppendErr(err error) {
b.AppendBuffer(&bufferItem{Err: err})
}
// Head returns the current head of the buffer. It will always exist but it may
// be a "sentinel" empty item with a nil Events slice to allow consumers to
// watch for the next update. Consumers should always check for empty Events and
@ -172,22 +161,23 @@ type bufferLink struct {
// newBufferItem returns a blank buffer item with a link and chan ready to have
// the fields set and be appended to a buffer.
func newBufferItem() *bufferItem {
func newBufferItem(events []Event) *bufferItem {
return &bufferItem{
link: &bufferLink{
ch: make(chan struct{}),
},
link: &bufferLink{ch: make(chan struct{})},
Events: events,
}
}
// Next return the next buffer item in the buffer. It may block until ctx is
// cancelled or until the next item is published.
func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
// See if there is already a next value, block if so. Note we don't rely on
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-forceClose:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:
}
@ -201,45 +191,28 @@ func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
if next.Err != nil {
return nil, next.Err
}
if len(next.Events) == 0 {
// Skip this event
return next.Next(ctx)
}
return next, nil
}
// NextNoBlock returns the next item in the buffer without blocking. If it
// reaches the most recent item it will return nil and no error.
func (i *bufferItem) NextNoBlock() (*bufferItem, error) {
// reaches the most recent item it will return nil.
func (i *bufferItem) NextNoBlock() *bufferItem {
nextRaw := i.link.next.Load()
if nextRaw == nil {
return nil, nil
return nil
}
next := nextRaw.(*bufferItem)
if next.Err != nil {
return nil, next.Err
}
if len(next.Events) == 0 {
// Skip this event
return next.NextNoBlock()
}
return next, nil
return nextRaw.(*bufferItem)
}
// FollowAfter returns either the next item in the buffer if there is already
// one, or if not it returns an empty item (that will be ignored by subscribers)
// that has the same link as the current buffer so that it will be notified of
// future updates in the buffer without including the current item.
func (i *bufferItem) FollowAfter() (*bufferItem, error) {
next, err := i.NextNoBlock()
if err != nil {
return nil, err
}
// NextLink returns either the next item in the buffer if there is one, or
// an empty item (that will be ignored by subscribers) that has a pointer to
// the same link as this bufferItem (but none of the bufferItem content).
// When the link.ch is closed, subscriptions will be notified of the next item.
func (i *bufferItem) NextLink() *bufferItem {
next := i.NextNoBlock()
if next == nil {
// Return an empty item that can be followed to the next item published.
item := &bufferItem{}
item.link = i.link
return item, nil
return &bufferItem{link: i.link}
}
return next, nil
return next
}

View File

@ -60,7 +60,7 @@ func TestEventBufferFuzz(t *testing.T) {
item := head
var err error
for {
item, err = item.Next(context.Background())
item, err = item.Next(context.Background(), nil)
if err != nil {
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
expect, err)

View File

@ -7,8 +7,8 @@ import (
"time"
)
// EventPublisher receives changes events from Publish, and sends them to all
// registered subscribers.
// EventPublisher receives change events from Publish, and sends the events to
// all subscribers of the event Topic.
type EventPublisher struct {
// snapCacheTTL controls how long we keep snapshots in our cache before
// allowing them to be garbage collected and a new one made for subsequent
@ -60,6 +60,7 @@ type changeEvents struct {
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used.
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
// SnapshotAppender appends groups of events to create a Snapshot of state.
@ -91,10 +92,8 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache
return e
}
// PublishEvents to all subscribers. tx is a read-only transaction that captures
// the state at the time the change happened. The caller must never use the tx once
// it has been passed to PublishChanged.
func (e *EventPublisher) PublishEvents(events []Event) {
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
}
@ -146,22 +145,18 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
return buf
}
// Subscribe returns a new stream.Subscription for the given request. A
// subscription will stream an initial snapshot of events matching the request
// if required and then block until new events that modify the request occur, or
// the context is cancelled. Subscriptions may be forced to reset if the server
// decides it can no longer maintain correct operation for example if ACL
// policies changed or the state store was restored.
// Subscribe returns a new Subscription for the given request. A subscription
// will receive an initial snapshot of events matching the request if req.Index > 0.
// After the snapshot, events will be streamed as they are created.
// Subscriptions may be closed, forcing the client to resubscribe (for example if
// ACL policies changed or the state store is abandoned).
//
// When the caller is finished with the subscription for any reason, it must
// call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(
ctx context.Context,
req *SubscribeRequest,
) (*Subscription, error) {
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := e.snapshotHandlers[req.Topic]
if !ok {
if !ok || req.Topic == nil {
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
@ -176,47 +171,35 @@ func (e *EventPublisher) Subscribe(
topicHead := buf.Head()
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "resume stream" message to signal to
// No need for a snapshot, send the "end of empty snapshot" message to signal to
// client its cache is still good, then follow along from here in the topic.
e := Event{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: endOfEmptySnapshot{},
}
// Make a new buffer to send to the client containing the resume.
buf := newEventBuffer()
// Store the head of that buffer before we append to it to give as the
// starting point for the subscription.
subHead := buf.Head()
buf.Append([]Event{e})
buf.Append([]Event{{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: endOfEmptySnapshot{},
}})
// Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer.
follow, err := topicHead.FollowAfter()
if err != nil {
return nil, err
}
buf.AppendBuffer(follow)
buf.AppendItem(topicHead.NextLink())
sub = newSubscription(ctx, req, subHead)
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = newSubscription(ctx, req, snap.Snap)
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
}
e.subscriptions.add(req, sub)
// Set unsubscribe so that the caller doesn't need to keep track of the
// SubscriptionRequest, and can not accidentally call unsubscribe with the
// wrong value.
sub.Unsubscribe = func() {
e.subscriptions.unsubscribe(req)
}
return sub, nil
}
@ -239,27 +222,30 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.Close()
sub.forceClose()
}
}
}
}
// unsubscribe must be called when a client is no longer interested in a
// subscription to free resources monitoring changes in it's ACL token.
//
// req MUST be the same pointer that was used to register the subscription.
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
s.lock.Lock()
defer s.lock.Unlock()
// unsubscribe returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
// wrong pointer.
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
return func() {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
}

View File

@ -26,9 +26,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
defer cancel()
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
sub, err := publisher.Subscribe(ctx, subscription)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(sub)
eventCh := consumeSubscription(ctx, sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
@ -47,7 +47,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
Key: "sub-key",
Payload: "the-published-event-payload",
}}
publisher.PublishEvents(events)
publisher.Publish(events)
// Subscriber should see the published event
result = nextResult(t, eventCh)
@ -68,11 +68,11 @@ func newTestSnapshotHandlers() SnapshotHandlers {
}
}
func consumeSubscription(sub *Subscription) <-chan subNextResult {
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next()
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es,
Err: err,

View File

@ -9,10 +9,10 @@ package stream
// collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type eventSnapshot struct {
// Snap is the first item in the buffer containing the snapshot. Once the
// Head is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// so that subscribers receive all the events from the same buffer.
Snap *bufferItem
Head *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *eventBuffer
@ -30,14 +30,14 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Snap: buf.Head(),
Head: buf.Head(),
snapBuffer: buf,
}
go func() {
idx, err := fn(req, s.snapBuffer)
if err != nil {
s.snapBuffer.AppendErr(err)
s.snapBuffer.AppendItem(&bufferItem{Err: err})
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
@ -57,44 +57,33 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// find the first event after the current snapshot.
item := topicBufferHead
for {
// Find the next item that we should include.
next, err := item.NextNoBlock()
if err != nil {
// Append an error result to signal to subscribers that this snapshot is
// no good.
s.snapBuffer.AppendErr(err)
return
}
if next == nil {
next := item.NextNoBlock()
switch {
case next == nil:
// This is the head of the topic buffer (or was just now which is after
// the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update.
follow, err := item.FollowAfter()
if err != nil {
s.snapBuffer.AppendErr(err)
return
}
s.snapBuffer.AppendBuffer(follow)
// We are done, subscribers will now follow future updates to the topic
// after reading the snapshot events.
s.snapBuffer.AppendItem(item.NextLink())
return
}
if next.Err != nil {
s.snapBuffer.AppendErr(next.Err)
case next.Err != nil:
// This case is not currently possible because errors can only come
// from a snapshot func, and this is consuming events from a topic
// buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places
// in the future.
s.snapBuffer.AppendItem(next)
return
}
if len(next.Events) > 0 && next.Events[0].Index > idx {
case len(next.Events) > 0 && next.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it.
s.snapBuffer.AppendBuffer(next)
s.snapBuffer.AppendItem(next)
return
}
// We don't need this item, continue to next
item = next
}

View File

@ -112,11 +112,11 @@ func TestEventSnapshot(t *testing.T) {
snapIDs := make([]string, 0, tc.snapshotSize)
updateIDs := make([]string, 0, tc.updatesAfterSnap)
snapDone := false
curItem := es.Snap
curItem := es.Head
var err error
RECV:
for {
curItem, err = curItem.Next(ctx)
curItem, err = curItem.Next(ctx, nil)
// This error is typically timeout so dump the state to aid debugging.
require.NoError(t, err,
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,

View File

@ -0,0 +1,17 @@
package stream
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEvent_IsEndOfSnapshot(t *testing.T) {
e := Event{Payload: endOfSnapshot{}}
require.True(t, e.IsEndOfSnapshot())
t.Run("not EndOfSnapshot", func(t *testing.T) {
e := Event{Payload: endOfEmptySnapshot{}}
require.False(t, e.IsEndOfSnapshot())
})
}

View File

@ -19,11 +19,10 @@ const (
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry")
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
// Subscription holds state about a single Subscribe call. Subscribe clients
// access their next event by calling Next(). This may initially include the
// snapshot events to catch them up if they are new or behind.
// Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct {
// state is accessed atomically 0 means open, 1 means closed with reload
state uint32
@ -35,17 +34,15 @@ type Subscription struct {
// is mutated by calls to Next.
currentItem *bufferItem
// ctx is the Subscription context that wraps the context of the streaming RPC
// handler call.
ctx context.Context
// forceClosed is closed when forceClose is called. It is used by
// EventPublisher to cancel Next().
forceClosed chan struct{}
// cancelFn stores the context cancel function that will wake up the
// in-progress Next call on a server-initiated state change e.g. Reload.
cancelFn func()
// Unsubscribe is a function set by EventPublisher that is called to
// free resources when the subscription is no longer needed.
Unsubscribe func()
// unsub is a function set by EventPublisher that is called to free resources
// when the subscription is no longer needed.
// It must be safe to call the function from multiple goroutines and the function
// must be idempotent.
unsub func()
}
// SubscribeRequest identifies the types of events the subscriber would like to
@ -59,74 +56,81 @@ type SubscribeRequest struct {
// newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription {
subCtx, cancel := context.WithCancel(ctx)
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
ctx: subCtx,
cancelFn: cancel,
forceClosed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
}
}
// Next returns the next set of events to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next() ([]Event, error) {
func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(s.ctx)
if err != nil {
// Check we didn't return because of a state change cancelling the context
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return nil, ErrSubscriptionClosed
case err != nil:
return nil, err
}
// Advance our cursor for next loop or next call
s.currentItem = next
// Assume happy path where all events (or none) are relevant.
allMatch := true
// If there is a specific key, see if we need to filter any events
if s.req.Key != "" {
for _, e := range next.Events {
if s.req.Key != e.Key {
allMatch = false
break
}
}
events := s.filter(next.Events)
if len(events) == 0 {
continue
}
// Only if we need to filter events should we bother allocating a new slice
// as this is a hot loop.
events := next.Events
if !allMatch {
events = make([]Event, 0, len(next.Events))
for _, e := range next.Events {
// Only return it if the key matches.
if s.req.Key == "" || s.req.Key == e.Key {
events = append(events, e)
}
}
}
if len(events) > 0 {
return events, nil
}
// Keep looping until we find some events we are interested in.
return events, nil
}
}
// TODO: test cases for this method
func (s *Subscription) filter(events []Event) []Event {
if s.req.Key == "" || len(events) == 0 {
return events
}
allMatch := true
for _, e := range events {
if s.req.Key != e.Key {
allMatch = false
break
}
}
// Only allocate a new slice if some events need to be filtered out
if allMatch {
return events
}
// FIXME: this will over-allocate. We could get a count from the previous range
// over events.
events = make([]Event, 0, len(events))
for _, e := range events {
if s.req.Key == e.Key {
events = append(events, e)
}
}
return events
}
// Close the subscription. Subscribers will receive an error when they call Next,
// and will need to perform a new Subscribe request.
// It is safe to call from any goroutine.
func (s *Subscription) Close() {
func (s *Subscription) forceClose() {
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
if swapped {
s.cancelFn()
close(s.forceClosed)
}
}
// Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() {
s.unsub()
}

View File

@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/require"
)
func noopUnSub() {}
func TestSubscription(t *testing.T) {
eb := newEventBuffer()
@ -26,11 +28,11 @@ func TestSubscription(t *testing.T) {
Topic: testTopic,
Key: "test",
}
sub := newSubscription(ctx, req, startHead)
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
@ -46,7 +48,7 @@ func TestSubscription(t *testing.T) {
})
// Next call should block until event is delivered
got, err = sub.Next()
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed > 200*time.Millisecond,
@ -64,7 +66,7 @@ func TestSubscription(t *testing.T) {
publishTestEvent(index, eb, "test")
start = time.Now()
got, err = sub.Next()
got, err = sub.Next(ctx)
elapsed = time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
@ -79,7 +81,7 @@ func TestSubscription(t *testing.T) {
cancel()
})
_, err = sub.Next()
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.True(t, elapsed > 200*time.Millisecond,
@ -106,11 +108,11 @@ func TestSubscription_Close(t *testing.T) {
Topic: testTopic,
Key: "test",
}
sub := newSubscription(ctx, req, startHead)
sub := newSubscription(req, startHead, noopUnSub)
// First call to sub.Next should return our published event immediately
start := time.Now()
got, err := sub.Next()
got, err := sub.Next(ctx)
elapsed := time.Since(start)
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
@ -122,10 +124,10 @@ func TestSubscription_Close(t *testing.T) {
// needs to reset (e.g. on ACL perm change).
start = time.Now()
time.AfterFunc(200*time.Millisecond, func() {
sub.Close()
sub.forceClose()
})
_, err = sub.Next()
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)