539 lines
15 KiB
Go
539 lines
15 KiB
Go
package nomad
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/scheduler"
|
|
)
|
|
|
|
const (
|
|
// backoffBaselineFast is the baseline time for exponential backoff
|
|
backoffBaselineFast = 20 * time.Millisecond
|
|
|
|
// backoffBaselineSlow is the baseline time for exponential backoff
|
|
// but that is much slower than backoffBaselineFast
|
|
backoffBaselineSlow = 500 * time.Millisecond
|
|
|
|
// backoffLimitFast is the limit of the exponential backoff
|
|
backoffLimitFast = time.Second
|
|
|
|
// backoffLimitSlow is the limit of the exponential backoff for
|
|
// the slower backoff
|
|
backoffLimitSlow = 10 * time.Second
|
|
|
|
// backoffSchedulerVersionMismatch is the backoff between retries when the
|
|
// scheduler version mismatches that of the leader.
|
|
backoffSchedulerVersionMismatch = 30 * time.Second
|
|
|
|
// dequeueTimeout is used to timeout an evaluation dequeue so that
|
|
// we can check if there is a shutdown event
|
|
dequeueTimeout = 500 * time.Millisecond
|
|
|
|
// raftSyncLimit is the limit of time we will wait for Raft replication
|
|
// to catch up to the evaluation. This is used to fast Nack and
|
|
// allow another scheduler to pick it up.
|
|
raftSyncLimit = 5 * time.Second
|
|
|
|
// dequeueErrGrace is the grace period where we don't log about
|
|
// dequeue errors after start. This is to improve the user experience
|
|
// in dev mode where the leader isn't elected for a few seconds.
|
|
dequeueErrGrace = 10 * time.Second
|
|
)
|
|
|
|
// Worker is a single threaded scheduling worker. There may be multiple
|
|
// running per server (leader or follower). They are responsible for dequeuing
|
|
// pending evaluations, invoking schedulers, plan submission and the
|
|
// lifecycle around making task allocations. They bridge the business logic
|
|
// of the scheduler with the plumbing required to make it all work.
|
|
type Worker struct {
|
|
srv *Server
|
|
logger *log.Logger
|
|
start time.Time
|
|
|
|
paused bool
|
|
pauseLock sync.Mutex
|
|
pauseCond *sync.Cond
|
|
|
|
failures uint
|
|
|
|
evalToken string
|
|
|
|
// snapshotIndex is the index of the snapshot in which the scheduler was
|
|
// first envoked. It is used to mark the SnapshotIndex of evaluations
|
|
// Created, Updated or Reblocked.
|
|
snapshotIndex uint64
|
|
}
|
|
|
|
// NewWorker starts a new worker associated with the given server
|
|
func NewWorker(srv *Server) (*Worker, error) {
|
|
w := &Worker{
|
|
srv: srv,
|
|
logger: srv.logger,
|
|
start: time.Now(),
|
|
}
|
|
w.pauseCond = sync.NewCond(&w.pauseLock)
|
|
go w.run()
|
|
return w, nil
|
|
}
|
|
|
|
// SetPause is used to pause or unpause a worker
|
|
func (w *Worker) SetPause(p bool) {
|
|
w.pauseLock.Lock()
|
|
w.paused = p
|
|
w.pauseLock.Unlock()
|
|
if !p {
|
|
w.pauseCond.Broadcast()
|
|
}
|
|
}
|
|
|
|
// checkPaused is used to park the worker when paused
|
|
func (w *Worker) checkPaused() {
|
|
w.pauseLock.Lock()
|
|
for w.paused {
|
|
w.pauseCond.Wait()
|
|
}
|
|
w.pauseLock.Unlock()
|
|
}
|
|
|
|
// run is the long-lived goroutine which is used to run the worker
|
|
func (w *Worker) run() {
|
|
for {
|
|
// Dequeue a pending evaluation
|
|
eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout)
|
|
if shutdown {
|
|
return
|
|
}
|
|
|
|
// Check for a shutdown
|
|
if w.srv.IsShutdown() {
|
|
w.sendAck(eval.ID, token, false)
|
|
return
|
|
}
|
|
|
|
// Wait for the raft log to catchup to the evaluation
|
|
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
|
|
w.sendAck(eval.ID, token, false)
|
|
continue
|
|
}
|
|
|
|
// Invoke the scheduler to determine placements
|
|
if err := w.invokeScheduler(eval, token); err != nil {
|
|
w.sendAck(eval.ID, token, false)
|
|
continue
|
|
}
|
|
|
|
// Complete the evaluation
|
|
w.sendAck(eval.ID, token, true)
|
|
}
|
|
}
|
|
|
|
// dequeueEvaluation is used to fetch the next ready evaluation.
|
|
// This blocks until an evaluation is available or a timeout is reached.
|
|
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
|
|
// Setup the request
|
|
req := structs.EvalDequeueRequest{
|
|
Schedulers: w.srv.config.EnabledSchedulers,
|
|
Timeout: timeout,
|
|
SchedulerVersion: scheduler.SchedulerVersion,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.EvalDequeueResponse
|
|
|
|
REQ:
|
|
// Check if we are paused
|
|
w.checkPaused()
|
|
|
|
// Make a blocking RPC
|
|
start := time.Now()
|
|
err := w.srv.RPC("Eval.Dequeue", &req, &resp)
|
|
metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start)
|
|
if err != nil {
|
|
if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() {
|
|
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
|
|
}
|
|
|
|
// Adjust the backoff based on the error. If it is a scheduler version
|
|
// mismatch we increase the baseline.
|
|
base, limit := backoffBaselineFast, backoffLimitSlow
|
|
if strings.Contains(err.Error(), "calling scheduler version") {
|
|
base = backoffSchedulerVersionMismatch
|
|
limit = backoffSchedulerVersionMismatch
|
|
}
|
|
|
|
if w.backoffErr(base, limit) {
|
|
return nil, "", true
|
|
}
|
|
goto REQ
|
|
}
|
|
w.backoffReset()
|
|
|
|
// Check if we got a response
|
|
if resp.Eval != nil {
|
|
w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID)
|
|
return resp.Eval, resp.Token, false
|
|
}
|
|
|
|
// Check for potential shutdown
|
|
if w.srv.IsShutdown() {
|
|
return nil, "", true
|
|
}
|
|
goto REQ
|
|
}
|
|
|
|
// sendAck makes a best effort to ack or nack the evaluation.
|
|
// Any errors are logged but swallowed.
|
|
func (w *Worker) sendAck(evalID, token string, ack bool) {
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now())
|
|
// Setup the request
|
|
req := structs.EvalAckRequest{
|
|
EvalID: evalID,
|
|
Token: token,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.GenericResponse
|
|
|
|
// Determine if this is an Ack or Nack
|
|
verb := "ack"
|
|
endpoint := "Eval.Ack"
|
|
if !ack {
|
|
verb = "nack"
|
|
endpoint = "Eval.Nack"
|
|
}
|
|
|
|
// Make the RPC call
|
|
err := w.srv.RPC(endpoint, &req, &resp)
|
|
if err != nil {
|
|
w.logger.Printf("[ERR] worker: failed to %s evaluation '%s': %v",
|
|
verb, evalID, err)
|
|
} else {
|
|
w.logger.Printf("[DEBUG] worker: %s for evaluation %s", verb, evalID)
|
|
}
|
|
}
|
|
|
|
// waitForIndex ensures that the local state is at least as fresh
|
|
// as the given index. This is used before starting an evaluation,
|
|
// but also potentially mid-stream. If a Plan fails because of stale
|
|
// state (attempt to allocate to a failed/dead node), we may need
|
|
// to sync our state again and do the planning with more recent data.
|
|
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
|
|
// XXX: Potential optimization is to set up a watch on the state stores
|
|
// index table and only unblock via a trigger rather than timing out and
|
|
// checking.
|
|
|
|
start := time.Now()
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
|
|
CHECK:
|
|
// Get the states current index
|
|
snapshotIndex, err := w.srv.fsm.State().LatestIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to determine state store's index: %v", err)
|
|
}
|
|
|
|
// We only need the FSM state to be as recent as the given index
|
|
if index <= snapshotIndex {
|
|
w.backoffReset()
|
|
return nil
|
|
}
|
|
|
|
// Check if we've reached our limit
|
|
if time.Now().Sub(start) > timeout {
|
|
return fmt.Errorf("sync wait timeout reached")
|
|
}
|
|
|
|
// Exponential back off if we haven't yet reached it
|
|
if w.backoffErr(backoffBaselineFast, backoffLimitFast) {
|
|
return fmt.Errorf("shutdown while waiting for state sync")
|
|
}
|
|
goto CHECK
|
|
}
|
|
|
|
// invokeScheduler is used to invoke the business logic of the scheduler
|
|
func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error {
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "invoke_scheduler", eval.Type}, time.Now())
|
|
// Store the evaluation token
|
|
w.evalToken = token
|
|
|
|
// Snapshot the current state
|
|
snap, err := w.srv.fsm.State().Snapshot()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to snapshot state: %v", err)
|
|
}
|
|
|
|
// Store the snapshot's index
|
|
w.snapshotIndex, err = snap.LatestIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to determine snapshot's index: %v", err)
|
|
}
|
|
|
|
// Create the scheduler, or use the special system scheduler
|
|
var sched scheduler.Scheduler
|
|
if eval.Type == structs.JobTypeCore {
|
|
sched = NewCoreScheduler(w.srv, snap)
|
|
} else {
|
|
sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to instantiate scheduler: %v", err)
|
|
}
|
|
}
|
|
|
|
// Process the evaluation
|
|
err = sched.Process(eval)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to process evaluation: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SubmitPlan is used to submit a plan for consideration. This allows
|
|
// the worker to act as the planner for the scheduler.
|
|
func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) {
|
|
// Check for a shutdown before plan submission
|
|
if w.srv.IsShutdown() {
|
|
return nil, nil, fmt.Errorf("shutdown while planning")
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "submit_plan"}, time.Now())
|
|
|
|
// Add the evaluation token to the plan
|
|
plan.EvalToken = w.evalToken
|
|
|
|
// Setup the request
|
|
req := structs.PlanRequest{
|
|
Plan: plan,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.PlanResponse
|
|
|
|
SUBMIT:
|
|
// Make the RPC call
|
|
if err := w.srv.RPC("Plan.Submit", &req, &resp); err != nil {
|
|
w.logger.Printf("[ERR] worker: failed to submit plan for evaluation %s: %v",
|
|
plan.EvalID, err)
|
|
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
|
goto SUBMIT
|
|
}
|
|
return nil, nil, err
|
|
} else {
|
|
w.logger.Printf("[DEBUG] worker: submitted plan for evaluation %s", plan.EvalID)
|
|
w.backoffReset()
|
|
}
|
|
|
|
// Look for a result
|
|
result := resp.Result
|
|
if result == nil {
|
|
return nil, nil, fmt.Errorf("missing result")
|
|
}
|
|
|
|
// Check if a state update is required. This could be required if we
|
|
// planning based on stale data, which is causing issues. For example, a
|
|
// node failure since the time we've started planning or conflicting task
|
|
// allocations.
|
|
var state scheduler.State
|
|
if result.RefreshIndex != 0 {
|
|
// Wait for the raft log to catchup to the evaluation
|
|
w.logger.Printf("[DEBUG] worker: refreshing state to index %d for %q", result.RefreshIndex, plan.EvalID)
|
|
if err := w.waitForIndex(result.RefreshIndex, raftSyncLimit); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Snapshot the current state
|
|
snap, err := w.srv.fsm.State().Snapshot()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to snapshot state: %v", err)
|
|
}
|
|
state = snap
|
|
}
|
|
|
|
// Return the result and potential state update
|
|
return result, state, nil
|
|
}
|
|
|
|
// UpdateEval is used to submit an updated evaluation. This allows
|
|
// the worker to act as the planner for the scheduler.
|
|
func (w *Worker) UpdateEval(eval *structs.Evaluation) error {
|
|
// Check for a shutdown before plan submission
|
|
if w.srv.IsShutdown() {
|
|
return fmt.Errorf("shutdown while planning")
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "update_eval"}, time.Now())
|
|
|
|
// Store the snapshot index in the eval
|
|
eval.SnapshotIndex = w.snapshotIndex
|
|
|
|
// Setup the request
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval},
|
|
EvalToken: w.evalToken,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.GenericResponse
|
|
|
|
SUBMIT:
|
|
// Make the RPC call
|
|
if err := w.srv.RPC("Eval.Update", &req, &resp); err != nil {
|
|
w.logger.Printf("[ERR] worker: failed to update evaluation %#v: %v",
|
|
eval, err)
|
|
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
|
goto SUBMIT
|
|
}
|
|
return err
|
|
} else {
|
|
w.logger.Printf("[DEBUG] worker: updated evaluation %#v", eval)
|
|
w.backoffReset()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateEval is used to create a new evaluation. This allows
|
|
// the worker to act as the planner for the scheduler.
|
|
func (w *Worker) CreateEval(eval *structs.Evaluation) error {
|
|
// Check for a shutdown before plan submission
|
|
if w.srv.IsShutdown() {
|
|
return fmt.Errorf("shutdown while planning")
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "create_eval"}, time.Now())
|
|
|
|
// Store the snapshot index in the eval
|
|
eval.SnapshotIndex = w.snapshotIndex
|
|
|
|
// Setup the request
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval},
|
|
EvalToken: w.evalToken,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.GenericResponse
|
|
|
|
SUBMIT:
|
|
// Make the RPC call
|
|
if err := w.srv.RPC("Eval.Create", &req, &resp); err != nil {
|
|
w.logger.Printf("[ERR] worker: failed to create evaluation %#v: %v",
|
|
eval, err)
|
|
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
|
goto SUBMIT
|
|
}
|
|
return err
|
|
} else {
|
|
w.logger.Printf("[DEBUG] worker: created evaluation %#v", eval)
|
|
w.backoffReset()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReblockEval is used to reinsert a blocked evaluation into the blocked eval
|
|
// tracker. This allows the worker to act as the planner for the scheduler.
|
|
func (w *Worker) ReblockEval(eval *structs.Evaluation) error {
|
|
// Check for a shutdown before plan submission
|
|
if w.srv.IsShutdown() {
|
|
return fmt.Errorf("shutdown while planning")
|
|
}
|
|
defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now())
|
|
|
|
// Update the evaluation if the queued jobs is not same as what is
|
|
// recorded in the job summary
|
|
ws := memdb.NewWatchSet()
|
|
summary, err := w.srv.fsm.state.JobSummaryByID(ws, eval.Namespace, eval.JobID)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't retrieve job summary: %v", err)
|
|
}
|
|
if summary != nil {
|
|
var hasChanged bool
|
|
for tg, summary := range summary.Summary {
|
|
if queued, ok := eval.QueuedAllocations[tg]; ok {
|
|
if queued != summary.Queued {
|
|
hasChanged = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if hasChanged {
|
|
if err := w.UpdateEval(eval); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Store the snapshot index in the eval
|
|
eval.SnapshotIndex = w.snapshotIndex
|
|
|
|
// Setup the request
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval},
|
|
EvalToken: w.evalToken,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: w.srv.config.Region,
|
|
},
|
|
}
|
|
var resp structs.GenericResponse
|
|
|
|
SUBMIT:
|
|
// Make the RPC call
|
|
if err := w.srv.RPC("Eval.Reblock", &req, &resp); err != nil {
|
|
w.logger.Printf("[ERR] worker: failed to reblock evaluation %#v: %v",
|
|
eval, err)
|
|
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
|
goto SUBMIT
|
|
}
|
|
return err
|
|
} else {
|
|
w.logger.Printf("[DEBUG] worker: reblocked evaluation %#v", eval)
|
|
w.backoffReset()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// shouldResubmit checks if a given error should be swallowed and the plan
|
|
// resubmitted after a backoff. Usually these are transient errors that
|
|
// the cluster should heal from quickly.
|
|
func (w *Worker) shouldResubmit(err error) bool {
|
|
s := err.Error()
|
|
switch {
|
|
case strings.Contains(s, "No cluster leader"):
|
|
return true
|
|
case strings.Contains(s, "plan queue is disabled"):
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// backoffErr is used to do an exponential back off on error. This is
|
|
// maintained statefully for the worker. Returns if attempts should be
|
|
// abandoneded due to shutdown.
|
|
func (w *Worker) backoffErr(base, limit time.Duration) bool {
|
|
backoff := (1 << (2 * w.failures)) * base
|
|
if backoff > limit {
|
|
backoff = limit
|
|
} else {
|
|
w.failures++
|
|
}
|
|
select {
|
|
case <-time.After(backoff):
|
|
return false
|
|
case <-w.srv.shutdownCh:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// backoffReset is used to reset the failure count for
|
|
// exponential backoff
|
|
func (w *Worker) backoffReset() {
|
|
w.failures = 0
|
|
}
|