diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go index 3c95ec6a7..de340bb39 100644 --- a/internal/storage/inmem/watch.go +++ b/internal/storage/inmem/watch.go @@ -26,6 +26,7 @@ type Watch struct { // Next returns the next WatchEvent, blocking until one is available. func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { + var idx uint64 for { e, err := w.nextEvent(ctx) if err == stream.ErrSubForceClosed { @@ -35,6 +36,26 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { return nil, err } + // This works around a *very* rare race-condition in the EventPublisher where + // it's possible to see duplicate events when events are published at the same + // time as the first subscription is created on a {topic, subject} pair. + // + // We see this problem when a call to WriteCAS is happening in parallel with + // a call to WatchList. It happens because our snapshot handler returns events + // that have not yet been published (in the gap between us committing changes + // to MemDB and the EventPublisher dispatching events onto its event buffers). + // + // An intuitive solution to this problem would be to take eventLock in the + // snapshot handler to avoid it racing with publishing, but this does not + // work because publishing is asynchronous. + // + // We should fix this problem at the root, but it's complicated, so for now + // we'll work around it. + if e.Index <= idx { + continue + } + idx = e.Index + event := e.Payload.(eventPayload).event if w.query.matches(event.Resource) { return event, nil