Merge pull request #1612 from hashicorp/diptanu-0-5-branch

Chained Allocs
This commit is contained in:
Diptanu Choudhury 2016-08-17 16:23:32 -07:00 committed by GitHub
commit 59892e4004
7 changed files with 195 additions and 0 deletions

View File

@ -184,6 +184,7 @@ func SystemJob() *structs.Job {
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{},
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,

View File

@ -2495,6 +2495,9 @@ type Allocation struct {
// TaskStates stores the state of each task,
TaskStates map[string]*TaskState
// PreviousAllocation is the allocation that this allocation is replacing
PreviousAllocation string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64

View File

@ -457,6 +457,12 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
ClientStatus: structs.AllocClientStatusPending,
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map

View File

@ -2054,3 +2054,85 @@ func TestGenericSched_FilterCompleteAllocs(t *testing.T) {
}
}
}
func TestGenericSched_ChainedAlloc(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
var allocIDs []string
for _, allocList := range h.Plans[0].NodeAllocation {
for _, alloc := range allocList {
allocIDs = append(allocIDs, alloc.ID)
}
}
sort.Strings(allocIDs)
// Create a new harness to invoke the scheduler again
h1 := NewHarnessWithState(t, h.State)
job1 := mock.Job()
job1.ID = job.ID
job1.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
job1.TaskGroups[0].Count = 12
noErr(t, h1.State.UpsertJob(h1.NextIndex(), job1))
// Create a mock evaluation to update the job
eval1 := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
}
// Process the evaluation
if err := h1.Process(NewServiceScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
plan := h1.Plans[0]
// Collect all the chained allocation ids and the new allocations which
// don't have any chained allocations
var prevAllocs []string
var newAllocs []string
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
if alloc.PreviousAllocation == "" {
newAllocs = append(newAllocs, alloc.ID)
continue
}
prevAllocs = append(prevAllocs, alloc.PreviousAllocation)
}
}
sort.Strings(prevAllocs)
// Ensure that the new allocations has their corresponging original
// allocation ids
if !reflect.DeepEqual(prevAllocs, allocIDs) {
t.Fatalf("expected: %v, actual: %v", len(allocIDs), len(prevAllocs))
}
// Ensuring two new allocations don't have any chained allocations
if len(newAllocs) != 2 {
t.Fatalf("expected: %v, actual: %v", 2, len(newAllocs))
}
}

View File

@ -321,6 +321,12 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
ClientStatus: structs.AllocClientStatusPending,
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map

View File

@ -2,6 +2,7 @@ package scheduler
import (
"reflect"
"sort"
"testing"
"time"
@ -1008,3 +1009,90 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {
t.Fatalf("bad queued allocations: %#v", h.Evals[0].QueuedAllocations)
}
}
func TestSystemSched_ChainedAlloc(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
var allocIDs []string
for _, allocList := range h.Plans[0].NodeAllocation {
for _, alloc := range allocList {
allocIDs = append(allocIDs, alloc.ID)
}
}
sort.Strings(allocIDs)
// Create a new harness to invoke the scheduler again
h1 := NewHarnessWithState(t, h.State)
job1 := mock.SystemJob()
job1.ID = job.ID
job1.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
noErr(t, h1.State.UpsertJob(h1.NextIndex(), job1))
// Insert two more nodes
for i := 0; i < 2; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a mock evaluation to update the job
eval1 := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
}
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
plan := h1.Plans[0]
// Collect all the chained allocation ids and the new allocations which
// don't have any chained allocations
var prevAllocs []string
var newAllocs []string
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
if alloc.PreviousAllocation == "" {
newAllocs = append(newAllocs, alloc.ID)
continue
}
prevAllocs = append(prevAllocs, alloc.PreviousAllocation)
}
}
sort.Strings(prevAllocs)
// Ensure that the new allocations has their corresponging original
// allocation ids
if !reflect.DeepEqual(prevAllocs, allocIDs) {
t.Fatalf("expected: %v, actual: %v", len(allocIDs), len(prevAllocs))
}
// Ensuring two new allocations don't have any chained allocations
if len(newAllocs) != 2 {
t.Fatalf("expected: %v, actual: %v", 2, len(newAllocs))
}
}

View File

@ -66,6 +66,15 @@ func NewHarness(t *testing.T) *Harness {
return h
}
// NewHarnessWithState creates a new harness with the given state for testing
// purposes.
func NewHarnessWithState(t *testing.T, state *state.StateStore) *Harness {
return &Harness{
State: state,
nextIndex: 1,
}
}
// SubmitPlan is used to handle plan submission
func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, error) {
// Ensure sequential plan application