stream: unexport identifiers
Now that EventPublisher is part of stream a lot of the internals can be hidden
This commit is contained in:
parent
9e37894778
commit
e1305fe80c
|
@ -52,7 +52,7 @@ func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Even
|
|||
}
|
||||
}
|
||||
// TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent
|
||||
return []stream.Event{stream.NewUnsubscribeEvent(secretIDs)}, nil
|
||||
return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil
|
||||
}
|
||||
|
||||
// changeObject returns the object before it was deleted if the change was a delete,
|
||||
|
|
|
@ -23,7 +23,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
Mutate: func(s *Store, tx *txn) error {
|
||||
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "token update",
|
||||
|
@ -37,7 +37,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
|
||||
return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "token delete",
|
||||
|
@ -48,13 +48,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
token := newACLToken(1)
|
||||
return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "policy create",
|
||||
Mutate: newACLPolicyWithSingleToken,
|
||||
// two identical tokens, because Mutate has two changes
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
|
||||
},
|
||||
{
|
||||
Name: "policy update",
|
||||
|
@ -64,7 +64,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
policy.Rules = `operator = "write"`
|
||||
return s.aclPolicySetTxn(tx, tx.Index, policy)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "policy delete",
|
||||
|
@ -73,13 +73,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
policy := newACLPolicy(1)
|
||||
return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "role create",
|
||||
Mutate: newACLRoleWithSingleToken,
|
||||
// Two tokens with the same ID, because there are two changes in Mutate
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)),
|
||||
},
|
||||
{
|
||||
Name: "role update",
|
||||
|
@ -93,7 +93,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
})
|
||||
return s.aclRoleSetTxn(tx, tx.Index, role, true)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
{
|
||||
Name: "role delete",
|
||||
|
@ -102,7 +102,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
|||
role := newACLRole(1, newACLRolePolicyLink(1))
|
||||
return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil)
|
||||
},
|
||||
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||
expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
|
|||
|
||||
// Ensure the reset event was sent.
|
||||
err = assertErr(t, eventCh)
|
||||
require.Equal(stream.ErrSubscriptionReload, err)
|
||||
require.Equal(stream.ErrSubscriptionClosed, err)
|
||||
|
||||
// Register another subscription.
|
||||
subscription2 := &stream.SubscribeRequest{
|
||||
|
@ -90,7 +90,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
|
|||
|
||||
// Ensure the reset event was sent.
|
||||
err = assertErr(t, eventCh2)
|
||||
require.Equal(stream.ErrSubscriptionReload, err)
|
||||
require.Equal(stream.ErrSubscriptionClosed, err)
|
||||
}
|
||||
|
||||
func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
||||
|
@ -175,7 +175,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
|
|||
|
||||
// Ensure the reload event was sent.
|
||||
err = assertErr(t, eventCh)
|
||||
require.Equal(stream.ErrSubscriptionReload, err)
|
||||
require.Equal(stream.ErrSubscriptionClosed, err)
|
||||
|
||||
// Register another subscription.
|
||||
subscription3 := &stream.SubscribeRequest{
|
||||
|
@ -362,7 +362,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
|
|||
}
|
||||
}
|
||||
require.Error(t, next.Err)
|
||||
require.Equal(t, stream.ErrSubscriptionReload, next.Err)
|
||||
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
|
||||
return
|
||||
case <-timeoutCh:
|
||||
t.Fatalf("no err after 100ms")
|
||||
|
@ -390,7 +390,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
|
|||
}
|
||||
return events, nil
|
||||
},
|
||||
Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) {
|
||||
Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
|
||||
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
|
||||
if err != nil {
|
||||
return idx, err
|
||||
|
@ -403,7 +403,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler {
|
|||
Index: node.ModifyIndex,
|
||||
Payload: node,
|
||||
}
|
||||
buffer.Append([]stream.Event{event})
|
||||
snap.Append([]stream.Event{event})
|
||||
}
|
||||
return idx, nil
|
||||
},
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
/*
|
||||
Package stream provides a publish/subscribe system for events produced by changes
|
||||
to the state store.
|
||||
*/
|
||||
package stream
|
||||
|
||||
type Topic int32
|
||||
|
@ -24,21 +28,21 @@ func (e Event) IsEndOfSnapshot() bool {
|
|||
}
|
||||
|
||||
func (e Event) IsResumeStream() bool {
|
||||
return e.Payload == ResumeStream{}
|
||||
return e.Payload == resumeStream{}
|
||||
}
|
||||
|
||||
type endOfSnapshot struct{}
|
||||
|
||||
type ResumeStream struct{}
|
||||
type resumeStream struct{}
|
||||
|
||||
// TODO: unexport once EventPublisher is in stream package
|
||||
type UnsubscribePayload struct {
|
||||
TokensSecretIDs []string
|
||||
type closeSubscriptionPayload struct {
|
||||
tokensSecretIDs []string
|
||||
}
|
||||
|
||||
// NewUnsubscribeEvent returns a special Event that is handled by the
|
||||
// stream package, and is never sent to subscribers. It results in any subscriptions
|
||||
// which match any of the TokenSecretIDs to be unsubscribed.
|
||||
func NewUnsubscribeEvent(tokenSecretIDs []string) Event {
|
||||
return Event{Payload: UnsubscribePayload{TokensSecretIDs: tokenSecretIDs}}
|
||||
// NewCloseSubscriptionEvent returns a special Event that is handled by the
|
||||
// stream package, and is never sent to subscribers. EventProcessor handles
|
||||
// these events, and closes any subscriptions which were created using a token
|
||||
// which matches any of the tokenSecretIDs.
|
||||
func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event {
|
||||
return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"sync/atomic"
|
||||
)
|
||||
|
||||
// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent
|
||||
// eventBuffer is a single-writer, multiple-reader, unlimited length concurrent
|
||||
// buffer of events that have been published on a topic. The buffer is
|
||||
// effectively just the head of an atomically updated single-linked list. Atomic
|
||||
// accesses are usually to be suspected as premature optimization but this
|
||||
|
@ -27,7 +27,7 @@ import (
|
|||
//
|
||||
// The buffer is used to deliver all messages broadcast toa topic for active
|
||||
// subscribers to consume, but it is also an effective way to both deliver and
|
||||
// optionally cache snapshots per topic and key. byt using an EventBuffer,
|
||||
// optionally cache snapshots per topic and key. byt using an eventBuffer,
|
||||
// snapshot functions don't have to read the whole snapshot into memory before
|
||||
// delivery - they can stream from memdb. However simply by storing a pointer to
|
||||
// the first event in the buffer, we can cache the buffered events for future
|
||||
|
@ -46,26 +46,26 @@ import (
|
|||
// automatically keep the events we need to make that work for exactly the
|
||||
// optimal amount of time and no longer.
|
||||
//
|
||||
// A new buffer is constructed with a sentinel "empty" BufferItem that has a nil
|
||||
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
|
||||
// Events array. This enables subscribers to start watching for the next update
|
||||
// immediately.
|
||||
//
|
||||
// The zero value EventBuffer is _not_ a usable type since it has not been
|
||||
// The zero value eventBuffer is _not_ a usable type since it has not been
|
||||
// initialized with an empty bufferItem so can't be used to wait for the first
|
||||
// published event. Call NewEventBuffer to construct a new buffer.
|
||||
// published event. Call newEventBuffer to construct a new buffer.
|
||||
//
|
||||
// Calls to Append or AppendBuffer that mutate the head must be externally
|
||||
// synchronized. This allows systems that already serialize writes to append
|
||||
// without lock overhead (e.g. a snapshot goroutine appending thousands of
|
||||
// events).
|
||||
type EventBuffer struct {
|
||||
type eventBuffer struct {
|
||||
head atomic.Value
|
||||
}
|
||||
|
||||
// NewEventBuffer creates an EventBuffer ready for use.
|
||||
func NewEventBuffer() *EventBuffer {
|
||||
b := &EventBuffer{}
|
||||
b.head.Store(NewBufferItem())
|
||||
// newEventBuffer creates an eventBuffer ready for use.
|
||||
func newEventBuffer() *eventBuffer {
|
||||
b := &eventBuffer{}
|
||||
b.head.Store(newBufferItem())
|
||||
return b
|
||||
}
|
||||
|
||||
|
@ -76,9 +76,9 @@ func NewEventBuffer() *EventBuffer {
|
|||
// mutations to the events as they may have been exposed to subscribers in other
|
||||
// goroutines. Append only supports a single concurrent caller and must be
|
||||
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||
func (b *EventBuffer) Append(events []Event) {
|
||||
func (b *eventBuffer) Append(events []Event) {
|
||||
// Push events to the head
|
||||
it := NewBufferItem()
|
||||
it := newBufferItem()
|
||||
it.Events = events
|
||||
b.AppendBuffer(it)
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func (b *EventBuffer) Append(events []Event) {
|
|||
//
|
||||
// AppendBuffer only supports a single concurrent caller and must be externally
|
||||
// synchronized with other Append, AppendBuffer or AppendErr calls.
|
||||
func (b *EventBuffer) AppendBuffer(item *BufferItem) {
|
||||
func (b *eventBuffer) AppendBuffer(item *bufferItem) {
|
||||
// First store it as the next node for the old head this ensures once it's
|
||||
// visible to new searchers the linked list is already valid. Not sure it
|
||||
// matters but this seems nicer.
|
||||
|
@ -110,20 +110,20 @@ func (b *EventBuffer) AppendBuffer(item *BufferItem) {
|
|||
// streaming subscription and return the error. AppendErr only supports a
|
||||
// single concurrent caller and must be externally synchronized with other
|
||||
// Append, AppendBuffer or AppendErr calls.
|
||||
func (b *EventBuffer) AppendErr(err error) {
|
||||
b.AppendBuffer(&BufferItem{Err: err})
|
||||
func (b *eventBuffer) AppendErr(err error) {
|
||||
b.AppendBuffer(&bufferItem{Err: err})
|
||||
}
|
||||
|
||||
// Head returns the current head of the buffer. It will always exist but it may
|
||||
// be a "sentinel" empty item with a nil Events slice to allow consumers to
|
||||
// watch for the next update. Consumers should always check for empty Events and
|
||||
// treat them as no-ops. Will panic if EventBuffer was not initialized correctly
|
||||
// with EventBuffer.
|
||||
func (b *EventBuffer) Head() *BufferItem {
|
||||
return b.head.Load().(*BufferItem)
|
||||
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
|
||||
// with eventBuffer.
|
||||
func (b *eventBuffer) Head() *bufferItem {
|
||||
return b.head.Load().(*bufferItem)
|
||||
}
|
||||
|
||||
// BufferItem represents a set of events published by a single raft operation.
|
||||
// bufferItem represents a set of events published by a single raft operation.
|
||||
// The first item returned by a newly constructed buffer will have nil Events.
|
||||
// It is a sentinel value which is used to wait on the next events via Next.
|
||||
//
|
||||
|
@ -135,9 +135,9 @@ func (b *EventBuffer) Head() *BufferItem {
|
|||
// they have been delivered except where it's intentional to maintain a cache or
|
||||
// trailing store of events for performance reasons.
|
||||
//
|
||||
// Subscribers must not mutate the BufferItem or the Events or Encoded payloads
|
||||
// Subscribers must not mutate the bufferItem or the Events or Encoded payloads
|
||||
// inside as these are shared between all readers.
|
||||
type BufferItem struct {
|
||||
type bufferItem struct {
|
||||
// Events is the set of events published at one raft index. This may be nil as
|
||||
// a sentinel value to allow watching for the first event in a buffer. Callers
|
||||
// should check and skip nil Events at any point in the buffer. It will also
|
||||
|
@ -170,10 +170,10 @@ type bufferLink struct {
|
|||
ch chan struct{}
|
||||
}
|
||||
|
||||
// NewBufferItem returns a blank buffer item with a link and chan ready to have
|
||||
// newBufferItem returns a blank buffer item with a link and chan ready to have
|
||||
// the fields set and be appended to a buffer.
|
||||
func NewBufferItem() *BufferItem {
|
||||
return &BufferItem{
|
||||
func newBufferItem() *bufferItem {
|
||||
return &bufferItem{
|
||||
link: &bufferLink{
|
||||
ch: make(chan struct{}),
|
||||
},
|
||||
|
@ -182,7 +182,7 @@ func NewBufferItem() *BufferItem {
|
|||
|
||||
// Next return the next buffer item in the buffer. It may block until ctx is
|
||||
// cancelled or until the next item is published.
|
||||
func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
|
||||
func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) {
|
||||
// See if there is already a next value, block if so. Note we don't rely on
|
||||
// state change (chan nil) as that's not threadsafe but detecting close is.
|
||||
select {
|
||||
|
@ -197,7 +197,7 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
|
|||
// shouldn't be possible
|
||||
return nil, errors.New("invalid next item")
|
||||
}
|
||||
next := nextRaw.(*BufferItem)
|
||||
next := nextRaw.(*bufferItem)
|
||||
if next.Err != nil {
|
||||
return nil, next.Err
|
||||
}
|
||||
|
@ -210,12 +210,12 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) {
|
|||
|
||||
// NextNoBlock returns the next item in the buffer without blocking. If it
|
||||
// reaches the most recent item it will return nil and no error.
|
||||
func (i *BufferItem) NextNoBlock() (*BufferItem, error) {
|
||||
func (i *bufferItem) NextNoBlock() (*bufferItem, error) {
|
||||
nextRaw := i.link.next.Load()
|
||||
if nextRaw == nil {
|
||||
return nil, nil
|
||||
}
|
||||
next := nextRaw.(*BufferItem)
|
||||
next := nextRaw.(*bufferItem)
|
||||
if next.Err != nil {
|
||||
return nil, next.Err
|
||||
}
|
||||
|
@ -230,14 +230,14 @@ func (i *BufferItem) NextNoBlock() (*BufferItem, error) {
|
|||
// one, or if not it returns an empty item (that will be ignored by subscribers)
|
||||
// that has the same link as the current buffer so that it will be notified of
|
||||
// future updates in the buffer without including the current item.
|
||||
func (i *BufferItem) FollowAfter() (*BufferItem, error) {
|
||||
func (i *bufferItem) FollowAfter() (*bufferItem, error) {
|
||||
next, err := i.NextNoBlock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if next == nil {
|
||||
// Return an empty item that can be followed to the next item published.
|
||||
item := &BufferItem{}
|
||||
item := &bufferItem{}
|
||||
item.link = i.link
|
||||
return item, nil
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestEventBufferFuzz(t *testing.T) {
|
|||
nReaders := 1000
|
||||
nMessages := 1000
|
||||
|
||||
b := NewEventBuffer()
|
||||
b := newEventBuffer()
|
||||
|
||||
// Start a write goroutine that will publish 10000 messages with sequential
|
||||
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
||||
|
|
|
@ -33,11 +33,11 @@ type EventPublisher struct {
|
|||
|
||||
// topicBuffers stores the head of the linked-list buffer to publish events to
|
||||
// for a topic.
|
||||
topicBuffers map[Topic]*EventBuffer
|
||||
topicBuffers map[Topic]*eventBuffer
|
||||
|
||||
// snapCache if a cache of EventSnapshots indexed by topic and key.
|
||||
// TODO: new struct for snapCache and snapFns and snapCacheTTL
|
||||
snapCache map[Topic]map[string]*EventSnapshot
|
||||
snapCache map[Topic]map[string]*eventSnapshot
|
||||
|
||||
subscriptions *subscriptions
|
||||
|
||||
|
@ -69,13 +69,20 @@ type changeEvents struct {
|
|||
// TopicHandler provides functions which create stream.Events for a topic.
|
||||
type TopicHandler struct {
|
||||
// Snapshot creates the necessary events to reproduce the current state and
|
||||
// appends them to the EventBuffer.
|
||||
Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error)
|
||||
// appends them to the eventBuffer.
|
||||
Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
||||
// ProcessChanges accepts a slice of Changes, and builds a slice of events for
|
||||
// those changes.
|
||||
ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error)
|
||||
}
|
||||
|
||||
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
||||
type SnapshotAppender interface {
|
||||
// Append events to the snapshot.
|
||||
// TODO: document why parameter is a slice instead of a single Event
|
||||
Append(events []Event)
|
||||
}
|
||||
|
||||
// NewEventPublisher returns an EventPublisher for publishing change events.
|
||||
// Handlers are used to convert the memDB changes into events.
|
||||
// A goroutine is run in the background to publish events to all subscribes.
|
||||
|
@ -84,8 +91,8 @@ type TopicHandler struct {
|
|||
func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher {
|
||||
e := &EventPublisher{
|
||||
snapCacheTTL: snapCacheTTL,
|
||||
topicBuffers: make(map[Topic]*EventBuffer),
|
||||
snapCache: make(map[Topic]map[string]*EventSnapshot),
|
||||
topicBuffers: make(map[Topic]*eventBuffer),
|
||||
snapCache: make(map[Topic]map[string]*eventSnapshot),
|
||||
publishCh: make(chan changeEvents, 64),
|
||||
subscriptions: &subscriptions{
|
||||
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
||||
|
@ -136,8 +143,8 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
|||
// as any ACL update events to cause affected listeners to reset their stream.
|
||||
func (e *EventPublisher) sendEvents(update changeEvents) {
|
||||
for _, event := range update.events {
|
||||
if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok {
|
||||
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs)
|
||||
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
||||
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,10 +167,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) {
|
|||
// already exist.
|
||||
//
|
||||
// EventPublisher.lock must be held to call this method.
|
||||
func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer {
|
||||
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
||||
buf, ok := e.topicBuffers[topic]
|
||||
if !ok {
|
||||
buf = NewEventBuffer()
|
||||
buf = newEventBuffer()
|
||||
e.topicBuffers[topic] = buf
|
||||
}
|
||||
return buf
|
||||
|
@ -207,10 +214,10 @@ func (e *EventPublisher) Subscribe(
|
|||
Index: req.Index,
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Payload: ResumeStream{},
|
||||
Payload: resumeStream{},
|
||||
}
|
||||
// Make a new buffer to send to the client containing the resume.
|
||||
buf := NewEventBuffer()
|
||||
buf := newEventBuffer()
|
||||
|
||||
// Store the head of that buffer before we append to it to give as the
|
||||
// starting point for the subscription.
|
||||
|
@ -226,13 +233,13 @@ func (e *EventPublisher) Subscribe(
|
|||
}
|
||||
buf.AppendBuffer(follow)
|
||||
|
||||
sub = NewSubscription(ctx, req, subHead)
|
||||
sub = newSubscription(ctx, req, subHead)
|
||||
} else {
|
||||
snap, err := e.getSnapshotLocked(req, topicHead)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sub = NewSubscription(ctx, req, snap.Snap)
|
||||
sub = newSubscription(ctx, req, snap.Snap)
|
||||
}
|
||||
|
||||
e.subscriptions.add(req, sub)
|
||||
|
@ -288,16 +295,16 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) {
|
||||
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
|
||||
// See if there is a cached snapshot
|
||||
topicSnaps, ok := e.snapCache[req.Topic]
|
||||
if !ok {
|
||||
topicSnaps = make(map[string]*EventSnapshot)
|
||||
topicSnaps = make(map[string]*eventSnapshot)
|
||||
e.snapCache[req.Topic] = topicSnaps
|
||||
}
|
||||
|
||||
snap, ok := topicSnaps[req.Key]
|
||||
if ok && snap.Err() == nil {
|
||||
if ok && snap.err() == nil {
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
|
@ -307,7 +314,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *Buf
|
|||
return nil, fmt.Errorf("unknown topic %d", req.Topic)
|
||||
}
|
||||
|
||||
snap = NewEventSnapshot(req, topicHead, handler.Snapshot)
|
||||
snap = newEventSnapshot(req, topicHead, handler.Snapshot)
|
||||
if e.snapCacheTTL > 0 {
|
||||
topicSnaps[req.Key] = snap
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
|
|||
func newTestTopicHandlers() map[Topic]TopicHandler {
|
||||
return map[Topic]TopicHandler{
|
||||
testTopic: {
|
||||
Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
|
||||
Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
if req.Topic != testTopic {
|
||||
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
|
||||
}
|
||||
|
|
|
@ -1,37 +1,35 @@
|
|||
package stream
|
||||
|
||||
// EventSnapshot represents the state of memdb for a given topic and key at some
|
||||
// eventSnapshot represents the state of memdb for a given topic and key at some
|
||||
// point in time. It is modelled as a buffer of events so that snapshots can be
|
||||
// streamed to possibly multiple subscribers concurrently, and can be trivially
|
||||
// cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot
|
||||
// cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot
|
||||
// is dropped from memory, any subscribers still reading from it may do so by following
|
||||
// their pointers. When the last subscribe unsubscribes the snapshot is garbage
|
||||
// collected automatically by Go's runtime. This simplifies snapshot and buffer
|
||||
// management dramatically.
|
||||
type EventSnapshot struct {
|
||||
type eventSnapshot struct {
|
||||
// Snap is the first item in the buffer containing the snapshot. Once the
|
||||
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
|
||||
// so that subscribers receive all the events from the same buffer.
|
||||
Snap *BufferItem
|
||||
Snap *bufferItem
|
||||
|
||||
// snapBuffer is the Head of the snapshot buffer the fn should write to.
|
||||
snapBuffer *EventBuffer
|
||||
snapBuffer *eventBuffer
|
||||
}
|
||||
|
||||
// SnapFn is the type of function needed to generate a snapshot for a topic and
|
||||
// key.
|
||||
type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error)
|
||||
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
|
||||
|
||||
// NewEventSnapshot creates a snapshot buffer based on the subscription request.
|
||||
// newEventSnapshot creates a snapshot buffer based on the subscription request.
|
||||
// The current buffer head for the topic in question is passed so that once the
|
||||
// snapshot is complete and has been delivered into the buffer, any events
|
||||
// published during snapshotting can be immediately appended and won't be
|
||||
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
|
||||
// snapshot buffer so that subscribers will naturally follow from the snapshot
|
||||
// to wait for any subsequent updates.
|
||||
func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot {
|
||||
buf := NewEventBuffer()
|
||||
s := &EventSnapshot{
|
||||
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot {
|
||||
buf := newEventBuffer()
|
||||
s := &eventSnapshot{
|
||||
Snap: buf.Head(),
|
||||
snapBuffer: buf,
|
||||
}
|
||||
|
@ -54,7 +52,7 @@ func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn Sna
|
|||
return s
|
||||
}
|
||||
|
||||
func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx uint64) {
|
||||
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
|
||||
// Now splice on the topic buffer. We need to iterate through the buffer to
|
||||
// find the first event after the current snapshot.
|
||||
item := topicBufferHead
|
||||
|
@ -102,10 +100,10 @@ func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx u
|
|||
}
|
||||
}
|
||||
|
||||
// Err returns an error if the snapshot func has failed with an error or nil
|
||||
// err returns an error if the snapshot func has failed with an error or nil
|
||||
// otherwise. Nil doesn't necessarily mean there won't be an error but there
|
||||
// hasn't been one yet.
|
||||
func (s *EventSnapshot) Err() error {
|
||||
func (s *eventSnapshot) err() error {
|
||||
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
|
||||
// then the last event will be an error.
|
||||
head := s.snapBuffer.Head()
|
||||
|
|
|
@ -70,10 +70,10 @@ func TestEventSnapshot(t *testing.T) {
|
|||
snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex)
|
||||
|
||||
// Create a topic buffer for updates
|
||||
tb := NewEventBuffer()
|
||||
tb := newEventBuffer()
|
||||
|
||||
// Capture the topic buffer head now so updatesBeforeSnap are "concurrent"
|
||||
// and are seen by the EventSnapshot once it completes the snap.
|
||||
// and are seen by the eventSnapshot once it completes the snap.
|
||||
tbHead := tb.Head()
|
||||
|
||||
// Deliver any pre-snapshot events simulating updates that occur after the
|
||||
|
@ -87,9 +87,9 @@ func TestEventSnapshot(t *testing.T) {
|
|||
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
|
||||
}
|
||||
|
||||
// Create EventSnapshot, (will call snFn in another goroutine). The
|
||||
// Request is ignored by the SnapFn so doesn't matter for now.
|
||||
es := NewEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
|
||||
// Create eventSnapshot, (will call snFn in another goroutine). The
|
||||
// Request is ignored by the snapFunc so doesn't matter for now.
|
||||
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
|
||||
|
||||
// Deliver any post-snapshot events simulating updates that occur
|
||||
// logically after snapshot. It doesn't matter that these might actually
|
||||
|
@ -155,8 +155,8 @@ func genSequentialIDs(start, end int) []string {
|
|||
return ids
|
||||
}
|
||||
|
||||
func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn {
|
||||
return func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
|
||||
func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
|
||||
return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
for i := 0; i < size; i++ {
|
||||
// Event content is arbitrary we are just using Health because it's the
|
||||
// first type defined. We just want a set of things with consecutive
|
||||
|
|
|
@ -17,11 +17,9 @@ const (
|
|||
subscriptionStateClosed uint32 = 1
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrSubscriptionReload is a error signalling reload event should be sent to
|
||||
// the client and the server should close.
|
||||
ErrSubscriptionReload = errors.New("subscription closed by server, client should retry")
|
||||
)
|
||||
// ErrSubscriptionClosed is a error signalling the subscription has been
|
||||
// closed. The client should Unsubscribe, then re-Subscribe.
|
||||
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry")
|
||||
|
||||
// Subscription holds state about a single Subscribe call. Subscribe clients
|
||||
// access their next event by calling Next(). This may initially include the
|
||||
|
@ -35,7 +33,7 @@ type Subscription struct {
|
|||
|
||||
// currentItem stores the current snapshot or topic buffer item we are on. It
|
||||
// is mutated by calls to Next.
|
||||
currentItem *BufferItem
|
||||
currentItem *bufferItem
|
||||
|
||||
// ctx is the Subscription context that wraps the context of the streaming RPC
|
||||
// handler call.
|
||||
|
@ -59,9 +57,9 @@ type SubscribeRequest struct {
|
|||
Index uint64
|
||||
}
|
||||
|
||||
// NewSubscription return a new subscription. The caller is responsible for
|
||||
// newSubscription return a new subscription. The caller is responsible for
|
||||
// calling Unsubscribe when it is done with the subscription, to free resources.
|
||||
func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription {
|
||||
func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription {
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
return &Subscription{
|
||||
ctx: subCtx,
|
||||
|
@ -75,7 +73,7 @@ func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferIte
|
|||
// single goroutine concurrently as it mutates the Subscription.
|
||||
func (s *Subscription) Next() ([]Event, error) {
|
||||
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
||||
return nil, ErrSubscriptionReload
|
||||
return nil, ErrSubscriptionClosed
|
||||
}
|
||||
|
||||
for {
|
||||
|
@ -83,7 +81,7 @@ func (s *Subscription) Next() ([]Event, error) {
|
|||
if err != nil {
|
||||
// Check we didn't return because of a state change cancelling the context
|
||||
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
||||
return nil, ErrSubscriptionReload
|
||||
return nil, ErrSubscriptionClosed
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
func TestSubscription(t *testing.T) {
|
||||
eb := NewEventBuffer()
|
||||
eb := newEventBuffer()
|
||||
|
||||
index := uint64(100)
|
||||
|
||||
|
@ -26,7 +26,7 @@ func TestSubscription(t *testing.T) {
|
|||
Topic: Topic_ServiceHealth,
|
||||
Key: "test",
|
||||
}
|
||||
sub := NewSubscription(ctx, req, startHead)
|
||||
sub := newSubscription(ctx, req, startHead)
|
||||
|
||||
// First call to sub.Next should return our published event immediately
|
||||
start := time.Now()
|
||||
|
@ -89,7 +89,7 @@ func TestSubscription(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSubscription_Close(t *testing.T) {
|
||||
eb := NewEventBuffer()
|
||||
eb := newEventBuffer()
|
||||
|
||||
index := uint64(100)
|
||||
|
||||
|
@ -106,7 +106,7 @@ func TestSubscription_Close(t *testing.T) {
|
|||
Topic: Topic_ServiceHealth,
|
||||
Key: "test",
|
||||
}
|
||||
sub := NewSubscription(ctx, req, startHead)
|
||||
sub := newSubscription(ctx, req, startHead)
|
||||
|
||||
// First call to sub.Next should return our published event immediately
|
||||
start := time.Now()
|
||||
|
@ -128,14 +128,14 @@ func TestSubscription_Close(t *testing.T) {
|
|||
_, err = sub.Next()
|
||||
elapsed = time.Since(start)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, ErrSubscriptionReload, err)
|
||||
require.Equal(t, ErrSubscriptionClosed, err)
|
||||
require.True(t, elapsed > 200*time.Millisecond,
|
||||
"Reload should have happened after blocking 200ms, took %s", elapsed)
|
||||
require.True(t, elapsed < 2*time.Second,
|
||||
"Reload should have been delivered after short time, took %s", elapsed)
|
||||
}
|
||||
|
||||
func publishTestEvent(index uint64, b *EventBuffer, key string) {
|
||||
func publishTestEvent(index uint64, b *eventBuffer, key string) {
|
||||
// Don't care about the event payload for now just the semantics of publishing
|
||||
// something. This is not a valid stream in the end-to-end streaming protocol
|
||||
// but enough to test subscription mechanics.
|
||||
|
|
Loading…
Reference in New Issue