stream: refactor to support change in framing events
Removing EndOfEmptySnapshot, add NewSnapshotToFollow
This commit is contained in:
parent
a442fcd6cd
commit
0769f54fe1
|
@ -20,22 +20,22 @@ type Event struct {
|
|||
}
|
||||
|
||||
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
||||
// snapshot has completed. Future events from Subscription.Next will be
|
||||
// change events.
|
||||
// snapshot has completed. Subsequent events from Subscription.Next will be
|
||||
// streamed as they occur.
|
||||
func (e Event) IsEndOfSnapshot() bool {
|
||||
return e.Payload == endOfSnapshot{}
|
||||
}
|
||||
|
||||
// IsEndOfEmptySnapshot returns true if this is a framing event that indicates
|
||||
// there is no snapshot. Future events from Subscription.Next will be
|
||||
// change events.
|
||||
func (e Event) IsEndOfEmptySnapshot() bool {
|
||||
return e.Payload == endOfEmptySnapshot{}
|
||||
// IsNewSnapshotToFollow returns true if this is a framing event that indicates
|
||||
// that the clients view is stale, and must be reset. Subsequent events from
|
||||
// Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
|
||||
func (e Event) IsNewSnapshotToFollow() bool {
|
||||
return e.Payload == newSnapshotToFollow{}
|
||||
}
|
||||
|
||||
type endOfSnapshot struct{}
|
||||
|
||||
type endOfEmptySnapshot struct{}
|
||||
type newSnapshotToFollow struct{}
|
||||
|
||||
type closeSubscriptionPayload struct {
|
||||
tokensSecretIDs []string
|
||||
|
|
|
@ -216,3 +216,9 @@ func (i *bufferItem) NextLink() *bufferItem {
|
|||
}
|
||||
return next
|
||||
}
|
||||
|
||||
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
|
||||
// false if there are no events stored in the item, or the index does not match.
|
||||
func (i *bufferItem) HasEventIndex(index uint64) bool {
|
||||
return len(i.Events) > 0 && i.Events[0].Index == index
|
||||
}
|
||||
|
|
|
@ -157,8 +157,7 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
|||
// When the caller is finished with the subscription for any reason, it must
|
||||
// call Subscription.Unsubscribe to free ACL tracking resources.
|
||||
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
|
||||
// Ensure we know how to make a snapshot for this topic
|
||||
_, ok := e.snapshotHandlers[req.Topic]
|
||||
handler, ok := e.snapshotHandlers[req.Topic]
|
||||
if !ok || req.Topic == nil {
|
||||
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
||||
}
|
||||
|
@ -166,47 +165,48 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
|||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
|
||||
// Ensure there is a topic buffer for that topic so we start capturing any
|
||||
// future published events.
|
||||
buf := e.getTopicBuffer(req.Topic)
|
||||
topicHead := e.getTopicBuffer(req.Topic).Head()
|
||||
|
||||
// See if we need a snapshot
|
||||
topicHead := buf.Head()
|
||||
var sub *Subscription
|
||||
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
||||
// No need for a snapshot, send the "end of empty snapshot" message to signal to
|
||||
// client its cache is still good, then follow along from here in the topic.
|
||||
// If the client view is fresh, resume the stream.
|
||||
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
|
||||
buf := newEventBuffer()
|
||||
|
||||
// Store the head of that buffer before we append to it to give as the
|
||||
// starting point for the subscription.
|
||||
subHead := buf.Head()
|
||||
|
||||
buf.Append([]Event{{
|
||||
Index: req.Index,
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Payload: endOfEmptySnapshot{},
|
||||
}})
|
||||
|
||||
// Now splice the rest of the topic buffer on so the subscription will
|
||||
// continue to see future updates in the topic buffer.
|
||||
subscriptionHead := buf.Head()
|
||||
// splice the rest of the topic buffer onto the subscription buffer so
|
||||
// the subscription will receive new events.
|
||||
buf.AppendItem(topicHead.NextLink())
|
||||
|
||||
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
|
||||
} else {
|
||||
snap, err := e.getSnapshotLocked(req, topicHead)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
|
||||
return e.subscriptions.add(req, subscriptionHead), nil
|
||||
}
|
||||
|
||||
e.subscriptions.add(req, sub)
|
||||
return sub, nil
|
||||
snapFromCache := e.getCachedSnapshotLocked(req)
|
||||
if req.Index == 0 && snapFromCache != nil {
|
||||
return e.subscriptions.add(req, snapFromCache.First), nil
|
||||
}
|
||||
snap := newEventSnapshot()
|
||||
|
||||
// TODO: testcase for this case, especially the from-cache-splice case
|
||||
// if the request has an Index the client view is stale and must be reset
|
||||
// with a NewSnapshotToFollow event.
|
||||
if req.Index > 0 {
|
||||
snap.buffer.Append([]Event{{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Payload: newSnapshotToFollow{},
|
||||
}})
|
||||
|
||||
if snapFromCache != nil {
|
||||
snap.buffer.AppendItem(snapFromCache.First)
|
||||
return e.subscriptions.add(req, snap.First), nil
|
||||
}
|
||||
}
|
||||
|
||||
snap.appendAndSplice(*req, handler, topicHead)
|
||||
e.setCachedSnapshotLocked(req, snap)
|
||||
return e.subscriptions.add(req, snap.First), nil
|
||||
}
|
||||
|
||||
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
|
||||
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
|
||||
sub := newSubscription(*req, head, s.unsubscribe(req))
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
|
@ -216,6 +216,7 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
|
|||
s.byToken[req.Token] = subsByToken
|
||||
}
|
||||
subsByToken[req] = sub
|
||||
return sub
|
||||
}
|
||||
|
||||
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
||||
|
@ -263,7 +264,8 @@ func (s *subscriptions) closeAll() {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
|
||||
// EventPublisher.lock must be held to call this method.
|
||||
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
|
||||
topicSnaps, ok := e.snapCache[req.Topic]
|
||||
if !ok {
|
||||
topicSnaps = make(map[string]*eventSnapshot)
|
||||
|
@ -272,25 +274,22 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
|
|||
|
||||
snap, ok := topicSnaps[req.Key]
|
||||
if ok && snap.err() == nil {
|
||||
return snap, nil
|
||||
return snap
|
||||
}
|
||||
|
||||
handler, ok := e.snapshotHandlers[req.Topic]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
||||
}
|
||||
|
||||
snap = newEventSnapshot(req, topicHead, handler)
|
||||
if e.snapCacheTTL > 0 {
|
||||
topicSnaps[req.Key] = snap
|
||||
|
||||
// Trigger a clearout after TTL
|
||||
time.AfterFunc(e.snapCacheTTL, func() {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
delete(topicSnaps, req.Key)
|
||||
})
|
||||
}
|
||||
|
||||
return snap, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// EventPublisher.lock must be held to call this method.
|
||||
func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *eventSnapshot) {
|
||||
if e.snapCacheTTL == 0 {
|
||||
return
|
||||
}
|
||||
e.snapCache[req.Topic][req.Key] = snap
|
||||
|
||||
// Setup a cache eviction
|
||||
time.AfterFunc(e.snapCacheTTL, func() {
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
delete(e.snapCache[req.Topic], req.Key)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,50 +9,45 @@ package stream
|
|||
// collected automatically by Go's runtime. This simplifies snapshot and buffer
|
||||
// management dramatically.
|
||||
type eventSnapshot struct {
|
||||
// Head is the first item in the buffer containing the snapshot. Once the
|
||||
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
|
||||
// so that subscribers receive all the events from the same buffer.
|
||||
Head *bufferItem
|
||||
// First item in the buffer. Used as the Head of a subscription, or to
|
||||
// splice this snapshot onto another one.
|
||||
First *bufferItem
|
||||
|
||||
// snapBuffer is the Head of the snapshot buffer the fn should write to.
|
||||
snapBuffer *eventBuffer
|
||||
// buffer is the Head of the snapshot buffer the fn should write to.
|
||||
buffer *eventBuffer
|
||||
}
|
||||
|
||||
// newEventSnapshot creates a snapshot buffer based on the subscription request.
|
||||
// The current buffer head for the topic requested is passed so that once the
|
||||
// snapshot is complete and has been delivered into the buffer, any events
|
||||
// published during snapshotting can be immediately appended and won't be
|
||||
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
|
||||
// snapshot buffer so that subscribers will naturally follow from the snapshot
|
||||
// to wait for any subsequent updates.
|
||||
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
|
||||
buf := newEventBuffer()
|
||||
s := &eventSnapshot{
|
||||
Head: buf.Head(),
|
||||
snapBuffer: buf,
|
||||
// newEventSnapshot creates an empty snapshot buffer.
|
||||
func newEventSnapshot() *eventSnapshot {
|
||||
snapBuffer := newEventBuffer()
|
||||
return &eventSnapshot{
|
||||
First: snapBuffer.Head(),
|
||||
buffer: snapBuffer,
|
||||
}
|
||||
|
||||
go func() {
|
||||
idx, err := fn(*req, s.snapBuffer)
|
||||
if err != nil {
|
||||
s.snapBuffer.AppendItem(&bufferItem{Err: err})
|
||||
return
|
||||
}
|
||||
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
|
||||
s.snapBuffer.Append([]Event{{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Index: idx,
|
||||
Payload: endOfSnapshot{},
|
||||
}})
|
||||
s.spliceFromTopicBuffer(topicBufferHead, idx)
|
||||
}()
|
||||
return s
|
||||
}
|
||||
|
||||
// appendAndSlice populates the snapshot buffer by calling the SnapshotFunc,
|
||||
// then adding an endOfSnapshot framing event, and finally by splicing in
|
||||
// events from the topicBuffer.
|
||||
func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, topicBufferHead *bufferItem) {
|
||||
idx, err := fn(req, s.buffer)
|
||||
if err != nil {
|
||||
s.buffer.AppendItem(&bufferItem{Err: err})
|
||||
return
|
||||
}
|
||||
s.buffer.Append([]Event{{
|
||||
Topic: req.Topic,
|
||||
Key: req.Key,
|
||||
Index: idx,
|
||||
Payload: endOfSnapshot{},
|
||||
}})
|
||||
s.spliceFromTopicBuffer(topicBufferHead, idx)
|
||||
}
|
||||
|
||||
// spliceFromTopicBuffer traverses the topicBuffer looking for the last item
|
||||
// in the buffer, or the first item where the index is greater than idx. Once
|
||||
// the item is found it is appended to the snapshot buffer.
|
||||
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
|
||||
// Now splice on the topic buffer. We need to iterate through the buffer to
|
||||
// find the first event after the current snapshot.
|
||||
item := topicBufferHead
|
||||
for {
|
||||
next := item.NextNoBlock()
|
||||
|
@ -62,7 +57,7 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
|
|||
// the snapshot completed). We don't want any of the events (if any) in
|
||||
// the snapshot buffer as they came before the snapshot but we do need to
|
||||
// wait for the next update.
|
||||
s.snapBuffer.AppendItem(item.NextLink())
|
||||
s.buffer.AppendItem(item.NextLink())
|
||||
return
|
||||
|
||||
case next.Err != nil:
|
||||
|
@ -71,14 +66,14 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
|
|||
// buffer which does not contain a snapshot.
|
||||
// Handle this case anyway in case errors can come from other places
|
||||
// in the future.
|
||||
s.snapBuffer.AppendItem(next)
|
||||
s.buffer.AppendItem(next)
|
||||
return
|
||||
|
||||
case len(next.Events) > 0 && next.Events[0].Index > idx:
|
||||
// We've found an update in the topic buffer that happened after our
|
||||
// snapshot was taken, splice it into the snapshot buffer so subscribers
|
||||
// can continue to read this and others after it.
|
||||
s.snapBuffer.AppendItem(next)
|
||||
s.buffer.AppendItem(next)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -93,6 +88,6 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
|
|||
func (s *eventSnapshot) err() error {
|
||||
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
|
||||
// then the last event will be an error.
|
||||
head := s.snapBuffer.Head()
|
||||
head := s.buffer.Head()
|
||||
return head.Err
|
||||
}
|
||||
|
|
|
@ -87,9 +87,8 @@ func TestEventSnapshot(t *testing.T) {
|
|||
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
|
||||
}
|
||||
|
||||
// Create eventSnapshot, (will call snFn in another goroutine). The
|
||||
// Request is ignored by the snapFunc so doesn't matter for now.
|
||||
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
|
||||
es := newEventSnapshot()
|
||||
es.appendAndSplice(SubscribeRequest{}, snFn, tbHead)
|
||||
|
||||
// Deliver any post-snapshot events simulating updates that occur
|
||||
// logically after snapshot. It doesn't matter that these might actually
|
||||
|
@ -112,7 +111,7 @@ func TestEventSnapshot(t *testing.T) {
|
|||
snapIDs := make([]string, 0, tc.snapshotSize)
|
||||
updateIDs := make([]string, 0, tc.updatesAfterSnap)
|
||||
snapDone := false
|
||||
curItem := es.Head
|
||||
curItem := es.First
|
||||
var err error
|
||||
RECV:
|
||||
for {
|
||||
|
|
|
@ -11,7 +11,7 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
|
|||
require.True(t, e.IsEndOfSnapshot())
|
||||
|
||||
t.Run("not EndOfSnapshot", func(t *testing.T) {
|
||||
e := Event{Payload: endOfEmptySnapshot{}}
|
||||
e := Event{Payload: newSnapshotToFollow{}}
|
||||
require.False(t, e.IsEndOfSnapshot())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ type Subscription struct {
|
|||
state uint32
|
||||
|
||||
// req is the requests that we are responding to
|
||||
req *SubscribeRequest
|
||||
req SubscribeRequest
|
||||
|
||||
// currentItem stores the current snapshot or topic buffer item we are on. It
|
||||
// is mutated by calls to Next.
|
||||
|
@ -56,7 +56,7 @@ type SubscribeRequest struct {
|
|||
|
||||
// newSubscription return a new subscription. The caller is responsible for
|
||||
// calling Unsubscribe when it is done with the subscription, to free resources.
|
||||
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
||||
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
||||
return &Subscription{
|
||||
forceClosed: make(chan struct{}),
|
||||
req: req,
|
||||
|
|
|
@ -23,8 +23,7 @@ func TestSubscription(t *testing.T) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Create a subscription
|
||||
req := &SubscribeRequest{
|
||||
req := SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "test",
|
||||
}
|
||||
|
@ -103,8 +102,7 @@ func TestSubscription_Close(t *testing.T) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Create a subscription
|
||||
req := &SubscribeRequest{
|
||||
req := SubscribeRequest{
|
||||
Topic: testTopic,
|
||||
Key: "test",
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue