subscribe: update to use NewSnapshotToFollow event

This commit is contained in:
Daniel Nephin 2020-10-02 13:55:41 -04:00
parent fa115c6249
commit 3183b9ebb3
3 changed files with 6 additions and 5 deletions

View File

@ -10,7 +10,7 @@ import (
// event is allowed to be sent to this client or not.
func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision {
switch {
case e.IsEndOfSnapshot(), e.IsEndOfEmptySnapshot():
case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow():
return acl.Allow
}

View File

@ -59,10 +59,11 @@ func (l *eventLogger) Trace(e []stream.Event) {
first := e[0]
switch {
case first.IsEndOfSnapshot() || first.IsEndOfEmptySnapshot():
case first.IsEndOfSnapshot():
l.snapshotDone = true
l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count)
case first.IsNewSnapshotToFollow():
return
case l.snapshotDone:
l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e))
}

View File

@ -169,8 +169,8 @@ func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream
case event.IsEndOfSnapshot():
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
return e
case event.IsEndOfEmptySnapshot():
e.Payload = &pbsubscribe.Event_EndOfEmptySnapshot{EndOfEmptySnapshot: true}
case event.IsNewSnapshotToFollow():
e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}
return e
}