From 43fb0e82dca0ad4f1bd3a2c691bd09f384e725ab Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 29 Sep 2023 12:46:28 -0700 Subject: [PATCH] client: prevent watching stale alloc state (#18612) When waiting on a previous alloc we must query against the leader before switching to a stale query with index set. Also check to ensure the response is fresh before using it like #18269 --- client/allocwatcher/alloc_watcher.go | 34 ++++++++++++++++++++++++---- client/client.go | 4 +++- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index 9dc5f95d1..3ee610abc 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -353,9 +353,11 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { req := structs.AllocSpecificRequest{ AllocID: p.prevAllocID, QueryOptions: structs.QueryOptions{ - Region: p.config.Region, - AllowStale: true, - AuthToken: p.config.Node.SecretID, + Region: p.config.Region, + AuthToken: p.config.Node.SecretID, + + // Initially get response from leader, then switch to stale + AllowStale: false, }, } @@ -372,15 +374,36 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { resp := structs.SingleAllocResponse{} err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) if err != nil { - p.logger.Error("error querying previous alloc", "error", err) retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) + timer, stop := helper.NewSafeTimer(retry) + p.logger.Error("error querying previous alloc", "error", err, "wait", retry) select { - case <-time.After(retry): + case <-timer.C: continue case <-ctx.Done(): + stop() return ctx.Err() } } + + // Ensure that we didn't receive a stale response + if req.AllowStale && resp.Index < req.MinQueryIndex { + retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) + timer, stop := helper.NewSafeTimer(retry) + p.logger.Warn("received stale alloc; retrying", + "req_index", req.MinQueryIndex, + "resp_index", resp.Index, + "wait", retry, + ) + select { + case <-timer.C: + continue + case <-ctx.Done(): + stop() + return ctx.Err() + } + } + if resp.Alloc == nil { p.logger.Debug("blocking alloc was GC'd") return nil @@ -392,6 +415,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { // Update the query index and requery. if resp.Index > req.MinQueryIndex { + req.AllowStale = true req.MinQueryIndex = resp.Index } } diff --git a/client/client.go b/client/client.go index c652ecec7..f908e1982 100644 --- a/client/client.go +++ b/client/client.go @@ -2403,6 +2403,7 @@ OUTER: // Node.GetClientAllocs which returns older results. if allocsResp.Index <= allocsReq.MinQueryIndex { retry := c.retryIntv(getAllocRetryIntv) + timer, stop := helper.NewSafeTimer(retry) c.logger.Warn("failed to retrieve updated allocs; retrying", "req_index", allocsReq.MinQueryIndex, "resp_index", allocsResp.Index, @@ -2410,9 +2411,10 @@ OUTER: "wait", retry, ) select { - case <-time.After(retry): + case <-timer.C: continue case <-c.shutdownCh: + stop() return } }