open-nomad/scheduler/generic_sched.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

946 lines
33 KiB
Go
Raw Permalink Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package scheduler
import (
"fmt"
"runtime/debug"
"sort"
"time"
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
2015-08-13 22:17:24 +00:00
const (
// maxServiceScheduleAttempts is used to limit the number of times
// we will attempt to schedule if we continue to hit conflicts for services.
maxServiceScheduleAttempts = 5
// maxBatchScheduleAttempts is used to limit the number of times
// we will attempt to schedule if we continue to hit conflicts for batch.
maxBatchScheduleAttempts = 2
2015-08-26 00:06:06 +00:00
// allocNotNeeded is the status used when a job no longer requires an allocation
allocNotNeeded = "alloc not needed due to job update"
// allocReconnected is the status to use when a replacement allocation is stopped
// because a disconnected node reconnects.
allocReconnected = "alloc not needed due to disconnected client reconnect"
2015-08-26 00:06:06 +00:00
// allocMigrating is the status used when we must migrate an allocation
allocMigrating = "alloc is being migrated"
// allocUpdating is the status used when a job requires an update
allocUpdating = "alloc is being updated due to job update"
// allocLost is the status used when an allocation is lost
allocLost = "alloc is lost since its node is down"
// allocUnknown is the status used when an allocation is unknown
allocUnknown = "alloc is unknown since its node is disconnected"
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"
// allocNodeTainted is the status used when stopping an alloc because its
// node is tainted.
allocNodeTainted = "alloc not needed as node is tainted"
Stop allocs to be rescheduled Currently, when an alloc fails and is rescheduled, the alloc desired state remains as "run" and the nomad client may not free the resources. Here, we ensure that an alloc is marked as stopped when it's rescheduled. Notice the Desired Status and Description before and after this change: Before: ``` mars-2:nomad notnoop$ nomad alloc status 02aba49e ID = 02aba49e Eval ID = bb9ed1d2 Name = example-reschedule.nodes[0] Node ID = 5853d547 Node Name = mars-2.local Job ID = example-reschedule Job Version = 0 Client Status = failed Client Description = Failed tasks Desired Status = run Desired Description = <none> Created = 10s ago Modified = 5s ago Replacement Alloc ID = d6bf872b Task "payload" is "dead" Task Resources CPU Memory Disk Addresses 0/100 MHz 24 MiB/300 MiB 300 MiB Task Events: Started At = 2019-06-06T21:12:45Z Finished At = 2019-06-06T21:12:50Z Total Restarts = 0 Last Restart = N/A Recent Events: Time Type Description 2019-06-06T17:12:50-04:00 Not Restarting Policy allows no restarts 2019-06-06T17:12:50-04:00 Terminated Exit Code: 1 2019-06-06T17:12:45-04:00 Started Task started by client 2019-06-06T17:12:45-04:00 Task Setup Building Task Directory 2019-06-06T17:12:45-04:00 Received Task received by client ``` After: ``` ID = 5001ccd1 Eval ID = 53507a02 Name = example-reschedule.nodes[0] Node ID = a3b04364 Node Name = mars-2.local Job ID = example-reschedule Job Version = 0 Client Status = failed Client Description = Failed tasks Desired Status = stop Desired Description = alloc was rescheduled because it failed Created = 13s ago Modified = 3s ago Replacement Alloc ID = 7ba7ac20 Task "payload" is "dead" Task Resources CPU Memory Disk Addresses 21/100 MHz 24 MiB/300 MiB 300 MiB Task Events: Started At = 2019-06-06T21:22:50Z Finished At = 2019-06-06T21:22:55Z Total Restarts = 0 Last Restart = N/A Recent Events: Time Type Description 2019-06-06T17:22:55-04:00 Not Restarting Policy allows no restarts 2019-06-06T17:22:55-04:00 Terminated Exit Code: 1 2019-06-06T17:22:50-04:00 Started Task started by client 2019-06-06T17:22:50-04:00 Task Setup Building Task Directory 2019-06-06T17:22:50-04:00 Received Task received by client ```
2019-06-06 19:04:32 +00:00
// allocRescheduled is the status used when an allocation failed and was rescheduled
allocRescheduled = "alloc was rescheduled because it failed"
// blockedEvalMaxPlanDesc is the description used for blocked evals that are
// a result of hitting the max number of plan attempts
blockedEvalMaxPlanDesc = "created due to placement conflicts"
// blockedEvalFailedPlacements is the description used for blocked evals
// that are a result of failing to place all allocations.
blockedEvalFailedPlacements = "created to place remaining allocations"
2018-03-08 00:44:54 +00:00
// reschedulingFollowupEvalDesc is the description used when creating follow
// up evals for delayed rescheduling
reschedulingFollowupEvalDesc = "created for delayed rescheduling"
// disconnectTimeoutFollowupEvalDesc is the description used when creating follow
// up evals for allocations that be should be stopped after its disconnect
// timeout has passed.
disconnectTimeoutFollowupEvalDesc = "created for delayed disconnect timeout"
2018-03-08 00:44:54 +00:00
// maxPastRescheduleEvents is the maximum number of past reschedule event
// that we track when unlimited rescheduling is enabled
maxPastRescheduleEvents = 5
2015-08-13 22:17:24 +00:00
)
// minVersionMaxClientDisconnect is the minimum version that supports max_client_disconnect.
var minVersionMaxClientDisconnect = version.Must(version.NewVersion("1.3.0"))
// SetStatusError is used to set the status of the evaluation to the given error
type SetStatusError struct {
Err error
EvalStatus string
}
func (s *SetStatusError) Error() string {
return s.Err.Error()
}
// GenericScheduler is used for 'service' and 'batch' type jobs. This scheduler is
2017-12-13 17:36:03 +00:00
// designed for long-lived services, and as such spends more time attempting
// to make a high quality placement. This is the primary scheduler for
// most workloads. It also supports a 'batch' mode to optimize for fast decision
// making at the cost of quality.
type GenericScheduler struct {
logger log.Logger
eventsCh chan<- interface{}
state State
planner Planner
batch bool
2015-08-14 00:11:20 +00:00
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *GenericStack
// followUpEvals are evals with WaitUntil set, which are delayed until that time
// before being rescheduled
2018-03-12 20:44:33 +00:00
followUpEvals []*structs.Evaluation
2017-05-18 19:36:04 +00:00
deployment *structs.Deployment
blocked *structs.Evaluation
failedTGAllocs map[string]*structs.AllocMetric
queuedAllocs map[string]int
}
// NewServiceScheduler is a factory function to instantiate a new service scheduler
func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler {
s := &GenericScheduler{
logger: logger.Named("service_sched"),
eventsCh: eventsCh,
state: state,
planner: planner,
batch: false,
}
return s
}
// NewBatchScheduler is a factory function to instantiate a new batch scheduler
func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler {
s := &GenericScheduler{
logger: logger.Named("batch_sched"),
eventsCh: eventsCh,
state: state,
planner: planner,
batch: true,
}
return s
}
// Process is used to handle a single evaluation
func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) {
defer func() {
if r := recover(); r != nil {
s.logger.Error("processing eval panicked scheduler - please report this as a bug!", "eval_id", eval.ID, "error", r, "stack_trace", string(debug.Stack()))
err = fmt.Errorf("failed to process eval: %v", r)
}
}()
// Store the evaluation
s.eval = eval
2018-09-15 23:23:13 +00:00
// Update our logger with the eval's information
s.logger = s.logger.With("eval_id", eval.ID, "job_id", eval.JobID, "namespace", eval.Namespace)
2015-08-14 00:40:23 +00:00
// Verify the evaluation trigger reason is understood
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister,
structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerAllocStop,
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption,
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout, structs.EvalTriggerReconnect:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
2018-03-12 20:44:33 +00:00
return setStatus(s.logger, s.planner, s.eval, nil, s.blocked,
2017-07-06 00:13:45 +00:00
s.failedTGAllocs, structs.EvalStatusFailed, desc, s.queuedAllocs,
s.deployment.GetID())
}
2015-08-14 00:40:23 +00:00
// Retry up to the maxScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
limit := maxServiceScheduleAttempts
if s.batch {
limit = maxBatchScheduleAttempts
}
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
// Scheduling was tried but made no forward progress so create a
// blocked eval to retry once resources become available.
var mErr multierror.Error
if err := s.createBlockedEval(true); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
2018-03-12 20:44:33 +00:00
if err := setStatus(s.logger, s.planner, s.eval, nil, s.blocked,
s.failedTGAllocs, statusErr.EvalStatus, err.Error(),
2017-07-06 00:13:45 +00:00
s.queuedAllocs, s.deployment.GetID()); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
return err
}
// If the current evaluation is a blocked evaluation and we didn't place
// everything, do not update the status to complete.
if s.eval.Status == structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 {
e := s.ctx.Eligibility()
newEval := s.eval.Copy()
newEval.EscapedComputedClass = e.HasEscaped()
newEval.ClassEligibility = e.GetClasses()
2017-10-13 21:36:02 +00:00
newEval.QuotaLimitReached = e.QuotaLimitReached()
return s.planner.ReblockEval(newEval)
}
// Update the status to complete
2018-03-12 20:44:33 +00:00
return setStatus(s.logger, s.planner, s.eval, nil, s.blocked,
2017-07-06 00:13:45 +00:00
s.failedTGAllocs, structs.EvalStatusComplete, "", s.queuedAllocs,
s.deployment.GetID())
}
// createBlockedEval creates a blocked eval and submits it to the planner. If
// failure is set to true, the eval's trigger reason reflects that.
func (s *GenericScheduler) createBlockedEval(planFailure bool) error {
e := s.ctx.Eligibility()
escaped := e.HasEscaped()
// Only store the eligible classes if the eval hasn't escaped.
var classEligibility map[string]bool
if !escaped {
classEligibility = e.GetClasses()
}
s.blocked = s.eval.CreateBlockedEval(classEligibility, escaped, e.QuotaLimitReached(), s.failedTGAllocs)
if planFailure {
s.blocked.TriggeredBy = structs.EvalTriggerMaxPlans
s.blocked.StatusDescription = blockedEvalMaxPlanDesc
} else {
s.blocked.StatusDescription = blockedEvalFailedPlacements
}
return s.planner.CreateEval(s.blocked)
}
2015-08-14 00:40:23 +00:00
// process is wrapped in retryMax to iteratively run the handler until we have no
2015-08-14 00:11:20 +00:00
// further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) {
2015-08-11 23:41:48 +00:00
// Lookup the Job by ID
var err error
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID)
2015-08-11 23:41:48 +00:00
if err != nil {
2017-05-18 19:36:04 +00:00
return false, fmt.Errorf("failed to get job %q: %v", s.eval.JobID, err)
2015-08-14 00:11:20 +00:00
}
2017-05-18 19:36:04 +00:00
numTaskGroups := 0
2017-05-18 19:36:04 +00:00
stopped := s.job.Stopped()
if !stopped {
numTaskGroups = len(s.job.TaskGroups)
}
s.queuedAllocs = make(map[string]int, numTaskGroups)
2018-03-08 00:44:54 +00:00
s.followUpEvals = nil
2015-08-14 00:11:20 +00:00
// Create a plan
s.plan = s.eval.MakePlan(s.job)
2015-08-14 00:11:20 +00:00
2017-05-18 19:36:04 +00:00
if !s.batch {
// Get any existing deployment
2017-09-07 23:56:15 +00:00
s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID)
2017-05-18 19:36:04 +00:00
if err != nil {
return false, fmt.Errorf("failed to get job deployment %q: %v", s.eval.JobID, err)
}
}
// Reset the failed allocations
s.failedTGAllocs = nil
2015-09-07 18:34:59 +00:00
// Create an evaluation context
s.ctx = NewEvalContext(s.eventsCh, s.state, s.plan, s.logger)
2015-09-07 18:34:59 +00:00
// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx)
2017-04-19 17:54:03 +00:00
if !s.job.Stopped() {
s.setJob(s.job)
}
2015-09-07 18:34:59 +00:00
2015-08-14 00:40:23 +00:00
// Compute the target job allocations
2017-05-23 23:39:15 +00:00
if err := s.computeJobAllocs(); err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to compute job allocations", "error", err)
2015-08-14 00:40:23 +00:00
return false, err
2015-08-11 23:41:48 +00:00
}
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it. If not, submit
// a new eval to the planner in createBlockedEval. If rescheduling should
// be delayed, do that instead.
delayInstead := len(s.followUpEvals) > 0 && s.eval.WaitUntil.IsZero()
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil &&
!delayInstead {
if err := s.createBlockedEval(false); err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to make blocked eval", "error", err)
return false, err
}
2018-09-15 23:23:13 +00:00
s.logger.Debug("failed to place all allocations, blocked eval created", "blocked_eval_id", s.blocked.ID)
}
// If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan
// anyways to get the annotations.
if s.plan.IsNoOp() && !s.eval.AnnotatePlan {
2015-08-14 01:16:32 +00:00
return true, nil
}
// Create follow up evals for any delayed reschedule eligible allocations, except in
// the case that this evaluation was already delayed.
if delayInstead {
for _, eval := range s.followUpEvals {
eval.PreviousEval = s.eval.ID
// TODO(preetha) this should be batching evals before inserting them
if err := s.planner.CreateEval(eval); err != nil {
2018-09-15 23:23:13 +00:00
s.logger.Error("failed to make next eval for rescheduling", "error", err)
return false, err
}
2018-09-15 23:23:13 +00:00
s.logger.Debug("found reschedulable allocs, followup eval created", "followup_eval_id", eval.ID)
}
}
// Submit the plan and store the results.
2015-08-14 00:11:20 +00:00
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
2015-08-14 00:11:20 +00:00
if err != nil {
2015-08-14 00:40:23 +00:00
return false, err
2015-08-14 00:11:20 +00:00
}
2016-07-22 06:13:07 +00:00
// Decrement the number of allocations pending per task group based on the
// number of allocations successfully placed
2016-07-22 21:53:49 +00:00
adjustQueuedAllocations(s.logger, result, s.queuedAllocs)
2015-08-14 00:11:20 +00:00
// If we got a state refresh, try again since we have stale data
if newState != nil {
2018-09-15 23:23:13 +00:00
s.logger.Debug("refresh forced")
2015-08-14 00:11:20 +00:00
s.state = newState
2015-08-14 00:40:23 +00:00
return false, nil
2015-08-14 00:11:20 +00:00
}
// Try again if the plan was not fully committed, potential conflict
fullCommit, expected, actual := result.FullCommit(s.plan)
if !fullCommit {
2018-09-15 23:23:13 +00:00
s.logger.Debug("plan didn't fully commit", "attempted", expected, "placed", actual)
2016-02-22 20:15:40 +00:00
if newState == nil {
return false, fmt.Errorf("missing state refresh after partial commit")
}
2015-08-14 00:40:23 +00:00
return false, nil
2015-08-14 00:11:20 +00:00
}
2015-08-14 00:40:23 +00:00
// Success!
return true, nil
2015-08-14 00:11:20 +00:00
}
2017-05-22 17:58:34 +00:00
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
2017-05-23 23:39:15 +00:00
func (s *GenericScheduler) computeJobAllocs() error {
2017-05-22 17:58:34 +00:00
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true)
2017-05-22 17:58:34 +00:00
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
}
// Determine the tainted nodes containing job allocs
tainted, err := taintedNodes(s.state, allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
s.eval.JobID, err)
}
// Update the allocations which are in pending/running state on tainted
// nodes to lost, but only if the scheduler has already marked them
2017-05-22 17:58:34 +00:00
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
2018-09-15 23:23:13 +00:00
reconciler := NewAllocReconciler(s.logger,
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID,
s.eval.Priority, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))
2017-05-22 17:58:34 +00:00
results := reconciler.Compute()
2018-09-15 23:23:13 +00:00
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", results))
2017-05-22 17:58:34 +00:00
2017-05-23 20:02:47 +00:00
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: results.desiredTGUpdates,
}
}
2017-05-22 17:58:34 +00:00
// Add the deployment changes to the plan
s.plan.Deployment = results.deployment
2017-05-22 17:58:34 +00:00
s.plan.DeploymentUpdates = results.deploymentUpdates
// Store all the follow up evaluations from rescheduled allocations
if len(results.desiredFollowupEvals) > 0 {
for _, evals := range results.desiredFollowupEvals {
s.followUpEvals = append(s.followUpEvals, evals...)
}
}
2018-03-12 20:44:33 +00:00
// Update the stored deployment
if results.deployment != nil {
s.deployment = results.deployment
}
2017-05-22 17:58:34 +00:00
// Handle the stop
for _, stop := range results.stop {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
2017-05-22 17:58:34 +00:00
}
// Handle disconnect updates
for _, update := range results.disconnectUpdates {
s.plan.AppendUnknownAlloc(update)
}
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
// Handle reconnect updates.
// Reconnected allocs have a new AllocState entry.
for _, update := range results.reconnectUpdates {
s.ctx.Plan().AppendAlloc(update, nil)
}
2017-05-22 17:58:34 +00:00
// Handle the in-place updates
for _, update := range results.inplaceUpdate {
2017-07-06 00:13:45 +00:00
if update.DeploymentID != s.deployment.GetID() {
update.DeploymentID = s.deployment.GetID()
2017-07-05 19:50:40 +00:00
update.DeploymentStatus = nil
}
2020-08-25 21:09:21 +00:00
s.ctx.Plan().AppendAlloc(update, nil)
2017-05-22 17:58:34 +00:00
}
2018-03-23 23:55:21 +00:00
// Handle the annotation updates
2018-03-26 18:06:21 +00:00
for _, update := range results.attributeUpdates {
2020-08-25 21:09:21 +00:00
s.ctx.Plan().AppendAlloc(update, nil)
2018-03-23 23:55:21 +00:00
}
2017-05-23 20:02:47 +00:00
// Nothing remaining to do if placement is not required
2017-07-15 23:31:33 +00:00
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise
// set the queued allocs to zero. This is true if the job is being
// stopped as well.
if s.job != nil {
2017-05-23 20:02:47 +00:00
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}
}
return nil
}
2017-05-22 17:58:34 +00:00
// Compute the placements
place := make([]placementResult, 0, len(results.place))
2017-07-15 23:31:33 +00:00
for _, p := range results.place {
s.queuedAllocs[p.taskGroup.Name] += 1
2017-07-15 23:31:33 +00:00
place = append(place, p)
}
destructive := make([]placementResult, 0, len(results.destructiveUpdate))
2017-07-15 23:31:33 +00:00
for _, p := range results.destructiveUpdate {
s.queuedAllocs[p.placeTaskGroup.Name] += 1
destructive = append(destructive, p)
2017-07-15 23:31:33 +00:00
}
return s.computePlacements(destructive, place, results.taskGroupAllocNameIndexes)
2015-08-14 01:16:32 +00:00
}
// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) {
ns, jobID := s.job.Namespace, s.job.ID
tgName := p.TaskGroup().Name
// find deployments and use the latest promoted or canaried version
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false)
if err != nil {
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err)
}
sort.Slice(deployments, func(i, j int) bool {
return deployments[i].JobVersion > deployments[j].JobVersion
})
for _, d := range deployments {
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully
// we will kill it soon. This is a defensive measure, have not seen it in practice
//
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries.
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) {
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion)
return d.ID, job, err
}
}
// check if the non-promoted version is a job without update block. This version should be the latest "stable" version,
// as all subsequent versions must be canaried deployments. Otherwise, we would have found a deployment above,
// or the alloc would have been replaced already by a newer non-deployment job.
if job, err := s.state.JobByIDAndVersion(nil, ns, jobID, p.MinJobVersion()); err == nil && job != nil && job.Update.IsEmpty() {
return "", job, err
}
return "", nil, nil
}
2017-07-20 19:31:08 +00:00
// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult, nameIndex map[string]*allocNameIndex) error {
2017-05-22 17:58:34 +00:00
// Get the base nodes
nodes, byDC, err := s.setNodes(s.job)
2017-05-22 17:58:34 +00:00
if err != nil {
return err
}
var deploymentID string
if s.deployment != nil && s.deployment.Active() {
2017-05-22 17:58:34 +00:00
deploymentID = s.deployment.ID
}
2018-03-12 19:32:43 +00:00
// Capture current time to use as the start time for any rescheduled allocations
now := time.Now()
// Have to handle destructive changes first as we need to discount their
// resources. To understand this imagine the resources were reduced and the
// count was scaled up.
for _, results := range [][]placementResult{destructive, place} {
for _, missing := range results {
// Get the task group
tg := missing.TaskGroup()
// This is populated from the reconciler via the compute results,
// therefore we cannot have an allocation belonging to a task group
// that has not generated and been through allocation name index
// tracking.
taskGroupNameIndex := nameIndex[tg.Name]
var downgradedJob *structs.Job
if missing.DowngradeNonCanary() {
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing)
if err != nil {
return err
}
// Defensive check - if there is no appropriate deployment for this job, use the latest
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil {
tg = job.LookupTaskGroup(tg.Name)
downgradedJob = job
deploymentID = jobDeploymentID
} else {
jobVersion := -1
if job != nil {
jobVersion = int(job.Version)
}
s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion)
}
}
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
metric.ExhaustResources(tg)
continue
}
2017-07-18 00:18:12 +00:00
// Use downgraded job in scheduling stack to honor old job
// resources, constraints, and node pool scheduler configuration.
if downgradedJob != nil {
s.setJob(downgradedJob)
if needsToSetNodes(downgradedJob, s.job) {
nodes, byDC, err = s.setNodes(downgradedJob)
if err != nil {
return err
}
}
}
// Find the preferred node
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
return err
}
2017-05-22 17:58:34 +00:00
// Check if we should stop the previous allocation upon successful
// placement of its replacement. This allow atomic placements/stops. We
// stop the allocation before trying to find a replacement because this
// frees the resources currently used by the previous allocation.
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "", "")
}
// Compute penalty nodes for rescheduled allocs
selectOptions := getSelectOptions(prevAllocation, preferredNode)
selectOptions.AllocName = missing.Name()
option := s.selectNextOption(tg, selectOptions)
2017-05-22 17:58:34 +00:00
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
s.ctx.Metrics().NodesInPool = len(nodes)
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
// Restore stack job and nodes now that placement is done, to use
// plan job version
if downgradedJob != nil {
s.setJob(s.job)
if needsToSetNodes(downgradedJob, s.job) {
nodes, byDC, err = s.setNodes(s.job)
if err != nil {
return err
}
}
}
// Set fields based on if we found an allocation option
if option != nil {
2018-10-02 20:36:04 +00:00
resources := &structs.AllocatedResources{
2019-12-16 20:34:58 +00:00
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
2019-06-18 17:12:23 +00:00
Shared: structs.AllocatedSharedResources{
DiskMB: int64(tg.EphemeralDisk.SizeMB),
},
2019-06-18 04:55:43 +00:00
}
if option.AllocResources != nil {
2019-06-18 17:12:23 +00:00
resources.Shared.Networks = option.AllocResources.Networks
2020-06-16 15:53:10 +00:00
resources.Shared.Ports = option.AllocResources.Ports
2018-10-02 20:36:04 +00:00
}
// Pull the allocation name as a new variables, so we can alter
// this as needed without making changes to the original
// object.
newAllocName := missing.Name()
// Identify the index from the name, so we can check this
// against the allocation name index tracking for duplicates.
allocIndex := structs.AllocIndexFromName(newAllocName, s.job.ID, tg.Name)
// If the allocation index is a duplicate, we cannot simply
// create a new allocation with the same name. We need to
// generate a new index and use this. The log message is useful
// for debugging and development, but could be removed in a
// future version of Nomad.
if taskGroupNameIndex.IsDuplicate(allocIndex) {
oldAllocName := newAllocName
newAllocName = taskGroupNameIndex.Next(1)[0]
taskGroupNameIndex.UnsetIndex(allocIndex)
s.logger.Debug("duplicate alloc index found and changed",
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}
// Create an allocation for this
alloc := &structs.Allocation{
2018-10-02 20:36:04 +00:00
ID: uuid.Generate(),
Namespace: s.job.Namespace,
EvalID: s.eval.ID,
Name: newAllocName,
2018-10-02 20:36:04 +00:00
JobID: s.job.ID,
TaskGroup: tg.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
NodeName: option.Node.Name,
2018-10-02 20:36:04 +00:00
DeploymentID: deploymentID,
TaskResources: resources.OldTaskResources(),
AllocatedResources: resources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
// SharedResources is considered deprecated, will be removed in 0.11.
// It is only set for compat reasons.
SharedResources: &structs.Resources{
DiskMB: tg.EphemeralDisk.SizeMB,
Networks: resources.Shared.Networks,
},
2017-07-05 19:50:40 +00:00
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prevAllocation != nil {
alloc.PreviousAllocation = prevAllocation.ID
2018-01-19 21:20:00 +00:00
if missing.IsRescheduling() {
updateRescheduleTracker(alloc, prevAllocation, now)
}
// If the allocation has task handles,
// copy them to the new allocation
propagateTaskState(alloc, prevAllocation, missing.PreviousLost())
}
2017-07-15 23:31:33 +00:00
// If we are placing a canary and we found a match, add the canary
// to the deployment state object and mark it as a canary.
if missing.Canary() && s.deployment != nil {
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Canary: true,
}
}
2017-05-22 17:58:34 +00:00
s.handlePreemptions(option, alloc, missing)
// Track the placement
2020-08-25 21:09:21 +00:00
s.plan.AppendAlloc(alloc, downgradedJob)
2017-07-18 00:18:12 +00:00
} else {
// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
// Update metrics with the resources requested by the task group.
s.ctx.Metrics().ExhaustResources(tg)
// Track the fact that we didn't find a placement
s.failedTGAllocs[tg.Name] = s.ctx.Metrics()
// If we weren't able to find a replacement for the allocation, back
// out the fact that we asked to stop the allocation.
if stopPrevAlloc {
s.plan.PopUpdate(prevAllocation)
}
2017-07-18 00:18:12 +00:00
}
2017-05-22 17:58:34 +00:00
}
}
return nil
}
// setJob updates the stack with the given job and job's node pool scheduler
// configuration.
func (s *GenericScheduler) setJob(job *structs.Job) error {
// Fetch node pool and global scheduler configuration to determine how to
// configure the scheduler.
pool, err := s.state.NodePoolByName(nil, job.NodePool)
if err != nil {
return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err)
}
_, schedConfig, err := s.state.SchedulerConfig()
if err != nil {
return fmt.Errorf("failed to get scheduler configuration: %v", err)
}
s.stack.SetJob(job)
s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
return nil
}
// setnodes updates the stack with the nodes that are ready for placement for
// the given job.
func (s *GenericScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) {
nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, job.Datacenters, job.NodePool)
if err != nil {
return nil, nil, err
}
s.stack.SetNodes(nodes)
return nodes, byDC, nil
}
// needsToSetNodes returns true if jobs a and b changed in a way that requires
// the nodes to be reset.
func needsToSetNodes(a, b *structs.Job) bool {
return !helper.SliceSetEq(a.Datacenters, b.Datacenters) ||
a.NodePool != b.NodePool
}
// propagateTaskState copies task handles from previous allocations to
// replacement allocations when the previous allocation is being drained or was
// lost. Remote task drivers rely on this to reconnect to remote tasks when the
// allocation managing them changes due to a down or draining node.
//
2021-04-29 22:02:12 +00:00
// The previous allocation will be marked as lost after task state has been
// propagated (when the plan is applied), so its ClientStatus is not yet marked
// as lost. Instead, we use the `prevLost` flag to track whether the previous
// allocation will be marked lost.
func propagateTaskState(newAlloc, prev *structs.Allocation, prevLost bool) {
// Don't transfer state from client terminal allocs
if prev.ClientTerminalStatus() {
return
}
// If previous allocation is not lost and not draining, do not copy
// task handles.
if !prevLost && !prev.DesiredTransition.ShouldMigrate() {
return
}
newAlloc.TaskStates = make(map[string]*structs.TaskState, len(newAlloc.AllocatedResources.Tasks))
for taskName, prevState := range prev.TaskStates {
if prevState.TaskHandle == nil {
// No task handle, skip
continue
}
if _, ok := newAlloc.AllocatedResources.Tasks[taskName]; !ok {
// Task dropped in update, skip
continue
}
// Copy state
newState := structs.NewTaskState()
newState.TaskHandle = prevState.TaskHandle.Copy()
newAlloc.TaskStates[taskName] = newState
}
}
// getSelectOptions sets up preferred nodes and penalty nodes
func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs.Node) *SelectOptions {
selectOptions := &SelectOptions{}
if prevAllocation != nil {
penaltyNodes := make(map[string]struct{})
// If alloc failed, penalize the node it failed on to encourage
// rescheduling on a new node.
if prevAllocation.ClientStatus == structs.AllocClientStatusFailed {
penaltyNodes[prevAllocation.NodeID] = struct{}{}
}
if prevAllocation.RescheduleTracker != nil {
for _, reschedEvent := range prevAllocation.RescheduleTracker.Events {
penaltyNodes[reschedEvent.PrevNodeID] = struct{}{}
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
}
return selectOptions
}
2018-01-19 21:20:00 +00:00
// updateRescheduleTracker carries over previous restart attempts and adds the most recent restart
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, now time.Time) {
reschedPolicy := prev.ReschedulePolicy()
var rescheduleEvents []*structs.RescheduleEvent
if prev.RescheduleTracker != nil {
var interval time.Duration
if reschedPolicy != nil {
interval = reschedPolicy.Interval
}
2018-03-08 00:44:54 +00:00
// If attempts is set copy all events in the interval range
if reschedPolicy.Attempts > 0 {
for _, reschedEvent := range prev.RescheduleTracker.Events {
2018-03-08 00:44:54 +00:00
timeDiff := now.UnixNano() - reschedEvent.RescheduleTime
// Only copy over events that are within restart interval
// This keeps the list of events small in cases where there's a long chain of old restart events
if interval > 0 && timeDiff <= interval.Nanoseconds() {
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
}
}
} else {
2018-03-08 00:44:54 +00:00
// Only copy the last n if unlimited is set
start := 0
if len(prev.RescheduleTracker.Events) > maxPastRescheduleEvents {
start = len(prev.RescheduleTracker.Events) - maxPastRescheduleEvents
}
for i := start; i < len(prev.RescheduleTracker.Events); i++ {
reschedEvent := prev.RescheduleTracker.Events[i]
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
}
}
}
nextDelay := prev.NextDelay()
rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay)
2018-01-19 21:20:00 +00:00
rescheduleEvents = append(rescheduleEvents, rescheduleEvent)
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
}
2017-05-22 17:58:34 +00:00
// findPreferredNode finds the preferred node for an allocation
2018-07-05 15:00:03 +00:00
func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.Node, error) {
prev := place.PreviousAllocation()
if prev == nil {
return nil, nil
}
if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
2017-06-06 21:08:46 +00:00
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
2018-07-11 18:27:10 +00:00
preferredNode, err := s.state.NodeByID(ws, prev.NodeID)
2018-07-05 15:00:03 +00:00
if err != nil {
return nil, err
}
2018-07-02 20:05:38 +00:00
if preferredNode != nil && preferredNode.Ready() {
2018-07-05 15:00:03 +00:00
return preferredNode, nil
2017-05-22 17:58:34 +00:00
}
}
2018-07-05 15:00:03 +00:00
return nil, nil
2017-05-22 17:58:34 +00:00
}
// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
option := s.stack.Select(tg, selectOptions)
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
// Check if preemption is enabled, defaults to true
//
// The scheduler configuration is read directly from state but only
// values that can't be specified per node pool should be used. Other
// values must be merged by calling schedConfig.WithNodePool() and set in
// the stack by calling SetSchedulerConfiguration().
enablePreemption := true
if schedConfig != nil {
if s.job.Type == structs.JobTypeBatch {
enablePreemption = schedConfig.PreemptionConfig.BatchSchedulerEnabled
} else {
enablePreemption = schedConfig.PreemptionConfig.ServiceSchedulerEnabled
}
}
// Run stack again with preemption enabled
if option == nil && enablePreemption {
selectOptions.Preempt = true
option = s.stack.Select(tg, selectOptions)
}
return option
}
2020-06-22 14:28:45 +00:00
// handlePreemptions sets relevant preeemption related fields.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
if option.PreemptedAllocs == nil {
return
}
// If this placement involves preemption, set DesiredState to evict for those allocations
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub(nil))
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup().Name]
desired.Preemptions += 1
}
}
}
alloc.PreemptedAllocations = preemptedAllocIDs
}