65c7e149d3
The eval broker's `Cancelable` method used by the cancelable eval reaper mutates the slice of cancelable evals by removing a batch at a time from the slice. But this method unsafely uses a read lock despite this mutation. Under normal workloads this is likely to be safe but when the eval broker is under the heavy load this feature is intended to fix, we're likely to have a race condition. Switch this to a write lock, like the other locks that mutate the eval broker state. This changeset also adjusts the timeout to allow poorly-sized Actions runners more time to schedule the appropriate goroutines. The test has also been updated to use `shoenig/test/wait` so we can have sensible reporting of the results rather than just a timeout error when things go wrong.
1033 lines
29 KiB
Go
1033 lines
29 KiB
Go
package nomad
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/helper/broker"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/lib/delayheap"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// failedQueue is the queue we add Evaluations to once
|
|
// they've reached the deliveryLimit. This allows the leader to
|
|
// set the status to failed.
|
|
failedQueue = "_failed"
|
|
)
|
|
|
|
var (
|
|
// ErrNotOutstanding is returned if an evaluation is not outstanding
|
|
ErrNotOutstanding = errors.New("evaluation is not outstanding")
|
|
|
|
// ErrTokenMismatch is the outstanding eval has a different token
|
|
ErrTokenMismatch = errors.New("evaluation token does not match")
|
|
|
|
// ErrNackTimeoutReached is returned if an expired evaluation is reset
|
|
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
|
|
)
|
|
|
|
// EvalBroker is used to manage brokering of evaluations. When an evaluation is
|
|
// created, due to a change in a job specification or a node, we put it into the
|
|
// broker. The broker sorts by evaluations by priority and scheduler type. This
|
|
// allows us to dequeue the highest priority work first, while also allowing sub-schedulers
|
|
// to only dequeue work they know how to handle. The broker is designed to be entirely
|
|
// in-memory and is managed by the leader node.
|
|
//
|
|
// The broker must provide at-least-once delivery semantics. It relies on explicit
|
|
// Ack/Nack messages to handle this. If a delivery is not Ack'd in a sufficient time
|
|
// span, it will be assumed Nack'd.
|
|
type EvalBroker struct {
|
|
nackTimeout time.Duration
|
|
deliveryLimit int
|
|
|
|
enabled bool
|
|
enabledNotifier *broker.GenericNotifier
|
|
|
|
stats *BrokerStats
|
|
|
|
// evals tracks queued evaluations by ID to de-duplicate enqueue.
|
|
// The counter is the number of times we've attempted delivery,
|
|
// and is used to eventually fail an evaluation.
|
|
evals map[string]int
|
|
|
|
// jobEvals tracks queued evaluations by a job's ID and namespace to serialize them
|
|
jobEvals map[structs.NamespacedID]string
|
|
|
|
// pending tracks the pending evaluations by JobID in a priority queue
|
|
pending map[structs.NamespacedID]PendingEvaluations
|
|
|
|
// cancelable tracks previously pending evaluations (for any job) that are
|
|
// now safe for the Eval.Ack RPC to cancel in batches
|
|
cancelable []*structs.Evaluation
|
|
|
|
// ready tracks the ready jobs by scheduler in a priority queue
|
|
ready map[string]ReadyEvaluations
|
|
|
|
// unack is a map of evalID to an un-acknowledged evaluation
|
|
unack map[string]*unackEval
|
|
|
|
// waiting is used to notify on a per-scheduler basis of ready work
|
|
waiting map[string]chan struct{}
|
|
|
|
// requeue tracks evaluations that need to be re-enqueued once the current
|
|
// evaluation finishes by token. If the token is Nacked or rejected the
|
|
// evaluation is dropped but if Acked successfully, the evaluation is
|
|
// queued.
|
|
requeue map[string]*structs.Evaluation
|
|
|
|
// timeWait has evaluations that are waiting for time to elapse
|
|
timeWait map[string]*time.Timer
|
|
|
|
// delayedEvalCancelFunc is used to stop the long running go routine
|
|
// that processes delayed evaluations
|
|
delayedEvalCancelFunc context.CancelFunc
|
|
|
|
// delayHeap is a heap used to track incoming evaluations that are
|
|
// not eligible to enqueue until their WaitTime
|
|
delayHeap *delayheap.DelayHeap
|
|
|
|
// delayedEvalsUpdateCh is used to trigger notifications for updates
|
|
// to the delayHeap
|
|
delayedEvalsUpdateCh chan struct{}
|
|
|
|
// initialNackDelay is the delay applied before re-enqueuing a
|
|
// Nacked evaluation for the first time.
|
|
initialNackDelay time.Duration
|
|
|
|
// subsequentNackDelay is the delay applied before reenqueuing
|
|
// an evaluation that has been Nacked more than once. This delay is
|
|
// compounding after the first Nack.
|
|
subsequentNackDelay time.Duration
|
|
|
|
l sync.RWMutex
|
|
}
|
|
|
|
// unackEval tracks an unacknowledged evaluation along with the Nack timer
|
|
type unackEval struct {
|
|
Eval *structs.Evaluation
|
|
Token string
|
|
NackTimer *time.Timer
|
|
}
|
|
|
|
// ReadyEvaluations is a list of ready evaluations across multiple jobs. We
|
|
// implement the container/heap interface so that this is a priority queue.
|
|
type ReadyEvaluations []*structs.Evaluation
|
|
|
|
// PendingEvaluations is a list of pending evaluations for a given job. We
|
|
// implement the container/heap interface so that this is a priority queue.
|
|
type PendingEvaluations []*structs.Evaluation
|
|
|
|
// NewEvalBroker creates a new evaluation broker. This is parameterized
|
|
// with the timeout used for messages that are not acknowledged before we
|
|
// assume a Nack and attempt to redeliver as well as the deliveryLimit
|
|
// which prevents a failing eval from being endlessly delivered. The
|
|
// initialNackDelay is the delay before making a Nacked evaluation available
|
|
// again for the first Nack and subsequentNackDelay is the compounding delay
|
|
// after the first Nack.
|
|
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
|
|
if timeout < 0 {
|
|
return nil, fmt.Errorf("timeout cannot be negative")
|
|
}
|
|
b := &EvalBroker{
|
|
nackTimeout: timeout,
|
|
deliveryLimit: deliveryLimit,
|
|
enabled: false,
|
|
enabledNotifier: broker.NewGenericNotifier(),
|
|
stats: new(BrokerStats),
|
|
evals: make(map[string]int),
|
|
jobEvals: make(map[structs.NamespacedID]string),
|
|
pending: make(map[structs.NamespacedID]PendingEvaluations),
|
|
cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest),
|
|
ready: make(map[string]ReadyEvaluations),
|
|
unack: make(map[string]*unackEval),
|
|
waiting: make(map[string]chan struct{}),
|
|
requeue: make(map[string]*structs.Evaluation),
|
|
timeWait: make(map[string]*time.Timer),
|
|
initialNackDelay: initialNackDelay,
|
|
subsequentNackDelay: subsequentNackDelay,
|
|
delayHeap: delayheap.NewDelayHeap(),
|
|
delayedEvalsUpdateCh: make(chan struct{}, 1),
|
|
}
|
|
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
|
|
return b, nil
|
|
}
|
|
|
|
// Enabled is used to check if the broker is enabled.
|
|
func (b *EvalBroker) Enabled() bool {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
return b.enabled
|
|
}
|
|
|
|
// SetEnabled is used to control if the broker is enabled. The broker
|
|
// should only be enabled on the active leader.
|
|
func (b *EvalBroker) SetEnabled(enabled bool) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
prevEnabled := b.enabled
|
|
b.enabled = enabled
|
|
if !prevEnabled && enabled {
|
|
// start the go routine for delayed evals
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
b.delayedEvalCancelFunc = cancel
|
|
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
|
|
}
|
|
|
|
if !enabled {
|
|
b.flush()
|
|
}
|
|
|
|
// Notify all subscribers to state changes of the broker enabled value.
|
|
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
|
|
}
|
|
|
|
// Enqueue is used to enqueue a new evaluation
|
|
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
b.processEnqueue(eval, "")
|
|
}
|
|
|
|
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
|
|
// that are being re-enqueued to include their token.
|
|
//
|
|
// When requeuing an evaluation that potentially may be already
|
|
// enqueued. The evaluation is handled in one of the following ways:
|
|
// * Evaluation not outstanding: Process as a normal Enqueue
|
|
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
|
// - Ack received: Unblock the evaluation allowing it to be dequeued
|
|
// - Nack received: Drop the evaluation as it was created as a result of a
|
|
// scheduler run that was Nack'd
|
|
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
|
|
// The lock needs to be held until all evaluations are enqueued. This is so
|
|
// that when Dequeue operations are unblocked they will pick the highest
|
|
// priority evaluations.
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
for eval, token := range evals {
|
|
b.processEnqueue(eval, token)
|
|
}
|
|
}
|
|
|
|
// processEnqueue deduplicates evals and either enqueue immediately or enforce
|
|
// the evals wait time. If the token is passed, and the evaluation ID is
|
|
// outstanding, the evaluation is blocked until an Ack/Nack is received.
|
|
// processEnqueue must be called with the lock held.
|
|
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
|
// If we're not enabled, don't enable more queuing.
|
|
if !b.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if already enqueued
|
|
if _, ok := b.evals[eval.ID]; ok {
|
|
if token == "" {
|
|
return
|
|
}
|
|
|
|
// If the token has been passed, the evaluation is being reblocked by
|
|
// the scheduler and should be processed once the outstanding evaluation
|
|
// is Acked or Nacked.
|
|
if unack, ok := b.unack[eval.ID]; ok && unack.Token == token {
|
|
b.requeue[token] = eval
|
|
}
|
|
return
|
|
} else if b.enabled {
|
|
b.evals[eval.ID] = 0
|
|
}
|
|
|
|
// Check if we need to enforce a wait
|
|
if eval.Wait > 0 {
|
|
b.processWaitingEnqueue(eval)
|
|
return
|
|
}
|
|
|
|
if !eval.WaitUntil.IsZero() {
|
|
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
|
|
b.stats.TotalWaiting += 1
|
|
b.stats.DelayedEvals[eval.ID] = eval
|
|
// Signal an update.
|
|
select {
|
|
case b.delayedEvalsUpdateCh <- struct{}{}:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// processWaitingEnqueue waits the given duration on the evaluation before
|
|
// enqueuing.
|
|
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
|
|
timer := time.AfterFunc(eval.Wait, func() {
|
|
b.enqueueWaiting(eval)
|
|
})
|
|
b.timeWait[eval.ID] = timer
|
|
b.stats.TotalWaiting += 1
|
|
}
|
|
|
|
// enqueueWaiting is used to enqueue a waiting evaluation
|
|
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
delete(b.timeWait, eval.ID)
|
|
b.stats.TotalWaiting -= 1
|
|
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// enqueueLocked is used to enqueue with the lock held
|
|
func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) {
|
|
// Do nothing if not enabled
|
|
if !b.enabled {
|
|
return
|
|
}
|
|
|
|
// Check if there is a ready evaluation for this JobID
|
|
namespacedID := structs.NamespacedID{
|
|
ID: eval.JobID,
|
|
Namespace: eval.Namespace,
|
|
}
|
|
readyEval := b.jobEvals[namespacedID]
|
|
if readyEval == "" {
|
|
b.jobEvals[namespacedID] = eval.ID
|
|
} else if readyEval != eval.ID {
|
|
pending := b.pending[namespacedID]
|
|
heap.Push(&pending, eval)
|
|
b.pending[namespacedID] = pending
|
|
b.stats.TotalPending += 1
|
|
return
|
|
}
|
|
|
|
// Find the next ready eval by scheduler class
|
|
readyQueue, ok := b.ready[sched]
|
|
if !ok {
|
|
readyQueue = make([]*structs.Evaluation, 0, 16)
|
|
if _, ok := b.waiting[sched]; !ok {
|
|
b.waiting[sched] = make(chan struct{}, 1)
|
|
}
|
|
}
|
|
|
|
// Push onto the heap
|
|
heap.Push(&readyQueue, eval)
|
|
b.ready[sched] = readyQueue
|
|
|
|
// Update the stats
|
|
b.stats.TotalReady += 1
|
|
bySched, ok := b.stats.ByScheduler[sched]
|
|
if !ok {
|
|
bySched = &SchedulerStats{}
|
|
b.stats.ByScheduler[sched] = bySched
|
|
}
|
|
bySched.Ready += 1
|
|
|
|
// Unblock any pending dequeues
|
|
select {
|
|
case b.waiting[sched] <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Dequeue is used to perform a blocking dequeue. The next available evalution
|
|
// is returned as well as a unique token identifier for this dequeue. The token
|
|
// changes on leadership election to ensure a Dequeue prior to a leadership
|
|
// election cannot conflict with a Dequeue of the same evaluation after a
|
|
// leadership election.
|
|
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) {
|
|
var timeoutTimer *time.Timer
|
|
var timeoutCh <-chan time.Time
|
|
SCAN:
|
|
// Scan for work
|
|
eval, token, err := b.scanForSchedulers(schedulers)
|
|
if err != nil {
|
|
if timeoutTimer != nil {
|
|
timeoutTimer.Stop()
|
|
}
|
|
return nil, "", err
|
|
}
|
|
|
|
// Check if we have something
|
|
if eval != nil {
|
|
if timeoutTimer != nil {
|
|
timeoutTimer.Stop()
|
|
}
|
|
return eval, token, nil
|
|
}
|
|
|
|
// Setup the timeout channel the first time around
|
|
if timeoutTimer == nil && timeout != 0 {
|
|
timeoutTimer = time.NewTimer(timeout)
|
|
timeoutCh = timeoutTimer.C
|
|
}
|
|
|
|
// Block until we get work
|
|
scan := b.waitForSchedulers(schedulers, timeoutCh)
|
|
if scan {
|
|
goto SCAN
|
|
}
|
|
return nil, "", nil
|
|
}
|
|
|
|
// scanForSchedulers scans for work on any of the schedulers. The highest priority work
|
|
// is dequeued first. This may return nothing if there is no work waiting.
|
|
func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, string, error) {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Do nothing if not enabled
|
|
if !b.enabled {
|
|
return nil, "", fmt.Errorf("eval broker disabled")
|
|
}
|
|
|
|
// Scan for eligible work
|
|
var eligibleSched []string
|
|
var eligiblePriority int
|
|
for _, sched := range schedulers {
|
|
// Get the ready queue for this scheduler
|
|
readyQueue, ok := b.ready[sched]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Peek at the next item
|
|
ready := readyQueue.Peek()
|
|
if ready == nil {
|
|
continue
|
|
}
|
|
|
|
// Add to eligible if equal or greater priority
|
|
if len(eligibleSched) == 0 || ready.Priority > eligiblePriority {
|
|
eligibleSched = []string{sched}
|
|
eligiblePriority = ready.Priority
|
|
|
|
} else if eligiblePriority > ready.Priority {
|
|
continue
|
|
|
|
} else if eligiblePriority == ready.Priority {
|
|
eligibleSched = append(eligibleSched, sched)
|
|
}
|
|
}
|
|
|
|
// Determine behavior based on eligible work
|
|
switch n := len(eligibleSched); n {
|
|
case 0:
|
|
// No work to do!
|
|
return nil, "", nil
|
|
|
|
case 1:
|
|
// Only a single task, dequeue
|
|
return b.dequeueForSched(eligibleSched[0])
|
|
|
|
default:
|
|
// Multiple tasks. We pick a random task so that we fairly
|
|
// distribute work.
|
|
offset := rand.Intn(n)
|
|
return b.dequeueForSched(eligibleSched[offset])
|
|
}
|
|
}
|
|
|
|
// dequeueForSched is used to dequeue the next work item for a given scheduler.
|
|
// This assumes locks are held and that this scheduler has work
|
|
func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) {
|
|
readyQueue := b.ready[sched]
|
|
raw := heap.Pop(&readyQueue)
|
|
b.ready[sched] = readyQueue
|
|
eval := raw.(*structs.Evaluation)
|
|
|
|
// Generate a UUID for the token
|
|
token := uuid.Generate()
|
|
|
|
// Setup Nack timer
|
|
nackTimer := time.AfterFunc(b.nackTimeout, func() {
|
|
b.Nack(eval.ID, token)
|
|
})
|
|
|
|
// Add to the unack queue
|
|
b.unack[eval.ID] = &unackEval{
|
|
Eval: eval,
|
|
Token: token,
|
|
NackTimer: nackTimer,
|
|
}
|
|
|
|
// Increment the dequeue count
|
|
b.evals[eval.ID] += 1
|
|
|
|
// Update the stats
|
|
b.stats.TotalReady -= 1
|
|
b.stats.TotalUnacked += 1
|
|
bySched := b.stats.ByScheduler[sched]
|
|
bySched.Ready -= 1
|
|
bySched.Unacked += 1
|
|
|
|
return eval, token, nil
|
|
}
|
|
|
|
// waitForSchedulers is used to wait for work on any of the scheduler or until a timeout.
|
|
// Returns if there is work waiting potentially.
|
|
func (b *EvalBroker) waitForSchedulers(schedulers []string, timeoutCh <-chan time.Time) bool {
|
|
doneCh := make(chan struct{})
|
|
readyCh := make(chan struct{}, 1)
|
|
defer close(doneCh)
|
|
|
|
// Start all the watchers
|
|
b.l.Lock()
|
|
for _, sched := range schedulers {
|
|
waitCh, ok := b.waiting[sched]
|
|
if !ok {
|
|
waitCh = make(chan struct{}, 1)
|
|
b.waiting[sched] = waitCh
|
|
}
|
|
|
|
// Start a goroutine that either waits for the waitCh on this scheduler
|
|
// to unblock or for this waitForSchedulers call to return
|
|
go func() {
|
|
select {
|
|
case <-waitCh:
|
|
select {
|
|
case readyCh <- struct{}{}:
|
|
default:
|
|
}
|
|
case <-doneCh:
|
|
}
|
|
}()
|
|
}
|
|
b.l.Unlock()
|
|
|
|
// Block until we have ready work and should scan, or until we timeout
|
|
// and should not make an attempt to scan for work
|
|
select {
|
|
case <-readyCh:
|
|
return true
|
|
case <-timeoutCh:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Outstanding checks if an EvalID has been delivered but not acknowledged
|
|
// and returns the associated token for the evaluation.
|
|
func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
return unack.Token, true
|
|
}
|
|
|
|
// OutstandingReset resets the Nack timer for the EvalID if the
|
|
// token matches and the eval is outstanding
|
|
func (b *EvalBroker) OutstandingReset(evalID, token string) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
if !unack.NackTimer.Reset(b.nackTimeout) {
|
|
return ErrNackTimeoutReached
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ack is used to positively acknowledge handling an evaluation
|
|
func (b *EvalBroker) Ack(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Always delete the requeued evaluation. Either the Ack is successful and
|
|
// we requeue it or it isn't and we want to remove it.
|
|
defer delete(b.requeue, token)
|
|
|
|
// Lookup the unack'd eval
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return fmt.Errorf("Evaluation ID not found")
|
|
}
|
|
if unack.Token != token {
|
|
return fmt.Errorf("Token does not match for Evaluation ID")
|
|
}
|
|
jobID := unack.Eval.JobID
|
|
|
|
// Ensure we were able to stop the timer
|
|
if !unack.NackTimer.Stop() {
|
|
return fmt.Errorf("Evaluation ID Ack'd after Nack timer expiration")
|
|
}
|
|
|
|
// Update the stats
|
|
b.stats.TotalUnacked -= 1
|
|
queue := unack.Eval.Type
|
|
if b.evals[evalID] > b.deliveryLimit {
|
|
queue = failedQueue
|
|
}
|
|
bySched := b.stats.ByScheduler[queue]
|
|
bySched.Unacked -= 1
|
|
|
|
// Cleanup
|
|
delete(b.unack, evalID)
|
|
delete(b.evals, evalID)
|
|
|
|
namespacedID := structs.NamespacedID{
|
|
ID: jobID,
|
|
Namespace: unack.Eval.Namespace,
|
|
}
|
|
delete(b.jobEvals, namespacedID)
|
|
|
|
// Check if there are any pending evaluations
|
|
if pending := b.pending[namespacedID]; len(pending) != 0 {
|
|
|
|
// Any pending evaluations with ModifyIndexes older than the just-ack'd
|
|
// evaluation are no longer useful, so it's safe to drop them.
|
|
cancelable := pending.MarkForCancel()
|
|
b.cancelable = append(b.cancelable, cancelable...)
|
|
b.stats.TotalCancelable = len(b.cancelable)
|
|
b.stats.TotalPending -= len(cancelable)
|
|
|
|
// If any remain, enqueue an eval
|
|
if len(pending) > 0 {
|
|
raw := heap.Pop(&pending)
|
|
eval := raw.(*structs.Evaluation)
|
|
b.stats.TotalPending -= 1
|
|
b.enqueueLocked(eval, eval.Type)
|
|
}
|
|
|
|
// Clean up if there are no more after that
|
|
if len(pending) > 0 {
|
|
b.pending[namespacedID] = pending
|
|
} else {
|
|
delete(b.pending, namespacedID)
|
|
}
|
|
}
|
|
|
|
// Re-enqueue the evaluation.
|
|
if eval, ok := b.requeue[token]; ok {
|
|
b.processEnqueue(eval, "")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Nack is used to negatively acknowledge handling an evaluation
|
|
func (b *EvalBroker) Nack(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// Always delete the requeued evaluation since the Nack means the requeue is
|
|
// invalid.
|
|
delete(b.requeue, token)
|
|
|
|
// Lookup the unack'd eval
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return fmt.Errorf("Evaluation ID not found")
|
|
}
|
|
if unack.Token != token {
|
|
return fmt.Errorf("Token does not match for Evaluation ID")
|
|
}
|
|
|
|
// Stop the timer, doesn't matter if we've missed it
|
|
unack.NackTimer.Stop()
|
|
|
|
// Cleanup
|
|
delete(b.unack, evalID)
|
|
|
|
// Update the stats
|
|
b.stats.TotalUnacked -= 1
|
|
bySched := b.stats.ByScheduler[unack.Eval.Type]
|
|
bySched.Unacked -= 1
|
|
|
|
// Check if we've hit the delivery limit, and re-enqueue
|
|
// in the failedQueue
|
|
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
|
|
b.enqueueLocked(unack.Eval, failedQueue)
|
|
} else {
|
|
e := unack.Eval
|
|
e.Wait = b.nackReenqueueDelay(e, dequeues)
|
|
|
|
// See if there should be a delay before re-enqueuing
|
|
if e.Wait > 0 {
|
|
b.processWaitingEnqueue(e)
|
|
} else {
|
|
b.enqueueLocked(e, e.Type)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// nackReenqueueDelay is used to determine the delay that should be applied on
|
|
// the evaluation given the number of previous attempts
|
|
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
|
|
switch {
|
|
case prevDequeues <= 0:
|
|
return 0
|
|
case prevDequeues == 1:
|
|
return b.initialNackDelay
|
|
default:
|
|
// For each subsequent nack compound a delay
|
|
return time.Duration(prevDequeues-1) * b.subsequentNackDelay
|
|
}
|
|
}
|
|
|
|
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
|
|
// progress but is in a potentially unbounded operation such as the plan queue.
|
|
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
if !unack.NackTimer.Stop() {
|
|
return ErrNackTimeoutReached
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
|
|
// paused. It should be resumed after leaving an unbounded operation.
|
|
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
unack, ok := b.unack[evalID]
|
|
if !ok {
|
|
return ErrNotOutstanding
|
|
}
|
|
if unack.Token != token {
|
|
return ErrTokenMismatch
|
|
}
|
|
unack.NackTimer.Reset(b.nackTimeout)
|
|
return nil
|
|
}
|
|
|
|
// Flush is used to clear the state of the broker. It must be called from within
|
|
// the lock.
|
|
func (b *EvalBroker) flush() {
|
|
// Unblock any waiters
|
|
for _, waitCh := range b.waiting {
|
|
close(waitCh)
|
|
}
|
|
b.waiting = make(map[string]chan struct{})
|
|
|
|
// Cancel any Nack timers
|
|
for _, unack := range b.unack {
|
|
unack.NackTimer.Stop()
|
|
}
|
|
|
|
// Cancel any time wait evals
|
|
for _, wait := range b.timeWait {
|
|
wait.Stop()
|
|
}
|
|
|
|
// Cancel the delayed evaluations goroutine
|
|
if b.delayedEvalCancelFunc != nil {
|
|
b.delayedEvalCancelFunc()
|
|
}
|
|
|
|
// Clear out the update channel for delayed evaluations
|
|
b.delayedEvalsUpdateCh = make(chan struct{}, 1)
|
|
|
|
// Reset the broker
|
|
b.stats.TotalReady = 0
|
|
b.stats.TotalUnacked = 0
|
|
b.stats.TotalPending = 0
|
|
b.stats.TotalWaiting = 0
|
|
b.stats.TotalCancelable = 0
|
|
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
b.evals = make(map[string]int)
|
|
b.jobEvals = make(map[structs.NamespacedID]string)
|
|
b.pending = make(map[structs.NamespacedID]PendingEvaluations)
|
|
b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest)
|
|
b.ready = make(map[string]ReadyEvaluations)
|
|
b.unack = make(map[string]*unackEval)
|
|
b.timeWait = make(map[string]*time.Timer)
|
|
b.delayHeap = delayheap.NewDelayHeap()
|
|
}
|
|
|
|
// evalWrapper satisfies the HeapNode interface
|
|
type evalWrapper struct {
|
|
eval *structs.Evaluation
|
|
}
|
|
|
|
func (d *evalWrapper) Data() interface{} {
|
|
return d.eval
|
|
}
|
|
|
|
func (d *evalWrapper) ID() string {
|
|
return d.eval.ID
|
|
}
|
|
|
|
func (d *evalWrapper) Namespace() string {
|
|
return d.eval.Namespace
|
|
}
|
|
|
|
// runDelayedEvalsWatcher is a long-lived function that waits till a time
|
|
// deadline is met for pending evaluations before enqueuing them
|
|
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) {
|
|
var timerChannel <-chan time.Time
|
|
var delayTimer *time.Timer
|
|
for {
|
|
eval, waitUntil := b.nextDelayedEval()
|
|
if waitUntil.IsZero() {
|
|
timerChannel = nil
|
|
} else {
|
|
launchDur := waitUntil.Sub(time.Now().UTC())
|
|
if delayTimer == nil {
|
|
delayTimer = time.NewTimer(launchDur)
|
|
} else {
|
|
delayTimer.Reset(launchDur)
|
|
}
|
|
timerChannel = delayTimer.C
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timerChannel:
|
|
// remove from the heap since we can enqueue it now
|
|
b.l.Lock()
|
|
b.delayHeap.Remove(&evalWrapper{eval})
|
|
b.stats.TotalWaiting -= 1
|
|
delete(b.stats.DelayedEvals, eval.ID)
|
|
b.enqueueLocked(eval, eval.Type)
|
|
b.l.Unlock()
|
|
case <-updateCh:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// nextDelayedEval returns the next delayed eval to launch and when it should be enqueued.
|
|
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
|
|
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
// If there is nothing wait for an update.
|
|
if b.delayHeap.Length() == 0 {
|
|
return nil, time.Time{}
|
|
}
|
|
nextEval := b.delayHeap.Peek()
|
|
if nextEval == nil {
|
|
return nil, time.Time{}
|
|
}
|
|
eval := nextEval.Node.Data().(*structs.Evaluation)
|
|
return eval, nextEval.WaitUntil
|
|
}
|
|
|
|
// Stats is used to query the state of the broker
|
|
func (b *EvalBroker) Stats() *BrokerStats {
|
|
// Allocate a new stats struct
|
|
stats := new(BrokerStats)
|
|
stats.DelayedEvals = make(map[string]*structs.Evaluation)
|
|
stats.ByScheduler = make(map[string]*SchedulerStats)
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
// Copy all the stats
|
|
stats.TotalReady = b.stats.TotalReady
|
|
stats.TotalUnacked = b.stats.TotalUnacked
|
|
stats.TotalPending = b.stats.TotalPending
|
|
stats.TotalWaiting = b.stats.TotalWaiting
|
|
stats.TotalCancelable = b.stats.TotalCancelable
|
|
for id, eval := range b.stats.DelayedEvals {
|
|
evalCopy := *eval
|
|
stats.DelayedEvals[id] = &evalCopy
|
|
}
|
|
for sched, subStat := range b.stats.ByScheduler {
|
|
subStatCopy := *subStat
|
|
stats.ByScheduler[sched] = &subStatCopy
|
|
}
|
|
return stats
|
|
}
|
|
|
|
// Cancelable retrieves a batch of previously-pending evaluations that are now
|
|
// stale and ready to mark for canceling. The eval RPC will call this with a
|
|
// batch size set to avoid sending overly large raft messages.
|
|
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
if batchSize > len(b.cancelable) {
|
|
batchSize = len(b.cancelable)
|
|
}
|
|
|
|
cancelable := b.cancelable[:batchSize]
|
|
b.cancelable = b.cancelable[batchSize:]
|
|
|
|
b.stats.TotalCancelable = len(b.cancelable)
|
|
return cancelable
|
|
}
|
|
|
|
// EmitStats is used to export metrics about the broker while enabled
|
|
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
|
|
timer, stop := helper.NewSafeTimer(period)
|
|
defer stop()
|
|
|
|
for {
|
|
timer.Reset(period)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
stats := b.Stats()
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_pending"}, float32(stats.TotalPending))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
|
|
metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable))
|
|
for _, eval := range stats.DelayedEvals {
|
|
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
|
|
float32(time.Until(eval.WaitUntil).Seconds()),
|
|
[]metrics.Label{
|
|
{Name: "eval_id", Value: eval.ID},
|
|
{Name: "job", Value: eval.JobID},
|
|
{Name: "namespace", Value: eval.Namespace},
|
|
})
|
|
}
|
|
for sched, schedStats := range stats.ByScheduler {
|
|
metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready))
|
|
metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked))
|
|
}
|
|
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// BrokerStats returns all the stats about the broker
|
|
type BrokerStats struct {
|
|
TotalReady int
|
|
TotalUnacked int
|
|
TotalPending int
|
|
TotalWaiting int
|
|
TotalCancelable int
|
|
DelayedEvals map[string]*structs.Evaluation
|
|
ByScheduler map[string]*SchedulerStats
|
|
}
|
|
|
|
// SchedulerStats returns the stats per scheduler
|
|
type SchedulerStats struct {
|
|
Ready int
|
|
Unacked int
|
|
}
|
|
|
|
// Len is for the sorting interface
|
|
func (r ReadyEvaluations) Len() int {
|
|
return len(r)
|
|
}
|
|
|
|
// Less is for the sorting interface. We flip the check
|
|
// so that the "min" in the min-heap is the element with the
|
|
// highest priority
|
|
func (r ReadyEvaluations) Less(i, j int) bool {
|
|
if r[i].JobID != r[j].JobID && r[i].Priority != r[j].Priority {
|
|
return !(r[i].Priority < r[j].Priority)
|
|
}
|
|
return r[i].CreateIndex < r[j].CreateIndex
|
|
}
|
|
|
|
// Swap is for the sorting interface
|
|
func (r ReadyEvaluations) Swap(i, j int) {
|
|
r[i], r[j] = r[j], r[i]
|
|
}
|
|
|
|
// Push is used to add a new evaluation to the slice
|
|
func (r *ReadyEvaluations) Push(e interface{}) {
|
|
*r = append(*r, e.(*structs.Evaluation))
|
|
}
|
|
|
|
// Pop is used to remove an evaluation from the slice
|
|
func (r *ReadyEvaluations) Pop() interface{} {
|
|
n := len(*r)
|
|
e := (*r)[n-1]
|
|
(*r)[n-1] = nil
|
|
*r = (*r)[:n-1]
|
|
return e
|
|
}
|
|
|
|
// Peek is used to peek at the next element that would be popped
|
|
func (r ReadyEvaluations) Peek() *structs.Evaluation {
|
|
n := len(r)
|
|
if n == 0 {
|
|
return nil
|
|
}
|
|
return r[n-1]
|
|
}
|
|
|
|
// Len is for the sorting interface
|
|
func (p PendingEvaluations) Len() int {
|
|
return len(p)
|
|
}
|
|
|
|
// Less is for the sorting interface. We flip the check
|
|
// so that the "min" in the min-heap is the element with the
|
|
// highest priority or highest modify index
|
|
func (p PendingEvaluations) Less(i, j int) bool {
|
|
if p[i].Priority != p[j].Priority {
|
|
return !(p[i].Priority < p[j].Priority)
|
|
}
|
|
return !(p[i].ModifyIndex < p[j].ModifyIndex)
|
|
}
|
|
|
|
// Swap is for the sorting interface
|
|
func (p PendingEvaluations) Swap(i, j int) {
|
|
p[i], p[j] = p[j], p[i]
|
|
}
|
|
|
|
// Push implements the heap interface and is used to add a new evaluation to the slice
|
|
func (p *PendingEvaluations) Push(e interface{}) {
|
|
*p = append(*p, e.(*structs.Evaluation))
|
|
}
|
|
|
|
// Pop implements the heap interface and is used to remove an evaluation from the slice
|
|
func (p *PendingEvaluations) Pop() interface{} {
|
|
n := len(*p)
|
|
e := (*p)[n-1]
|
|
(*p)[n-1] = nil
|
|
*p = (*p)[:n-1]
|
|
return e
|
|
}
|
|
|
|
// MarkForCancel is used to clear the pending list of all but the one with the
|
|
// highest modify index and highest priority. It returns a slice of cancelable
|
|
// evals so that Eval.Ack RPCs can write batched raft entries to cancel
|
|
// them. This must be called inside the broker's lock.
|
|
func (p *PendingEvaluations) MarkForCancel() []*structs.Evaluation {
|
|
|
|
// In pathological cases, we can have a large number of pending evals but
|
|
// will want to cancel most of them. Using heap.Remove requires we re-sort
|
|
// for each eval we remove. Because we expect to have at most one remaining,
|
|
// we'll just create a new heap.
|
|
retain := PendingEvaluations{(heap.Pop(p)).(*structs.Evaluation)}
|
|
|
|
cancelable := make([]*structs.Evaluation, len(*p))
|
|
copy(cancelable, *p)
|
|
|
|
*p = retain
|
|
return cancelable
|
|
}
|