stream: fix a snapshot cache bug
Previously a snapshot created as part of a resumse-stream request could have incorrectly cached the newSnapshotToFollow event. This would cause clients to error because they received an unexpected framing event.
This commit is contained in:
parent
2726c65fbe
commit
a29b848e3b
|
@ -0,0 +1,4 @@
|
||||||
|
```release-note:bug
|
||||||
|
streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients
|
||||||
|
to error until the cache expired.
|
||||||
|
```
|
|
@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapFromCache := e.getCachedSnapshotLocked(req)
|
snapFromCache := e.getCachedSnapshotLocked(req)
|
||||||
if req.Index == 0 && snapFromCache != nil {
|
if snapFromCache == nil {
|
||||||
|
snap := newEventSnapshot()
|
||||||
|
snap.appendAndSplice(*req, handler, topicHead)
|
||||||
|
e.setCachedSnapshotLocked(req, snap)
|
||||||
|
snapFromCache = snap
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the request.Index is 0 the client has no view, send a full snapshot.
|
||||||
|
if req.Index == 0 {
|
||||||
return e.subscriptions.add(req, snapFromCache.First), nil
|
return e.subscriptions.add(req, snapFromCache.First), nil
|
||||||
}
|
}
|
||||||
snap := newEventSnapshot()
|
|
||||||
|
|
||||||
// if the request has an Index the client view is stale and must be reset
|
// otherwise the request has an Index, the client view is stale and must be reset
|
||||||
// with a NewSnapshotToFollow event.
|
// with a NewSnapshotToFollow event.
|
||||||
if req.Index > 0 {
|
result := newEventSnapshot()
|
||||||
snap.buffer.Append([]Event{{
|
result.buffer.Append([]Event{{
|
||||||
Topic: req.Topic,
|
Topic: req.Topic,
|
||||||
Payload: newSnapshotToFollow{},
|
Payload: newSnapshotToFollow{},
|
||||||
}})
|
}})
|
||||||
|
result.buffer.AppendItem(snapFromCache.First)
|
||||||
if snapFromCache != nil {
|
return e.subscriptions.add(req, result.First), nil
|
||||||
snap.buffer.AppendItem(snapFromCache.First)
|
|
||||||
return e.subscriptions.add(req, snap.First), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
snap.appendAndSplice(*req, handler, topicHead)
|
|
||||||
e.setCachedSnapshotLocked(req, snap)
|
|
||||||
return e.subscriptions.add(req, snap.First), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
|
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
|
||||||
|
|
Loading…
Reference in New Issue