Merge pull request #8799 from hashicorp/streaming/rename-framing-events

stream: remove EndOfEmptySnapshot, add NewSnapshotToFollow
This commit is contained in:
Daniel Nephin 2020-10-06 12:42:58 -04:00 committed by GitHub
commit ae433947a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 487 additions and 243 deletions

View File

@ -20,22 +20,22 @@ type Event struct {
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Future events from Subscription.Next will be
// change events.
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.
func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{}
}
// IsEndOfEmptySnapshot returns true if this is a framing event that indicates
// there is no snapshot. Future events from Subscription.Next will be
// change events.
func (e Event) IsEndOfEmptySnapshot() bool {
return e.Payload == endOfEmptySnapshot{}
// IsNewSnapshotToFollow returns true if this is a framing event that indicates
// that the clients view is stale, and must be reset. Subsequent events from
// Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == newSnapshotToFollow{}
}
type endOfSnapshot struct{}
type endOfEmptySnapshot struct{}
type newSnapshotToFollow struct{}
type closeSubscriptionPayload struct {
tokensSecretIDs []string

View File

@ -216,3 +216,9 @@ func (i *bufferItem) NextLink() *bufferItem {
}
return next
}
// HasEventIndex returns true if index matches the Event.Index of this item. Returns
// false if there are no events stored in the item, or the index does not match.
func (i *bufferItem) HasEventIndex(index uint64) bool {
return len(i.Events) > 0 && i.Events[0].Index == index
}

View File

@ -36,7 +36,7 @@ type EventPublisher struct {
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan changeEvents
publishCh chan []Event
snapshotHandlers SnapshotHandlers
}
@ -54,10 +54,6 @@ type subscriptions struct {
byToken map[string]map[*SubscribeRequest]*Subscription
}
type changeEvents struct {
events []Event
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used.
@ -84,7 +80,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot),
publishCh: make(chan changeEvents, 64),
publishCh: make(chan []Event, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
@ -97,7 +93,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
e.publishCh <- events
}
}
@ -110,16 +106,16 @@ func (e *EventPublisher) Run(ctx context.Context) {
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.sendEvents(update)
e.publishEvent(update)
}
}
}
// sendEvents sends the given events to any applicable topic listeners, as well
// as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) {
// publishEvent appends the events to any applicable topic buffers. It handles
// any closeSubscriptionPayload events by closing associated subscriptions.
func (e *EventPublisher) publishEvent(events []Event) {
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
for _, event := range events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
continue
@ -157,8 +153,7 @@ func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
// When the caller is finished with the subscription for any reason, it must
// call Subscription.Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
// Ensure we know how to make a snapshot for this topic
_, ok := e.snapshotHandlers[req.Topic]
handler, ok := e.snapshotHandlers[req.Topic]
if !ok || req.Topic == nil {
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
@ -166,47 +161,47 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
e.lock.Lock()
defer e.lock.Unlock()
// Ensure there is a topic buffer for that topic so we start capturing any
// future published events.
buf := e.getTopicBuffer(req.Topic)
topicHead := e.getTopicBuffer(req.Topic).Head()
// See if we need a snapshot
topicHead := buf.Head()
var sub *Subscription
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
// No need for a snapshot, send the "end of empty snapshot" message to signal to
// client its cache is still good, then follow along from here in the topic.
// If the client view is fresh, resume the stream.
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
buf := newEventBuffer()
// Store the head of that buffer before we append to it to give as the
// starting point for the subscription.
subHead := buf.Head()
buf.Append([]Event{{
Index: req.Index,
Topic: req.Topic,
Key: req.Key,
Payload: endOfEmptySnapshot{},
}})
// Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer.
subscriptionHead := buf.Head()
// splice the rest of the topic buffer onto the subscription buffer so
// the subscription will receive new events.
buf.AppendItem(topicHead.NextLink())
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
} else {
snap, err := e.getSnapshotLocked(req, topicHead)
if err != nil {
return nil, err
}
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
return e.subscriptions.add(req, subscriptionHead), nil
}
e.subscriptions.add(req, sub)
return sub, nil
snapFromCache := e.getCachedSnapshotLocked(req)
if req.Index == 0 && snapFromCache != nil {
return e.subscriptions.add(req, snapFromCache.First), nil
}
snap := newEventSnapshot()
// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Payload: newSnapshotToFollow{},
}})
if snapFromCache != nil {
snap.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, snap.First), nil
}
}
snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
return e.subscriptions.add(req, snap.First), nil
}
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
sub := newSubscription(*req, head, s.unsubscribe(req))
s.lock.Lock()
defer s.lock.Unlock()
@ -216,6 +211,7 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
return sub
}
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
@ -263,7 +259,8 @@ func (s *subscriptions) closeAll() {
}
}
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*eventSnapshot)
@ -272,25 +269,22 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
snap, ok := topicSnaps[req.Key]
if ok && snap.err() == nil {
return snap, nil
return snap
}
handler, ok := e.snapshotHandlers[req.Topic]
if !ok {
return nil, fmt.Errorf("unknown topic %v", req.Topic)
}
snap = newEventSnapshot(req, topicHead, handler)
if e.snapCacheTTL > 0 {
topicSnaps[req.Key] = snap
// Trigger a clearout after TTL
time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock()
defer e.lock.Unlock()
delete(topicSnaps, req.Key)
})
}
return snap, nil
return nil
}
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *eventSnapshot) {
if e.snapCacheTTL == 0 {
return
}
e.snapCache[req.Topic][req.Key] = snap
// Setup a cache eviction
time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.snapCache[req.Topic], req.Key)
})
}

View File

@ -17,8 +17,8 @@ func (i intTopic) String() string {
var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{
func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
@ -28,20 +28,18 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
sub, err := publisher.Subscribe(subscription)
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
eventCh := runSubscription(ctx, sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
require.Equal(t, expected, result.Events)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
result = nextResult(t, eventCh)
require.Len(t, result.Events, 1)
require.True(t, result.Events[0].IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
@ -52,10 +50,16 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher.Publish(events)
// Subscriber should see the published event
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
next = getNextEvents(t, eventCh)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events)
require.Equal(t, expected, next)
}
var testSnapshotEvent = Event{
Topic: testTopic,
Payload: "snapshot-event-payload",
Key: "sub-key",
Index: 1,
}
func newTestSnapshotHandlers() SnapshotHandlers {
@ -64,18 +68,18 @@ func newTestSnapshotHandlers() SnapshotHandlers {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
buf.Append([]Event{testSnapshotEvent})
return 1, nil
},
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
eventCh := make(chan eventOrErr, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
eventCh <- eventOrErr{
Events: es,
Err: err,
}
@ -87,30 +91,31 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR
return eventCh
}
type subNextResult struct {
type eventOrErr struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event {
t.Helper()
select {
case next := <-eventCh:
return next
require.NoError(t, next.Err)
return next.Events
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
t.Fatalf("timeout waiting for event from subscription")
return nil
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
case <-time.After(25 * time.Millisecond):
}
}
@ -156,3 +161,232 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
}
}
}
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
_, err := publisher.Subscribe(req)
require.NoError(t, err)
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Index: 3,
}}
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvents(t, eventCh)
expected = []Event{events[0]}
require.Equal(t, expected, next)
}
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
assertNoResult(t, eventCh)
expected := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
publisher.publishEvent([]Event{expected})
next := getNextEvents(t, eventCh)
require.Equal(t, []Event{expected}, next)
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Equal(t, nextEvent, next[0])
})
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -9,50 +9,45 @@ package stream
// collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type eventSnapshot struct {
// Head is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// so that subscribers receive all the events from the same buffer.
Head *bufferItem
// First item in the buffer. Used as the Head of a subscription, or to
// splice this snapshot onto another one.
First *bufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *eventBuffer
// buffer is the Head of the snapshot buffer the fn should write to.
buffer *eventBuffer
}
// newEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic requested is passed so that once the
// snapshot is complete and has been delivered into the buffer, any events
// published during snapshotting can be immediately appended and won't be
// missed. Once the snapshot is delivered the topic buffer is spliced onto the
// snapshot buffer so that subscribers will naturally follow from the snapshot
// to wait for any subsequent updates.
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
buf := newEventBuffer()
s := &eventSnapshot{
Head: buf.Head(),
snapBuffer: buf,
// newEventSnapshot creates an empty snapshot buffer.
func newEventSnapshot() *eventSnapshot {
snapBuffer := newEventBuffer()
return &eventSnapshot{
First: snapBuffer.Head(),
buffer: snapBuffer,
}
go func() {
idx, err := fn(*req, s.snapBuffer)
if err != nil {
s.snapBuffer.AppendItem(&bufferItem{Err: err})
return
}
// We wrote the snapshot events to the buffer, send the "end of snapshot" event
s.snapBuffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}()
return s
}
// appendAndSlice populates the snapshot buffer by calling the SnapshotFunc,
// then adding an endOfSnapshot framing event, and finally by splicing in
// events from the topicBuffer.
func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, topicBufferHead *bufferItem) {
idx, err := fn(req, s.buffer)
if err != nil {
s.buffer.AppendItem(&bufferItem{Err: err})
return
}
s.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}
// spliceFromTopicBuffer traverses the topicBuffer looking for the last item
// in the buffer, or the first item where the index is greater than idx. Once
// the item is found it is appended to the snapshot buffer.
func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot.
item := topicBufferHead
for {
next := item.NextNoBlock()
@ -62,7 +57,7 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// the snapshot completed). We don't want any of the events (if any) in
// the snapshot buffer as they came before the snapshot but we do need to
// wait for the next update.
s.snapBuffer.AppendItem(item.NextLink())
s.buffer.AppendItem(item.NextLink())
return
case next.Err != nil:
@ -71,14 +66,14 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
// buffer which does not contain a snapshot.
// Handle this case anyway in case errors can come from other places
// in the future.
s.snapBuffer.AppendItem(next)
s.buffer.AppendItem(next)
return
case len(next.Events) > 0 && next.Events[0].Index > idx:
// We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it.
s.snapBuffer.AppendItem(next)
s.buffer.AppendItem(next)
return
}
@ -93,6 +88,6 @@ func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx u
func (s *eventSnapshot) err() error {
// Fetch the head of the buffer, this is atomic. If the snapshot func errored
// then the last event will be an error.
head := s.snapBuffer.Head()
head := s.buffer.Head()
return head.Err
}

View File

@ -87,9 +87,8 @@ func TestEventSnapshot(t *testing.T) {
tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
}
// Create eventSnapshot, (will call snFn in another goroutine). The
// Request is ignored by the snapFunc so doesn't matter for now.
es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
es := newEventSnapshot()
es.appendAndSplice(SubscribeRequest{}, snFn, tbHead)
// Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually
@ -112,7 +111,7 @@ func TestEventSnapshot(t *testing.T) {
snapIDs := make([]string, 0, tc.snapshotSize)
updateIDs := make([]string, 0, tc.updatesAfterSnap)
snapDone := false
curItem := es.Head
curItem := es.First
var err error
RECV:
for {

View File

@ -11,7 +11,7 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
require.True(t, e.IsEndOfSnapshot())
t.Run("not EndOfSnapshot", func(t *testing.T) {
e := Event{Payload: endOfEmptySnapshot{}}
e := Event{Payload: newSnapshotToFollow{}}
require.False(t, e.IsEndOfSnapshot())
})
}

View File

@ -28,7 +28,7 @@ type Subscription struct {
state uint32
// req is the requests that we are responding to
req *SubscribeRequest
req SubscribeRequest
// currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next.
@ -56,7 +56,7 @@ type SubscribeRequest struct {
// newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
req: req,

View File

@ -23,8 +23,7 @@ func TestSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a subscription
req := &SubscribeRequest{
req := SubscribeRequest{
Topic: testTopic,
Key: "test",
}
@ -103,8 +102,7 @@ func TestSubscription_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a subscription
req := &SubscribeRequest{
req := SubscribeRequest{
Topic: testTopic,
Key: "test",
}

View File

@ -200,7 +200,7 @@ type Event struct {
//
// Types that are valid to be assigned to Payload:
// *Event_EndOfSnapshot
// *Event_EndOfEmptySnapshot
// *Event_NewSnapshotToFollow
// *Event_EventBatch
// *Event_ServiceHealth
Payload isEvent_Payload `protobuf_oneof:"Payload"`
@ -251,8 +251,8 @@ type isEvent_Payload interface {
type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"`
}
type Event_EndOfEmptySnapshot struct {
EndOfEmptySnapshot bool `protobuf:"varint,6,opt,name=EndOfEmptySnapshot,proto3,oneof"`
type Event_NewSnapshotToFollow struct {
NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"`
}
type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"`
@ -261,10 +261,10 @@ type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
}
func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_EndOfEmptySnapshot) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {}
func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_NewSnapshotToFollow) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {}
func (m *Event) GetPayload() isEvent_Payload {
if m != nil {
@ -301,9 +301,9 @@ func (m *Event) GetEndOfSnapshot() bool {
return false
}
func (m *Event) GetEndOfEmptySnapshot() bool {
if x, ok := m.GetPayload().(*Event_EndOfEmptySnapshot); ok {
return x.EndOfEmptySnapshot
func (m *Event) GetNewSnapshotToFollow() bool {
if x, ok := m.GetPayload().(*Event_NewSnapshotToFollow); ok {
return x.NewSnapshotToFollow
}
return false
}
@ -326,7 +326,7 @@ func (m *Event) GetServiceHealth() *ServiceHealthUpdate {
func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{
(*Event_EndOfSnapshot)(nil),
(*Event_EndOfEmptySnapshot)(nil),
(*Event_NewSnapshotToFollow)(nil),
(*Event_EventBatch)(nil),
(*Event_ServiceHealth)(nil),
}
@ -343,9 +343,9 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
}
_ = b.EncodeVarint(5<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_EndOfEmptySnapshot:
case *Event_NewSnapshotToFollow:
t := uint64(0)
if x.EndOfEmptySnapshot {
if x.NewSnapshotToFollow {
t = 1
}
_ = b.EncodeVarint(6<<3 | proto.WireVarint)
@ -377,12 +377,12 @@ func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer)
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err
case 6: // Payload.EndOfEmptySnapshot
case 6: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfEmptySnapshot{x != 0}
m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err
case 7: // Payload.EventBatch
if wire != proto.WireBytes {
@ -412,7 +412,7 @@ func _Event_OneofSizer(msg proto.Message) (n int) {
case *Event_EndOfSnapshot:
n += 1 // tag and wire
n += 1
case *Event_EndOfEmptySnapshot:
case *Event_NewSnapshotToFollow:
n += 1 // tag and wire
n += 1
case *Event_EventBatch:
@ -546,40 +546,40 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{
// 521 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x4f, 0x8f, 0xd2, 0x40,
0x14, 0xef, 0xc0, 0x02, 0xcb, 0xc3, 0xdd, 0xd4, 0x11, 0x63, 0xc3, 0x26, 0x0d, 0x12, 0xb3, 0xa9,
0x9b, 0x48, 0x37, 0x98, 0xe8, 0x4d, 0x23, 0x2c, 0x8a, 0x31, 0x11, 0x53, 0xdc, 0x83, 0xde, 0x86,
0xf6, 0x49, 0x1b, 0xd8, 0x99, 0xb1, 0x1d, 0x56, 0xb9, 0xfb, 0x21, 0xf6, 0xcb, 0x78, 0xf7, 0xe8,
0x47, 0x30, 0xf8, 0x45, 0x0c, 0x43, 0xb7, 0x5b, 0x60, 0x6f, 0xde, 0xfa, 0x7e, 0x7f, 0xe6, 0xfd,
0xf2, 0x5e, 0x1f, 0x3c, 0x94, 0xb1, 0x50, 0xc2, 0x95, 0xe3, 0x64, 0x3e, 0x4e, 0xfc, 0x38, 0x1a,
0xa3, 0x9b, 0x7d, 0xb5, 0x35, 0x47, 0xab, 0x19, 0xd0, 0x68, 0x64, 0x6a, 0x8c, 0x2f, 0x23, 0x1f,
0x5d, 0x2e, 0x82, 0x54, 0xd6, 0xba, 0x22, 0x60, 0x8e, 0xae, 0x95, 0x1e, 0x7e, 0x9d, 0x63, 0xa2,
0xe8, 0x31, 0x94, 0x3e, 0x0a, 0x19, 0xf9, 0x16, 0x69, 0x12, 0xe7, 0xb0, 0x63, 0xb6, 0x6f, 0x1e,
0xd7, 0xb8, 0xb7, 0xa6, 0xa9, 0x09, 0xc5, 0x77, 0xb8, 0xb0, 0x0a, 0x4d, 0xe2, 0x54, 0xbd, 0xd5,
0x27, 0xad, 0xaf, 0x9c, 0x53, 0xe4, 0x56, 0x51, 0x63, 0xeb, 0x62, 0x85, 0xbe, 0xe5, 0x01, 0x7e,
0xb7, 0xf6, 0x9a, 0xc4, 0xd9, 0xf3, 0xd6, 0x05, 0xb5, 0x01, 0xce, 0x98, 0x62, 0x3e, 0x72, 0x85,
0xb1, 0x55, 0xd2, 0x86, 0x1c, 0xd2, 0xfa, 0x59, 0x80, 0x52, 0xff, 0x12, 0xf9, 0x7f, 0xe6, 0x59,
0x77, 0x2e, 0xe6, 0x3b, 0x1f, 0xc3, 0x41, 0x9f, 0x07, 0xc3, 0x2f, 0x23, 0xce, 0x64, 0x12, 0x0a,
0xa5, 0x9b, 0xef, 0x0f, 0x0c, 0x6f, 0x13, 0xa6, 0xa7, 0x40, 0x35, 0xd0, 0xbf, 0x90, 0x6a, 0x91,
0x89, 0xcb, 0xa9, 0xf8, 0x16, 0x8e, 0x3e, 0x07, 0xd0, 0x91, 0xbb, 0x4c, 0xf9, 0xa1, 0x55, 0x69,
0x12, 0xa7, 0xd6, 0xb9, 0x9f, 0x8b, 0x7b, 0x43, 0x0e, 0x0c, 0x2f, 0x27, 0xa5, 0xaf, 0xe1, 0x60,
0xb4, 0xde, 0xce, 0x00, 0xd9, 0x4c, 0x85, 0x16, 0x68, 0xaf, 0x9d, 0xf3, 0x6e, 0xf0, 0xe7, 0x32,
0x60, 0x0a, 0x57, 0x91, 0x37, 0xe0, 0x6e, 0x15, 0x2a, 0x1f, 0xd8, 0x62, 0x26, 0x58, 0xd0, 0x7a,
0x96, 0xcf, 0x42, 0x1d, 0x28, 0xeb, 0x2a, 0xb1, 0x48, 0xb3, 0xe8, 0xd4, 0x36, 0x86, 0xa8, 0x09,
0x2f, 0xe5, 0x5b, 0x3f, 0x08, 0xdc, 0xbb, 0xa5, 0x17, 0x7d, 0x04, 0x85, 0xa1, 0x4c, 0x57, 0x50,
0xcf, 0xb9, 0x7b, 0x4c, 0xb1, 0x99, 0x98, 0x0c, 0xa5, 0x57, 0x18, 0x4a, 0xfa, 0x06, 0xcc, 0x5e,
0x88, 0xfe, 0x34, 0x7d, 0xe1, 0xbd, 0x08, 0x50, 0x2f, 0xa4, 0xd6, 0x39, 0x6a, 0x67, 0x7f, 0x60,
0x7b, 0x5b, 0xe2, 0xed, 0x98, 0x4e, 0x5e, 0xa5, 0x4b, 0xa7, 0x35, 0xa8, 0x9c, 0xf3, 0x29, 0x17,
0xdf, 0xb8, 0x69, 0xd0, 0xbb, 0x5b, 0x73, 0x32, 0x09, 0xb5, 0xa0, 0xbe, 0x01, 0xf5, 0x04, 0xe7,
0xe8, 0x2b, 0xb3, 0x70, 0xf2, 0x18, 0xaa, 0x59, 0x38, 0x7a, 0x07, 0xf6, 0x3d, 0x9c, 0x44, 0x89,
0xc2, 0xd8, 0x34, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xd7, 0x35, 0xe9, 0x7c, 0x82, 0x07, 0x23, 0xc5,
0x14, 0xf6, 0x42, 0xc6, 0x27, 0x98, 0x5e, 0x84, 0x54, 0x91, 0xe0, 0xf4, 0x05, 0x54, 0xb3, 0x0b,
0xa1, 0x47, 0xf9, 0x85, 0x6c, 0xdd, 0x4d, 0x63, 0x67, 0xa6, 0x2d, 0xe3, 0x94, 0x74, 0x5f, 0xfe,
0x5a, 0xda, 0xe4, 0xf7, 0xd2, 0x26, 0x7f, 0x96, 0x36, 0xb9, 0xfa, 0x6b, 0x1b, 0x9f, 0x9f, 0x4c,
0x22, 0x15, 0xce, 0xc7, 0x6d, 0x5f, 0x5c, 0xb8, 0x21, 0x4b, 0xc2, 0xc8, 0x17, 0xb1, 0x74, 0x7d,
0xc1, 0x93, 0xf9, 0xcc, 0xdd, 0x39, 0xed, 0x71, 0x59, 0x43, 0x4f, 0xff, 0x05, 0x00, 0x00, 0xff,
0xff, 0x7d, 0xf7, 0xca, 0x01, 0xf6, 0x03, 0x00, 0x00,
// 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75,
0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47,
0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf,
0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e,
0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0,
0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c,
0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed,
0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67,
0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87,
0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c,
0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79,
0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2,
0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc,
0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee,
0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c,
0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99,
0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1,
0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1,
0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a,
0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30,
0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad,
0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b,
0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50,
0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00,
0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a,
0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70,
0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96,
0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9,
0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8,
0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e,
0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -595,18 +595,24 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StateChangeSubscriptionClient interface {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error)
}
@ -653,18 +659,24 @@ func (x *stateChangeSubscriptionSubscribeClient) Recv() (*Event, error) {
// StateChangeSubscriptionServer is the server API for StateChangeSubscription service.
type StateChangeSubscriptionServer interface {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error
}
@ -842,14 +854,14 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0x28
return len(dAtA) - i, nil
}
func (m *Event_EndOfEmptySnapshot) MarshalTo(dAtA []byte) (int, error) {
func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
return m.MarshalToSizedBuffer(dAtA[:m.Size()])
}
func (m *Event_EndOfEmptySnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
i--
if m.EndOfEmptySnapshot {
if m.NewSnapshotToFollow {
dAtA[i] = 1
} else {
dAtA[i] = 0
@ -1058,7 +1070,7 @@ func (m *Event_EndOfSnapshot) Size() (n int) {
n += 2
return n
}
func (m *Event_EndOfEmptySnapshot) Size() (n int) {
func (m *Event_NewSnapshotToFollow) Size() (n int) {
if m == nil {
return 0
}
@ -1444,7 +1456,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
m.Payload = &Event_EndOfSnapshot{b}
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfEmptySnapshot", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
@ -1462,7 +1474,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
}
b := bool(v != 0)
m.Payload = &Event_EndOfEmptySnapshot{b}
m.Payload = &Event_NewSnapshotToFollow{b}
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)

View File

@ -13,18 +13,24 @@ import "proto/pbservice/node.proto";
// state change events. Events are streamed as they happen.
service StateChangeSubscription {
// Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
//
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
//
// Subscribe may return an ABORTED status error to indicate the client must
// re-start the Subscribe call.
// If SubscribeRequest.Index is > 0 it is assumed the client already has a
// snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed
// and the server doesn't know which previously delivered events should
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
// stream, for example because the ACL permissions for the token changed, or
// because the server state was restored from a snapshot.
rpc Subscribe(SubscribeRequest) returns (stream Event) {}
}
@ -92,11 +98,11 @@ message Event {
// ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5;
// EndOfEmptySnapshot indicates that the client is still up-to-date.
// The snapshot has ended, and was empty. The rest of the stream will be
// individual update events. It distinguishes between "up to date, no snapshot"
// and "snapshot contains zero events but you should reset any old state to be blank".
bool EndOfEmptySnapshot = 6;
// NewSnapshotToFollow indicates that the client view is stale. The client
// must reset its view before handing any more events. Subsequent events
// in the stream will be for a new snapshot until an EndOfSnapshot event
// is received.
bool NewSnapshotToFollow = 6;
// EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft