client: prevent watching stale alloc state (#18612)

When waiting on a previous alloc we must query against the leader before
switching to a stale query with index set.

Also check to ensure the response is fresh before using it like #18269
This commit is contained in:
Michael Schurter 2023-09-29 12:46:28 -07:00
parent 547a95795a
commit 43fb0e82dc
2 changed files with 32 additions and 6 deletions

View File

@ -354,8 +354,10 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
AllocID: p.prevAllocID, AllocID: p.prevAllocID,
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Region: p.config.Region, Region: p.config.Region,
AllowStale: true,
AuthToken: p.config.Node.SecretID, AuthToken: p.config.Node.SecretID,
// Initially get response from leader, then switch to stale
AllowStale: false,
}, },
} }
@ -372,15 +374,36 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
resp := structs.SingleAllocResponse{} resp := structs.SingleAllocResponse{}
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil { if err != nil {
p.logger.Error("error querying previous alloc", "error", err)
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Error("error querying previous alloc", "error", err, "wait", retry)
select { select {
case <-time.After(retry): case <-timer.C:
continue continue
case <-ctx.Done(): case <-ctx.Done():
stop()
return ctx.Err() return ctx.Err()
} }
} }
// Ensure that we didn't receive a stale response
if req.AllowStale && resp.Index < req.MinQueryIndex {
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Warn("received stale alloc; retrying",
"req_index", req.MinQueryIndex,
"resp_index", resp.Index,
"wait", retry,
)
select {
case <-timer.C:
continue
case <-ctx.Done():
stop()
return ctx.Err()
}
}
if resp.Alloc == nil { if resp.Alloc == nil {
p.logger.Debug("blocking alloc was GC'd") p.logger.Debug("blocking alloc was GC'd")
return nil return nil
@ -392,6 +415,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
// Update the query index and requery. // Update the query index and requery.
if resp.Index > req.MinQueryIndex { if resp.Index > req.MinQueryIndex {
req.AllowStale = true
req.MinQueryIndex = resp.Index req.MinQueryIndex = resp.Index
} }
} }

View File

@ -2403,6 +2403,7 @@ OUTER:
// Node.GetClientAllocs which returns older results. // Node.GetClientAllocs which returns older results.
if allocsResp.Index <= allocsReq.MinQueryIndex { if allocsResp.Index <= allocsReq.MinQueryIndex {
retry := c.retryIntv(getAllocRetryIntv) retry := c.retryIntv(getAllocRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
c.logger.Warn("failed to retrieve updated allocs; retrying", c.logger.Warn("failed to retrieve updated allocs; retrying",
"req_index", allocsReq.MinQueryIndex, "req_index", allocsReq.MinQueryIndex,
"resp_index", allocsResp.Index, "resp_index", allocsResp.Index,
@ -2410,9 +2411,10 @@ OUTER:
"wait", retry, "wait", retry,
) )
select { select {
case <-time.After(retry): case <-timer.C:
continue continue
case <-c.shutdownCh: case <-c.shutdownCh:
stop()
return return
} }
} }