Schedulers create blocked eval if there are failed allocations

This commit is contained in:
Alex Dadgar 2016-01-28 13:43:48 -08:00
parent 45a733600a
commit 9045d7e989
9 changed files with 298 additions and 0 deletions

1
nomad/blocked_evals.go Normal file
View File

@ -0,0 +1 @@
package nomad

View File

@ -0,0 +1 @@
package nomad

View File

@ -1824,6 +1824,7 @@ func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {
}
const (
EvalStatusBlocked = "blocked"
EvalStatusPending = "pending"
EvalStatusComplete = "complete"
EvalStatusFailed = "failed"
@ -1912,6 +1913,18 @@ type Evaluation struct {
// This is used to support rolling upgrades, where we need a chain of evaluations.
PreviousEval string
// EligibleClasses are the computed node classes that have explicitely been
// marked as eligible for placement for some task groups of the job.
EligibleClasses []uint64
// IneligibleClasses are the computed node classes that have explicitely been
// marked as ineligible for placement for some task groups of the job.
IneligibleClasses []uint64
// EscapedComputedClass marks whether the job has constraints that are not
// captured by computed node classes.
EscapedComputedClass bool
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
@ -1980,6 +1993,25 @@ func (e *Evaluation) NextRollingEval(wait time.Duration) *Evaluation {
}
}
// BlockedEval creates a blocked evaluation to followup this eval to place any
// failed allocations. It takes the classes marked explicitely eligible or
// ineligible and whether the job has escaped computed node classes.
func (e *Evaluation) BlockedEval(elig, inelig []uint64, escaped bool) *Evaluation {
return &Evaluation{
ID: GenerateUUID(),
Priority: e.Priority,
Type: e.Type,
TriggeredBy: e.TriggeredBy,
JobID: e.JobID,
JobModifyIndex: e.JobModifyIndex,
Status: EvalStatusBlocked,
PreviousEval: e.ID,
EligibleClasses: elig,
IneligibleClasses: inelig,
EscapedComputedClass: escaped,
}
}
// Plan is used to submit a commit plan for task allocations. These
// are submitted to the leader which verifies that resources have
// not been overcommitted before admiting the plan.

View File

@ -220,6 +220,36 @@ func (e *EvalEligibility) HasEscaped() bool {
return false
}
// GetClasses returns the eligible classes and the ineligible classes,
// respectively, across the job and task groups.
func (e *EvalEligibility) GetClasses() ([]uint64, []uint64) {
var elig, inelig []uint64
// Go through the job.
for class, feas := range e.job {
switch feas {
case EvalComputedClassEligible:
elig = append(elig, class)
case EvalComputedClassIneligible:
inelig = append(inelig, class)
}
}
// Go through the task groups.
for _, classes := range e.taskGroups {
for class, feas := range classes {
switch feas {
case EvalComputedClassEligible:
elig = append(elig, class)
case EvalComputedClassIneligible:
inelig = append(inelig, class)
}
}
}
return elig, inelig
}
// JobStatus returns the eligibility status of the job.
func (e *EvalEligibility) JobStatus(class uint64) ComputedClassFeasibility {
// COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3

View File

@ -3,6 +3,8 @@ package scheduler
import (
"log"
"os"
"reflect"
"sort"
"testing"
"github.com/hashicorp/nomad/nomad/mock"
@ -206,3 +208,31 @@ func TestEvalEligibility_SetJob(t *testing.T) {
t.Fatalf("SetJob() should mark task group as escaped")
}
}
type uint64Array []uint64
func (u uint64Array) Len() int { return len(u) }
func (u uint64Array) Less(i, j int) bool { return u[i] <= u[j] }
func (u uint64Array) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
func TestEvalEligibility_GetClasses(t *testing.T) {
e := NewEvalEligibility()
e.SetJobEligibility(true, 1)
e.SetJobEligibility(false, 2)
e.SetTaskGroupEligibility(true, "foo", 3)
e.SetTaskGroupEligibility(false, "bar", 4)
e.SetTaskGroupEligibility(true, "bar", 5)
expElig := []uint64{1, 3, 5}
expInelig := []uint64{2, 4}
actElig, actInelig := e.GetClasses()
sort.Sort(uint64Array(actElig))
sort.Sort(uint64Array(actInelig))
if !reflect.DeepEqual(actElig, expElig) {
t.Fatalf("GetClasses() returned %#v; want %#v", actElig, expElig)
}
if !reflect.DeepEqual(actInelig, expInelig) {
t.Fatalf("GetClasses() returned %#v; want %#v", actInelig, expInelig)
}
}

View File

@ -58,6 +58,8 @@ type GenericScheduler struct {
limitReached bool
nextEval *structs.Evaluation
blocked *structs.Evaluation
}
// NewServiceScheduler is a factory function to instantiate a new service scheduler
@ -158,6 +160,19 @@ func (s *GenericScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
e := s.ctx.Eligibility()
elig, inelig := e.GetClasses()
s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped())
if err := s.planner.CreateEval(s.blocked); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}
// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
if err != nil {

View File

@ -104,6 +104,11 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
}
plan := h.Plans[0]
// Ensure the plan has created a follow up eval.
if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", h.CreateEvals)
}
// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
@ -131,6 +136,93 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_BlockedEval(t *testing.T) {
h := NewHarness(t)
// Create a full node
node := mock.Node()
node.Reserved = node.Resources
node.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create an ineligible node
node2 := mock.Node()
node2.Attributes["kernel.name"] = "windows"
node2.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))
// Create a jobs
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan has created a follow up eval.
if len(h.CreateEvals) != 1 {
t.Fatalf("bad: %#v", h.CreateEvals)
}
created := h.CreateEvals[0]
if created.Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", created)
}
if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 {
t.Fatalf("bad: %#v", created)
}
if created.EscapedComputedClass {
t.Fatalf("bad: %#v", created)
}
// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 1 {
for _, a := range out {
t.Logf("%#v", a)
}
t.Fatalf("bad: %#v", out)
}
// Check the coalesced failures
if out[0].Metrics.CoalescedFailures != 9 {
t.Fatalf("bad: %#v", out[0].Metrics)
}
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 {
t.Fatalf("bad: %#v", out[0].Metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobModify(t *testing.T) {
h := NewHarness(t)

View File

@ -35,6 +35,8 @@ type SystemScheduler struct {
limitReached bool
nextEval *structs.Evaluation
blocked *structs.Evaluation
}
// NewSystemScheduler is a factory function to instantiate a new system
@ -127,6 +129,19 @@ func (s *SystemScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
e := s.ctx.Eligibility()
elig, inelig := e.GetClasses()
s.blocked = s.eval.BlockedEval(elig, inelig, e.HasEscaped())
if err := s.planner.CreateEval(s.blocked); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}
// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
if err != nil {

View File

@ -184,6 +184,88 @@ func TestSystemSched_JobRegister_AllocFail(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobRegister_BlockedEval(t *testing.T) {
h := NewHarness(t)
// Create a full node
node := mock.Node()
node.Reserved = node.Resources
node.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create an ineligible node
node2 := mock.Node()
node2.Attributes["kernel.name"] = "windows"
node2.ComputeClass()
noErr(t, h.State.UpsertNode(h.NextIndex(), node2))
// Create a jobs
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan has created a follow up eval.
if len(h.CreateEvals) != 1 {
t.Fatalf("bad: %#v", h.CreateEvals)
}
created := h.CreateEvals[0]
if created.Status != structs.EvalStatusBlocked {
t.Fatalf("bad: %#v", created)
}
if len(created.EligibleClasses) != 1 && len(created.IneligibleClasses) != 1 {
t.Fatalf("bad: %#v", created)
}
if created.EscapedComputedClass {
t.Fatalf("bad: %#v", created)
}
// Ensure the plan failed to alloc
if len(plan.FailedAllocs) != 1 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 1 {
for _, a := range out {
t.Logf("%#v", a)
}
t.Fatalf("bad: %#v", out)
}
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 {
t.Fatalf("bad: %#v", out[0].Metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobModify(t *testing.T) {
h := NewHarness(t)