Merge pull request #957 from hashicorp/b-near-node-capacity

core: ProposedAllocs dedups in-place updated allocations
This commit is contained in:
Alex Dadgar 2016-03-21 18:09:54 -07:00
commit c744f7a0bf
5 changed files with 169 additions and 45 deletions

View File

@ -680,11 +680,13 @@ func (r *Resources) Copy() *Resources {
}
newR := new(Resources)
*newR = *r
if r.Networks != nil {
n := len(r.Networks)
newR.Networks = make([]*NetworkResource, n)
for i := 0; i < n; i++ {
newR.Networks[i] = r.Networks[i].Copy()
}
}
return newR
}
@ -942,11 +944,13 @@ func (j *Job) Copy() *Job {
nj.Datacenters = CopySliceString(nj.Datacenters)
nj.Constraints = CopySliceConstraints(nj.Constraints)
if j.TaskGroups != nil {
tgs := make([]*TaskGroup, len(nj.TaskGroups))
for i, tg := range nj.TaskGroups {
tgs[i] = tg.Copy()
}
nj.TaskGroups = tgs
}
nj.Periodic = nj.Periodic.Copy()
nj.Meta = CopyMapStringString(nj.Meta)
@ -1317,11 +1321,13 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.RestartPolicy = ntg.RestartPolicy.Copy()
if tg.Tasks != nil {
tasks := make([]*Task, len(ntg.Tasks))
for i, t := range ntg.Tasks {
tasks[i] = t.Copy()
}
ntg.Tasks = tasks
}
ntg.Meta = CopyMapStringString(ntg.Meta)
return ntg
@ -1483,14 +1489,14 @@ func (s *Service) Copy() *Service {
*ns = *s
ns.Tags = CopySliceString(ns.Tags)
var checks []*ServiceCheck
if l := len(ns.Checks); l != 0 {
checks = make([]*ServiceCheck, len(ns.Checks))
if s.Checks != nil {
checks := make([]*ServiceCheck, len(ns.Checks))
for i, c := range ns.Checks {
checks[i] = c.Copy()
}
}
ns.Checks = checks
}
return ns
}
@ -1623,21 +1629,26 @@ func (t *Task) Copy() *Task {
*nt = *t
nt.Env = CopyMapStringString(nt.Env)
if t.Services != nil {
services := make([]*Service, len(nt.Services))
for i, s := range nt.Services {
services[i] = s.Copy()
}
nt.Services = services
}
nt.Constraints = CopySliceConstraints(nt.Constraints)
nt.Resources = nt.Resources.Copy()
nt.Meta = CopyMapStringString(nt.Meta)
artifacts := make([]*TaskArtifact, len(nt.Artifacts))
for i, a := range nt.Artifacts {
artifacts[i] = a.Copy()
if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))
for _, a := range nt.Artifacts {
artifacts = append(artifacts, a.Copy())
}
nt.Artifacts = artifacts
}
if i, err := copystructure.Copy(nt.Config); err != nil {
nt.Config = i.(map[string]interface{})
@ -1767,10 +1778,13 @@ func (ts *TaskState) Copy() *TaskState {
}
copy := new(TaskState)
copy.State = ts.State
if ts.Events != nil {
copy.Events = make([]*TaskEvent, len(ts.Events))
for i, e := range ts.Events {
copy.Events[i] = e.Copy()
}
}
return copy
}
@ -2148,25 +2162,31 @@ func (a *Allocation) Copy() *Allocation {
na.Job = na.Job.Copy()
na.Resources = na.Resources.Copy()
if a.TaskResources != nil {
tr := make(map[string]*Resources, len(na.TaskResources))
for task, resource := range na.TaskResources {
tr[task] = resource.Copy()
}
na.TaskResources = tr
}
if a.Services != nil {
s := make(map[string]string, len(na.Services))
for service, id := range na.Services {
s[service] = id
}
na.Services = s
}
na.Metrics = na.Metrics.Copy()
if a.TaskStates != nil {
ts := make(map[string]*TaskState, len(na.TaskStates))
for task, state := range na.TaskStates {
ts[task] = state.Copy()
}
na.TaskStates = ts
}
return na
}

View File

@ -119,12 +119,23 @@ func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, erro
if update := e.plan.NodeUpdate[nodeID]; len(update) > 0 {
proposed = structs.RemoveAllocs(existingAlloc, update)
}
proposed = append(proposed, e.plan.NodeAllocation[nodeID]...)
// Ensure the return is not nil
if proposed == nil {
proposed = make([]*structs.Allocation, 0)
// We create an index of the existing allocations so that if an inplace
// update occurs, we do not double count and we override the old allocation.
proposedIDs := make(map[string]*structs.Allocation, len(proposed))
for _, alloc := range proposed {
proposedIDs[alloc.ID] = alloc
}
for _, alloc := range e.plan.NodeAllocation[nodeID] {
proposedIDs[alloc.ID] = alloc
}
// Materialize the proposed slice
proposed = make([]*structs.Allocation, 0, len(proposedIDs))
for _, alloc := range proposedIDs {
proposed = append(proposed, alloc)
}
return proposed, nil
}

View File

@ -487,24 +487,28 @@ func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) {
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
ID: structs.GenerateUUID(),
},
// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: "ignore 2",
ID: structs.GenerateUUID(),
},
}
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: job.ID,
ID: structs.GenerateUUID(),
},
// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: "ignore 2",
ID: structs.GenerateUUID(),
},
}

View File

@ -436,6 +436,93 @@ func TestServiceSched_JobModify(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// Have a single node and submit a job. Increment the count such that all fit
// on the node but the node doesn't have enough resources to fit the new count +
// 1. This tests that we properly discount the resources of existing allocs.
func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) {
h := NewHarness(t)
// Create one node
node := mock.Node()
node.Resources.CPU = 1000
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Generate a fake job with one allocation
job := mock.Job()
job.TaskGroups[0].Tasks[0].Resources.CPU = 256
job2 := job.Copy()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.Resources.CPU = 256
allocs = append(allocs, alloc)
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Update the job to count 3
job2.TaskGroups[0].Count = 3
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan didn't evicted the alloc
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(update) != 0 {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 3 {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan didn't to alloc
if len(plan.FailedAllocs) != 0 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
out = structs.FilterTerminalAllocs(out)
if len(out) != 3 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobModify_CountZero(t *testing.T) {
h := NewHarness(t)
@ -1079,7 +1166,7 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Ensure no plan as it should be a no-op
// Ensure a plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}

View File

@ -345,9 +345,11 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
&structs.Allocation{
ID: structs.GenerateUUID(),
JobID: "foo",
},
&structs.Allocation{
ID: structs.GenerateUUID(),
JobID: "foo",
},
}
@ -369,7 +371,7 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != -10.0 {
t.Fatalf("Bad: %v", out[0])
t.Fatalf("Bad: %#v", out[0])
}
if out[1] != nodes[1] {