diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 6022885bb..6ee83a098 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -306,8 +306,8 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv defer r.taskStatusLock.Unlock() taskState, ok := r.taskStates[taskName] if !ok { - r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName) - return + taskState = &structs.TaskState{} + r.taskStates[taskName] = taskState } // Set the tasks state. diff --git a/nomad/fsm.go b/nomad/fsm.go index fb8b9934d..5516b764f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -356,7 +356,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - // Attach the plan to all the allocations. It is pulled out in the + // Attach the job to all the allocations. It is pulled out in the // payload to avoid the redundancy of encoding, but should be denormalized // prior to being inserted into MemDB. if j := req.Job; j != nil { @@ -367,6 +367,20 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { } } + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + for _, alloc := range req.Alloc { + if alloc.Resources != nil { + continue + } + + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } + } + if err := n.state.UpsertAllocs(index, req.Alloc); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err) return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index a300cb0f4..f6a7bc5e7 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -587,6 +587,43 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) { } } +func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { + fsm := testFSM(t) + + alloc := mock.Alloc() + job := alloc.Job + resources := alloc.Resources + alloc.Resources = nil + req := structs.AllocUpdateRequest{ + Job: job, + Alloc: []*structs.Allocation{alloc}, + } + buf, err := structs.Encode(structs.AllocUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + out, err := fsm.State().AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + alloc.CreateIndex = out.CreateIndex + alloc.ModifyIndex = out.ModifyIndex + alloc.AllocModifyIndex = out.AllocModifyIndex + + // Resources should be recomputed + alloc.Resources = resources + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v %#v", alloc, out) + } +} + func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { fsm := testFSM(t) fsm.blockedEvals.SetEnabled(true) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index ae71f9f54..eee97a8d4 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -224,12 +224,13 @@ func Alloc() *structs.Allocation { Resources: &structs.Resources{ CPU: 500, MemoryMB: 256, + DiskMB: 10, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "main", Value: 12345}}, - MBits: 100, + ReservedPorts: []structs.Port{{Label: "main", Value: 5000}}, + MBits: 50, DynamicPorts: []structs.Port{{Label: "http"}}, }, }, @@ -238,6 +239,7 @@ func Alloc() *structs.Allocation { "web": &structs.Resources{ CPU: 500, MemoryMB: 256, + DiskMB: 10, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ Device: "eth0", @@ -249,11 +251,6 @@ func Alloc() *structs.Allocation { }, }, }, - TaskStates: map[string]*structs.TaskState{ - "web": &structs.TaskState{ - State: structs.TaskStatePending, - }, - }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 57deebaa8..f7d8351d2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1404,23 +1404,16 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { } // Create the delta updates + ts := map[string]*structs.TaskState{"web": &structs.TaskState{State: structs.TaskStatePending}} update := &structs.Allocation{ ID: alloc.ID, ClientStatus: structs.AllocClientStatusFailed, - TaskStates: map[string]*structs.TaskState{ - "web": &structs.TaskState{ - State: structs.TaskStatePending, - }, - }, + TaskStates: ts, } update2 := &structs.Allocation{ ID: alloc2.ID, ClientStatus: structs.AllocClientStatusRunning, - TaskStates: map[string]*structs.TaskState{ - "web": &structs.TaskState{ - State: structs.TaskStatePending, - }, - }, + TaskStates: ts, } err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2}) @@ -1435,6 +1428,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { alloc.CreateIndex = 1000 alloc.ModifyIndex = 1001 + alloc.TaskStates = ts alloc.ClientStatus = structs.AllocClientStatusFailed if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) @@ -1448,6 +1442,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { alloc2.ModifyIndex = 1000 alloc2.ModifyIndex = 1001 alloc2.ClientStatus = structs.AllocClientStatusRunning + alloc2.TaskStates = ts if !reflect.DeepEqual(alloc2, out) { t.Fatalf("bad: %#v %#v", alloc2, out) } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index bf32e6f06..57d762701 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -58,8 +58,20 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st // For each alloc, add the resources for _, alloc := range allocs { - if err := used.Add(alloc.Resources); err != nil { - return false, "", nil, err + if alloc.Resources != nil { + if err := used.Add(alloc.Resources); err != nil { + return false, "", nil, err + } + } else if alloc.TaskResources != nil { + // Allocations within the plan have the combined resources stripped + // to save space, so sum up the individual task resources. + for _, taskResource := range alloc.TaskResources { + if err := used.Add(taskResource); err != nil { + return false, "", nil, err + } + } + } else { + return false, "", nil, fmt.Errorf("allocation %q has no resources set", alloc.ID) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0bc741de2..b562a0ad7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2459,6 +2459,10 @@ func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) { // Normalize the job newAlloc.Job = nil + + // Strip the resources as it can be rebuilt. + newAlloc.Resources = nil + newAlloc.DesiredStatus = status newAlloc.DesiredDescription = desc node := alloc.NodeID diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 4372c7566..72095bf34 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -335,7 +335,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { } // Attempt to match the task group - option, size := s.stack.Select(missing.TaskGroup) + option, _ := s.stack.Select(missing.TaskGroup) // Create an allocation for this alloc := &structs.Allocation{ @@ -344,7 +344,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { Name: missing.Name, JobID: s.job.ID, TaskGroup: missing.TaskGroup.Name, - Resources: size, Metrics: s.ctx.Metrics(), } @@ -360,13 +359,11 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending - alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed - alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 019e78673..34b1bcc18 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -228,7 +228,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { s.stack.SetNodes(nodes) // Attempt to match the task group - option, size := s.stack.Select(missing.TaskGroup) + option, _ := s.stack.Select(missing.TaskGroup) if option == nil { // Check if this task group has already failed @@ -245,7 +245,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { Name: missing.Name, JobID: s.job.ID, TaskGroup: missing.TaskGroup.Name, - Resources: size, Metrics: s.ctx.Metrics(), } @@ -261,13 +260,11 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending - alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed - alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/util.go b/scheduler/util.go index e8f93f272..961b698fe 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -366,7 +366,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, allocInPlace) // Attempt to match the task group - option, size := stack.Select(update.TaskGroup) + option, _ := stack.Select(update.TaskGroup) // Pop the allocation ctx.Plan().PopUpdate(update.Alloc) @@ -391,8 +391,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // Update the allocation newAlloc.EvalID = eval.ID - newAlloc.Job = nil // Use the Job in the Plan - newAlloc.Resources = size + newAlloc.Job = nil // Use the Job in the Plan + newAlloc.Resources = nil // Computed in Plan Apply newAlloc.TaskResources = option.TaskResources newAlloc.Metrics = ctx.Metrics() newAlloc.DesiredStatus = structs.AllocDesiredStatusRun @@ -460,11 +460,3 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { return c } - -func initTaskState(tg *structs.TaskGroup, state string) map[string]*structs.TaskState { - states := make(map[string]*structs.TaskState, len(tg.Tasks)) - for _, task := range tg.Tasks { - states[task.Name] = &structs.TaskState{State: state} - } - return states -} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 0719cc5fb..bcae1d290 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -714,29 +714,6 @@ func TestTaskGroupConstraints(t *testing.T) { } -func TestInitTaskState(t *testing.T) { - tg := &structs.TaskGroup{ - Tasks: []*structs.Task{ - &structs.Task{Name: "foo"}, - &structs.Task{Name: "bar"}, - }, - } - expPending := map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStatePending}, - "bar": &structs.TaskState{State: structs.TaskStatePending}, - } - expDead := map[string]*structs.TaskState{ - "foo": &structs.TaskState{State: structs.TaskStateDead}, - "bar": &structs.TaskState{State: structs.TaskStateDead}, - } - actPending := initTaskState(tg, structs.TaskStatePending) - actDead := initTaskState(tg, structs.TaskStateDead) - - if !(reflect.DeepEqual(expPending, actPending) && reflect.DeepEqual(expDead, actDead)) { - t.Fatal("Expected and actual not equal") - } -} - func TestProgressMade(t *testing.T) { noopPlan := &structs.PlanResult{} if progressMade(nil) || progressMade(noopPlan) {