fix multiple overflow errors in exponential backoff (#18200)
We use capped exponential backoff in several places in the code when handling failures. The code we've copy-and-pasted all over has a check to see if the backoff is greater than the limit, but this check happens after the bitshift and we always increment the number of attempts. This causes an overflow with a fairly small number of failures (ex. at one place I tested it occurs after only 24 iterations), resulting in a negative backoff which then never recovers. The backoff becomes a tight loop consuming resources and/or DoS'ing a Nomad RPC handler or an external API such as Vault. Note this doesn't occur in places where we cap the number of iterations so the loop breaks (usually to return an error), so long as the number of iterations is reasonable. Introduce a helper with a check on the cap before the bitshift to avoid overflow in all places this can occur. Fixes: #18199 Co-authored-by: stswidwinski <stan.swidwinski@gmail.com>
This commit is contained in:
parent
04484a10a0
commit
0a19fe3b60
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
core: Fixed a bug where exponential backoff could result in excessive CPU usage
|
||||||
|
```
|
|
@ -11,6 +11,7 @@ import (
|
||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
||||||
)
|
)
|
||||||
|
@ -127,7 +128,9 @@ MAIN:
|
||||||
//
|
//
|
||||||
// It logs the errors with appropriate log levels; don't log returned error
|
// It logs the errors with appropriate log levels; don't log returned error
|
||||||
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
|
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
|
||||||
var retry int
|
var retry uint64
|
||||||
|
var backoff time.Duration
|
||||||
|
limit := time.Second * 5
|
||||||
|
|
||||||
MAIN:
|
MAIN:
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
@ -162,13 +165,7 @@ MAIN:
|
||||||
h.logger.Error("failed to start stats collection for task", "error", err)
|
h.logger.Error("failed to start stats collection for task", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
limit := time.Second * 5
|
backoff = helper.Backoff(time.Second, limit, retry)
|
||||||
backoff := 1 << (2 * uint64(retry)) * time.Second
|
|
||||||
if backoff > limit || retry > 5 {
|
|
||||||
backoff = limit
|
|
||||||
}
|
|
||||||
|
|
||||||
// Increment retry counter
|
|
||||||
retry++
|
retry++
|
||||||
|
|
||||||
time.Sleep(backoff)
|
time.Sleep(backoff)
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||||
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
|
||||||
"github.com/hashicorp/nomad/client/vaultclient"
|
"github.com/hashicorp/nomad/client/vaultclient"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -311,7 +312,8 @@ OUTER:
|
||||||
// deriveVaultToken derives the Vault token using exponential backoffs. It
|
// deriveVaultToken derives the Vault token using exponential backoffs. It
|
||||||
// returns the Vault token and whether the manager should exit.
|
// returns the Vault token and whether the manager should exit.
|
||||||
func (h *vaultHook) deriveVaultToken() (token string, exit bool) {
|
func (h *vaultHook) deriveVaultToken() (token string, exit bool) {
|
||||||
attempts := 0
|
var attempts uint64
|
||||||
|
var backoff time.Duration
|
||||||
for {
|
for {
|
||||||
tokens, err := h.client.DeriveToken(h.alloc, []string{h.taskName})
|
tokens, err := h.client.DeriveToken(h.alloc, []string{h.taskName})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -339,14 +341,11 @@ func (h *vaultHook) deriveVaultToken() (token string, exit bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle the retry case
|
// Handle the retry case
|
||||||
backoff := (1 << (2 * uint64(attempts))) * vaultBackoffBaseline
|
backoff = helper.Backoff(vaultBackoffBaseline, vaultBackoffLimit, attempts)
|
||||||
if backoff > vaultBackoffLimit {
|
|
||||||
backoff = vaultBackoffLimit
|
|
||||||
}
|
|
||||||
h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff)
|
|
||||||
|
|
||||||
attempts++
|
attempts++
|
||||||
|
|
||||||
|
h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff)
|
||||||
|
|
||||||
// Wait till retrying
|
// Wait till retrying
|
||||||
select {
|
select {
|
||||||
case <-h.ctx.Done():
|
case <-h.ctx.Done():
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/helper/pluginutils/loader"
|
"github.com/hashicorp/nomad/helper/pluginutils/loader"
|
||||||
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
|
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -450,7 +451,8 @@ func (i *instanceManager) handleFingerprint(f *device.FingerprintResponse) error
|
||||||
// collectStats is a long lived goroutine for collecting device statistics. It
|
// collectStats is a long lived goroutine for collecting device statistics. It
|
||||||
// handles errors by backing off exponentially and retrying.
|
// handles errors by backing off exponentially and retrying.
|
||||||
func (i *instanceManager) collectStats() {
|
func (i *instanceManager) collectStats() {
|
||||||
attempt := 0
|
var attempt uint64
|
||||||
|
var backoff time.Duration
|
||||||
|
|
||||||
START:
|
START:
|
||||||
// Get a device plugin
|
// Get a device plugin
|
||||||
|
@ -495,10 +497,7 @@ START:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retry with an exponential backoff
|
// Retry with an exponential backoff
|
||||||
backoff := (1 << (2 * uint64(attempt))) * statsBackoffBaseline
|
backoff = helper.Backoff(statsBackoffBaseline, statsBackoffLimit, attempt)
|
||||||
if backoff > statsBackoffLimit {
|
|
||||||
backoff = statsBackoffLimit
|
|
||||||
}
|
|
||||||
attempt++
|
attempt++
|
||||||
|
|
||||||
i.logger.Error("stats returned an error", "error", err, "retry", backoff)
|
i.logger.Error("stats returned an error", "error", err, "retry", backoff)
|
||||||
|
@ -511,7 +510,7 @@ START:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the attempt since we got statistics
|
// Reset the attempts since we got statistics
|
||||||
attempt = 0
|
attempt = 0
|
||||||
|
|
||||||
// Store the new stats
|
// Store the new stats
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/helper/pluginutils/loader"
|
"github.com/hashicorp/nomad/helper/pluginutils/loader"
|
||||||
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
|
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -290,7 +291,7 @@ func (i *instanceManager) fingerprint() {
|
||||||
|
|
||||||
// backoff and retry used if the RPC is closed by the other end
|
// backoff and retry used if the RPC is closed by the other end
|
||||||
var backoff time.Duration
|
var backoff time.Duration
|
||||||
var retry int
|
var retry uint64
|
||||||
for {
|
for {
|
||||||
if backoff > 0 {
|
if backoff > 0 {
|
||||||
select {
|
select {
|
||||||
|
@ -329,11 +330,7 @@ func (i *instanceManager) fingerprint() {
|
||||||
i.handleFingerprintError()
|
i.handleFingerprintError()
|
||||||
|
|
||||||
// Calculate the new backoff
|
// Calculate the new backoff
|
||||||
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry)
|
||||||
if backoff > driverFPBackoffLimit {
|
|
||||||
backoff = driverFPBackoffLimit
|
|
||||||
}
|
|
||||||
// Increment retry counter
|
|
||||||
retry++
|
retry++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -426,7 +423,7 @@ func (i *instanceManager) handleEvents() {
|
||||||
}
|
}
|
||||||
|
|
||||||
var backoff time.Duration
|
var backoff time.Duration
|
||||||
var retry int
|
var retry uint64
|
||||||
for {
|
for {
|
||||||
if backoff > 0 {
|
if backoff > 0 {
|
||||||
select {
|
select {
|
||||||
|
@ -453,10 +450,7 @@ func (i *instanceManager) handleEvents() {
|
||||||
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
|
i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry)
|
||||||
|
|
||||||
// Calculate the new backoff
|
// Calculate the new backoff
|
||||||
backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline
|
backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry)
|
||||||
if backoff > driverFPBackoffLimit {
|
|
||||||
backoff = driverFPBackoffLimit
|
|
||||||
}
|
|
||||||
retry++
|
retry++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,11 +23,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
|
||||||
|
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|
||||||
i := uint64(0)
|
|
||||||
|
|
||||||
timer, stop := helper.NewSafeTimer(limit)
|
timer, stop := helper.NewSafeTimer(limit)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
|
var attempts uint64
|
||||||
|
var backoff time.Duration
|
||||||
|
|
||||||
for {
|
for {
|
||||||
self, err := client.Self()
|
self, err := client.Self()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -40,13 +41,8 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff := (1 << (2 * i)) * baseline
|
backoff = helper.Backoff(baseline, limit, attempts)
|
||||||
if backoff > limit {
|
attempts++
|
||||||
backoff = limit
|
|
||||||
} else {
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
timer.Reset(backoff)
|
timer.Reset(backoff)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -483,7 +483,9 @@ type createContainerClient interface {
|
||||||
func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions,
|
func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions,
|
||||||
image string) (*docker.Container, error) {
|
image string) (*docker.Container, error) {
|
||||||
// Create a container
|
// Create a container
|
||||||
attempted := 0
|
var attempted uint64
|
||||||
|
var backoff time.Duration
|
||||||
|
|
||||||
CREATE:
|
CREATE:
|
||||||
container, createErr := client.CreateContainer(config)
|
container, createErr := client.CreateContainer(config)
|
||||||
if createErr == nil {
|
if createErr == nil {
|
||||||
|
@ -533,16 +535,19 @@ CREATE:
|
||||||
|
|
||||||
if attempted < 5 {
|
if attempted < 5 {
|
||||||
attempted++
|
attempted++
|
||||||
time.Sleep(nextBackoff(attempted))
|
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
|
||||||
|
time.Sleep(backoff)
|
||||||
goto CREATE
|
goto CREATE
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") {
|
} else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") {
|
||||||
// There is still a very small chance this is possible even with the
|
// There is still a very small chance this is possible even with the
|
||||||
// coordinator so retry.
|
// coordinator so retry.
|
||||||
return nil, nstructs.NewRecoverableError(createErr, true)
|
return nil, nstructs.NewRecoverableError(createErr, true)
|
||||||
} else if isDockerTransientError(createErr) && attempted < 5 {
|
} else if isDockerTransientError(createErr) && attempted < 5 {
|
||||||
attempted++
|
attempted++
|
||||||
time.Sleep(nextBackoff(attempted))
|
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
|
||||||
|
time.Sleep(backoff)
|
||||||
goto CREATE
|
goto CREATE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,8 +562,9 @@ func (d *Driver) startContainer(c *docker.Container) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a container
|
var attempted uint64
|
||||||
attempted := 0
|
var backoff time.Duration
|
||||||
|
|
||||||
START:
|
START:
|
||||||
startErr := dockerClient.StartContainer(c.ID, c.HostConfig)
|
startErr := dockerClient.StartContainer(c.ID, c.HostConfig)
|
||||||
if startErr == nil || strings.Contains(startErr.Error(), "Container already running") {
|
if startErr == nil || strings.Contains(startErr.Error(), "Container already running") {
|
||||||
|
@ -570,7 +576,8 @@ START:
|
||||||
if isDockerTransientError(startErr) {
|
if isDockerTransientError(startErr) {
|
||||||
if attempted < 5 {
|
if attempted < 5 {
|
||||||
attempted++
|
attempted++
|
||||||
time.Sleep(nextBackoff(attempted))
|
backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted)
|
||||||
|
time.Sleep(backoff)
|
||||||
goto START
|
goto START
|
||||||
}
|
}
|
||||||
return nstructs.NewRecoverableError(startErr, true)
|
return nstructs.NewRecoverableError(startErr, true)
|
||||||
|
@ -579,13 +586,6 @@ START:
|
||||||
return recoverableErrTimeouts(startErr)
|
return recoverableErrTimeouts(startErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nextBackoff returns appropriate docker backoff durations after attempted attempts.
|
|
||||||
func nextBackoff(attempted int) time.Duration {
|
|
||||||
// attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s
|
|
||||||
// TODO: add randomization factor and extract to a helper
|
|
||||||
return 1 << (2 * uint64(attempted)) * 50 * time.Millisecond
|
|
||||||
}
|
|
||||||
|
|
||||||
// createImage creates a docker image either by pulling it from a registry or by
|
// createImage creates a docker image either by pulling it from a registry or by
|
||||||
// loading it from the file system
|
// loading it from the file system
|
||||||
func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) {
|
func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) {
|
||||||
|
|
|
@ -95,8 +95,8 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
|
||||||
defer destCh.close()
|
defer destCh.close()
|
||||||
|
|
||||||
// backoff and retry used if the docker stats API returns an error
|
// backoff and retry used if the docker stats API returns an error
|
||||||
var backoff time.Duration = 0
|
var backoff time.Duration
|
||||||
var retry int
|
var retry uint64
|
||||||
|
|
||||||
// create an interval timer
|
// create an interval timer
|
||||||
timer, stop := helper.NewSafeTimer(backoff)
|
timer, stop := helper.NewSafeTimer(backoff)
|
||||||
|
@ -136,11 +136,7 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
|
||||||
h.logger.Debug("error collecting stats from container", "error", err)
|
h.logger.Debug("error collecting stats from container", "error", err)
|
||||||
|
|
||||||
// Calculate the new backoff
|
// Calculate the new backoff
|
||||||
backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline
|
backoff = helper.Backoff(statsCollectorBackoffBaseline, statsCollectorBackoffLimit, retry)
|
||||||
if backoff > statsCollectorBackoffLimit {
|
|
||||||
backoff = statsCollectorBackoffLimit
|
|
||||||
}
|
|
||||||
// Increment retry counter
|
|
||||||
retry++
|
retry++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package helper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Backoff(backoffBase time.Duration, backoffLimit time.Duration, attempt uint64) time.Duration {
|
||||||
|
const MaxUint = ^uint64(0)
|
||||||
|
const MaxInt = int64(MaxUint >> 1)
|
||||||
|
|
||||||
|
// Ensure lack of non-positive backoffs since these make no sense
|
||||||
|
if backoffBase.Nanoseconds() <= 0 {
|
||||||
|
return max(backoffBase, 0*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that a large attempt will not cause an overflow
|
||||||
|
if attempt > 62 || MaxInt/backoffBase.Nanoseconds() < (1<<attempt) {
|
||||||
|
return backoffLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute deadline and clamp it to backoffLimit
|
||||||
|
deadline := 1 << attempt * backoffBase
|
||||||
|
if deadline > backoffLimit {
|
||||||
|
deadline = backoffLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
return deadline
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package helper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/shoenig/test/must"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_Backoff(t *testing.T) {
|
||||||
|
const MaxUint = ^uint64(0)
|
||||||
|
const MaxInt = int64(MaxUint >> 1)
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
backoffBase time.Duration
|
||||||
|
backoffLimit time.Duration
|
||||||
|
attempt uint64
|
||||||
|
expectedResult time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "backoff limit clamps for high base",
|
||||||
|
backoffBase: time.Hour,
|
||||||
|
backoffLimit: time.Minute,
|
||||||
|
attempt: 1,
|
||||||
|
expectedResult: time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "backoff limit clamps for boundary attempt",
|
||||||
|
backoffBase: time.Hour,
|
||||||
|
backoffLimit: time.Minute,
|
||||||
|
attempt: 63,
|
||||||
|
expectedResult: time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "small retry value",
|
||||||
|
backoffBase: time.Minute,
|
||||||
|
backoffLimit: time.Hour,
|
||||||
|
attempt: 0,
|
||||||
|
expectedResult: time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first retry value",
|
||||||
|
backoffBase: time.Minute,
|
||||||
|
backoffLimit: time.Hour,
|
||||||
|
attempt: 1,
|
||||||
|
expectedResult: 2 * time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "fifth retry value",
|
||||||
|
backoffBase: time.Minute,
|
||||||
|
backoffLimit: time.Hour,
|
||||||
|
attempt: 5,
|
||||||
|
expectedResult: 32 * time.Minute,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sixth retry value",
|
||||||
|
backoffBase: time.Minute,
|
||||||
|
backoffLimit: time.Hour,
|
||||||
|
attempt: 6,
|
||||||
|
expectedResult: time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
result := Backoff(tc.backoffBase, tc.backoffLimit, tc.attempt)
|
||||||
|
must.Eq(t, tc.expectedResult, result)
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/go-set"
|
"github.com/hashicorp/go-set"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/helper/pointer"
|
"github.com/hashicorp/nomad/helper/pointer"
|
||||||
"github.com/hashicorp/nomad/lib/lang"
|
"github.com/hashicorp/nomad/lib/lang"
|
||||||
"github.com/hashicorp/nomad/nomad/stream"
|
"github.com/hashicorp/nomad/nomad/stream"
|
||||||
|
@ -254,8 +255,9 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State
|
||||||
|
|
||||||
const backoffBase = 20 * time.Millisecond
|
const backoffBase = 20 * time.Millisecond
|
||||||
const backoffLimit = 1 * time.Second
|
const backoffLimit = 1 * time.Second
|
||||||
var retries uint
|
var retries uint64
|
||||||
var retryTimer *time.Timer
|
var retryTimer *time.Timer
|
||||||
|
var deadline time.Duration
|
||||||
|
|
||||||
// XXX: Potential optimization is to set up a watch on the state
|
// XXX: Potential optimization is to set up a watch on the state
|
||||||
// store's index table and only unblock via a trigger rather than
|
// store's index table and only unblock via a trigger rather than
|
||||||
|
@ -273,16 +275,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exponential back off
|
// Exponential back off
|
||||||
retries++
|
|
||||||
if retryTimer == nil {
|
if retryTimer == nil {
|
||||||
// First retry, start at baseline
|
// First retry, start at baseline
|
||||||
retryTimer = time.NewTimer(backoffBase)
|
retryTimer = time.NewTimer(backoffBase)
|
||||||
} else {
|
} else {
|
||||||
// Subsequent retry, reset timer
|
// Subsequent retry, reset timer
|
||||||
deadline := 1 << (2 * retries) * backoffBase
|
deadline = helper.Backoff(backoffBase, backoffLimit, retries)
|
||||||
if deadline > backoffLimit {
|
retries++
|
||||||
deadline = backoffLimit
|
|
||||||
}
|
|
||||||
retryTimer.Reset(deadline)
|
retryTimer.Reset(deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-version"
|
"github.com/hashicorp/go-version"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/nomad/state"
|
"github.com/hashicorp/nomad/nomad/state"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -109,8 +110,9 @@ type Worker struct {
|
||||||
|
|
||||||
// failures is the count of errors encountered while dequeueing evaluations
|
// failures is the count of errors encountered while dequeueing evaluations
|
||||||
// and is used to calculate backoff.
|
// and is used to calculate backoff.
|
||||||
failures uint
|
failures uint64
|
||||||
evalToken string
|
failureBackoff time.Duration
|
||||||
|
evalToken string
|
||||||
|
|
||||||
// snapshotIndex is the index of the snapshot in which the scheduler was
|
// snapshotIndex is the index of the snapshot in which the scheduler was
|
||||||
// first invoked. It is used to mark the SnapshotIndex of evaluations
|
// first invoked. It is used to mark the SnapshotIndex of evaluations
|
||||||
|
@ -133,6 +135,7 @@ func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) *
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
status: WorkerStarting,
|
status: WorkerStarting,
|
||||||
enabledSchedulers: make([]string, len(args.EnabledSchedulers)),
|
enabledSchedulers: make([]string, len(args.EnabledSchedulers)),
|
||||||
|
failureBackoff: time.Duration(0),
|
||||||
}
|
}
|
||||||
copy(w.enabledSchedulers, args.EnabledSchedulers)
|
copy(w.enabledSchedulers, args.EnabledSchedulers)
|
||||||
|
|
||||||
|
@ -874,12 +877,10 @@ func (w *Worker) shouldResubmit(err error) bool {
|
||||||
// backoff if the server or the worker is shutdown.
|
// backoff if the server or the worker is shutdown.
|
||||||
func (w *Worker) backoffErr(base, limit time.Duration) bool {
|
func (w *Worker) backoffErr(base, limit time.Duration) bool {
|
||||||
w.setWorkloadStatus(WorkloadBackoff)
|
w.setWorkloadStatus(WorkloadBackoff)
|
||||||
backoff := (1 << (2 * w.failures)) * base
|
|
||||||
if backoff > limit {
|
backoff := helper.Backoff(base, limit, w.failures)
|
||||||
backoff = limit
|
w.failures++
|
||||||
} else {
|
|
||||||
w.failures++
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(backoff):
|
case <-time.After(backoff):
|
||||||
return false
|
return false
|
||||||
|
|
Loading…
Reference in New Issue