2015-07-27 22:31:09 +00:00
|
|
|
package nomad
|
|
|
|
|
2015-08-04 23:32:46 +00:00
|
|
|
import (
|
|
|
|
"fmt"
|
2015-08-04 23:35:49 +00:00
|
|
|
"time"
|
2015-08-04 23:32:46 +00:00
|
|
|
|
2015-08-04 23:35:49 +00:00
|
|
|
"github.com/armon/go-metrics"
|
2016-02-20 21:12:14 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2015-08-11 21:27:14 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/state"
|
2015-08-04 23:32:46 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
2015-10-11 21:48:18 +00:00
|
|
|
"github.com/hashicorp/raft"
|
2015-08-04 23:32:46 +00:00
|
|
|
)
|
2015-07-27 22:31:09 +00:00
|
|
|
|
|
|
|
// planApply is a long lived goroutine that reads plan allocations from
|
|
|
|
// the plan queue, determines if they can be applied safely and applies
|
|
|
|
// them via Raft.
|
2015-10-11 21:48:18 +00:00
|
|
|
//
|
|
|
|
// Naively, we could simply dequeue a plan, verify, apply and then respond.
|
|
|
|
// However, the plan application is bounded by the Raft apply time and
|
|
|
|
// subject to some latency. This creates a stall condition, where we are
|
2015-10-12 21:35:17 +00:00
|
|
|
// not evaluating, but simply waiting for a transaction to apply.
|
2015-10-11 21:48:18 +00:00
|
|
|
//
|
|
|
|
// To avoid this, we overlap verification with apply. This means once
|
|
|
|
// we've verified plan N we attempt to apply it. However, while waiting
|
|
|
|
// for apply, we begin to verify plan N+1 under the assumption that plan
|
|
|
|
// N has succeeded.
|
|
|
|
//
|
|
|
|
// In this sense, we track two parallel versions of the world. One is
|
|
|
|
// the pessimistic one driven by the Raft log which is replicated. The
|
|
|
|
// other is optimistic and assumes our transactions will succeed. In the
|
|
|
|
// happy path, this lets us do productive work during the latency of
|
|
|
|
// apply.
|
|
|
|
//
|
|
|
|
// In the unhappy path (Raft transaction fails), effectively we only
|
|
|
|
// wasted work during a time we would have been waiting anyways. However,
|
|
|
|
// in anticipation of this case we cannot respond to the plan until
|
|
|
|
// the Raft log is updated. This means our schedulers will stall,
|
|
|
|
// but there are many of those and only a single plan verifier.
|
|
|
|
//
|
2015-07-27 22:31:09 +00:00
|
|
|
func (s *Server) planApply() {
|
2015-10-12 21:35:17 +00:00
|
|
|
// waitCh is used to track an outstanding application while snap
|
|
|
|
// holds an optimistic state which includes that plan application.
|
2015-10-11 22:34:52 +00:00
|
|
|
var waitCh chan struct{}
|
|
|
|
var snap *state.StateSnapshot
|
2016-02-20 21:12:14 +00:00
|
|
|
pool := NewEvaluatePool(workerPoolSize, workerPoolBufferSize)
|
|
|
|
defer pool.Shutdown()
|
2015-10-11 22:34:52 +00:00
|
|
|
|
2015-07-27 22:31:09 +00:00
|
|
|
for {
|
|
|
|
// Pull the next pending plan, exit if we are no longer leader
|
|
|
|
pending, err := s.planQueue.Dequeue(0)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-08-12 22:44:36 +00:00
|
|
|
// Verify the evaluation is outstanding, and that the tokens match.
|
2015-10-23 17:22:44 +00:00
|
|
|
if err := s.evalBroker.OutstandingReset(pending.plan.EvalID, pending.plan.EvalToken); err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: plan rejected for evaluation %s: %v",
|
|
|
|
pending.plan.EvalID, err)
|
|
|
|
pending.respond(nil, err)
|
2015-08-12 22:44:36 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2015-10-11 22:34:52 +00:00
|
|
|
// Check if out last plan has completed
|
|
|
|
select {
|
|
|
|
case <-waitCh:
|
|
|
|
waitCh = nil
|
|
|
|
snap = nil
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2015-08-05 01:30:05 +00:00
|
|
|
// Snapshot the state so that we have a consistent view of the world
|
2015-10-11 22:34:52 +00:00
|
|
|
// if no snapshot is available
|
2015-10-11 22:38:07 +00:00
|
|
|
if waitCh == nil || snap == nil {
|
2015-10-11 22:34:52 +00:00
|
|
|
snap, err = s.fsm.State().Snapshot()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
|
|
|
|
pending.respond(nil, err)
|
|
|
|
continue
|
|
|
|
}
|
2015-08-05 01:30:05 +00:00
|
|
|
}
|
|
|
|
|
2015-08-04 23:32:46 +00:00
|
|
|
// Evaluate the plan
|
2016-02-20 21:12:14 +00:00
|
|
|
result, err := evaluatePlan(pool, snap, pending.plan)
|
2015-08-04 23:32:46 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: failed to evaluate plan: %v", err)
|
|
|
|
pending.respond(nil, err)
|
|
|
|
continue
|
|
|
|
}
|
2015-07-27 22:31:09 +00:00
|
|
|
|
2015-10-11 21:57:36 +00:00
|
|
|
// Fast-path the response if there is nothing to do
|
|
|
|
if result.IsNoOp() {
|
|
|
|
pending.respond(result, nil)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2015-10-12 21:35:17 +00:00
|
|
|
// Ensure any parallel apply is complete before starting the next one.
|
|
|
|
// This also limits how out of date our snapshot can be.
|
2015-10-11 22:34:52 +00:00
|
|
|
if waitCh != nil {
|
|
|
|
<-waitCh
|
|
|
|
snap, err = s.fsm.State().Snapshot()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: failed to snapshot state: %v", err)
|
|
|
|
pending.respond(nil, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-11 21:57:36 +00:00
|
|
|
// Dispatch the Raft transaction for the plan
|
2015-10-11 22:19:01 +00:00
|
|
|
future, err := s.applyPlan(result, snap)
|
2015-10-11 21:57:36 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Printf("[ERR] nomad: failed to submit plan: %v", err)
|
|
|
|
pending.respond(nil, err)
|
|
|
|
continue
|
2015-07-27 22:31:09 +00:00
|
|
|
}
|
|
|
|
|
2015-10-11 21:57:36 +00:00
|
|
|
// Respond to the plan in async
|
2015-10-11 22:34:52 +00:00
|
|
|
waitCh = make(chan struct{})
|
2015-10-11 21:57:36 +00:00
|
|
|
go s.asyncPlanWait(waitCh, future, result, pending)
|
2015-07-27 22:31:09 +00:00
|
|
|
}
|
|
|
|
}
|
2015-08-04 23:32:46 +00:00
|
|
|
|
2015-08-05 01:30:05 +00:00
|
|
|
// applyPlan is used to apply the plan result and to return the alloc index
|
2015-10-11 22:19:01 +00:00
|
|
|
func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
|
2015-08-05 01:30:05 +00:00
|
|
|
req := structs.AllocUpdateRequest{}
|
2015-08-26 00:36:52 +00:00
|
|
|
for _, updateList := range result.NodeUpdate {
|
|
|
|
req.Alloc = append(req.Alloc, updateList...)
|
2015-08-05 01:30:05 +00:00
|
|
|
}
|
|
|
|
for _, allocList := range result.NodeAllocation {
|
|
|
|
req.Alloc = append(req.Alloc, allocList...)
|
|
|
|
}
|
2015-08-15 20:33:20 +00:00
|
|
|
req.Alloc = append(req.Alloc, result.FailedAllocs...)
|
2015-08-05 01:30:05 +00:00
|
|
|
|
2016-02-09 05:58:05 +00:00
|
|
|
// Set the time the alloc was applied for the first time. This can be used
|
|
|
|
// to approximate the scheduling time.
|
|
|
|
now := time.Now().UTC().UnixNano()
|
|
|
|
for _, alloc := range req.Alloc {
|
|
|
|
if alloc.CreateTime == 0 {
|
|
|
|
alloc.CreateTime = now
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-11 22:19:01 +00:00
|
|
|
// Dispatch the Raft transaction
|
|
|
|
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Optimistically apply to our state view
|
|
|
|
if snap != nil {
|
|
|
|
nextIdx := s.raft.AppliedIndex() + 1
|
|
|
|
if err := snap.UpsertAllocs(nextIdx, req.Alloc); err != nil {
|
|
|
|
return future, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return future, nil
|
2015-10-11 21:48:18 +00:00
|
|
|
}
|
|
|
|
|
2015-10-11 21:57:36 +00:00
|
|
|
// asyncPlanWait is used to apply and respond to a plan async
|
|
|
|
func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
|
|
|
|
result *structs.PlanResult, pending *pendingPlan) {
|
2015-10-11 21:48:18 +00:00
|
|
|
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
|
2015-10-11 21:57:36 +00:00
|
|
|
defer close(waitCh)
|
|
|
|
|
|
|
|
// Wait for the plan to apply
|
2015-10-11 21:48:18 +00:00
|
|
|
if err := future.Error(); err != nil {
|
2015-10-11 21:57:36 +00:00
|
|
|
s.logger.Printf("[ERR] nomad: failed to apply plan: %v", err)
|
|
|
|
pending.respond(nil, err)
|
|
|
|
return
|
2015-10-11 21:48:18 +00:00
|
|
|
}
|
2015-10-11 21:57:36 +00:00
|
|
|
|
|
|
|
// Respond to the plan
|
|
|
|
result.AllocIndex = future.Index()
|
|
|
|
pending.respond(result, nil)
|
2015-08-05 01:30:05 +00:00
|
|
|
}
|
|
|
|
|
2015-08-04 23:32:46 +00:00
|
|
|
// evaluatePlan is used to determine what portions of a plan
|
|
|
|
// can be applied if any. Returns if there should be a plan application
|
|
|
|
// which may be partial or if there was an error
|
2016-02-20 21:12:14 +00:00
|
|
|
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan) (*structs.PlanResult, error) {
|
2015-08-04 23:35:49 +00:00
|
|
|
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())
|
2015-08-05 01:10:57 +00:00
|
|
|
|
2015-08-04 23:32:46 +00:00
|
|
|
// Create a result holder for the plan
|
|
|
|
result := &structs.PlanResult{
|
2015-08-26 00:36:52 +00:00
|
|
|
NodeUpdate: make(map[string][]*structs.Allocation),
|
2015-08-04 23:32:46 +00:00
|
|
|
NodeAllocation: make(map[string][]*structs.Allocation),
|
2015-08-15 20:33:20 +00:00
|
|
|
FailedAllocs: plan.FailedAllocs,
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
|
|
|
|
2015-08-26 00:36:52 +00:00
|
|
|
// Collect all the nodeIDs
|
|
|
|
nodeIDs := make(map[string]struct{})
|
|
|
|
for nodeID := range plan.NodeUpdate {
|
|
|
|
nodeIDs[nodeID] = struct{}{}
|
|
|
|
}
|
2015-08-05 01:10:57 +00:00
|
|
|
for nodeID := range plan.NodeAllocation {
|
2015-08-26 00:36:52 +00:00
|
|
|
nodeIDs[nodeID] = struct{}{}
|
|
|
|
}
|
|
|
|
|
2016-02-20 21:12:14 +00:00
|
|
|
// Setup a multierror to handle potentially getting many
|
|
|
|
// errors since we are processing in parallel.
|
|
|
|
var mErr multierror.Error
|
|
|
|
|
|
|
|
// handleResult is used to process the result of evaluateNodePlan
|
|
|
|
handleResult := func(nodeID string, fit bool, err error) (cancel bool) {
|
2015-08-05 01:10:57 +00:00
|
|
|
// Evaluate the plan for this node
|
2015-08-04 23:32:46 +00:00
|
|
|
if err != nil {
|
2016-02-20 21:12:14 +00:00
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
return true
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
2015-08-05 01:10:57 +00:00
|
|
|
if !fit {
|
2015-08-04 23:32:46 +00:00
|
|
|
// Scheduler must have stale data, RefreshIndex should force
|
|
|
|
// the latest view of allocations and nodes
|
2015-09-07 03:56:38 +00:00
|
|
|
allocIndex, err := snap.Index("allocs")
|
2015-08-04 23:32:46 +00:00
|
|
|
if err != nil {
|
2016-02-20 21:12:14 +00:00
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
return true
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
2015-09-07 03:56:38 +00:00
|
|
|
nodeIndex, err := snap.Index("nodes")
|
2015-08-04 23:32:46 +00:00
|
|
|
if err != nil {
|
2016-02-20 21:12:14 +00:00
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
return true
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
2015-08-05 00:13:40 +00:00
|
|
|
result.RefreshIndex = maxUint64(nodeIndex, allocIndex)
|
2015-08-04 23:32:46 +00:00
|
|
|
|
|
|
|
// If we require all-at-once scheduling, there is no point
|
|
|
|
// to continue the evaluation, as we've already failed.
|
|
|
|
if plan.AllAtOnce {
|
2015-08-26 00:36:52 +00:00
|
|
|
result.NodeUpdate = nil
|
2015-08-05 01:30:05 +00:00
|
|
|
result.NodeAllocation = nil
|
2016-02-20 21:12:14 +00:00
|
|
|
return true
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Skip this node, since it cannot be used.
|
2016-02-20 21:12:14 +00:00
|
|
|
return
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Add this to the plan result
|
2015-08-26 00:36:52 +00:00
|
|
|
if nodeUpdate := plan.NodeUpdate[nodeID]; len(nodeUpdate) > 0 {
|
|
|
|
result.NodeUpdate[nodeID] = nodeUpdate
|
2015-08-07 16:37:45 +00:00
|
|
|
}
|
|
|
|
if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 {
|
|
|
|
result.NodeAllocation[nodeID] = nodeAlloc
|
|
|
|
}
|
2016-02-20 21:12:14 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the pool channels
|
|
|
|
req := pool.RequestCh()
|
|
|
|
resp := pool.ResultCh()
|
|
|
|
outstanding := 0
|
|
|
|
didCancel := false
|
|
|
|
|
|
|
|
// Evalute each node in the plan, handling results as
|
|
|
|
// they are ready to avoid blocking.
|
|
|
|
for nodeID := range nodeIDs {
|
|
|
|
select {
|
|
|
|
case req <- evaluateRequest{snap, plan, nodeID}:
|
|
|
|
outstanding++
|
|
|
|
case r := <-resp:
|
|
|
|
outstanding--
|
2016-02-20 21:41:49 +00:00
|
|
|
|
|
|
|
// Handle a result that allows us to cancel evaluation,
|
|
|
|
// which may save time processing additional entries.
|
2016-02-20 21:12:14 +00:00
|
|
|
if cancel := handleResult(r.nodeID, r.fit, r.err); cancel {
|
|
|
|
didCancel = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Drain the remaining results
|
|
|
|
for outstanding > 0 {
|
|
|
|
r := <-resp
|
|
|
|
if !didCancel {
|
|
|
|
if cancel := handleResult(r.nodeID, r.fit, r.err); cancel {
|
|
|
|
didCancel = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
outstanding--
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
2016-02-20 21:12:14 +00:00
|
|
|
return result, mErr.ErrorOrNil()
|
2015-08-04 23:32:46 +00:00
|
|
|
}
|
|
|
|
|
2015-08-05 01:10:57 +00:00
|
|
|
// evaluateNodePlan is used to evalute the plan for a single node,
|
|
|
|
// returning if the plan is valid or if an error is encountered
|
2015-08-11 21:27:14 +00:00
|
|
|
func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID string) (bool, error) {
|
2015-08-07 16:37:45 +00:00
|
|
|
// If this is an evict-only plan, it always 'fits' since we are removing things.
|
|
|
|
if len(plan.NodeAllocation[nodeID]) == 0 {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
2015-08-05 01:10:57 +00:00
|
|
|
// Get the node itself
|
2015-09-07 03:56:38 +00:00
|
|
|
node, err := snap.NodeByID(nodeID)
|
2015-08-05 01:10:57 +00:00
|
|
|
if err != nil {
|
2015-10-07 10:18:19 +00:00
|
|
|
return false, fmt.Errorf("failed to get node '%s': %v", nodeID, err)
|
2015-08-05 01:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// If the node does not exist or is not ready for schduling it is not fit
|
2015-08-16 01:03:05 +00:00
|
|
|
// XXX: There is a potential race between when we do this check and when
|
|
|
|
// the Raft commit happens.
|
2015-09-07 02:47:02 +00:00
|
|
|
if node == nil || node.Status != structs.NodeStatusReady || node.Drain {
|
2015-08-05 01:10:57 +00:00
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
2016-02-20 19:26:38 +00:00
|
|
|
// Get the existing allocations that are non-terminal
|
|
|
|
existingAlloc, err := snap.AllocsByNodeTerminal(nodeID, false)
|
2015-08-05 01:10:57 +00:00
|
|
|
if err != nil {
|
2015-10-07 10:18:19 +00:00
|
|
|
return false, fmt.Errorf("failed to get existing allocations for '%s': %v", nodeID, err)
|
2015-08-05 01:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Determine the proposed allocation by first removing allocations
|
|
|
|
// that are planned evictions and adding the new allocations.
|
|
|
|
proposed := existingAlloc
|
2015-08-26 00:36:52 +00:00
|
|
|
var remove []*structs.Allocation
|
|
|
|
if update := plan.NodeUpdate[nodeID]; len(update) > 0 {
|
|
|
|
remove = append(remove, update...)
|
2015-08-05 01:10:57 +00:00
|
|
|
}
|
2015-08-23 02:37:21 +00:00
|
|
|
if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
|
|
|
|
for _, alloc := range updated {
|
2015-08-26 00:36:52 +00:00
|
|
|
remove = append(remove, alloc)
|
2015-08-23 02:37:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
proposed = structs.RemoveAllocs(existingAlloc, remove)
|
2015-08-05 01:10:57 +00:00
|
|
|
proposed = append(proposed, plan.NodeAllocation[nodeID]...)
|
|
|
|
|
|
|
|
// Check if these allocations fit
|
2015-09-14 01:46:40 +00:00
|
|
|
fit, _, _, err := structs.AllocsFit(node, proposed, nil)
|
2015-08-13 18:54:59 +00:00
|
|
|
return fit, err
|
2015-08-05 01:10:57 +00:00
|
|
|
}
|