eval broker: shed all but one blocked eval per job after ack (#14621)

When an evaluation is acknowledged by a scheduler, the resulting plan is
guaranteed to cover up to the `waitIndex` set by the worker based on the most
recent evaluation for that job in the state store. At that point, we no longer
need to retain blocked evaluations in the broker that are older than that index.

Move all but the highest priority / highest `ModifyIndex` blocked eval into a
canceled set. When the `Eval.Ack` RPC returns from the eval broker it will
signal a reap of a batch of cancelable evals to write to raft. This paces the
cancelations limited by how frequently the schedulers are acknowledging evals;
this should reduce the risk of cancelations from overwhelming raft relative to
scheduler progress. In order to avoid straggling batches when the cluster is
quiet, we also include a periodic sweep through the cancelable list.
This commit is contained in:
Tim Gross 2022-11-16 16:10:11 -05:00 committed by GitHub
parent 45ff0765c7
commit 6415fb4284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 506 additions and 267 deletions

3
.changelog/14621.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
scheduler: when multiple evaluations are pending for the same job, evaluate the latest and cancel the intermediaries on success
```

View File

@ -252,6 +252,12 @@ type Config struct {
// retrying a failed evaluation.
EvalFailedFollowupBaselineDelay time.Duration
// EvalReapCancelableInterval is the interval for the periodic reaping of
// cancelable evaluations. Cancelable evaluations are canceled whenever any
// eval is ack'd but this sweeps up on quiescent clusters. This config value
// exists only for testing.
EvalReapCancelableInterval time.Duration
// EvalFailedFollowupDelayRange defines the range of additional time from
// the baseline in which to wait before retrying a failed evaluation. The
// additional delay is selected from this range randomly.
@ -471,6 +477,7 @@ func DefaultConfig() *Config {
EvalNackSubsequentReenqueueDelay: 20 * time.Second,
EvalFailedFollowupBaselineDelay: 1 * time.Minute,
EvalFailedFollowupDelayRange: 5 * time.Minute,
EvalReapCancelableInterval: 5 * time.Second,
MinHeartbeatTTL: 10 * time.Second,
MaxHeartbeatsPerSecond: 50.0,
HeartbeatGrace: 10 * time.Second,

View File

@ -64,7 +64,11 @@ type EvalBroker struct {
jobEvals map[structs.NamespacedID]string
// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]PendingEvaluations
blocked map[structs.NamespacedID]BlockedEvaluations
// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable []*structs.Evaluation
// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
@ -115,11 +119,14 @@ type unackEval struct {
NackTimer *time.Timer
}
// PendingEvaluations is a list of waiting evaluations.
// We implement the container/heap interface so that this is a
// priority queue
// PendingEvaluations is a list of ready evaluations across multiple jobs. We
// implement the container/heap interface so that this is a priority queue.
type PendingEvaluations []*structs.Evaluation
// BlockedEvaluations is a list of blocked evaluations for a given job. We
// implement the container/heap interface so that this is a priority queue.
type BlockedEvaluations []*structs.Evaluation
// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
@ -139,7 +146,8 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
@ -586,15 +594,28 @@ func (b *EvalBroker) Ack(evalID, token string) error {
// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
raw := heap.Pop(&blocked)
// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel()
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalCancelable = len(b.cancelable)
b.stats.TotalBlocked -= len(cancelable)
// If any remain, enqueue an eval
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}
// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}
// Re-enqueue the evaluation.
@ -733,11 +754,13 @@ func (b *EvalBroker) flush() {
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.TotalCancelable = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]PendingEvaluations)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest)
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
@ -830,6 +853,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
stats.TotalCancelable = b.stats.TotalCancelable
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
@ -841,6 +865,24 @@ func (b *EvalBroker) Stats() *BrokerStats {
return stats
}
// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
}
cancelable := b.cancelable[:batchSize]
b.cancelable = b.cancelable[batchSize:]
b.stats.TotalCancelable = len(b.cancelable)
return cancelable
}
// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
@ -856,6 +898,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
@ -878,12 +921,13 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
TotalCancelable int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}
// SchedulerStats returns the stats per scheduler
@ -934,3 +978,56 @@ func (p PendingEvaluations) Peek() *structs.Evaluation {
}
return p[n-1]
}
// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}
// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority or highest modify index
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].ModifyIndex < p[j].ModifyIndex)
}
// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// Push implements the heap interface and is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}
// Pop implements the heap interface and is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}
// MarkForCancel is used to clear the blocked list of all but the one with the
// highest modify index and highest priority. It returns a slice of cancelable
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
// them. This must be called inside the broker's lock.
func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation {
// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have at most one remaining,
// we'll just create a new heap.
retain := BlockedEvaluations{(heap.Pop(p)).(*structs.Evaluation)}
cancelable := make([]*structs.Evaluation, len(*p))
copy(cancelable, *p)
*p = retain
return cancelable
}

View File

@ -1,16 +1,21 @@
package nomad
import (
"container/heap"
"encoding/json"
"errors"
"fmt"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@ -393,236 +398,107 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
ns1 := "namespace-one"
ns2 := "namespace-two"
eval := mock.Eval()
eval.Namespace = ns1
b.Enqueue(eval)
jobID := "example"
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.Namespace = ns1
eval2.CreateIndex = eval.CreateIndex + 1
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.JobID = eval.JobID
eval3.Namespace = ns1
eval3.CreateIndex = eval.CreateIndex + 2
b.Enqueue(eval3)
eval4 := mock.Eval()
eval4.JobID = eval.JobID
eval4.Namespace = ns2
eval4.CreateIndex = eval.CreateIndex + 3
b.Enqueue(eval4)
eval5 := mock.Eval()
eval5.JobID = eval.JobID
eval5.Namespace = ns2
eval5.CreateIndex = eval.CreateIndex + 4
b.Enqueue(eval5)
stats := b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 3 {
t.Fatalf("bad: %#v", stats)
newEval := func(idx uint64, ns string) *structs.Evaluation {
eval := mock.Eval()
eval.ID = fmt.Sprintf("eval:%d", idx)
eval.JobID = jobID
eval.Namespace = ns
eval.CreateIndex = idx
eval.ModifyIndex = idx
b.Enqueue(eval)
return eval
}
// Dequeue should work
// first job
eval1 := newEval(1, ns1)
newEval(2, ns1)
newEval(3, ns1)
eval4 := newEval(4, ns1)
// second job
eval5 := newEval(5, ns2)
newEval(6, ns2)
eval7 := newEval(7, ns2)
// retreive the stats from the broker, less some stats that aren't
// interesting for this test and make the test much more verbose
// to include
getStats := func() BrokerStats {
t.Helper()
stats := b.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalBlocked: 5, TotalCancelable: 0}, getStats())
// Dequeue should get 1st eval
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval1, must.Sprint("expected 1st eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 3 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalBlocked: 5, TotalCancelable: 0}, getStats())
// Ack out
err = b.Ack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
// Current wait index should be 4 but Ack to exercise behavior
// when worker's Eval.getWaitIndex gets a stale index
err = b.Ack(eval1.ID, token)
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Dequeue should work
// eval4 and eval5 are ready
// eval6 and eval7 are blocked
// Dequeue should get 4th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval2 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval4, must.Sprint("expected 4th eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 2 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Ack out
err = b.Ack(eval2.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 2 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval3 {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack out
err = b.Ack(eval3.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval4 {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack out
// Ack should clear the rest of namespace-one blocked but leave
// namespace-two untouched
err = b.Ack(eval4.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Dequeue should work
// Dequeue should get 5th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval5 {
t.Fatalf("bad : %#v", out)
}
must.NoError(t, err)
must.Eq(t, out, eval5, must.Sprint("expected 5th eval"))
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Ack out
// Ack should clear remaining namespace-two blocked evals
err = b.Ack(eval5.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalBlocked != 0 {
t.Fatalf("bad: %#v", stats)
}
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
// Dequeue should get 7th eval because that's all that's left
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval7, must.Sprint("expected 7th eval"))
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
// Last ack should leave the broker empty except for cancels
err = b.Ack(eval7.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
}
func TestEvalBroker_Enqueue_Disable(t *testing.T) {
@ -813,18 +689,18 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
b.SetEnabled(true)
NUM := 100
for i := 0; i < NUM; i++ {
for i := NUM; i > 0; i-- {
eval1 := mock.Eval()
eval1.CreateIndex = uint64(i)
eval1.ModifyIndex = uint64(i)
b.Enqueue(eval1)
}
for i := 0; i < NUM; i++ {
for i := 1; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1.CreateIndex != uint64(i) {
t.Fatalf("bad: %d %#v", i, out1)
}
must.Eq(t, uint64(i), out1.CreateIndex,
must.Sprintf("eval was not FIFO by CreateIndex"),
)
}
}
@ -1506,3 +1382,202 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) {
require.Equal(1, len(b.blocked))
}
func TestEvalBroker_PendingEvals_Ordering(t *testing.T) {
ready := PendingEvaluations{}
newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.JobID = jobID
eval.ID = evalID
eval.Priority = priority
eval.CreateIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&ready, newEval("example1", "eval01", 50, 1))
heap.Push(&ready, newEval("example3", "eval03", 70, 3))
heap.Push(&ready, newEval("example2", "eval02", 50, 2))
next := heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval03", next.ID,
test.Sprint("expected highest Priority to be next ready"))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval01", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
heap.Push(&ready, newEval("example4", "eval04", 50, 4))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval02", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
}
func TestEvalBroker_BlockedEval_Ordering(t *testing.T) {
blocked := BlockedEvaluations{}
newEval := func(evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.ID = evalID
eval.Priority = priority
eval.ModifyIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&blocked, newEval("eval03", 50, 3))
heap.Push(&blocked, newEval("eval02", 100, 2))
heap.Push(&blocked, newEval("eval01", 50, 1))
unblocked := heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval02", unblocked.ID,
test.Sprint("expected eval with highest priority to get unblocked"))
unblocked = heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval03", unblocked.ID,
test.Sprint("expected eval with highest modify index to get unblocked"))
heap.Push(&blocked, newEval("eval04", 30, 4))
unblocked = heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval01", unblocked.ID,
test.Sprint("expected eval with highest priority to get unblocked"))
}
func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) {
ci.Parallel(t)
blocked := BlockedEvaluations{}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
for i := 100; i > 0; i -= 10 {
eval := mock.Eval()
eval.JobID = "example"
eval.CreateIndex = uint64(i)
eval.ModifyIndex = uint64(i)
heap.Push(&blocked, eval)
}
canceled := blocked.MarkForCancel()
must.Eq(t, 9, len(canceled))
must.Eq(t, 1, blocked.Len())
raw := heap.Pop(&blocked)
must.NotNil(t, raw)
eval := raw.(*structs.Evaluation)
must.Eq(t, 100, eval.ModifyIndex)
}
// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
// workflows
func TestEvalBroker_IntegrationTest(t *testing.T) {
ci.Parallel(t)
srv, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent dequeue
c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up
})
defer cleanupS1()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
store := srv.fsm.State()
// create a system job, a node for it to run on, and a set of node up/down
// events that will result in evaluations queued.
job := mock.SystemJob()
jobReq := &structs.JobRegisterRequest{
Job: job,
EvalPriority: 50,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var jobResp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)
node := mock.Node()
nodeReq := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp)
must.NoError(t, err)
for i := 0; i < 10; i++ {
status := structs.NodeStatusDown
if i%2 == 0 {
status = structs.NodeStatusReady
}
statusReq := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: status,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var statusResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp)
must.NoError(t, err)
}
// ensure we have the expected number of evaluations and eval broker state
// retreive the stats from the broker, less some uninteresting ones
getStats := func() BrokerStats {
t.Helper()
stats := srv.evalBroker.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
getEvalStatuses := func() map[string]int {
t.Helper()
statuses := map[string]int{}
iter, err := store.Evals(nil, state.SortDefault)
must.NoError(t, err)
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
statuses[eval.Status] += 1
if eval.Status == structs.EvalStatusCancelled {
must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription)
}
}
return statuses
}
must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses())
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 10, TotalCancelable: 0}, getStats())
// start schedulers: all the evals are for a single job so there should only
// be one eval processesed at a time no matter how many schedulers we run
config := DefaultConfig()
config.NumSchedulers = 4
config.EvalReapCancelableInterval = time.Minute * 10
require.NoError(t, srv.Reload(config))
// assert that all but 2 evals were canceled and that the eval broker state
// has been cleared
require.Eventually(t, func() bool {
got := getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
}, 2*time.Second, time.Millisecond*100)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 0}, getStats())
}

View File

@ -233,7 +233,11 @@ func (e *Eval) Ack(args *structs.EvalAckRequest,
if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil {
return err
}
return nil
// It's not necessary to cancel evals before Ack returns, but it's done here
// to commit canceled evals as close to the Ack'd eval being committed as
// possible.
return cancelCancelableEvals(e.srv)
}
// Nack is used to negative acknowledge completion of a dequeued evaluation.

View File

@ -365,6 +365,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Reap any duplicate blocked evaluations
go s.reapDupBlockedEvaluations(stopCh)
// Reap any cancelable evaluations
s.reapCancelableEvalsCh = s.reapCancelableEvaluations(stopCh)
// Periodically unblock failed allocations
go s.periodicUnblockFailedEvals(stopCh)
@ -992,6 +995,66 @@ func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
}
}
// reapCancelableEvaluations is used to reap evaluations that were marked
// cancelable by the eval broker and should be canceled. These get swept up
// whenever an eval Acks, but this ensures that we don't have a straggling batch
// when the cluster doesn't have any more work to do. Returns a wake-up channel
// that can be used to trigger a new reap without waiting for the timer
func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {
wakeCh := make(chan struct{}, 1)
go func() {
timer, cancel := helper.NewSafeTimer(s.config.EvalReapCancelableInterval)
defer cancel()
for {
select {
case <-stopCh:
return
case <-wakeCh:
cancelCancelableEvals(s)
case <-timer.C:
cancelCancelableEvals(s)
timer.Reset(s.config.EvalReapCancelableInterval)
}
}
}()
return wakeCh
}
// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
// broker and updates their status to canceled.
func cancelCancelableEvals(srv *Server) error {
const cancelDesc = "canceled after more recent eval was processed"
// We *can* send larger raft logs but rough benchmarks show that a smaller
// page size strikes a balance between throughput and time we block the FSM
// apply for other operations
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
if len(cancelable) > 0 {
for i, eval := range cancelable {
eval = eval.Copy()
eval.Status = structs.EvalStatusCancelled
eval.StatusDescription = cancelDesc
eval.UpdateModifyTime()
cancelable[i] = eval
}
update := &structs.EvalUpdateRequest{
Evals: cancelable,
WriteRequest: structs.WriteRequest{Region: srv.Region()},
}
_, _, err := srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
srv.logger.Warn("eval cancel failed", "error", err, "method", "ack")
return err
}
}
return nil
}
// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) {
ticker := time.NewTicker(failedEvalUnblockInterval)

View File

@ -206,6 +206,9 @@ type Server struct {
// transitions to collide and create inconsistent state.
brokerLock sync.Mutex
// reapCancelableEvalsCh is used to signal the cancelable evals reaper to wake up
reapCancelableEvalsCh chan struct{}
// deploymentWatcher is used to watch deployments and their allocations and
// make the required calls to continue to transition the deployment.
deploymentWatcher *deploymentwatcher.Watcher
@ -362,6 +365,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,

View File

@ -11,6 +11,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/testlog"
@ -118,9 +119,10 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
eval2.JobID = eval1.JobID
// Insert the evals into the state store
if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1, eval2}); err != nil {
t.Fatal(err)
}
must.NoError(t, s1.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1}))
must.NoError(t, s1.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval2}))
s1.evalBroker.Enqueue(eval1)
s1.evalBroker.Enqueue(eval2)
@ -131,45 +133,29 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
// Attempt dequeue
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", waitIndex, eval1.ModifyIndex)
}
// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval1) {
t.Fatalf("bad: %#v %#v", eval, eval1)
}
must.False(t, shutdown, must.Sprint("should not be shutdown"))
must.NotEq(t, token, "", must.Sprint("should get a token"))
must.NotEq(t, eval1.ModifyIndex, waitIndex, must.Sprintf("bad wait index"))
must.Eq(t, eval, eval1)
// Update the modify index of the first eval
if err := s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 2000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}
must.NoError(t, s1.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 1500, []*structs.Evaluation{eval1}))
// Send the Ack
w.sendAck(eval1, token)
// Attempt second dequeue
// Attempt second dequeue; it should succeed because the 2nd eval has a
// lower modify index than the snapshot used to schedule the 1st
// eval. Normally this can only happen if the worker is on a follower that's
// trailing behind in raft logs
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
if shutdown {
t.Fatalf("should not shutdown")
}
if token == "" {
t.Fatalf("should get token")
}
if waitIndex != 2000 {
t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex)
}
// Ensure we get a sane eval
if !reflect.DeepEqual(eval, eval2) {
t.Fatalf("bad: %#v %#v", eval, eval2)
}
must.False(t, shutdown, must.Sprint("should not be shutdown"))
must.NotEq(t, token, "", must.Sprint("should get a token"))
must.Eq(t, waitIndex, 2000, must.Sprintf("bad wait index"))
must.Eq(t, eval, eval2)
}
func TestWorker_dequeueEvaluation_paused(t *testing.T) {