Merge pull request #1188 from hashicorp/f-no-failed-allocs

Failed Allocation Metrics stored in Evaluation
This commit is contained in:
Alex Dadgar 2016-05-24 20:06:28 -07:00
commit 3cbb89c61e
11 changed files with 258 additions and 167 deletions

View File

@ -67,6 +67,8 @@ type Evaluation struct {
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
CreateIndex uint64
ModifyIndex uint64
}

View File

@ -147,10 +147,14 @@ func (m *monitor) update(update *evalState) {
} else {
switch {
case existing.client != alloc.client:
description := ""
if alloc.clientDesc != "" {
description = fmt.Sprintf(" (%s)", alloc.clientDesc)
}
// Allocation status has changed
m.ui.Output(fmt.Sprintf(
"Allocation %q status changed: %q -> %q (%s)",
limit(alloc.id, m.length), existing.client, alloc.client, alloc.clientDesc))
"Allocation %q status changed: %q -> %q%s",
limit(alloc.id, m.length), existing.client, alloc.client, description))
}
}
}
@ -288,9 +292,31 @@ func (m *monitor) monitor(evalID string, allowPrefix bool) int {
m.update(state)
switch eval.Status {
case structs.EvalStatusComplete, structs.EvalStatusFailed:
m.ui.Info(fmt.Sprintf("Evaluation %q finished with status %q",
limit(eval.ID, m.length), eval.Status))
case structs.EvalStatusComplete, structs.EvalStatusFailed, structs.EvalStatusCancelled:
if len(eval.FailedTGAllocs) == 0 {
m.ui.Info(fmt.Sprintf("Evaluation %q finished with status %q",
limit(eval.ID, m.length), eval.Status))
} else {
// There were failures making the allocations
schedFailure = true
m.ui.Info(fmt.Sprintf("Evaluation %q finished with status %q but failed to place all allocations:",
limit(eval.ID, m.length), eval.Status))
// Print the failures per task group
for tg, metrics := range eval.FailedTGAllocs {
noun := "allocation"
if metrics.CoalescedFailures > 0 {
noun += "s"
}
m.ui.Output(fmt.Sprintf("Task Group %q (failed to place %d %s):", tg, metrics.CoalescedFailures+1, noun))
dumpAllocMetrics(m.ui, metrics, false)
}
if eval.BlockedEval != "" {
m.ui.Output(fmt.Sprintf("Evaluation %q waiting for additional capacity to place remainder",
limit(eval.BlockedEval, m.length)))
}
}
default:
// Wait for the next update
time.Sleep(updateWait)
@ -332,41 +358,46 @@ func dumpAllocStatus(ui cli.Ui, alloc *api.Allocation, length int) {
ui.Output(fmt.Sprintf("Allocation %q status %q (%d/%d nodes filtered)",
limit(alloc.ID, length), alloc.ClientStatus,
alloc.Metrics.NodesFiltered, alloc.Metrics.NodesEvaluated))
dumpAllocMetrics(ui, alloc.Metrics, true)
}
func dumpAllocMetrics(ui cli.Ui, metrics *api.AllocationMetric, scores bool) {
// Print a helpful message if we have an eligibility problem
if alloc.Metrics.NodesEvaluated == 0 {
if metrics.NodesEvaluated == 0 {
ui.Output(" * No nodes were eligible for evaluation")
}
// Print a helpful message if the user has asked for a DC that has no
// available nodes.
for dc, available := range alloc.Metrics.NodesAvailable {
for dc, available := range metrics.NodesAvailable {
if available == 0 {
ui.Output(fmt.Sprintf(" * No nodes are available in datacenter %q", dc))
}
}
// Print filter info
for class, num := range alloc.Metrics.ClassFiltered {
for class, num := range metrics.ClassFiltered {
ui.Output(fmt.Sprintf(" * Class %q filtered %d nodes", class, num))
}
for cs, num := range alloc.Metrics.ConstraintFiltered {
for cs, num := range metrics.ConstraintFiltered {
ui.Output(fmt.Sprintf(" * Constraint %q filtered %d nodes", cs, num))
}
// Print exhaustion info
if ne := alloc.Metrics.NodesExhausted; ne > 0 {
if ne := metrics.NodesExhausted; ne > 0 {
ui.Output(fmt.Sprintf(" * Resources exhausted on %d nodes", ne))
}
for class, num := range alloc.Metrics.ClassExhausted {
for class, num := range metrics.ClassExhausted {
ui.Output(fmt.Sprintf(" * Class %q exhausted on %d nodes", class, num))
}
for dim, num := range alloc.Metrics.DimensionExhausted {
for dim, num := range metrics.DimensionExhausted {
ui.Output(fmt.Sprintf(" * Dimension %q exhausted on %d nodes", dim, num))
}
// Print scores
for name, score := range alloc.Metrics.Scores {
ui.Output(fmt.Sprintf(" * Score %q = %f", name, score))
if scores {
for name, score := range metrics.Scores {
ui.Output(fmt.Sprintf(" * Score %q = %f", name, score))
}
}
}

View File

@ -124,7 +124,6 @@ func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *s
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
minUpdates += len(result.FailedAllocs)
// Setup the update request
req := structs.AllocUpdateRequest{
@ -137,7 +136,6 @@ func (s *Server) applyPlan(job *structs.Job, result *structs.PlanResult, snap *s
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
req.Alloc = append(req.Alloc, result.FailedAllocs...)
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
@ -200,7 +198,6 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
result := &structs.PlanResult{
NodeUpdate: make(map[string][]*structs.Allocation),
NodeAllocation: make(map[string][]*structs.Allocation),
FailedAllocs: plan.FailedAllocs,
}
// Collect all the nodeIDs

View File

@ -51,12 +51,10 @@ func TestPlanApply_applyPlan(t *testing.T) {
// Register alloc
alloc := mock.Alloc()
allocFail := mock.Alloc()
plan := &structs.PlanResult{
NodeAllocation: map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc},
},
FailedAllocs: []*structs.Allocation{allocFail},
}
// Snapshot the state
@ -94,15 +92,6 @@ func TestPlanApply_applyPlan(t *testing.T) {
t.Fatalf("missing alloc")
}
// Lookup the allocation
out, err = s1.fsm.State().AllocByID(allocFail.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing alloc")
}
// Evict alloc, Register alloc2
allocEvict := new(structs.Allocation)
*allocEvict = *alloc
@ -178,12 +167,10 @@ func TestPlanApply_EvalPlan_Simple(t *testing.T) {
snap, _ := state.Snapshot()
alloc := mock.Alloc()
allocFail := mock.Alloc()
plan := &structs.Plan{
NodeAllocation: map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc},
},
FailedAllocs: []*structs.Allocation{allocFail},
}
pool := NewEvaluatePool(workerPoolSize, workerPoolBufferSize)
@ -196,8 +183,8 @@ func TestPlanApply_EvalPlan_Simple(t *testing.T) {
if result == nil {
t.Fatalf("missing result")
}
if !reflect.DeepEqual(result.FailedAllocs, plan.FailedAllocs) {
t.Fatalf("missing failed allocs")
if !reflect.DeepEqual(result.NodeAllocation, plan.NodeAllocation) {
t.Fatalf("incorrect node allocations")
}
}

View File

@ -2649,6 +2649,16 @@ type Evaluation struct {
// This is used to support rolling upgrades, where we need a chain of evaluations.
PreviousEval string
// BlockedEval is the evaluation ID for a created blocked eval. A
// blocked eval will be created if all allocations could not be placed due
// to constraints or lacking resources.
BlockedEval string
// FailedTGAllocs are task groups which have allocations that could not be
// made, but the metrics are persisted so that the user can use the feedback
// to determine the cause.
FailedTGAllocs map[string]*AllocMetric
// ClassEligibility tracks computed node classes that have been explicitly
// marked as eligible or ineligible.
ClassEligibility map[string]bool
@ -2687,6 +2697,25 @@ func (e *Evaluation) Copy() *Evaluation {
}
ne := new(Evaluation)
*ne = *e
// Copy ClassEligibility
if e.ClassEligibility != nil {
classes := make(map[string]bool, len(e.ClassEligibility))
for class, elig := range e.ClassEligibility {
classes[class] = elig
}
ne.ClassEligibility = classes
}
// Copy FailedTGAllocs
if e.FailedTGAllocs != nil {
failedTGs := make(map[string]*AllocMetric, len(e.FailedTGAllocs))
for tg, metric := range e.FailedTGAllocs {
failedTGs[tg] = metric.Copy()
}
ne.FailedTGAllocs = failedTGs
}
return ne
}
@ -2747,10 +2776,10 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation {
}
}
// BlockedEval creates a blocked evaluation to followup this eval to place any
// CreateBlockedEval creates a blocked evaluation to followup this eval to place any
// failed allocations. It takes the classes marked explicitly eligible or
// ineligible and whether the job has escaped computed node classes.
func (e *Evaluation) BlockedEval(classEligibility map[string]bool, escaped bool) *Evaluation {
func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped bool) *Evaluation {
return &Evaluation{
ID: GenerateUUID(),
Priority: e.Priority,
@ -2801,11 +2830,6 @@ type Plan struct {
// The evicts must be considered prior to the allocations.
NodeAllocation map[string][]*Allocation
// FailedAllocs are allocations that could not be made,
// but are persisted so that the user can use the feedback
// to determine the cause.
FailedAllocs []*Allocation
// Annotations contains annotations by the scheduler to be used by operators
// to understand the decisions made by the scheduler.
Annotations *PlanAnnotations
@ -2853,13 +2877,9 @@ func (p *Plan) AppendAlloc(alloc *Allocation) {
p.NodeAllocation[node] = append(existing, alloc)
}
func (p *Plan) AppendFailed(alloc *Allocation) {
p.FailedAllocs = append(p.FailedAllocs, alloc)
}
// IsNoOp checks if this plan would do nothing
func (p *Plan) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && len(p.FailedAllocs) == 0
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0
}
// PlanResult is the result of a plan submitted to the leader.
@ -2870,11 +2890,6 @@ type PlanResult struct {
// NodeAllocation contains all the allocations that were committed.
NodeAllocation map[string][]*Allocation
// FailedAllocs are allocations that could not be made,
// but are persisted so that the user can use the feedback
// to determine the cause.
FailedAllocs []*Allocation
// RefreshIndex is the index the worker should refresh state up to.
// This allows all evictions and allocations to be materialized.
// If any allocations were rejected due to stale data (node state,
@ -2888,7 +2903,7 @@ type PlanResult struct {
// IsNoOp checks if this plan result would do nothing
func (p *PlanResult) IsNoOp() bool {
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && len(p.FailedAllocs) == 0
return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0
}
// FullCommit is used to check if all the allocations in a plan

View File

@ -100,7 +100,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusFailed, desc)
}
// Retry up to the maxScheduleAttempts and reset if progress is made.
@ -117,7 +117,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
if err := s.createBlockedEval(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()); err != nil {
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
@ -126,7 +126,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
}
// Update the status to complete
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "")
}
// createBlockedEval creates a blocked eval and stores it.
@ -140,7 +140,7 @@ func (s *GenericScheduler) createBlockedEval() error {
classEligibility = e.GetClasses()
}
s.blocked = s.eval.BlockedEval(classEligibility, escaped)
s.blocked = s.eval.CreateBlockedEval(classEligibility, escaped)
return s.planner.CreateEval(s.blocked)
}
@ -158,6 +158,9 @@ func (s *GenericScheduler) process() (bool, error) {
// Create a plan
s.plan = s.eval.MakePlan(s.job)
// Reset the failed allocations
s.eval.FailedTGAllocs = nil
// Create an evaluation context
s.ctx = NewEvalContext(s.state, s.plan, s.logger)
@ -173,6 +176,16 @@ func (s *GenericScheduler) process() (bool, error) {
return false, err
}
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil {
if err := s.createBlockedEval(); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}
// If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan
// anyways to get the annotations.
if s.plan.IsNoOp() && !s.eval.AnnotatePlan {
@ -190,16 +203,6 @@ func (s *GenericScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
if err := s.createBlockedEval(); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}
// Submit the plan and store the results.
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
@ -365,50 +368,47 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Update the set of placement ndoes
s.stack.SetNodes(nodes)
// Track the failed task groups so that we can coalesce
// the failures together to avoid creating many failed allocs.
failedTG := make(map[*structs.TaskGroup]*structs.Allocation)
for _, missing := range place {
// Check if this task group has already failed
if alloc, ok := failedTG[missing.TaskGroup]; ok {
alloc.Metrics.CoalescedFailures += 1
if metric, ok := s.eval.FailedTGAllocs[missing.TaskGroup.Name]; ok {
metric.CoalescedFailures += 1
continue
}
// Attempt to match the task group
option, _ := s.stack.Select(missing.TaskGroup)
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
}
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
// Set fields based on if we found an allocation option
if option != nil {
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
// Generate service IDs tasks in this allocation
// COMPAT - This is no longer required and would be removed in v0.4
alloc.PopulateServiceIDs(missing.TaskGroup)
alloc.NodeID = option.Node.ID
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
// Lazy initialize the failed map
if s.eval.FailedTGAllocs == nil {
s.eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.eval.FailedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
}
}

View File

@ -48,6 +48,14 @@ func TestServiceSched_JobRegister(t *testing.T) {
t.Fatalf("expected no annotations")
}
// Ensure the eval has no spawned blocked eval
if len(h.Evals) != 1 {
t.Fatalf("bad: %#v", h.Evals)
if h.Evals[0].BlockedEval != "" {
t.Fatalf("bad: %#v", h.Evals[0])
}
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
@ -224,39 +232,44 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
// Ensure no plan
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan has created a follow up eval.
// Ensure there is a follow up eval.
if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", h.CreateEvals)
}
// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
if len(h.Evals) != 1 {
t.Fatalf("incorrect number of updated eval: %#v", h.Evals)
}
outEval := h.Evals[0]
// Ensure the eval has its spawned blocked eval
if outEval.BlockedEval != h.CreateEvals[0].ID {
t.Fatalf("bad: %#v", outEval)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure the plan failed to alloc
if outEval == nil || len(outEval.FailedTGAllocs) != 1 {
t.Fatalf("bad: %#v", outEval)
}
// Ensure all allocations placed
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
metrics, ok := outEval.FailedTGAllocs[job.TaskGroups[0].Name]
if !ok {
t.Fatalf("no failed metrics: %#v", outEval.FailedTGAllocs)
}
// Check the coalesced failures
if out[0].Metrics.CoalescedFailures != 9 {
t.Fatalf("bad: %#v", out[0].Metrics)
if metrics.CoalescedFailures != 9 {
t.Fatalf("bad: %#v", metrics)
}
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 0 {
t.Fatalf("bad: %#v", out[0].Metrics)
if count, ok := metrics.NodesAvailable["dc1"]; !ok || count != 0 {
t.Fatalf("bad: %#v", metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
@ -295,11 +308,10 @@ func TestServiceSched_JobRegister_BlockedEval(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
// Ensure no plan
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan has created a follow up eval.
if len(h.CreateEvals) != 1 {
@ -320,31 +332,34 @@ func TestServiceSched_JobRegister_BlockedEval(t *testing.T) {
t.Fatalf("bad: %#v", created)
}
// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
// Ensure there is a follow up eval.
if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", h.CreateEvals)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
if len(h.Evals) != 1 {
t.Fatalf("incorrect number of updated eval: %#v", h.Evals)
}
outEval := h.Evals[0]
// Ensure all allocations placed
if len(out) != 1 {
for _, a := range out {
t.Logf("%#v", a)
}
t.Fatalf("bad: %#v", out)
// Ensure the plan failed to alloc
if outEval == nil || len(outEval.FailedTGAllocs) != 1 {
t.Fatalf("bad: %#v", outEval)
}
metrics, ok := outEval.FailedTGAllocs[job.TaskGroups[0].Name]
if !ok {
t.Fatalf("no failed metrics: %#v", outEval.FailedTGAllocs)
}
// Check the coalesced failures
if out[0].Metrics.CoalescedFailures != 9 {
t.Fatalf("bad: %#v", out[0].Metrics)
if metrics.CoalescedFailures != 9 {
t.Fatalf("bad: %#v", metrics)
}
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 {
t.Fatalf("bad: %#v", out[0].Metrics)
if count, ok := metrics.NodesAvailable["dc1"]; !ok || count != 2 {
t.Fatalf("bad: %#v", metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
@ -403,19 +418,39 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) {
if len(planned) != 2 {
t.Fatalf("bad: %#v", plan)
}
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
// Ensure two allocations placed
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 3 {
if len(out) != 2 {
t.Fatalf("bad: %#v", out)
}
if len(h.Evals) != 1 {
t.Fatalf("incorrect number of updated eval: %#v", h.Evals)
}
outEval := h.Evals[0]
// Ensure the eval has its spawned blocked eval
if outEval.BlockedEval != h.CreateEvals[0].ID {
t.Fatalf("bad: %#v", outEval)
}
// Ensure the plan failed to alloc one tg
if outEval == nil || len(outEval.FailedTGAllocs) != 1 {
t.Fatalf("bad: %#v", outEval)
}
metrics, ok := outEval.FailedTGAllocs[tg2.Name]
if !ok {
t.Fatalf("no failed metrics: %#v", outEval.FailedTGAllocs)
}
// Check the coalesced failures
if metrics.CoalescedFailures != tg2.Count-1 {
t.Fatalf("bad: %#v", metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
@ -586,9 +621,13 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan didn't to alloc
if len(plan.FailedAllocs) != 0 {
t.Fatalf("bad: %#v", plan)
// Ensure the plan had no failures
if len(h.Evals) != 1 {
t.Fatalf("incorrect number of updated eval: %#v", h.Evals)
}
outEval := h.Evals[0]
if outEval == nil || len(outEval.FailedTGAllocs) != 0 {
t.Fatalf("bad: %#v", outEval)
}
// Lookup the allocations by JobID

View File

@ -60,20 +60,20 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusFailed, desc)
}
// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, statusErr.EvalStatus, err.Error())
}
return err
}
// Update the status to complete
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusComplete, "")
}
// process is wrapped in retryMax to iteratively run the handler until we have no
@ -98,6 +98,9 @@ func (s *SystemScheduler) process() (bool, error) {
// Create a plan
s.plan = s.eval.MakePlan(s.job)
// Reset the failed allocations
s.eval.FailedTGAllocs = nil
// Create an evaluation context
s.ctx = NewEvalContext(s.state, s.plan, s.logger)
@ -220,10 +223,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
nodeByID[node.ID] = node
}
// Track the failed task groups so that we can coalesce
// the failures together to avoid creating many failed allocs.
failedTG := make(map[*structs.TaskGroup]*structs.Allocation)
nodes := make([]*structs.Node, 1)
for _, missing := range place {
node, ok := nodeByID[missing.Alloc.NodeID]
@ -240,43 +239,45 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
if option == nil {
// Check if this task group has already failed
if alloc, ok := failedTG[missing.TaskGroup]; ok {
alloc.Metrics.CoalescedFailures += 1
if metric, ok := s.eval.FailedTGAllocs[missing.TaskGroup.Name]; ok {
metric.CoalescedFailures += 1
continue
}
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
}
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = s.nodesByDC
// Set fields based on if we found an allocation option
if option != nil {
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
// Generate service IDs tasks in this allocation
// COMPAT - This is no longer required and would be removed in v0.4
alloc.PopulateServiceIDs(missing.TaskGroup)
alloc.NodeID = option.Node.ID
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
// Lazy initialize the failed map
if s.eval.FailedTGAllocs == nil {
s.eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.eval.FailedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
}
}
return nil
}

View File

@ -91,7 +91,6 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
for _, allocList := range plan.NodeAllocation {
allocs = append(allocs, allocList...)
}
allocs = append(allocs, plan.FailedAllocs...)
// Attach the plan to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized

View File

@ -366,7 +366,7 @@ func networkPortMap(n *structs.NetworkResource) map[string]int {
}
// setStatus is used to update the status of the evaluation
func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Evaluation, status, desc string) error {
func setStatus(logger *log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, status, desc string) error {
logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status)
newEval := eval.Copy()
newEval.Status = status
@ -374,6 +374,9 @@ func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Eval
if nextEval != nil {
newEval.NextEval = nextEval.ID
}
if spawnedBlocked != nil {
newEval.BlockedEval = spawnedBlocked.ID
}
return planner.UpdateEval(newEval)
}

View File

@ -488,7 +488,7 @@ func TestSetStatus(t *testing.T) {
eval := mock.Eval()
status := "a"
desc := "b"
if err := setStatus(logger, h, eval, nil, status, desc); err != nil {
if err := setStatus(logger, h, eval, nil, nil, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
@ -501,9 +501,10 @@ func TestSetStatus(t *testing.T) {
t.Fatalf("setStatus() submited invalid eval: %v", newEval)
}
// Test next evals
h = NewHarness(t)
next := mock.Eval()
if err := setStatus(logger, h, eval, next, status, desc); err != nil {
if err := setStatus(logger, h, eval, next, nil, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
@ -515,6 +516,22 @@ func TestSetStatus(t *testing.T) {
if newEval.NextEval != next.ID {
t.Fatalf("setStatus() didn't set nextEval correctly: %v", newEval)
}
// Test blocked evals
h = NewHarness(t)
blocked := mock.Eval()
if err := setStatus(logger, h, eval, nil, blocked, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
if len(h.Evals) != 1 {
t.Fatalf("setStatus() didn't update plan: %v", h.Evals)
}
newEval = h.Evals[0]
if newEval.BlockedEval != blocked.ID {
t.Fatalf("setStatus() didn't set BlockedEval correctly: %v", newEval)
}
}
func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) {