open-nomad/nomad/eval_broker.go
Tim Gross 6415fb4284
eval broker: shed all but one blocked eval per job after ack (#14621)
When an evaluation is acknowledged by a scheduler, the resulting plan is
guaranteed to cover up to the `waitIndex` set by the worker based on the most
recent evaluation for that job in the state store. At that point, we no longer
need to retain blocked evaluations in the broker that are older than that index.

Move all but the highest priority / highest `ModifyIndex` blocked eval into a
canceled set. When the `Eval.Ack` RPC returns from the eval broker it will
signal a reap of a batch of cancelable evals to write to raft. This paces the
cancelations limited by how frequently the schedulers are acknowledging evals;
this should reduce the risk of cancelations from overwhelming raft relative to
scheduler progress. In order to avoid straggling batches when the cluster is
quiet, we also include a periodic sweep through the cancelable list.
2022-11-16 16:10:11 -05:00

1034 lines
29 KiB
Go

package nomad
import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/broker"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// failedQueue is the queue we add Evaluations to once
// they've reached the deliveryLimit. This allows the leader to
// set the status to failed.
failedQueue = "_failed"
)
var (
// ErrNotOutstanding is returned if an evaluation is not outstanding
ErrNotOutstanding = errors.New("evaluation is not outstanding")
// ErrTokenMismatch is the outstanding eval has a different token
ErrTokenMismatch = errors.New("evaluation token does not match")
// ErrNackTimeoutReached is returned if an expired evaluation is reset
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
)
// EvalBroker is used to manage brokering of evaluations. When an evaluation is
// created, due to a change in a job specification or a node, we put it into the
// broker. The broker sorts by evaluations by priority and scheduler type. This
// allows us to dequeue the highest priority work first, while also allowing sub-schedulers
// to only dequeue work they know how to handle. The broker is designed to be entirely
// in-memory and is managed by the leader node.
//
// The broker must provide at-least-once delivery semantics. It relies on explicit
// Ack/Nack messages to handle this. If a delivery is not Ack'd in a sufficient time
// span, it will be assumed Nack'd.
type EvalBroker struct {
nackTimeout time.Duration
deliveryLimit int
enabled bool
enabledNotifier *broker.GenericNotifier
stats *BrokerStats
// evals tracks queued evaluations by ID to de-duplicate enqueue.
// The counter is the number of times we've attempted delivery,
// and is used to eventually fail an evaluation.
evals map[string]int
// jobEvals tracks queued evaluations by a job's ID and namespace to serialize them
jobEvals map[structs.NamespacedID]string
// blocked tracks the blocked evaluations by JobID in a priority queue
blocked map[structs.NamespacedID]BlockedEvaluations
// cancelable tracks previously blocked evaluations (for any job) that are
// now safe for the Eval.Ack RPC to cancel in batches
cancelable []*structs.Evaluation
// ready tracks the ready jobs by scheduler in a priority queue
ready map[string]PendingEvaluations
// unack is a map of evalID to an un-acknowledged evaluation
unack map[string]*unackEval
// waiting is used to notify on a per-scheduler basis of ready work
waiting map[string]chan struct{}
// requeue tracks evaluations that need to be re-enqueued once the current
// evaluation finishes by token. If the token is Nacked or rejected the
// evaluation is dropped but if Acked successfully, the evaluation is
// queued.
requeue map[string]*structs.Evaluation
// timeWait has evaluations that are waiting for time to elapse
timeWait map[string]*time.Timer
// delayedEvalCancelFunc is used to stop the long running go routine
// that processes delayed evaluations
delayedEvalCancelFunc context.CancelFunc
// delayHeap is a heap used to track incoming evaluations that are
// not eligible to enqueue until their WaitTime
delayHeap *delayheap.DelayHeap
// delayedEvalsUpdateCh is used to trigger notifications for updates
// to the delayHeap
delayedEvalsUpdateCh chan struct{}
// initialNackDelay is the delay applied before re-enqueuing a
// Nacked evaluation for the first time.
initialNackDelay time.Duration
// subsequentNackDelay is the delay applied before reenqueuing
// an evaluation that has been Nacked more than once. This delay is
// compounding after the first Nack.
subsequentNackDelay time.Duration
l sync.RWMutex
}
// unackEval tracks an unacknowledged evaluation along with the Nack timer
type unackEval struct {
Eval *structs.Evaluation
Token string
NackTimer *time.Timer
}
// PendingEvaluations is a list of ready evaluations across multiple jobs. We
// implement the container/heap interface so that this is a priority queue.
type PendingEvaluations []*structs.Evaluation
// BlockedEvaluations is a list of blocked evaluations for a given job. We
// implement the container/heap interface so that this is a priority queue.
type BlockedEvaluations []*structs.Evaluation
// NewEvalBroker creates a new evaluation broker. This is parameterized
// with the timeout used for messages that are not acknowledged before we
// assume a Nack and attempt to redeliver as well as the deliveryLimit
// which prevents a failing eval from being endlessly delivered. The
// initialNackDelay is the delay before making a Nacked evaluation available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]BlockedEvaluations),
cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
delayHeap: delayheap.NewDelayHeap(),
delayedEvalsUpdateCh: make(chan struct{}, 1),
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
return b, nil
}
// Enabled is used to check if the broker is enabled.
func (b *EvalBroker) Enabled() bool {
b.l.RLock()
defer b.l.RUnlock()
return b.enabled
}
// SetEnabled is used to control if the broker is enabled. The broker
// should only be enabled on the active leader.
func (b *EvalBroker) SetEnabled(enabled bool) {
b.l.Lock()
defer b.l.Unlock()
prevEnabled := b.enabled
b.enabled = enabled
if !prevEnabled && enabled {
// start the go routine for delayed evals
ctx, cancel := context.WithCancel(context.Background())
b.delayedEvalCancelFunc = cancel
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
}
if !enabled {
b.flush()
}
// Notify all subscribers to state changes of the broker enabled value.
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
}
// Enqueue is used to enqueue a new evaluation
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
b.processEnqueue(eval, "")
}
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
// that are being re-enqueued to include their token.
//
// When requeuing an evaluation that potentially may be already
// enqueued. The evaluation is handled in one of the following ways:
// * Evaluation not outstanding: Process as a normal Enqueue
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
// - Ack received: Unblock the evaluation allowing it to be dequeued
// - Nack received: Drop the evaluation as it was created as a result of a
// scheduler run that was Nack'd
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
// The lock needs to be held until all evaluations are enqueued. This is so
// that when Dequeue operations are unblocked they will pick the highest
// priority evaluations.
b.l.Lock()
defer b.l.Unlock()
for eval, token := range evals {
b.processEnqueue(eval, token)
}
}
// processEnqueue deduplicates evals and either enqueue immediately or enforce
// the evals wait time. If the token is passed, and the evaluation ID is
// outstanding, the evaluation is blocked until an Ack/Nack is received.
// processEnqueue must be called with the lock held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
// If we're not enabled, don't enable more queuing.
if !b.enabled {
return
}
// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
if token == "" {
return
}
// If the token has been passed, the evaluation is being reblocked by
// the scheduler and should be processed once the outstanding evaluation
// is Acked or Nacked.
if unack, ok := b.unack[eval.ID]; ok && unack.Token == token {
b.requeue[token] = eval
}
return
} else if b.enabled {
b.evals[eval.ID] = 0
}
// Check if we need to enforce a wait
if eval.Wait > 0 {
b.processWaitingEnqueue(eval)
return
}
if !eval.WaitUntil.IsZero() {
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
b.stats.TotalWaiting += 1
b.stats.DelayedEvals[eval.ID] = eval
// Signal an update.
select {
case b.delayedEvalsUpdateCh <- struct{}{}:
default:
}
return
}
b.enqueueLocked(eval, eval.Type)
}
// processWaitingEnqueue waits the given duration on the evaluation before
// enqueuing.
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
}
// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
delete(b.timeWait, eval.ID)
b.stats.TotalWaiting -= 1
b.enqueueLocked(eval, eval.Type)
}
// enqueueLocked is used to enqueue with the lock held
func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, queue string) {
// Do nothing if not enabled
if !b.enabled {
return
}
// Check if there is an evaluation for this JobID pending
namespacedID := structs.NamespacedID{
ID: eval.JobID,
Namespace: eval.Namespace,
}
pendingEval := b.jobEvals[namespacedID]
if pendingEval == "" {
b.jobEvals[namespacedID] = eval.ID
} else if pendingEval != eval.ID {
blocked := b.blocked[namespacedID]
heap.Push(&blocked, eval)
b.blocked[namespacedID] = blocked
b.stats.TotalBlocked += 1
return
}
// Find the pending by scheduler class
pending, ok := b.ready[queue]
if !ok {
pending = make([]*structs.Evaluation, 0, 16)
if _, ok := b.waiting[queue]; !ok {
b.waiting[queue] = make(chan struct{}, 1)
}
}
// Push onto the heap
heap.Push(&pending, eval)
b.ready[queue] = pending
// Update the stats
b.stats.TotalReady += 1
bySched, ok := b.stats.ByScheduler[queue]
if !ok {
bySched = &SchedulerStats{}
b.stats.ByScheduler[queue] = bySched
}
bySched.Ready += 1
// Unblock any blocked dequeues
select {
case b.waiting[queue] <- struct{}{}:
default:
}
}
// Dequeue is used to perform a blocking dequeue. The next available evalution
// is returned as well as a unique token identifier for this dequeue. The token
// changes on leadership election to ensure a Dequeue prior to a leadership
// election cannot conflict with a Dequeue of the same evaluation after a
// leadership election.
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) {
var timeoutTimer *time.Timer
var timeoutCh <-chan time.Time
SCAN:
// Scan for work
eval, token, err := b.scanForSchedulers(schedulers)
if err != nil {
if timeoutTimer != nil {
timeoutTimer.Stop()
}
return nil, "", err
}
// Check if we have something
if eval != nil {
if timeoutTimer != nil {
timeoutTimer.Stop()
}
return eval, token, nil
}
// Setup the timeout channel the first time around
if timeoutTimer == nil && timeout != 0 {
timeoutTimer = time.NewTimer(timeout)
timeoutCh = timeoutTimer.C
}
// Block until we get work
scan := b.waitForSchedulers(schedulers, timeoutCh)
if scan {
goto SCAN
}
return nil, "", nil
}
// scanForSchedulers scans for work on any of the schedulers. The highest priority work
// is dequeued first. This may return nothing if there is no work waiting.
func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, string, error) {
b.l.Lock()
defer b.l.Unlock()
// Do nothing if not enabled
if !b.enabled {
return nil, "", fmt.Errorf("eval broker disabled")
}
// Scan for eligible work
var eligibleSched []string
var eligiblePriority int
for _, sched := range schedulers {
// Get the pending queue
pending, ok := b.ready[sched]
if !ok {
continue
}
// Peek at the next item
ready := pending.Peek()
if ready == nil {
continue
}
// Add to eligible if equal or greater priority
if len(eligibleSched) == 0 || ready.Priority > eligiblePriority {
eligibleSched = []string{sched}
eligiblePriority = ready.Priority
} else if eligiblePriority > ready.Priority {
continue
} else if eligiblePriority == ready.Priority {
eligibleSched = append(eligibleSched, sched)
}
}
// Determine behavior based on eligible work
switch n := len(eligibleSched); n {
case 0:
// No work to do!
return nil, "", nil
case 1:
// Only a single task, dequeue
return b.dequeueForSched(eligibleSched[0])
default:
// Multiple tasks. We pick a random task so that we fairly
// distribute work.
offset := rand.Intn(n)
return b.dequeueForSched(eligibleSched[offset])
}
}
// dequeueForSched is used to dequeue the next work item for a given scheduler.
// This assumes locks are held and that this scheduler has work
func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) {
// Get the pending queue
pending := b.ready[sched]
raw := heap.Pop(&pending)
b.ready[sched] = pending
eval := raw.(*structs.Evaluation)
// Generate a UUID for the token
token := uuid.Generate()
// Setup Nack timer
nackTimer := time.AfterFunc(b.nackTimeout, func() {
b.Nack(eval.ID, token)
})
// Add to the unack queue
b.unack[eval.ID] = &unackEval{
Eval: eval,
Token: token,
NackTimer: nackTimer,
}
// Increment the dequeue count
b.evals[eval.ID] += 1
// Update the stats
b.stats.TotalReady -= 1
b.stats.TotalUnacked += 1
bySched := b.stats.ByScheduler[sched]
bySched.Ready -= 1
bySched.Unacked += 1
return eval, token, nil
}
// waitForSchedulers is used to wait for work on any of the scheduler or until a timeout.
// Returns if there is work waiting potentially.
func (b *EvalBroker) waitForSchedulers(schedulers []string, timeoutCh <-chan time.Time) bool {
doneCh := make(chan struct{})
readyCh := make(chan struct{}, 1)
defer close(doneCh)
// Start all the watchers
b.l.Lock()
for _, sched := range schedulers {
waitCh, ok := b.waiting[sched]
if !ok {
waitCh = make(chan struct{}, 1)
b.waiting[sched] = waitCh
}
// Start a goroutine that either waits for the waitCh on this scheduler
// to unblock or for this waitForSchedulers call to return
go func() {
select {
case <-waitCh:
select {
case readyCh <- struct{}{}:
default:
}
case <-doneCh:
}
}()
}
b.l.Unlock()
// Block until we have ready work and should scan, or until we timeout
// and should not make an attempt to scan for work
select {
case <-readyCh:
return true
case <-timeoutCh:
return false
}
}
// Outstanding checks if an EvalID has been delivered but not acknowledged
// and returns the associated token for the evaluation.
func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return "", false
}
return unack.Token, true
}
// OutstandingReset resets the Nack timer for the EvalID if the
// token matches and the eval is outstanding
func (b *EvalBroker) OutstandingReset(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Reset(b.nackTimeout) {
return ErrNackTimeoutReached
}
return nil
}
// Ack is used to positively acknowledge handling an evaluation
func (b *EvalBroker) Ack(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
// Always delete the requeued evaluation. Either the Ack is successful and
// we requeue it or it isn't and we want to remove it.
defer delete(b.requeue, token)
// Lookup the unack'd eval
unack, ok := b.unack[evalID]
if !ok {
return fmt.Errorf("Evaluation ID not found")
}
if unack.Token != token {
return fmt.Errorf("Token does not match for Evaluation ID")
}
jobID := unack.Eval.JobID
// Ensure we were able to stop the timer
if !unack.NackTimer.Stop() {
return fmt.Errorf("Evaluation ID Ack'd after Nack timer expiration")
}
// Update the stats
b.stats.TotalUnacked -= 1
queue := unack.Eval.Type
if b.evals[evalID] > b.deliveryLimit {
queue = failedQueue
}
bySched := b.stats.ByScheduler[queue]
bySched.Unacked -= 1
// Cleanup
delete(b.unack, evalID)
delete(b.evals, evalID)
namespacedID := structs.NamespacedID{
ID: jobID,
Namespace: unack.Eval.Namespace,
}
delete(b.jobEvals, namespacedID)
// Check if there are any blocked evaluations
if blocked := b.blocked[namespacedID]; len(blocked) != 0 {
// Any blocked evaluations with ModifyIndexes older than the just-ack'd
// evaluation are no longer useful, so it's safe to drop them.
cancelable := blocked.MarkForCancel()
b.cancelable = append(b.cancelable, cancelable...)
b.stats.TotalCancelable = len(b.cancelable)
b.stats.TotalBlocked -= len(cancelable)
// If any remain, enqueue an eval
if len(blocked) > 0 {
raw := heap.Pop(&blocked)
eval := raw.(*structs.Evaluation)
b.stats.TotalBlocked -= 1
b.enqueueLocked(eval, eval.Type)
}
// Clean up if there are no more after that
if len(blocked) > 0 {
b.blocked[namespacedID] = blocked
} else {
delete(b.blocked, namespacedID)
}
}
// Re-enqueue the evaluation.
if eval, ok := b.requeue[token]; ok {
b.processEnqueue(eval, "")
}
return nil
}
// Nack is used to negatively acknowledge handling an evaluation
func (b *EvalBroker) Nack(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
// Always delete the requeued evaluation since the Nack means the requeue is
// invalid.
delete(b.requeue, token)
// Lookup the unack'd eval
unack, ok := b.unack[evalID]
if !ok {
return fmt.Errorf("Evaluation ID not found")
}
if unack.Token != token {
return fmt.Errorf("Token does not match for Evaluation ID")
}
// Stop the timer, doesn't matter if we've missed it
unack.NackTimer.Stop()
// Cleanup
delete(b.unack, evalID)
// Update the stats
b.stats.TotalUnacked -= 1
bySched := b.stats.ByScheduler[unack.Eval.Type]
bySched.Unacked -= 1
// Check if we've hit the delivery limit, and re-enqueue
// in the failedQueue
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
b.enqueueLocked(unack.Eval, failedQueue)
} else {
e := unack.Eval
e.Wait = b.nackReenqueueDelay(e, dequeues)
// See if there should be a delay before re-enqueuing
if e.Wait > 0 {
b.processWaitingEnqueue(e)
} else {
b.enqueueLocked(e, e.Type)
}
}
return nil
}
// nackReenqueueDelay is used to determine the delay that should be applied on
// the evaluation given the number of previous attempts
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
switch {
case prevDequeues <= 0:
return 0
case prevDequeues == 1:
return b.initialNackDelay
default:
// For each subsequent nack compound a delay
return time.Duration(prevDequeues-1) * b.subsequentNackDelay
}
}
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Stop() {
return ErrNackTimeoutReached
}
return nil
}
// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
// paused. It should be resumed after leaving an unbounded operation.
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
unack.NackTimer.Reset(b.nackTimeout)
return nil
}
// Flush is used to clear the state of the broker. It must be called from within
// the lock.
func (b *EvalBroker) flush() {
// Unblock any waiters
for _, waitCh := range b.waiting {
close(waitCh)
}
b.waiting = make(map[string]chan struct{})
// Cancel any Nack timers
for _, unack := range b.unack {
unack.NackTimer.Stop()
}
// Cancel any time wait evals
for _, wait := range b.timeWait {
wait.Stop()
}
// Cancel the delayed evaluations goroutine
if b.delayedEvalCancelFunc != nil {
b.delayedEvalCancelFunc()
}
// Clear out the update channel for delayed evaluations
b.delayedEvalsUpdateCh = make(chan struct{}, 1)
// Reset the broker
b.stats.TotalReady = 0
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.TotalCancelable = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
b.blocked = make(map[structs.NamespacedID]BlockedEvaluations)
b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest)
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
b.delayHeap = delayheap.NewDelayHeap()
}
// evalWrapper satisfies the HeapNode interface
type evalWrapper struct {
eval *structs.Evaluation
}
func (d *evalWrapper) Data() interface{} {
return d.eval
}
func (d *evalWrapper) ID() string {
return d.eval.ID
}
func (d *evalWrapper) Namespace() string {
return d.eval.Namespace
}
// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for
// pending evaluations before enqueuing them
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) {
var timerChannel <-chan time.Time
var delayTimer *time.Timer
for {
eval, waitUntil := b.nextDelayedEval()
if waitUntil.IsZero() {
timerChannel = nil
} else {
launchDur := waitUntil.Sub(time.Now().UTC())
if delayTimer == nil {
delayTimer = time.NewTimer(launchDur)
} else {
delayTimer.Reset(launchDur)
}
timerChannel = delayTimer.C
}
select {
case <-ctx.Done():
return
case <-timerChannel:
// remove from the heap since we can enqueue it now
b.l.Lock()
b.delayHeap.Remove(&evalWrapper{eval})
b.stats.TotalWaiting -= 1
delete(b.stats.DelayedEvals, eval.ID)
b.enqueueLocked(eval, eval.Type)
b.l.Unlock()
case <-updateCh:
continue
}
}
}
// nextDelayedEval returns the next delayed eval to launch and when it should be enqueued.
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
b.l.RLock()
defer b.l.RUnlock()
// If there is nothing wait for an update.
if b.delayHeap.Length() == 0 {
return nil, time.Time{}
}
nextEval := b.delayHeap.Peek()
if nextEval == nil {
return nil, time.Time{}
}
eval := nextEval.Node.Data().(*structs.Evaluation)
return eval, nextEval.WaitUntil
}
// Stats is used to query the state of the broker
func (b *EvalBroker) Stats() *BrokerStats {
// Allocate a new stats struct
stats := new(BrokerStats)
stats.DelayedEvals = make(map[string]*structs.Evaluation)
stats.ByScheduler = make(map[string]*SchedulerStats)
b.l.RLock()
defer b.l.RUnlock()
// Copy all the stats
stats.TotalReady = b.stats.TotalReady
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
stats.TotalCancelable = b.stats.TotalCancelable
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
}
for sched, subStat := range b.stats.ByScheduler {
subStatCopy := *subStat
stats.ByScheduler[sched] = &subStatCopy
}
return stats
}
// Cancelable retrieves a batch of previously-blocked evaluations that are now
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
}
cancelable := b.cancelable[:batchSize]
b.cancelable = b.cancelable[batchSize:]
b.stats.TotalCancelable = len(b.cancelable)
return cancelable
}
// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()
for {
timer.Reset(period)
select {
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
[]metrics.Label{
{Name: "eval_id", Value: eval.ID},
{Name: "job", Value: eval.JobID},
{Name: "namespace", Value: eval.Namespace},
})
}
for sched, schedStats := range stats.ByScheduler {
metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready))
metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked))
}
case <-stopCh:
return
}
}
}
// BrokerStats returns all the stats about the broker
type BrokerStats struct {
TotalReady int
TotalUnacked int
TotalBlocked int
TotalWaiting int
TotalCancelable int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}
// SchedulerStats returns the stats per scheduler
type SchedulerStats struct {
Ready int
Unacked int
}
// Len is for the sorting interface
func (p PendingEvaluations) Len() int {
return len(p)
}
// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority
func (p PendingEvaluations) Less(i, j int) bool {
if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return p[i].CreateIndex < p[j].CreateIndex
}
// Swap is for the sorting interface
func (p PendingEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// Push is used to add a new evaluation to the slice
func (p *PendingEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}
// Pop is used to remove an evaluation from the slice
func (p *PendingEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}
// Peek is used to peek at the next element that would be popped
func (p PendingEvaluations) Peek() *structs.Evaluation {
n := len(p)
if n == 0 {
return nil
}
return p[n-1]
}
// Len is for the sorting interface
func (p BlockedEvaluations) Len() int {
return len(p)
}
// Less is for the sorting interface. We flip the check
// so that the "min" in the min-heap is the element with the
// highest priority or highest modify index
func (p BlockedEvaluations) Less(i, j int) bool {
if p[i].Priority != p[j].Priority {
return !(p[i].Priority < p[j].Priority)
}
return !(p[i].ModifyIndex < p[j].ModifyIndex)
}
// Swap is for the sorting interface
func (p BlockedEvaluations) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// Push implements the heap interface and is used to add a new evaluation to the slice
func (p *BlockedEvaluations) Push(e interface{}) {
*p = append(*p, e.(*structs.Evaluation))
}
// Pop implements the heap interface and is used to remove an evaluation from the slice
func (p *BlockedEvaluations) Pop() interface{} {
n := len(*p)
e := (*p)[n-1]
(*p)[n-1] = nil
*p = (*p)[:n-1]
return e
}
// MarkForCancel is used to clear the blocked list of all but the one with the
// highest modify index and highest priority. It returns a slice of cancelable
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
// them. This must be called inside the broker's lock.
func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation {
// In pathological cases, we can have a large number of blocked evals but
// will want to cancel most of them. Using heap.Remove requires we re-sort
// for each eval we remove. Because we expect to have at most one remaining,
// we'll just create a new heap.
retain := BlockedEvaluations{(heap.Pop(p)).(*structs.Evaluation)}
cancelable := make([]*structs.Evaluation, len(*p))
copy(cancelable, *p)
*p = retain
return cancelable
}