Fix incorrect backoff-wait logic.
This commit is contained in:
parent
2bd2054e09
commit
00312bcf57
|
@ -93,30 +93,27 @@ func (m *subscriptionManager) syncViaBlockingQuery(
|
|||
|
||||
store := m.getStore()
|
||||
|
||||
for {
|
||||
for ctx.Err() == nil {
|
||||
ws := memdb.NewWatchSet()
|
||||
ws.Add(store.AbandonCh())
|
||||
ws.Add(ctx.Done())
|
||||
|
||||
if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil {
|
||||
if result, err := queryFn(ctx, store, ws); err != nil {
|
||||
// Return immediately if the context was cancelled.
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
}
|
||||
logger.Error("failed to sync from query", "error", err)
|
||||
|
||||
waiter.Wait(ctx)
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}:
|
||||
waiter.Reset()
|
||||
}
|
||||
|
||||
// Block for any changes to the state store.
|
||||
ws.WatchCtx(ctx)
|
||||
}
|
||||
|
||||
err := waiter.Wait(ctx)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
} else if err != nil {
|
||||
logger.Error("failed to wait before re-trying sync", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue