storage: fix bug where WatchList would (rarely) return duplicate events (#17067)
This commit is contained in:
parent
c47af84f85
commit
5979752994
|
@ -26,6 +26,7 @@ type Watch struct {
|
|||
|
||||
// Next returns the next WatchEvent, blocking until one is available.
|
||||
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
||||
var idx uint64
|
||||
for {
|
||||
e, err := w.nextEvent(ctx)
|
||||
if err == stream.ErrSubForceClosed {
|
||||
|
@ -35,6 +36,26 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// This works around a *very* rare race-condition in the EventPublisher where
|
||||
// it's possible to see duplicate events when events are published at the same
|
||||
// time as the first subscription is created on a {topic, subject} pair.
|
||||
//
|
||||
// We see this problem when a call to WriteCAS is happening in parallel with
|
||||
// a call to WatchList. It happens because our snapshot handler returns events
|
||||
// that have not yet been published (in the gap between us committing changes
|
||||
// to MemDB and the EventPublisher dispatching events onto its event buffers).
|
||||
//
|
||||
// An intuitive solution to this problem would be to take eventLock in the
|
||||
// snapshot handler to avoid it racing with publishing, but this does not
|
||||
// work because publishing is asynchronous.
|
||||
//
|
||||
// We should fix this problem at the root, but it's complicated, so for now
|
||||
// we'll work around it.
|
||||
if e.Index <= idx {
|
||||
continue
|
||||
}
|
||||
idx = e.Index
|
||||
|
||||
event := e.Payload.(eventPayload).event
|
||||
if w.query.matches(event.Resource) {
|
||||
return event, nil
|
||||
|
|
Loading…
Reference in New Issue