1036 lines
29 KiB
Go
1036 lines
29 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/broker"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/lib/delayheap"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// failedQueue is the queue we add Evaluations to once
|
|
// they've reached the deliveryLimit. This allows the leader to
|
|
// set the status to failed.
|
|
failedQueue = "_failed"
|
|
)
|
|
|
|
var (
|
|
// ErrNotOutstanding is returned if an evaluation is not outstanding
|
|
ErrNotOutstanding = errors.New("evaluation is not outstanding")
|
|
|
|
// ErrTokenMismatch is the outstanding eval has a different token
|
|
ErrTokenMismatch = errors.New("evaluation token does not match")
|
|
|
|
// ErrNackTimeoutReached is returned if an expired evaluation is reset
|
|
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
|
|
)
|
|
|
|
// EvalBroker is used to manage brokering of evaluations. When an evaluation is
|
|
// created, due to a change in a job specification or a node, we put it into the
|
|
// broker. The broker sorts by evaluations by priority and scheduler type. This
|
|
// allows us to dequeue the highest priority work first, while also allowing sub-schedulers
|
|
// to only dequeue work they know how to handle. The broker is designed to be entirely
|
|
// in-memory and is managed by the leader node.
|
|
//
|
|
// The broker must provide at-least-once delivery semantics. It relies on explicit
|
|
// Ack/Nack messages to handle this. If a delivery is not Ack'd in a sufficient time
|
|
// span, it will be assumed Nack'd.
|
|
type EvalBroker struct {
|
|
nackTimeout time.Duration
|
|
deliveryLimit int
|
|
|
|
enabled bool
|
|
enabledNotifier *broker.GenericNotifier
|
|
|
|
stats *BrokerStats
|
|
|
|
// evals tracks queued evaluations by ID to de-duplicate enqueue.
|
|
// The counter is the number of times we've attempted delivery,
|
|
// and is used to eventually fail an evaluation.
|
|
evals map[string]int
|
|
|
|
// jobEvals tracks queued evaluations by a job's ID and namespace to serialize them
|
|
jobEvals map[structs.NamespacedID]string
|
|
|
|
// pending tracks the pending evaluations by JobID in a priority queue
|
|
pending map[structs.NamespacedID]PendingEvaluations
|
|
|
|
// cancelable tracks previously pending evaluations (for any job) that are
|
|
// now safe for the Eval.Ack RPC to cancel in batches
|
|
cancelable []*structs.Evaluation
|
|
|
|
// ready tracks the ready jobs by scheduler in a priority queue
|
|
ready map[string]ReadyEvaluations
|
|
|
|
// unack is a map of evalID to an un-acknowledged evaluation
|
|
unack map[string]*unackEval
|
|
|
|
// waiting is used to notify on a per-scheduler basis of ready work
|
|
waiting map[string]chan struct{}
|
|
|
|
// requeue tracks evaluations that need to be re-enqueued once the current
|
|
// evaluation finishes by token. If the token is Nacked or rejected the
|
|
// evaluation is dropped but if Acked successfully, the evaluation is
|
|
// queued.
|
|
requeue map[string]*structs.Evaluation
|
|
|
|
// timeWait has evaluations that are waiting for time to elapse
|
|
timeWait map[string]*time.Timer
|
|
|
|
// delayedEvalCancelFunc is used to stop the long running go routine
|
|
// that processes delayed evaluations
|
|
delayedEvalCancelFunc context.CancelFunc
|
|
|
|
// delayHeap is a heap used to track incoming evaluations that are
|
|
// not eligible to enqueue until their WaitTime
|
|
delayHeap *delayheap.DelayHeap
|
|
|
|
// delayedEvalsUpdateCh is used to trigger notifications for updates
|
|
// to the delayHeap
|
|
delayedEvalsUpdateCh chan struct{}
|
|
|
|
// initialNackDelay is the delay applied before re-enqueuing a
|
|
// Nacked evaluation for the first time.
|
|
initialNackDelay time.Duration
|
|
|
|
// subsequentNackDelay is the delay applied before reenqueuing
|
|
// an evaluation that has been Nacked more than once. This delay is
|
|
// compounding after the first Nack.
|
|
subsequentNackDelay time.Duration
|
|
|
|
l sync.RWMutex
|
|
}
|
|
|
|
// unackEval tracks an unacknowledged evaluation along with the Nack timer
|
|
type unackEval struct {
|
|
Eval *structs.Evaluation
|
|
Token string
|
|
NackTimer *time.Timer
|
|
}
|
|
|
|
// ReadyEvaluations is a list of ready evaluations across multiple jobs. We
|
|
// implement the container/heap interface so that this is a priority queue.
|
|
type ReadyEvaluations []*structs.Evaluation
|
|
|
|
// PendingEvaluations is a list of pending evaluations for a given job. We
|
|
// implement the container/heap interface so that this is a priority queue.
|
|
type PendingEvaluations []*structs.Evaluation
|
|
|
|
// NewEvalBroker creates a new evaluation broker. This is parameterized
|
|
// with the timeout used for messages that are not acknowledged before we
|
|
// assume a Nack and attempt to redeliver as well as the deliveryLimit
|
|
// which prevents a failing eval from being endlessly delivered. The
|
|
// initialNackDelay is the delay before making a Nacked evaluation available
|
|
// again for the first Nack and subsequentNackDelay is the compounding delay
|
|
// after the first Nack.
|
|
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
|
|
if timeout < 0 {
|
|
return nil, fmt.Errorf("timeout cannot be negative")
|
|
}
|
|
b := &EvalBroker{
|
|
nackTimeout: timeout,
|
|
deliveryLimit: deliveryLimit,
|
|
enabled: false,
|
|
enabledNotifier: broker.NewGenericNotifier(),
|
|
stats: new(BrokerStats),
|
|
evals: make(map[string]int),
|
|
jobEvals: make(map[structs.NamespacedID]string),
|
|
pending: make(map[structs.NamespacedID]PendingEvaluations),
|
|
cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest),
|
|
ready: make(map[string]ReadyEvaluations),
|
|
unack: make(map[string]*unackEval),
|
|
waiting: make(map[string]chan struct{}),
|
|
requeue: make(map[string]*structs.Evaluation),
|
|
timeWait: make(map[string]*time.Timer),
|
|
initialNackDelay: initialNackDelay,
|
|
subsequentNackDelay: subsequentNackDelay,
|
|
delayHeap: delayheap.NewDelayHeap(),
|
|
delayedEvalsUpdateCh: make(chan struct{}, 1),
|
|
}
|
|
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
|
|
return b, nil
|
|
}
|
|
|
|
// Enabled is used to check if the broker is enabled.
|
|
func (b *EvalBroker) Enabled() bool {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
return b.enabled
|
|
}
|
|
|
|
// SetEnabled is used to control if the broker is enabled. The broker
|
|
// should only be enabled on the active leader.
|
|
func (b *EvalBroker) SetEnabled(enabled bool) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
prevEnabled := b.enabled
|
|
b.enabled = enabled
|
|
if !prevEnabled && enabled {
|
|
// start the go routine for delayed evals
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
b.delayedEvalCancelFunc = cancel
|
|
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
|
|
}
|
|
|
|
if !enabled {
|
|
b.flush()
|
|
}
|
|
|
|
// Notify all subscribers to state changes of the broker enabled value.
|
|
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
|
|
}
|
|
|
|
// Enqueue is used to enqueue a new evaluation
|
|
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
b.processEnqueue(eval, "")
|
|
}
|
|
|
|
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
|
|
// that are being re-enqueued to include their token.
|
|
//
|
|
// When requeuing an evaluation that potentially may be already
|
|
// enqueued. The evaluation is handled in one of the following ways:
|
|
// * Evaluation not outstanding: Process as a normal Enqueue
|
|
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
|
// - Ack received: Unblock the evaluation allowing it to be dequeued
|
|
// - Nack received: Drop the evaluation as it was created as a result of a
|
|
// scheduler run that was Nack'd
|
|
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
|
|
// The lock needs to be held until all evaluations are enqueued. This is so
|
|
// that when Dequeue operations are unblocked they will pick the highest
|
|
// priority evaluations.
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
for eval, token := range evals {
|
|
b.processEnqueue(eval, token)
|
|
}
|
|
}
|
|
|
|
// processEnqueue deduplicates evals and either enqueue immediately or enforce
|
|
// the evals wait time. If the token is passed, and the evaluation ID is
|
|
// outstanding, the evaluation is blocked until an Ack/Nack is received.
|
|
// processEnqueue must be called with the lock held.
|
|
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
|
// If we're not enabled, don't enable more queuing.
|
|
if !b.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if already enqueued
|
|
if _, ok := b.evals[eval.ID]; ok {
|
|
if token == "" {
|
|
return
|
|
}
|
|
|
|
// If the token has been passed, the evaluation is being reblocked by
|
|
// the scheduler and should be processed once the outstanding evaluation
|
|
// is Acked or Nacked.
|
|
if unack, ok := b.unack[eval.ID]; ok && unack.Token == token {
|
|
b.requeue[token] = eval
|
|
}
|
|
return
|
|
} else if b.enabled {
|
|
b.evals[eval.ID] = 0
|
|
}
|
|
|
|
// Check if we need to enforce a wait
|
|
if eval.Wait > 0 {
|
|
b.processWaitingEnqueue(eval)
|
|
return
|
|
}
|
|
|
|
if !eval.WaitUntil.IsZero() {
|
|
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
|
|
b.stats.TotalWaiting += 1
|
|
b.stats.DelayedEvals[eval.ID] = eval
|
|
// Signal an update.
|
|
select {
|
|
case b.delayedEvalsUpdateCh <- struct{}{}:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// processWaitingEnqueue waits the given duration on the evaluation before
|
|
// enqueuing.
|
|
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
|
|
timer := time.AfterFunc(eval.Wait, func() {
|
|
b.enqueueWaiting(eval)
|
|
})
|
|
b.timeWait[eval.ID] = timer
|
|
b.stats.TotalWaiting += 1
|
|
}
|
|
|
|
// enqueueWaiting is used to enqueue a waiting evaluation
|
|
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
delete(b.timeWait, eval.ID)
|
|
b.stats.TotalWaiting -= 1
|
|
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// enqueueLocked is used to enqueue with the lock held
|
|
func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) {
|
|
// Do nothing if not enabled
|
|
if !b.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if there is a ready evaluation for this JobID
|
|
namespacedID := structs.NamespacedID{
|
|
ID: eval.JobID,
|
|
Namespace: eval.Namespace,
|
|
}
|
|
readyEval := b.jobEvals[namespacedID]
|
|
if readyEval == "" {
|
|
b.jobEvals[namespacedID] = eval.ID
|
|
} else if readyEval != eval.ID {
|
|
pending := b.pending[namespacedID]
|
|
heap.Push(&pending, eval)
|
|
b.pending[namespacedID] = pending
|
|
b.stats.TotalPending += 1
|
|
return
|
|
}
|
|
|
|
// Find the next ready eval by scheduler class
|
|
readyQueue, ok := b.ready[sched]
|
|
if !ok {
|
|
readyQueue = make([]*structs.Evaluation, 0, 16)
|
|
if _, ok := b.waiting[sched]; !ok {
|
|
b.waiting[sched] = make(chan struct{}, 1)
|
|
}
|
|
}
|
|
|
|
// Push onto the heap
|
|
heap.Push(&readyQueue, eval)
|
|
b.ready[sched] = readyQueue
|
|
|
|
// Update the stats
|
|
b.stats.TotalReady += 1
|
|
bySched, ok := b.stats.ByScheduler[sched]
|
|
if !ok {
|
|
bySched = &SchedulerStats{}
|
|
b.stats.ByScheduler[sched] = bySched
|
|
}
|
|
bySched.Ready += 1
|
|
|
|
// Unblock any pending dequeues
|
|
select {
|
|
case b.waiting[sched] <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Dequeue is used to perform a blocking dequeue. The next available evalution
|
|
// is returned as well as a unique token identifier for this dequeue. The token
|
|
// changes on leadership election to ensure a Dequeue prior to a leadership
|
|
// election cannot conflict with a Dequeue of the same evaluation after a
|
|
// leadership election.
|
|
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) {
|
|
var timeoutTimer *time.Timer
|
|
var timeoutCh <-chan time.Time
|
|
SCAN:
|
|
// Scan for work
|
|
eval, token, err := b.scanForSchedulers(schedulers)
|
|
if err != nil {
|
|
if timeoutTimer != nil {
|
|
timeoutTimer.Stop()
|
|
}
|
|
return nil, "", err
|
|
}
|
|
|
|
// Check if we have something
|
|
if eval != nil {
|
|
if timeoutTimer != nil {
|
|
timeoutTimer.Stop()
|
|
}
|
|
return eval, token, nil
|
|
}
|
|
|
|
// Setup the timeout channel the first time around
|
|
if timeoutTimer == nil && timeout != 0 {
|
|
timeoutTimer = time.NewTimer(timeout)
|
|
timeoutCh = timeoutTimer.C
|
|
}
|
|
|
|
// Block until we get work
|
|
scan := b.waitForSchedulers(schedulers, timeoutCh)
|
|
if scan {
|
|
goto SCAN
|
|
}
|
|
return nil, "", nil
|
|
}
|
|
|
|
// scanForSchedulers scans for work on any of the schedulers. The highest priority work
|
|
// is dequeued first. This may return nothing if there is no work waiting.
|
|
func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, string, error) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Do nothing if not enabled
|
|
if !b.enabled {
|
|
return nil, "", fmt.Errorf("eval broker disabled")
|
|
}
|
|
|
|
// Scan for eligible work
|
|
var eligibleSched []string
|
|
var eligiblePriority int
|
|
for _, sched := range schedulers {
|
|
// Get the ready queue for this scheduler
|
|
readyQueue, ok := b.ready[sched]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Peek at the next item
|
|
ready := readyQueue.Peek()
|
|
if ready == nil {
|
|
continue
|
|
}
|
|
|
|
// Add to eligible if equal or greater priority
|
|
if len(eligibleSched) == 0 || ready.Priority > eligiblePriority {
|
|
eligibleSched = []string{sched}
|
|
eligiblePriority = ready.Priority
|
|
|
|
} else if eligiblePriority > ready.Priority {
|
|
continue
|
|
|
|
} else if eligiblePriority == ready.Priority {
|
|
eligibleSched = append(eligibleSched, sched)
|
|
}
|
|
}
|
|
|
|
// Determine behavior based on eligible work
|
|
switch n := len(eligibleSched); n {
|
|
case 0:
|
|
// No work to do!
|
|
return nil, "", nil
|
|
|
|
case 1:
|
|
// Only a single task, dequeue
|
|
return b.dequeueForSched(eligibleSched[0])
|
|
|
|
default:
|
|
// Multiple tasks. We pick a random task so that we fairly
|
|
// distribute work.
|
|
offset := rand.Intn(n)
|
|
return b.dequeueForSched(eligibleSched[offset])
|
|
}
|
|
}
|
|
|
|
// dequeueForSched is used to dequeue the next work item for a given scheduler.
|
|
// This assumes locks are held and that this scheduler has work
|
|
func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) {
|
|
readyQueue := b.ready[sched]
|
|
raw := heap.Pop(&readyQueue)
|
|
b.ready[sched] = readyQueue
|
|
eval := raw.(*structs.Evaluation)
|
|
|
|
// Generate a UUID for the token
|
|
token := uuid.Generate()
|
|
|
|
// Setup Nack timer
|
|
nackTimer := time.AfterFunc(b.nackTimeout, func() {
|
|
b.Nack(eval.ID, token)
|
|
})
|
|
|
|
// Add to the unack queue
|
|
b.unack[eval.ID] = &unackEval{
|
|
Eval: eval,
|
|
Token: token,
|
|
NackTimer: nackTimer,
|
|
}
|
|
|
|
// Increment the dequeue count
|
|
b.evals[eval.ID] += 1
|
|
|
|
// Update the stats
|
|
b.stats.TotalReady -= 1
|
|
b.stats.TotalUnacked += 1
|
|
bySched := b.stats.ByScheduler[sched]
|
|
bySched.Ready -= 1
|
|
bySched.Unacked += 1
|
|
|
|
return eval, token, nil
|
|
}
|
|
|
|
// waitForSchedulers is used to wait for work on any of the scheduler or until a timeout.
|
|
// Returns if there is work waiting potentially.
|
|
func (b *EvalBroker) waitForSchedulers(schedulers []string, timeoutCh <-chan time.Time) bool {
|
|
doneCh := make(chan struct{})
|
|
readyCh := make(chan struct{}, 1)
|
|
defer close(doneCh)
|
|
|
|
// Start all the watchers
|
|
b.l.Lock()
|
|
for _, sched := range schedulers {
|
|
waitCh, ok := b.waiting[sched]
|
|
if !ok {
|
|
waitCh = make(chan struct{}, 1)
|
|
b.waiting[sched] = waitCh
|
|
}
|
|
|
|
// Start a goroutine that either waits for the waitCh on this scheduler
|
|
// to unblock or for this waitForSchedulers call to return
|
|
go func() {
|
|
select {
|
|
case <-waitCh:
|
|
select {
|
|
case readyCh <- struct{}{}:
|
|
default:
|
|
}
|
|
case <-doneCh:
|
|
}
|
|
}()
|
|
}
|
|
b.l.Unlock()
|
|
|
|
// Block until we have ready work and should scan, or until we timeout
|
|
// and should not make an attempt to scan for work
|
|
select {
|
|
case <-readyCh:
|
|
return true
|
|
case <-timeoutCh:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Outstanding checks if an EvalID has been delivered but not acknowledged
|
|
// and returns the associated token for the evaluation.
|
|
func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
return unack.Token, true
|
|
}
|
|
|
|
// OutstandingReset resets the Nack timer for the EvalID if the
|
|
// token matches and the eval is outstanding
|
|
func (b *EvalBroker) OutstandingReset(evalID, token string) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
if !unack.NackTimer.Reset(b.nackTimeout) {
|
|
return ErrNackTimeoutReached
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ack is used to positively acknowledge handling an evaluation
|
|
func (b *EvalBroker) Ack(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Always delete the requeued evaluation. Either the Ack is successful and
|
|
// we requeue it or it isn't and we want to remove it.
|
|
defer delete(b.requeue, token)
|
|
|
|
// Lookup the unack'd eval
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return fmt.Errorf("Evaluation ID not found")
|
|
}
|
|
if unack.Token != token {
|
|
return fmt.Errorf("Token does not match for Evaluation ID")
|
|
}
|
|
jobID := unack.Eval.JobID
|
|
|
|
// Ensure we were able to stop the timer
|
|
if !unack.NackTimer.Stop() {
|
|
return fmt.Errorf("Evaluation ID Ack'd after Nack timer expiration")
|
|
}
|
|
|
|
// Update the stats
|
|
b.stats.TotalUnacked -= 1
|
|
queue := unack.Eval.Type
|
|
if b.evals[evalID] > b.deliveryLimit {
|
|
queue = failedQueue
|
|
}
|
|
bySched := b.stats.ByScheduler[queue]
|
|
bySched.Unacked -= 1
|
|
|
|
// Cleanup
|
|
delete(b.unack, evalID)
|
|
delete(b.evals, evalID)
|
|
|
|
namespacedID := structs.NamespacedID{
|
|
ID: jobID,
|
|
Namespace: unack.Eval.Namespace,
|
|
}
|
|
delete(b.jobEvals, namespacedID)
|
|
|
|
// Check if there are any pending evaluations
|
|
if pending := b.pending[namespacedID]; len(pending) != 0 {
|
|
|
|
// Any pending evaluations with ModifyIndexes older than the just-ack'd
|
|
// evaluation are no longer useful, so it's safe to drop them.
|
|
cancelable := pending.MarkForCancel()
|
|
b.cancelable = append(b.cancelable, cancelable...)
|
|
b.stats.TotalCancelable = len(b.cancelable)
|
|
b.stats.TotalPending -= len(cancelable)
|
|
|
|
// If any remain, enqueue an eval
|
|
if len(pending) > 0 {
|
|
raw := heap.Pop(&pending)
|
|
eval := raw.(*structs.Evaluation)
|
|
b.stats.TotalPending -= 1
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// Clean up if there are no more after that
|
|
if len(pending) > 0 {
|
|
b.pending[namespacedID] = pending
|
|
} else {
|
|
delete(b.pending, namespacedID)
|
|
}
|
|
}
|
|
|
|
// Re-enqueue the evaluation.
|
|
if eval, ok := b.requeue[token]; ok {
|
|
b.processEnqueue(eval, "")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Nack is used to negatively acknowledge handling an evaluation
|
|
func (b *EvalBroker) Nack(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Always delete the requeued evaluation since the Nack means the requeue is
|
|
// invalid.
|
|
delete(b.requeue, token)
|
|
|
|
// Lookup the unack'd eval
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return fmt.Errorf("Evaluation ID not found")
|
|
}
|
|
if unack.Token != token {
|
|
return fmt.Errorf("Token does not match for Evaluation ID")
|
|
}
|
|
|
|
// Stop the timer, doesn't matter if we've missed it
|
|
unack.NackTimer.Stop()
|
|
|
|
// Cleanup
|
|
delete(b.unack, evalID)
|
|
|
|
// Update the stats
|
|
b.stats.TotalUnacked -= 1
|
|
bySched := b.stats.ByScheduler[unack.Eval.Type]
|
|
bySched.Unacked -= 1
|
|
|
|
// Check if we've hit the delivery limit, and re-enqueue
|
|
// in the failedQueue
|
|
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
|
|
b.enqueueLocked(unack.Eval, failedQueue)
|
|
} else {
|
|
e := unack.Eval
|
|
e.Wait = b.nackReenqueueDelay(e, dequeues)
|
|
|
|
// See if there should be a delay before re-enqueuing
|
|
if e.Wait > 0 {
|
|
b.processWaitingEnqueue(e)
|
|
} else {
|
|
b.enqueueLocked(e, e.Type)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// nackReenqueueDelay is used to determine the delay that should be applied on
|
|
// the evaluation given the number of previous attempts
|
|
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
|
|
switch {
|
|
case prevDequeues <= 0:
|
|
return 0
|
|
case prevDequeues == 1:
|
|
return b.initialNackDelay
|
|
default:
|
|
// For each subsequent nack compound a delay
|
|
return time.Duration(prevDequeues-1) * b.subsequentNackDelay
|
|
}
|
|
}
|
|
|
|
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
|
|
// progress but is in a potentially unbounded operation such as the plan queue.
|
|
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
if !unack.NackTimer.Stop() {
|
|
return ErrNackTimeoutReached
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
|
|
// paused. It should be resumed after leaving an unbounded operation.
|
|
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
unack.NackTimer.Reset(b.nackTimeout)
|
|
return nil
|
|
}
|
|
|
|
// Flush is used to clear the state of the broker. It must be called from within
|
|
// the lock.
|
|
func (b *EvalBroker) flush() {
|
|
// Unblock any waiters
|
|
for _, waitCh := range b.waiting {
|
|
close(waitCh)
|
|
}
|
|
b.waiting = make(map[string]chan struct{})
|
|
|
|
// Cancel any Nack timers
|
|
for _, unack := range b.unack {
|
|
unack.NackTimer.Stop()
|
|
}
|
|
|
|
// Cancel any time wait evals
|
|
for _, wait := range b.timeWait {
|
|
wait.Stop()
|
|
}
|
|
|
|
// Cancel the delayed evaluations goroutine
|
|
if b.delayedEvalCancelFunc != nil {
|
|
b.delayedEvalCancelFunc()
|
|
}
|
|
|
|
// Clear out the update channel for delayed evaluations
|
|
b.delayedEvalsUpdateCh = make(chan struct{}, 1)
|
|
|
|
// Reset the broker
|
|
b.stats.TotalReady = 0
|
|
b.stats.TotalUnacked = 0
|
|
b.stats.TotalPending = 0
|
|
b.stats.TotalWaiting = 0
|
|
b.stats.TotalCancelable = 0
|
|
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
b.evals = make(map[string]int)
|
|
b.jobEvals = make(map[structs.NamespacedID]string)
|
|
b.pending = make(map[structs.NamespacedID]PendingEvaluations)
|
|
b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest)
|
|
b.ready = make(map[string]ReadyEvaluations)
|
|
b.unack = make(map[string]*unackEval)
|
|
b.timeWait = make(map[string]*time.Timer)
|
|
b.delayHeap = delayheap.NewDelayHeap()
|
|
}
|
|
|
|
// evalWrapper satisfies the HeapNode interface
|
|
type evalWrapper struct {
|
|
eval *structs.Evaluation
|
|
}
|
|
|
|
func (d *evalWrapper) Data() interface{} {
|
|
return d.eval
|
|
}
|
|
|
|
func (d *evalWrapper) ID() string {
|
|
return d.eval.ID
|
|
}
|
|
|
|
func (d *evalWrapper) Namespace() string {
|
|
return d.eval.Namespace
|
|
}
|
|
|
|
// runDelayedEvalsWatcher is a long-lived function that waits till a time
|
|
// deadline is met for pending evaluations before enqueuing them
|
|
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) {
|
|
var timerChannel <-chan time.Time
|
|
var delayTimer *time.Timer
|
|
for {
|
|
eval, waitUntil := b.nextDelayedEval()
|
|
if waitUntil.IsZero() {
|
|
timerChannel = nil
|
|
} else {
|
|
launchDur := waitUntil.Sub(time.Now().UTC())
|
|
if delayTimer == nil {
|
|
delayTimer = time.NewTimer(launchDur)
|
|
} else {
|
|
delayTimer.Reset(launchDur)
|
|
}
|
|
timerChannel = delayTimer.C
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timerChannel:
|
|
// remove from the heap since we can enqueue it now
|
|
b.l.Lock()
|
|
b.delayHeap.Remove(&evalWrapper{eval})
|
|
b.stats.TotalWaiting -= 1
|
|
delete(b.stats.DelayedEvals, eval.ID)
|
|
b.enqueueLocked(eval, eval.Type)
|
|
b.l.Unlock()
|
|
case <-updateCh:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// nextDelayedEval returns the next delayed eval to launch and when it should be enqueued.
|
|
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
|
|
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
// If there is nothing wait for an update.
|
|
if b.delayHeap.Length() == 0 {
|
|
return nil, time.Time{}
|
|
}
|
|
nextEval := b.delayHeap.Peek()
|
|
if nextEval == nil {
|
|
return nil, time.Time{}
|
|
}
|
|
eval := nextEval.Node.Data().(*structs.Evaluation)
|
|
return eval, nextEval.WaitUntil
|
|
}
|
|
|
|
// Stats is used to query the state of the broker
|
|
func (b *EvalBroker) Stats() *BrokerStats {
|
|
// Allocate a new stats struct
|
|
stats := new(BrokerStats)
|
|
stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
// Copy all the stats
|
|
stats.TotalReady = b.stats.TotalReady
|
|
stats.TotalUnacked = b.stats.TotalUnacked
|
|
stats.TotalPending = b.stats.TotalPending
|
|
stats.TotalWaiting = b.stats.TotalWaiting
|
|
stats.TotalCancelable = b.stats.TotalCancelable
|
|
for id, eval := range b.stats.DelayedEvals {
|
|
evalCopy := *eval
|
|
stats.DelayedEvals[id] = &evalCopy
|
|
}
|
|
for sched, subStat := range b.stats.ByScheduler {
|
|
subStatCopy := *subStat
|
|
stats.ByScheduler[sched] = &subStatCopy
|
|
}
|
|
return stats
|
|
}
|
|
|
|
// Cancelable retrieves a batch of previously-pending evaluations that are now
|
|
// stale and ready to mark for canceling. The eval RPC will call this with a
|
|
// batch size set to avoid sending overly large raft messages.
|
|
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
if batchSize > len(b.cancelable) {
|
|
batchSize = len(b.cancelable)
|
|
}
|
|
|
|
cancelable := b.cancelable[:batchSize]
|
|
b.cancelable = b.cancelable[batchSize:]
|
|
|
|
b.stats.TotalCancelable = len(b.cancelable)
|
|
return cancelable
|
|
}
|
|
|
|
// EmitStats is used to export metrics about the broker while enabled
|
|
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
|
|
timer, stop := helper.NewSafeTimer(period)
|
|
defer stop()
|
|
|
|
for {
|
|
timer.Reset(period)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
stats := b.Stats()
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_pending"}, float32(stats.TotalPending))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
|
|
for _, eval := range stats.DelayedEvals {
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
|
|
float32(time.Until(eval.WaitUntil).Seconds()),
|
|
[]metrics.Label{
|
|
{Name: "eval_id", Value: eval.ID},
|
|
{Name: "job", Value: eval.JobID},
|
|
{Name: "namespace", Value: eval.Namespace},
|
|
})
|
|
}
|
|
for sched, schedStats := range stats.ByScheduler {
|
|
metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready))
|
|
metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked))
|
|
}
|
|
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// BrokerStats returns all the stats about the broker
|
|
type BrokerStats struct {
|
|
TotalReady int
|
|
TotalUnacked int
|
|
TotalPending int
|
|
TotalWaiting int
|
|
TotalCancelable int
|
|
DelayedEvals map[string]*structs.Evaluation
|
|
ByScheduler map[string]*SchedulerStats
|
|
}
|
|
|
|
// SchedulerStats returns the stats per scheduler
|
|
type SchedulerStats struct {
|
|
Ready int
|
|
Unacked int
|
|
}
|
|
|
|
// Len is for the sorting interface
|
|
func (r ReadyEvaluations) Len() int {
|
|
return len(r)
|
|
}
|
|
|
|
// Less is for the sorting interface. We flip the check
|
|
// so that the "min" in the min-heap is the element with the
|
|
// highest priority
|
|
func (r ReadyEvaluations) Less(i, j int) bool {
|
|
if r[i].JobID != r[j].JobID && r[i].Priority != r[j].Priority {
|
|
return !(r[i].Priority < r[j].Priority)
|
|
}
|
|
return r[i].CreateIndex < r[j].CreateIndex
|
|
}
|
|
|
|
// Swap is for the sorting interface
|
|
func (r ReadyEvaluations) Swap(i, j int) {
|
|
r[i], r[j] = r[j], r[i]
|
|
}
|
|
|
|
// Push is used to add a new evaluation to the slice
|
|
func (r *ReadyEvaluations) Push(e interface{}) {
|
|
*r = append(*r, e.(*structs.Evaluation))
|
|
}
|
|
|
|
// Pop is used to remove an evaluation from the slice
|
|
func (r *ReadyEvaluations) Pop() interface{} {
|
|
n := len(*r)
|
|
e := (*r)[n-1]
|
|
(*r)[n-1] = nil
|
|
*r = (*r)[:n-1]
|
|
return e
|
|
}
|
|
|
|
// Peek is used to peek at the next element that would be popped
|
|
func (r ReadyEvaluations) Peek() *structs.Evaluation {
|
|
n := len(r)
|
|
if n == 0 {
|
|
return nil
|
|
}
|
|
return r[n-1]
|
|
}
|
|
|
|
// Len is for the sorting interface
|
|
func (p PendingEvaluations) Len() int {
|
|
return len(p)
|
|
}
|
|
|
|
// Less is for the sorting interface. We flip the check
|
|
// so that the "min" in the min-heap is the element with the
|
|
// highest priority or highest modify index
|
|
func (p PendingEvaluations) Less(i, j int) bool {
|
|
if p[i].Priority != p[j].Priority {
|
|
return !(p[i].Priority < p[j].Priority)
|
|
}
|
|
return !(p[i].ModifyIndex < p[j].ModifyIndex)
|
|
}
|
|
|
|
// Swap is for the sorting interface
|
|
func (p PendingEvaluations) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
}
|
|
|
|
// Push implements the heap interface and is used to add a new evaluation to the slice
|
|
func (p *PendingEvaluations) Push(e interface{}) {
|
|
*p = append(*p, e.(*structs.Evaluation))
|
|
}
|
|
|
|
// Pop implements the heap interface and is used to remove an evaluation from the slice
|
|
func (p *PendingEvaluations) Pop() interface{} {
|
|
n := len(*p)
|
|
e := (*p)[n-1]
|
|
(*p)[n-1] = nil
|
|
*p = (*p)[:n-1]
|
|
return e
|
|
}
|
|
|
|
// MarkForCancel is used to clear the pending list of all but the one with the
|
|
// highest modify index and highest priority. It returns a slice of cancelable
|
|
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
|
|
// them. This must be called inside the broker's lock.
|
|
func (p *PendingEvaluations) MarkForCancel() []*structs.Evaluation {
|
|
|
|
// In pathological cases, we can have a large number of pending evals but
|
|
// will want to cancel most of them. Using heap.Remove requires we re-sort
|
|
// for each eval we remove. Because we expect to have at most one remaining,
|
|
// we'll just create a new heap.
|
|
retain := PendingEvaluations{(heap.Pop(p)).(*structs.Evaluation)}
|
|
|
|
cancelable := make([]*structs.Evaluation, len(*p))
|
|
copy(cancelable, *p)
|
|
|
|
*p = retain
|
|
return cancelable
|
|
}
|