Merge pull request #867 from hashicorp/f-reduce-alloc-update-size

Decrease size of Allocation when serializing
This commit is contained in:
Alex Dadgar 2016-03-01 15:38:18 -08:00
commit f91e054ac1
11 changed files with 86 additions and 64 deletions

View file

@ -306,8 +306,8 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
if !ok {
r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName)
return
taskState = &structs.TaskState{}
r.taskStates[taskName] = taskState
}
// Set the tasks state.

View file

@ -356,7 +356,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
// Attach the plan to all the allocations. It is pulled out in the
// Attach the job to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
if j := req.Job; j != nil {
@ -367,6 +367,20 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
}
}
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
}
if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err)
return err

View file

@ -587,6 +587,43 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) {
}
}
func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
fsm := testFSM(t)
alloc := mock.Alloc()
job := alloc.Job
resources := alloc.Resources
alloc.Resources = nil
req := structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out, err := fsm.State().AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
// Resources should be recomputed
alloc.Resources = resources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
}
func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)

View file

@ -224,12 +224,13 @@ func Alloc() *structs.Allocation {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "main", Value: 12345}},
MBits: 100,
ReservedPorts: []structs.Port{{Label: "main", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
@ -238,6 +239,7 @@ func Alloc() *structs.Allocation {
"web": &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
@ -249,11 +251,6 @@ func Alloc() *structs.Allocation {
},
},
},
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
Job: Job(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,

View file

@ -1404,23 +1404,16 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
}
// Create the delta updates
ts := map[string]*structs.TaskState{"web": &structs.TaskState{State: structs.TaskStatePending}}
update := &structs.Allocation{
ID: alloc.ID,
ClientStatus: structs.AllocClientStatusFailed,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
TaskStates: ts,
}
update2 := &structs.Allocation{
ID: alloc2.ID,
ClientStatus: structs.AllocClientStatusRunning,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
TaskStates: ts,
}
err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2})
@ -1435,6 +1428,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
alloc.CreateIndex = 1000
alloc.ModifyIndex = 1001
alloc.TaskStates = ts
alloc.ClientStatus = structs.AllocClientStatusFailed
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
@ -1448,6 +1442,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
alloc2.ModifyIndex = 1000
alloc2.ModifyIndex = 1001
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.TaskStates = ts
if !reflect.DeepEqual(alloc2, out) {
t.Fatalf("bad: %#v %#v", alloc2, out)
}

View file

@ -58,8 +58,20 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st
// For each alloc, add the resources
for _, alloc := range allocs {
if err := used.Add(alloc.Resources); err != nil {
return false, "", nil, err
if alloc.Resources != nil {
if err := used.Add(alloc.Resources); err != nil {
return false, "", nil, err
}
} else if alloc.TaskResources != nil {
// Allocations within the plan have the combined resources stripped
// to save space, so sum up the individual task resources.
for _, taskResource := range alloc.TaskResources {
if err := used.Add(taskResource); err != nil {
return false, "", nil, err
}
}
} else {
return false, "", nil, fmt.Errorf("allocation %q has no resources set", alloc.ID)
}
}

View file

@ -2459,6 +2459,10 @@ func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
// Normalize the job
newAlloc.Job = nil
// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil
newAlloc.DesiredStatus = status
newAlloc.DesiredDescription = desc
node := alloc.NodeID

View file

@ -335,7 +335,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
}
// Attempt to match the task group
option, size := s.stack.Select(missing.TaskGroup)
option, _ := s.stack.Select(missing.TaskGroup)
// Create an allocation for this
alloc := &structs.Allocation{
@ -344,7 +344,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
}
@ -360,13 +359,11 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending)
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead)
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
}

View file

@ -228,7 +228,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.stack.SetNodes(nodes)
// Attempt to match the task group
option, size := s.stack.Select(missing.TaskGroup)
option, _ := s.stack.Select(missing.TaskGroup)
if option == nil {
// Check if this task group has already failed
@ -245,7 +245,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
}
@ -261,13 +260,11 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending)
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead)
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
}

View file

@ -366,7 +366,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
allocInPlace)
// Attempt to match the task group
option, size := stack.Select(update.TaskGroup)
option, _ := stack.Select(update.TaskGroup)
// Pop the allocation
ctx.Plan().PopUpdate(update.Alloc)
@ -391,8 +391,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// Update the allocation
newAlloc.EvalID = eval.ID
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = size
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
@ -460,11 +460,3 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple {
return c
}
func initTaskState(tg *structs.TaskGroup, state string) map[string]*structs.TaskState {
states := make(map[string]*structs.TaskState, len(tg.Tasks))
for _, task := range tg.Tasks {
states[task.Name] = &structs.TaskState{State: state}
}
return states
}

View file

@ -714,29 +714,6 @@ func TestTaskGroupConstraints(t *testing.T) {
}
func TestInitTaskState(t *testing.T) {
tg := &structs.TaskGroup{
Tasks: []*structs.Task{
&structs.Task{Name: "foo"},
&structs.Task{Name: "bar"},
},
}
expPending := map[string]*structs.TaskState{
"foo": &structs.TaskState{State: structs.TaskStatePending},
"bar": &structs.TaskState{State: structs.TaskStatePending},
}
expDead := map[string]*structs.TaskState{
"foo": &structs.TaskState{State: structs.TaskStateDead},
"bar": &structs.TaskState{State: structs.TaskStateDead},
}
actPending := initTaskState(tg, structs.TaskStatePending)
actDead := initTaskState(tg, structs.TaskStateDead)
if !(reflect.DeepEqual(expPending, actPending) && reflect.DeepEqual(expDead, actDead)) {
t.Fatal("Expected and actual not equal")
}
}
func TestProgressMade(t *testing.T) {
noopPlan := &structs.PlanResult{}
if progressMade(nil) || progressMade(noopPlan) {