diff --git a/.changelog/9772.txt b/.changelog/9772.txt new file mode 100644 index 000000000..6dd048f99 --- /dev/null +++ b/.changelog/9772.txt @@ -0,0 +1,4 @@ +```release-note:bug +streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients +to error until the cache expired. +``` diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 769e875d8..db9ceb656 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } snapFromCache := e.getCachedSnapshotLocked(req) - if req.Index == 0 && snapFromCache != nil { + if snapFromCache == nil { + snap := newEventSnapshot() + snap.appendAndSplice(*req, handler, topicHead) + e.setCachedSnapshotLocked(req, snap) + snapFromCache = snap + } + + // If the request.Index is 0 the client has no view, send a full snapshot. + if req.Index == 0 { return e.subscriptions.add(req, snapFromCache.First), nil } - snap := newEventSnapshot() - // if the request has an Index the client view is stale and must be reset + // otherwise the request has an Index, the client view is stale and must be reset // with a NewSnapshotToFollow event. - if req.Index > 0 { - snap.buffer.Append([]Event{{ - Topic: req.Topic, - Payload: newSnapshotToFollow{}, - }}) - - if snapFromCache != nil { - snap.buffer.AppendItem(snapFromCache.First) - return e.subscriptions.add(req, snap.First), nil - } - } - - snap.appendAndSplice(*req, handler, topicHead) - e.setCachedSnapshotLocked(req, snap) - return e.subscriptions.add(req, snap.First), nil + result := newEventSnapshot() + result.buffer.Append([]Event{{ + Topic: req.Topic, + Payload: newSnapshotToFollow{}, + }}) + result.buffer.AppendItem(snapFromCache.First) + return e.subscriptions.add(req, result.First), nil } func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {