From 6415fb4284ff93312802e80d6aa7d74831a86eee Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 16 Nov 2022 16:10:11 -0500 Subject: [PATCH] eval broker: shed all but one blocked eval per job after ack (#14621) When an evaluation is acknowledged by a scheduler, the resulting plan is guaranteed to cover up to the `waitIndex` set by the worker based on the most recent evaluation for that job in the state store. At that point, we no longer need to retain blocked evaluations in the broker that are older than that index. Move all but the highest priority / highest `ModifyIndex` blocked eval into a canceled set. When the `Eval.Ack` RPC returns from the eval broker it will signal a reap of a batch of cancelable evals to write to raft. This paces the cancelations limited by how frequently the schedulers are acknowledging evals; this should reduce the risk of cancelations from overwhelming raft relative to scheduler progress. In order to avoid straggling batches when the cluster is quiet, we also include a periodic sweep through the cancelable list. --- .changelog/14621.txt | 3 + nomad/config.go | 7 + nomad/eval_broker.go | 129 ++++++++-- nomad/eval_broker_test.go | 507 ++++++++++++++++++++++---------------- nomad/eval_endpoint.go | 6 +- nomad/leader.go | 63 +++++ nomad/server.go | 4 + nomad/worker_test.go | 54 ++-- 8 files changed, 506 insertions(+), 267 deletions(-) create mode 100644 .changelog/14621.txt diff --git a/.changelog/14621.txt b/.changelog/14621.txt new file mode 100644 index 000000000..81a5032d4 --- /dev/null +++ b/.changelog/14621.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: when multiple evaluations are pending for the same job, evaluate the latest and cancel the intermediaries on success +``` diff --git a/nomad/config.go b/nomad/config.go index 7014ef987..2bcb8714b 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -252,6 +252,12 @@ type Config struct { // retrying a failed evaluation. EvalFailedFollowupBaselineDelay time.Duration + // EvalReapCancelableInterval is the interval for the periodic reaping of + // cancelable evaluations. Cancelable evaluations are canceled whenever any + // eval is ack'd but this sweeps up on quiescent clusters. This config value + // exists only for testing. + EvalReapCancelableInterval time.Duration + // EvalFailedFollowupDelayRange defines the range of additional time from // the baseline in which to wait before retrying a failed evaluation. The // additional delay is selected from this range randomly. @@ -471,6 +477,7 @@ func DefaultConfig() *Config { EvalNackSubsequentReenqueueDelay: 20 * time.Second, EvalFailedFollowupBaselineDelay: 1 * time.Minute, EvalFailedFollowupDelayRange: 5 * time.Minute, + EvalReapCancelableInterval: 5 * time.Second, MinHeartbeatTTL: 10 * time.Second, MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e13394b17..73591e16d 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -64,7 +64,11 @@ type EvalBroker struct { jobEvals map[structs.NamespacedID]string // blocked tracks the blocked evaluations by JobID in a priority queue - blocked map[structs.NamespacedID]PendingEvaluations + blocked map[structs.NamespacedID]BlockedEvaluations + + // cancelable tracks previously blocked evaluations (for any job) that are + // now safe for the Eval.Ack RPC to cancel in batches + cancelable []*structs.Evaluation // ready tracks the ready jobs by scheduler in a priority queue ready map[string]PendingEvaluations @@ -115,11 +119,14 @@ type unackEval struct { NackTimer *time.Timer } -// PendingEvaluations is a list of waiting evaluations. -// We implement the container/heap interface so that this is a -// priority queue +// PendingEvaluations is a list of ready evaluations across multiple jobs. We +// implement the container/heap interface so that this is a priority queue. type PendingEvaluations []*structs.Evaluation +// BlockedEvaluations is a list of blocked evaluations for a given job. We +// implement the container/heap interface so that this is a priority queue. +type BlockedEvaluations []*structs.Evaluation + // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit @@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[structs.NamespacedID]PendingEvaluations), + blocked: make(map[structs.NamespacedID]BlockedEvaluations), + cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest), ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error { // Check if there are any blocked evaluations if blocked := b.blocked[namespacedID]; len(blocked) != 0 { - raw := heap.Pop(&blocked) + + // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // evaluation are no longer useful, so it's safe to drop them. + cancelable := blocked.MarkForCancel() + b.cancelable = append(b.cancelable, cancelable...) + b.stats.TotalCancelable = len(b.cancelable) + b.stats.TotalBlocked -= len(cancelable) + + // If any remain, enqueue an eval + if len(blocked) > 0 { + raw := heap.Pop(&blocked) + eval := raw.(*structs.Evaluation) + b.stats.TotalBlocked -= 1 + b.enqueueLocked(eval, eval.Type) + } + + // Clean up if there are no more after that if len(blocked) > 0 { b.blocked[namespacedID] = blocked } else { delete(b.blocked, namespacedID) } - eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 - b.enqueueLocked(eval, eval.Type) } // Re-enqueue the evaluation. @@ -733,11 +754,13 @@ func (b *EvalBroker) flush() { b.stats.TotalUnacked = 0 b.stats.TotalBlocked = 0 b.stats.TotalWaiting = 0 + b.stats.TotalCancelable = 0 b.stats.DelayedEvals = make(map[string]*structs.Evaluation) b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) - b.blocked = make(map[structs.NamespacedID]PendingEvaluations) + b.blocked = make(map[structs.NamespacedID]BlockedEvaluations) + b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest) b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) @@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats { stats.TotalUnacked = b.stats.TotalUnacked stats.TotalBlocked = b.stats.TotalBlocked stats.TotalWaiting = b.stats.TotalWaiting + stats.TotalCancelable = b.stats.TotalCancelable for id, eval := range b.stats.DelayedEvals { evalCopy := *eval stats.DelayedEvals[id] = &evalCopy @@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } +// Cancelable retrieves a batch of previously-blocked evaluations that are now +// stale and ready to mark for canceling. The eval RPC will call this with a +// batch size set to avoid sending overly large raft messages. +func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { + b.l.RLock() + defer b.l.RUnlock() + + if batchSize > len(b.cancelable) { + batchSize = len(b.cancelable) + } + + cancelable := b.cancelable[:batchSize] + b.cancelable = b.cancelable[batchSize:] + + b.stats.TotalCancelable = len(b.cancelable) + return cancelable +} + // EmitStats is used to export metrics about the broker while enabled func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) @@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked)) metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting)) + metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable)) for _, eval := range stats.DelayedEvals { metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"}, float32(time.Until(eval.WaitUntil).Seconds()), @@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { // BrokerStats returns all the stats about the broker type BrokerStats struct { - TotalReady int - TotalUnacked int - TotalBlocked int - TotalWaiting int - DelayedEvals map[string]*structs.Evaluation - ByScheduler map[string]*SchedulerStats + TotalReady int + TotalUnacked int + TotalBlocked int + TotalWaiting int + TotalCancelable int + DelayedEvals map[string]*structs.Evaluation + ByScheduler map[string]*SchedulerStats } // SchedulerStats returns the stats per scheduler @@ -934,3 +978,56 @@ func (p PendingEvaluations) Peek() *structs.Evaluation { } return p[n-1] } + +// Len is for the sorting interface +func (p BlockedEvaluations) Len() int { + return len(p) +} + +// Less is for the sorting interface. We flip the check +// so that the "min" in the min-heap is the element with the +// highest priority or highest modify index +func (p BlockedEvaluations) Less(i, j int) bool { + if p[i].Priority != p[j].Priority { + return !(p[i].Priority < p[j].Priority) + } + return !(p[i].ModifyIndex < p[j].ModifyIndex) +} + +// Swap is for the sorting interface +func (p BlockedEvaluations) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +// Push implements the heap interface and is used to add a new evaluation to the slice +func (p *BlockedEvaluations) Push(e interface{}) { + *p = append(*p, e.(*structs.Evaluation)) +} + +// Pop implements the heap interface and is used to remove an evaluation from the slice +func (p *BlockedEvaluations) Pop() interface{} { + n := len(*p) + e := (*p)[n-1] + (*p)[n-1] = nil + *p = (*p)[:n-1] + return e +} + +// MarkForCancel is used to clear the blocked list of all but the one with the +// highest modify index and highest priority. It returns a slice of cancelable +// evals so that Eval.Ack RPCs can write batched raft entries to cancel +// them. This must be called inside the broker's lock. +func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation { + + // In pathological cases, we can have a large number of blocked evals but + // will want to cancel most of them. Using heap.Remove requires we re-sort + // for each eval we remove. Because we expect to have at most one remaining, + // we'll just create a new heap. + retain := BlockedEvaluations{(heap.Pop(p)).(*structs.Evaluation)} + + cancelable := make([]*structs.Evaluation, len(*p)) + copy(cancelable, *p) + + *p = retain + return cancelable +} diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 3b0988eae..c46f8bf4a 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,16 +1,21 @@ package nomad import ( + "container/heap" "encoding/json" "errors" "fmt" "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -393,236 +398,107 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { ns1 := "namespace-one" ns2 := "namespace-two" - eval := mock.Eval() - eval.Namespace = ns1 - b.Enqueue(eval) + jobID := "example" - eval2 := mock.Eval() - eval2.JobID = eval.JobID - eval2.Namespace = ns1 - eval2.CreateIndex = eval.CreateIndex + 1 - b.Enqueue(eval2) - - eval3 := mock.Eval() - eval3.JobID = eval.JobID - eval3.Namespace = ns1 - eval3.CreateIndex = eval.CreateIndex + 2 - b.Enqueue(eval3) - - eval4 := mock.Eval() - eval4.JobID = eval.JobID - eval4.Namespace = ns2 - eval4.CreateIndex = eval.CreateIndex + 3 - b.Enqueue(eval4) - - eval5 := mock.Eval() - eval5.JobID = eval.JobID - eval5.Namespace = ns2 - eval5.CreateIndex = eval.CreateIndex + 4 - b.Enqueue(eval5) - - stats := b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) + newEval := func(idx uint64, ns string) *structs.Evaluation { + eval := mock.Eval() + eval.ID = fmt.Sprintf("eval:%d", idx) + eval.JobID = jobID + eval.Namespace = ns + eval.CreateIndex = idx + eval.ModifyIndex = idx + b.Enqueue(eval) + return eval } - // Dequeue should work + // first job + eval1 := newEval(1, ns1) + newEval(2, ns1) + newEval(3, ns1) + eval4 := newEval(4, ns1) + + // second job + eval5 := newEval(5, ns2) + newEval(6, ns2) + eval7 := newEval(7, ns2) + + // retreive the stats from the broker, less some stats that aren't + // interesting for this test and make the test much more verbose + // to include + getStats := func() BrokerStats { + t.Helper() + stats := b.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats + } + + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) + + // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 3 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 5, TotalCancelable: 0}, getStats()) - // Ack out - err = b.Ack(eval.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + // Current wait index should be 4 but Ack to exercise behavior + // when worker's Eval.getWaitIndex gets a stale index + err = b.Ack(eval1.ID, token) + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // eval4 and eval5 are ready + // eval6 and eval7 are blocked + // Dequeue should get 4th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval2 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 2 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Ack out - err = b.Ack(eval2.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 2 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval3 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out - err = b.Ack(eval3.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Dequeue should work - out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval4 { - t.Fatalf("bad : %#v", out) - } - - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 1 { - t.Fatalf("bad: %#v", stats) - } - - // Ack out + // Ack should clear the rest of namespace-one blocked but leave + // namespace-two untouched err = b.Ack(eval4.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Dequeue should work + // Dequeue should get 5th eval out, token, err = b.Dequeue(defaultSched, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != eval5 { - t.Fatalf("bad : %#v", out) - } + must.NoError(t, err) + must.Eq(t, out, eval5, must.Sprint("expected 5th eval")) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 2, TotalCancelable: 2}, getStats()) - // Ack out + // Ack should clear remaining namespace-two blocked evals err = b.Ack(eval5.ID, token) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) - // Check the stats - stats = b.Stats() - if stats.TotalReady != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalBlocked != 0 { - t.Fatalf("bad: %#v", stats) - } + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) + + // Dequeue should get 7th eval because that's all that's left + out, token, err = b.Dequeue(defaultSched, time.Second) + must.NoError(t, err) + must.Eq(t, out, eval7, must.Sprint("expected 7th eval")) + + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) + + // Last ack should leave the broker empty except for cancels + err = b.Ack(eval7.ID, token) + must.NoError(t, err) + + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 3}, getStats()) } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -813,18 +689,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b.SetEnabled(true) NUM := 100 - for i := 0; i < NUM; i++ { + for i := NUM; i > 0; i-- { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } - for i := 0; i < NUM; i++ { + for i := 1; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) - if out1.CreateIndex != uint64(i) { - t.Fatalf("bad: %d %#v", i, out1) - } + must.Eq(t, uint64(i), out1.CreateIndex, + must.Sprintf("eval was not FIFO by CreateIndex"), + ) } } @@ -1506,3 +1382,202 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Equal(1, len(b.blocked)) } + +func TestEvalBroker_PendingEvals_Ordering(t *testing.T) { + + ready := PendingEvaluations{} + + newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.JobID = jobID + eval.ID = evalID + eval.Priority = priority + eval.CreateIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&ready, newEval("example1", "eval01", 50, 1)) + heap.Push(&ready, newEval("example3", "eval03", 70, 3)) + heap.Push(&ready, newEval("example2", "eval02", 50, 2)) + + next := heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval03", next.ID, + test.Sprint("expected highest Priority to be next ready")) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval01", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + + heap.Push(&ready, newEval("example4", "eval04", 50, 4)) + + next = heap.Pop(&ready).(*structs.Evaluation) + test.Eq(t, "eval02", next.ID, + test.Sprint("expected oldest CreateIndex to be next ready")) + +} + +func TestEvalBroker_BlockedEval_Ordering(t *testing.T) { + blocked := BlockedEvaluations{} + + newEval := func(evalID string, priority int, index uint64) *structs.Evaluation { + eval := mock.Eval() + eval.ID = evalID + eval.Priority = priority + eval.ModifyIndex = uint64(index) + return eval + } + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + heap.Push(&blocked, newEval("eval03", 50, 3)) + heap.Push(&blocked, newEval("eval02", 100, 2)) + heap.Push(&blocked, newEval("eval01", 50, 1)) + + unblocked := heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval02", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval03", unblocked.ID, + test.Sprint("expected eval with highest modify index to get unblocked")) + + heap.Push(&blocked, newEval("eval04", 30, 4)) + unblocked = heap.Pop(&blocked).(*structs.Evaluation) + test.Eq(t, "eval01", unblocked.ID, + test.Sprint("expected eval with highest priority to get unblocked")) + +} + +func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { + ci.Parallel(t) + + blocked := BlockedEvaluations{} + + // note: we're intentionally pushing these out-of-order to assert we're + // getting them back out in the intended order and not just as inserted + for i := 100; i > 0; i -= 10 { + eval := mock.Eval() + eval.JobID = "example" + eval.CreateIndex = uint64(i) + eval.ModifyIndex = uint64(i) + heap.Push(&blocked, eval) + } + + canceled := blocked.MarkForCancel() + must.Eq(t, 9, len(canceled)) + must.Eq(t, 1, blocked.Len()) + + raw := heap.Pop(&blocked) + must.NotNil(t, raw) + eval := raw.(*structs.Evaluation) + must.Eq(t, 100, eval.ModifyIndex) +} + +// TestEvalBroker_IntegrationTest exercises the eval broker with realistic +// workflows +func TestEvalBroker_IntegrationTest(t *testing.T) { + ci.Parallel(t) + + srv, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent dequeue + c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up + }) + + defer cleanupS1() + testutil.WaitForLeader(t, srv.RPC) + + codec := rpcClient(t, srv) + store := srv.fsm.State() + + // create a system job, a node for it to run on, and a set of node up/down + // events that will result in evaluations queued. + + job := mock.SystemJob() + jobReq := &structs.JobRegisterRequest{ + Job: job, + EvalPriority: 50, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var jobResp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + must.NoError(t, err) + + node := mock.Node() + nodeReq := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp) + must.NoError(t, err) + + for i := 0; i < 10; i++ { + status := structs.NodeStatusDown + if i%2 == 0 { + status = structs.NodeStatusReady + } + statusReq := &structs.NodeUpdateStatusRequest{ + NodeID: node.ID, + Status: status, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var statusResp structs.NodeUpdateResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp) + must.NoError(t, err) + } + + // ensure we have the expected number of evaluations and eval broker state + + // retreive the stats from the broker, less some uninteresting ones + getStats := func() BrokerStats { + t.Helper() + stats := srv.evalBroker.Stats() + stats.DelayedEvals = nil + stats.ByScheduler = nil + return *stats + } + + getEvalStatuses := func() map[string]int { + t.Helper() + statuses := map[string]int{} + iter, err := store.Evals(nil, state.SortDefault) + must.NoError(t, err) + for { + raw := iter.Next() + if raw == nil { + break + } + eval := raw.(*structs.Evaluation) + statuses[eval.Status] += 1 + if eval.Status == structs.EvalStatusCancelled { + must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription) + } + } + return statuses + } + + must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses()) + must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, + TotalBlocked: 10, TotalCancelable: 0}, getStats()) + + // start schedulers: all the evals are for a single job so there should only + // be one eval processesed at a time no matter how many schedulers we run + + config := DefaultConfig() + config.NumSchedulers = 4 + config.EvalReapCancelableInterval = time.Minute * 10 + require.NoError(t, srv.Reload(config)) + + // assert that all but 2 evals were canceled and that the eval broker state + // has been cleared + + require.Eventually(t, func() bool { + got := getEvalStatuses() + return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9 + }, 2*time.Second, time.Millisecond*100) + + must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, + TotalBlocked: 0, TotalCancelable: 0}, getStats()) +} diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index ed5dc15ad..0413ebc4f 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -233,7 +233,11 @@ func (e *Eval) Ack(args *structs.EvalAckRequest, if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } - return nil + + // It's not necessary to cancel evals before Ack returns, but it's done here + // to commit canceled evals as close to the Ack'd eval being committed as + // possible. + return cancelCancelableEvals(e.srv) } // Nack is used to negative acknowledge completion of a dequeued evaluation. diff --git a/nomad/leader.go b/nomad/leader.go index b6b2895fa..b15285d13 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -365,6 +365,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Reap any duplicate blocked evaluations go s.reapDupBlockedEvaluations(stopCh) + // Reap any cancelable evaluations + s.reapCancelableEvalsCh = s.reapCancelableEvaluations(stopCh) + // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) @@ -992,6 +995,66 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) { } } +// reapCancelableEvaluations is used to reap evaluations that were marked +// cancelable by the eval broker and should be canceled. These get swept up +// whenever an eval Acks, but this ensures that we don't have a straggling batch +// when the cluster doesn't have any more work to do. Returns a wake-up channel +// that can be used to trigger a new reap without waiting for the timer +func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} { + + wakeCh := make(chan struct{}, 1) + go func() { + + timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval) + defer cancel() + for { + select { + case <-stopCh: + return + case <-wakeCh: + cancelCancelableEvals(s) + case <-timer.C: + cancelCancelableEvals(s) + timer.Reset(s.config.EvalReapCancelableInterval) + } + } + }() + + return wakeCh +} + +// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval +// broker and updates their status to canceled. +func cancelCancelableEvals(srv *Server) error { + + const cancelDesc = "canceled after more recent eval was processed" + + // We *can* send larger raft logs but rough benchmarks show that a smaller + // page size strikes a balance between throughput and time we block the FSM + // apply for other operations + cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10) + if len(cancelable) > 0 { + for i, eval := range cancelable { + eval = eval.Copy() + eval.Status = structs.EvalStatusCancelled + eval.StatusDescription = cancelDesc + eval.UpdateModifyTime() + cancelable[i] = eval + } + + update := &structs.EvalUpdateRequest{ + Evals: cancelable, + WriteRequest: structs.WriteRequest{Region: srv.Region()}, + } + _, _, err := srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + srv.logger.Warn("eval cancel failed", "error", err, "method", "ack") + return err + } + } + return nil +} + // periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations. func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { ticker := time.NewTicker(failedEvalUnblockInterval) diff --git a/nomad/server.go b/nomad/server.go index 40fee1b4f..a00781d3a 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -206,6 +206,9 @@ type Server struct { // transitions to collide and create inconsistent state. brokerLock sync.Mutex + // reapCancelableEvalsCh is used to signal the cancelable evals reaper to wake up + reapCancelableEvalsCh chan struct{} + // deploymentWatcher is used to watch deployments and their allocations and // make the required calls to continue to transition the deployment. deploymentWatcher *deploymentwatcher.Watcher @@ -362,6 +365,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr readyForConsistentReads: &atomic.Bool{}, eventCh: make(chan serf.Event, 256), evalBroker: evalBroker, + reapCancelableEvalsCh: make(chan struct{}), blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a72304db9..9d4131e2d 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/testlog" @@ -118,9 +119,10 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { eval2.JobID = eval1.JobID // Insert the evals into the state store - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})) + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval2})) s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval2) @@ -131,45 +133,29 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != eval1.ModifyIndex { - t.Fatalf("bad wait index; got %d; want %d", waitIndex, eval1.ModifyIndex) - } - - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval1) { - t.Fatalf("bad: %#v %#v", eval, eval1) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.NotEq(t, eval1.ModifyIndex, waitIndex, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval1) // Update the modify index of the first eval - if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval1}); err != nil { - t.Fatal(err) - } + must.NoError(t, s1.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 1500, []*structs.Evaluation{eval1})) // Send the Ack w.sendAck(eval1, token) - // Attempt second dequeue + // Attempt second dequeue; it should succeed because the 2nd eval has a + // lower modify index than the snapshot used to schedule the 1st + // eval. Normally this can only happen if the worker is on a follower that's + // trailing behind in raft logs eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) - if shutdown { - t.Fatalf("should not shutdown") - } - if token == "" { - t.Fatalf("should get token") - } - if waitIndex != 2000 { - t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) - } - // Ensure we get a sane eval - if !reflect.DeepEqual(eval, eval2) { - t.Fatalf("bad: %#v %#v", eval, eval2) - } + must.False(t, shutdown, must.Sprint("should not be shutdown")) + must.NotEq(t, token, "", must.Sprint("should get a token")) + must.Eq(t, waitIndex, 2000, must.Sprintf("bad wait index")) + must.Eq(t, eval, eval2) + } func TestWorker_dequeueEvaluation_paused(t *testing.T) {