Merge pull request #2555 from hashicorp/f-nack-delay
Back-pressure on Nacks and ensure scheduling progress on failures
This commit is contained in:
commit
5f04fc0d81
|
@ -158,6 +158,30 @@ type Config struct {
|
|||
// complete eventually fails out of the system.
|
||||
EvalDeliveryLimit int
|
||||
|
||||
// EvalNackInitialReenqueueDelay is the delay applied before reenqueuing a
|
||||
// Nacked evaluation for the first time. This value should be small as the
|
||||
// initial Nack can be due to a down machine and the eval should be retried
|
||||
// quickly for liveliness.
|
||||
EvalNackInitialReenqueueDelay time.Duration
|
||||
|
||||
// EvalNackSubsequentReenqueueDelay is the delay applied before reenqueuing
|
||||
// an evaluation that has been Nacked more than once. This delay is
|
||||
// compounding after the first Nack. This value should be significantly
|
||||
// longer than the initial delay as the purpose it severs is to apply
|
||||
// back-pressure as evaluatiions are being Nacked either due to scheduler
|
||||
// failures or because they are hitting their Nack timeout, both of which
|
||||
// are signs of high server resource usage.
|
||||
EvalNackSubsequentReenqueueDelay time.Duration
|
||||
|
||||
// EvalFailedFollowupBaselineDelay is the minimum time waited before
|
||||
// retrying a failed evaluation.
|
||||
EvalFailedFollowupBaselineDelay time.Duration
|
||||
|
||||
// EvalFailedFollowupDelayRange defines the range of additional time from
|
||||
// the baseline in which to wait before retrying a failed evaluation. The
|
||||
// additional delay is selected from this range randomly.
|
||||
EvalFailedFollowupDelayRange time.Duration
|
||||
|
||||
// MinHeartbeatTTL is the minimum time between heartbeats.
|
||||
// This is used as a floor to prevent excessive updates.
|
||||
MinHeartbeatTTL time.Duration
|
||||
|
@ -214,33 +238,37 @@ func DefaultConfig() *Config {
|
|||
}
|
||||
|
||||
c := &Config{
|
||||
Region: DefaultRegion,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
RaftTimeout: 10 * time.Second,
|
||||
LogOutput: os.Stderr,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
SerfConfig: serf.DefaultConfig(),
|
||||
NumSchedulers: 1,
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
EvalGCInterval: 5 * time.Minute,
|
||||
EvalGCThreshold: 1 * time.Hour,
|
||||
JobGCInterval: 5 * time.Minute,
|
||||
JobGCThreshold: 4 * time.Hour,
|
||||
NodeGCInterval: 5 * time.Minute,
|
||||
NodeGCThreshold: 24 * time.Hour,
|
||||
EvalNackTimeout: 60 * time.Second,
|
||||
EvalDeliveryLimit: 3,
|
||||
MinHeartbeatTTL: 10 * time.Second,
|
||||
MaxHeartbeatsPerSecond: 50.0,
|
||||
HeartbeatGrace: 10 * time.Second,
|
||||
FailoverHeartbeatTTL: 300 * time.Second,
|
||||
ConsulConfig: config.DefaultConsulConfig(),
|
||||
VaultConfig: config.DefaultVaultConfig(),
|
||||
RPCHoldTimeout: 5 * time.Second,
|
||||
TLSConfig: &config.TLSConfig{},
|
||||
Region: DefaultRegion,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
RaftTimeout: 10 * time.Second,
|
||||
LogOutput: os.Stderr,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
SerfConfig: serf.DefaultConfig(),
|
||||
NumSchedulers: 1,
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
EvalGCInterval: 5 * time.Minute,
|
||||
EvalGCThreshold: 1 * time.Hour,
|
||||
JobGCInterval: 5 * time.Minute,
|
||||
JobGCThreshold: 4 * time.Hour,
|
||||
NodeGCInterval: 5 * time.Minute,
|
||||
NodeGCThreshold: 24 * time.Hour,
|
||||
EvalNackTimeout: 60 * time.Second,
|
||||
EvalDeliveryLimit: 3,
|
||||
EvalNackInitialReenqueueDelay: 1 * time.Second,
|
||||
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
|
||||
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
|
||||
EvalFailedFollowupDelayRange: 5 * time.Minute,
|
||||
MinHeartbeatTTL: 10 * time.Second,
|
||||
MaxHeartbeatsPerSecond: 50.0,
|
||||
HeartbeatGrace: 10 * time.Second,
|
||||
FailoverHeartbeatTTL: 300 * time.Second,
|
||||
ConsulConfig: config.DefaultConsulConfig(),
|
||||
VaultConfig: config.DefaultVaultConfig(),
|
||||
RPCHoldTimeout: 5 * time.Second,
|
||||
TLSConfig: &config.TLSConfig{},
|
||||
}
|
||||
|
||||
// Enable all known schedulers by default
|
||||
|
|
|
@ -76,6 +76,15 @@ type EvalBroker struct {
|
|||
// timeWait has evaluations that are waiting for time to elapse
|
||||
timeWait map[string]*time.Timer
|
||||
|
||||
// initialNackDelay is the delay applied before reenqueuing a
|
||||
// Nacked evaluation for the first time.
|
||||
initialNackDelay time.Duration
|
||||
|
||||
// subsequentNackDelay is the delay applied before reenqueuing
|
||||
// an evaluation that has been Nacked more than once. This delay is
|
||||
// compounding after the first Nack.
|
||||
subsequentNackDelay time.Duration
|
||||
|
||||
l sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -94,24 +103,29 @@ type PendingEvaluations []*structs.Evaluation
|
|||
// NewEvalBroker creates a new evaluation broker. This is parameterized
|
||||
// with the timeout used for messages that are not acknowledged before we
|
||||
// assume a Nack and attempt to redeliver as well as the deliveryLimit
|
||||
// which prevents a failing eval from being endlessly delivered.
|
||||
func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error) {
|
||||
// which prevents a failing eval from being endlessly delivered. The
|
||||
// initialNackDelay is the delay before making a Nacked evalution available
|
||||
// again for the first Nack and subsequentNackDelay is the compounding delay
|
||||
// after the first Nack.
|
||||
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
|
||||
if timeout < 0 {
|
||||
return nil, fmt.Errorf("timeout cannot be negative")
|
||||
}
|
||||
b := &EvalBroker{
|
||||
nackTimeout: timeout,
|
||||
deliveryLimit: deliveryLimit,
|
||||
enabled: false,
|
||||
stats: new(BrokerStats),
|
||||
evals: make(map[string]int),
|
||||
jobEvals: make(map[string]string),
|
||||
blocked: make(map[string]PendingEvaluations),
|
||||
ready: make(map[string]PendingEvaluations),
|
||||
unack: make(map[string]*unackEval),
|
||||
waiting: make(map[string]chan struct{}),
|
||||
requeue: make(map[string]*structs.Evaluation),
|
||||
timeWait: make(map[string]*time.Timer),
|
||||
nackTimeout: timeout,
|
||||
deliveryLimit: deliveryLimit,
|
||||
enabled: false,
|
||||
stats: new(BrokerStats),
|
||||
evals: make(map[string]int),
|
||||
jobEvals: make(map[string]string),
|
||||
blocked: make(map[string]PendingEvaluations),
|
||||
ready: make(map[string]PendingEvaluations),
|
||||
unack: make(map[string]*unackEval),
|
||||
waiting: make(map[string]chan struct{}),
|
||||
requeue: make(map[string]*structs.Evaluation),
|
||||
timeWait: make(map[string]*time.Timer),
|
||||
initialNackDelay: initialNackDelay,
|
||||
subsequentNackDelay: subsequentNackDelay,
|
||||
}
|
||||
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
||||
return b, nil
|
||||
|
@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
|||
|
||||
// Check if we need to enforce a wait
|
||||
if eval.Wait > 0 {
|
||||
timer := time.AfterFunc(eval.Wait, func() {
|
||||
b.enqueueWaiting(eval)
|
||||
})
|
||||
b.timeWait[eval.ID] = timer
|
||||
b.stats.TotalWaiting += 1
|
||||
b.processWaitingEnqueue(eval)
|
||||
return
|
||||
}
|
||||
|
||||
b.enqueueLocked(eval, eval.Type)
|
||||
}
|
||||
|
||||
// processWaitingEnqueue waits the given duration on the evaluation before
|
||||
// enqueueing.
|
||||
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
|
||||
timer := time.AfterFunc(eval.Wait, func() {
|
||||
b.enqueueWaiting(eval)
|
||||
})
|
||||
b.timeWait[eval.ID] = timer
|
||||
b.stats.TotalWaiting += 1
|
||||
}
|
||||
|
||||
// enqueueWaiting is used to enqueue a waiting evaluation
|
||||
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
|
@ -547,14 +567,37 @@ func (b *EvalBroker) Nack(evalID, token string) error {
|
|||
|
||||
// Check if we've hit the delivery limit, and re-enqueue
|
||||
// in the failedQueue
|
||||
if b.evals[evalID] >= b.deliveryLimit {
|
||||
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
|
||||
b.enqueueLocked(unack.Eval, failedQueue)
|
||||
} else {
|
||||
b.enqueueLocked(unack.Eval, unack.Eval.Type)
|
||||
e := unack.Eval
|
||||
e.Wait = b.nackReenqueueDelay(e, dequeues)
|
||||
|
||||
// See if there should be a delay before re-enqueuing
|
||||
if e.Wait > 0 {
|
||||
b.processWaitingEnqueue(e)
|
||||
} else {
|
||||
b.enqueueLocked(e, e.Type)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nackReenqueueDelay is used to determine the delay that should be applied on
|
||||
// the evaluation given the number of previous attempts
|
||||
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
|
||||
switch {
|
||||
case prevDequeues <= 0:
|
||||
return 0
|
||||
case prevDequeues == 1:
|
||||
return b.initialNackDelay
|
||||
default:
|
||||
// For each subsequent nack compound a delay
|
||||
return time.Duration(prevDequeues-1) * b.subsequentNackDelay
|
||||
}
|
||||
}
|
||||
|
||||
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
|
||||
// progress but is in a potentially unbounded operation such as the plan queue.
|
||||
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -15,14 +17,34 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
func testBrokerConfig() *Config {
|
||||
config := DefaultConfig()
|
||||
|
||||
// Tune the Nack timeout
|
||||
config.EvalNackTimeout = 5 * time.Second
|
||||
|
||||
// Tune the Nack delay
|
||||
config.EvalNackInitialReenqueueDelay = 5 * time.Millisecond
|
||||
config.EvalNackSubsequentReenqueueDelay = 50 * time.Millisecond
|
||||
return config
|
||||
}
|
||||
|
||||
func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
|
||||
if timeout == 0 {
|
||||
timeout = 5 * time.Second
|
||||
config := testBrokerConfig()
|
||||
|
||||
if timeout != 0 {
|
||||
config.EvalNackTimeout = timeout
|
||||
}
|
||||
b, err := NewEvalBroker(timeout, 3)
|
||||
|
||||
return testBrokerFromConfig(t, config)
|
||||
}
|
||||
|
||||
func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
|
||||
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
|
@ -126,19 +148,28 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check the stats
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 1 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalWaiting != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
t.Fatal(e)
|
||||
})
|
||||
|
||||
// Dequeue should work again
|
||||
out2, token2, err := b.Dequeue(defaultSched, time.Second)
|
||||
|
@ -192,6 +223,163 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Nack_Delay(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
|
||||
// Enqueue, but broker is disabled!
|
||||
b.SetEnabled(true)
|
||||
eval := mock.Eval()
|
||||
b.Enqueue(eval)
|
||||
|
||||
// Dequeue should work
|
||||
out, token, err := b.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out != eval {
|
||||
t.Fatalf("bad : %#v", out)
|
||||
}
|
||||
|
||||
// Nack back into the queue
|
||||
err = b.Nack(eval.ID, token)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if _, ok := b.Outstanding(out.ID); ok {
|
||||
t.Fatalf("should not be outstanding")
|
||||
}
|
||||
|
||||
// Check the stats to ensure that it is waiting
|
||||
stats := b.Stats()
|
||||
if stats.TotalReady != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalWaiting != 1 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
|
||||
// Now wait for it to be re-enqueued
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalWaiting != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
t.Fatal(e)
|
||||
})
|
||||
|
||||
// Dequeue should work again
|
||||
out2, token2, err := b.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out2 != eval {
|
||||
t.Fatalf("bad : %#v", out2)
|
||||
}
|
||||
if token2 == token {
|
||||
t.Fatalf("should get a new token")
|
||||
}
|
||||
|
||||
// Capture the time
|
||||
start := time.Now()
|
||||
|
||||
// Nack back into the queue
|
||||
err = b.Nack(eval.ID, token2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now wait for it to be re-enqueued
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalWaiting != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
t.Fatal(e)
|
||||
})
|
||||
|
||||
delay := time.Now().Sub(start)
|
||||
if delay < b.subsequentNackDelay {
|
||||
t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay)
|
||||
}
|
||||
|
||||
// Dequeue should work again
|
||||
out3, token3, err := b.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out3 != eval {
|
||||
t.Fatalf("bad : %#v", out3)
|
||||
}
|
||||
if token3 == token || token3 == token2 {
|
||||
t.Fatalf("should get a new token")
|
||||
}
|
||||
|
||||
// Ack finally
|
||||
err = b.Ack(eval.ID, token3)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if _, ok := b.Outstanding(out.ID); ok {
|
||||
t.Fatalf("should not be outstanding")
|
||||
}
|
||||
|
||||
// Check the stats
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Ready != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.ByScheduler[eval.Type].Unacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
|
@ -472,7 +660,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
|
|||
func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
NUM := 100
|
||||
NUM := 1000
|
||||
|
||||
for i := 0; i < NUM; i++ {
|
||||
eval1 := mock.Eval()
|
||||
|
@ -503,7 +691,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
|
|||
|
||||
// This will fail randomly at times. It is very hard to
|
||||
// test deterministically that its acting randomly.
|
||||
if counter >= 25 || counter <= -25 {
|
||||
if counter >= 250 || counter <= -250 {
|
||||
t.Fatalf("unlikely sequence: %d", counter)
|
||||
}
|
||||
}
|
||||
|
@ -584,7 +772,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
|
|||
|
||||
// Ensure we nack in a timely manner
|
||||
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
|
||||
b := testBroker(t, 5*time.Millisecond)
|
||||
b := testBroker(t, 50*time.Millisecond)
|
||||
b.SetEnabled(true)
|
||||
|
||||
// Enqueue
|
||||
|
@ -601,8 +789,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
|
|||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
// Reset in 2 milliseconds
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
// Reset in 20 milliseconds
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
if err := b.OutstandingReset(out.ID, token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -618,13 +806,13 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check the nack timer
|
||||
if diff := end.Sub(start); diff < 7*time.Millisecond {
|
||||
if diff := end.Sub(start); diff < 75*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
|
||||
b := testBroker(t, 5*time.Millisecond)
|
||||
b := testBroker(t, 50*time.Millisecond)
|
||||
b.SetEnabled(true)
|
||||
|
||||
// Enqueue
|
||||
|
@ -641,14 +829,14 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
|
|||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
// Pause in 2 milliseconds
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
// Pause in 20 milliseconds
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
if err := b.PauseNackTimeout(out.ID, token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -665,7 +853,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check the nack timer
|
||||
if diff := end.Sub(start); diff < 9*time.Millisecond {
|
||||
if diff := end.Sub(start); diff < 95*time.Millisecond {
|
||||
t.Fatalf("bad: %#v", diff)
|
||||
}
|
||||
}
|
||||
|
@ -820,7 +1008,7 @@ func TestEvalBroker_Wait(t *testing.T) {
|
|||
}
|
||||
|
||||
// Let the wait elapse
|
||||
time.Sleep(15 * time.Millisecond)
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Verify ready
|
||||
stats = b.Stats()
|
||||
|
@ -976,14 +1164,20 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check stats again as this should cause the re-enqueued one to be dropped
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
t.Fatalf("bad: %#v", stats)
|
||||
}
|
||||
if len(b.requeue) != 0 {
|
||||
t.Fatalf("bad: %#v", b.requeue)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
stats = b.Stats()
|
||||
if stats.TotalReady != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if stats.TotalUnacked != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", stats)
|
||||
}
|
||||
if len(b.requeue) != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", b.requeue)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(e error) {
|
||||
t.Fatal(e)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
|
@ -387,17 +388,24 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
|
|||
}
|
||||
|
||||
// Update the status to failed
|
||||
newEval := eval.Copy()
|
||||
newEval.Status = structs.EvalStatusFailed
|
||||
newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
|
||||
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval)
|
||||
updateEval := eval.Copy()
|
||||
updateEval.Status = structs.EvalStatusFailed
|
||||
updateEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
|
||||
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", updateEval)
|
||||
|
||||
// Create a follow-up evaluation that will be used to retry the
|
||||
// scheduling for the job after the cluster is hopefully more stable
|
||||
// due to the fairly large backoff.
|
||||
followupEvalWait := s.config.EvalFailedFollowupBaselineDelay +
|
||||
time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange)))
|
||||
followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)
|
||||
|
||||
// Update via Raft
|
||||
req := structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{newEval},
|
||||
Evals: []*structs.Evaluation{updateEval, followupEval},
|
||||
}
|
||||
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err)
|
||||
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v and create a follow-up: %v", updateEval, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -508,7 +508,7 @@ func TestLeader_ReapFailedEval(t *testing.T) {
|
|||
}
|
||||
s1.evalBroker.Nack(out.ID, token)
|
||||
|
||||
// Wait updated evaluation
|
||||
// Wait for an updated and followup evaluation
|
||||
state := s1.fsm.State()
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
|
@ -516,7 +516,45 @@ func TestLeader_ReapFailedEval(t *testing.T) {
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return out != nil && out.Status == structs.EvalStatusFailed, nil
|
||||
if out == nil {
|
||||
return false, fmt.Errorf("expect original evaluation to exist")
|
||||
}
|
||||
if out.Status != structs.EvalStatusFailed {
|
||||
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
|
||||
}
|
||||
|
||||
// See if there is a followup
|
||||
evals, err := state.EvalsByJob(ws, eval.JobID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if l := len(evals); l != 2 {
|
||||
return false, fmt.Errorf("got %d evals, want 2", l)
|
||||
}
|
||||
|
||||
for _, e := range evals {
|
||||
if e.ID == eval.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
if e.Status != structs.EvalStatusPending {
|
||||
return false, fmt.Errorf("follow up eval has status %v; want %v",
|
||||
e.Status, structs.EvalStatusPending)
|
||||
}
|
||||
|
||||
if e.Wait < s1.config.EvalFailedFollowupBaselineDelay ||
|
||||
e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange {
|
||||
return false, fmt.Errorf("bad wait: %v", e.Wait)
|
||||
}
|
||||
|
||||
if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
|
||||
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
|
||||
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
|
|
@ -174,7 +174,11 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger)
|
|||
}
|
||||
|
||||
// Create an eval broker
|
||||
evalBroker, err := NewEvalBroker(config.EvalNackTimeout, config.EvalDeliveryLimit)
|
||||
evalBroker, err := NewEvalBroker(
|
||||
config.EvalNackTimeout,
|
||||
config.EvalNackInitialReenqueueDelay,
|
||||
config.EvalNackSubsequentReenqueueDelay,
|
||||
config.EvalDeliveryLimit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -3736,13 +3736,14 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
EvalTriggerJobRegister = "job-register"
|
||||
EvalTriggerJobDeregister = "job-deregister"
|
||||
EvalTriggerPeriodicJob = "periodic-job"
|
||||
EvalTriggerNodeUpdate = "node-update"
|
||||
EvalTriggerScheduled = "scheduled"
|
||||
EvalTriggerRollingUpdate = "rolling-update"
|
||||
EvalTriggerMaxPlans = "max-plan-attempts"
|
||||
EvalTriggerJobRegister = "job-register"
|
||||
EvalTriggerJobDeregister = "job-deregister"
|
||||
EvalTriggerPeriodicJob = "periodic-job"
|
||||
EvalTriggerNodeUpdate = "node-update"
|
||||
EvalTriggerScheduled = "scheduled"
|
||||
EvalTriggerRollingUpdate = "rolling-update"
|
||||
EvalTriggerFailedFollowUp = "failed-follow-up"
|
||||
EvalTriggerMaxPlans = "max-plan-attempts"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -3985,6 +3986,23 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped
|
|||
}
|
||||
}
|
||||
|
||||
// CreateFailedFollowUpEval creates a follow up evaluation when the current one
|
||||
// has been marked as failed becasue it has hit the delivery limit and will not
|
||||
// be retried by the eval_broker.
|
||||
func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
|
||||
return &Evaluation{
|
||||
ID: GenerateUUID(),
|
||||
Priority: e.Priority,
|
||||
Type: e.Type,
|
||||
TriggeredBy: EvalTriggerFailedFollowUp,
|
||||
JobID: e.JobID,
|
||||
JobModifyIndex: e.JobModifyIndex,
|
||||
Status: EvalStatusPending,
|
||||
Wait: wait,
|
||||
PreviousEval: e.ID,
|
||||
}
|
||||
}
|
||||
|
||||
// Plan is used to submit a commit plan for task allocations. These
|
||||
// are submitted to the leader which verifies that resources have
|
||||
// not been overcommitted before admiting the plan.
|
||||
|
|
Loading…
Reference in a new issue