Merge pull request #1496 from hashicorp/b-summary

Using the parent transaction to query the allocation while updating s…
This commit is contained in:
Diptanu Choudhury 2016-08-01 17:07:16 -07:00 committed by GitHub
commit 1b47f39d76
2 changed files with 147 additions and 50 deletions

View file

@ -226,12 +226,14 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error
return fmt.Errorf("error retrieving any allocations for the node: %v", nodeID) return fmt.Errorf("error retrieving any allocations for the node: %v", nodeID)
} }
for _, alloc := range allocs { for _, alloc := range allocs {
copyAlloc := new(structs.Allocation) copyAlloc := alloc.Copy()
*copyAlloc = *alloc
if alloc.ClientStatus == structs.AllocClientStatusPending || if alloc.ClientStatus == structs.AllocClientStatusPending ||
alloc.ClientStatus == structs.AllocClientStatusRunning { alloc.ClientStatus == structs.AllocClientStatusRunning {
copyAlloc.ClientStatus = structs.AllocClientStatusLost copyAlloc.ClientStatus = structs.AllocClientStatusLost
if err := s.updateSummaryWithAlloc(index, copyAlloc, watcher, txn); err != nil {
// Updating the summary since we are changing the state of the
// allocation to lost
if err := s.updateSummaryWithAlloc(index, copyAlloc, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err) return fmt.Errorf("error updating job summary: %v", err)
} }
if err := txn.Insert("allocs", copyAlloc); err != nil { if err := txn.Insert("allocs", copyAlloc); err != nil {
@ -859,15 +861,6 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo
// Handle each of the updated allocations // Handle each of the updated allocations
for _, alloc := range allocs { for _, alloc := range allocs {
rawJob, err := txn.First("jobs", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to query job: %v", err)
}
if rawJob != nil {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
}
if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil {
return err return err
} }
@ -897,6 +890,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I
} }
exist := existing.(*structs.Allocation) exist := existing.(*structs.Allocation)
if err := s.updateSummaryWithAlloc(index, alloc, exist, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
// Trigger the watcher // Trigger the watcher
watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: exist.EvalID}) watcher.Add(watch.Item{AllocEval: exist.EvalID})
@ -944,21 +941,16 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
// Handle the allocations // Handle the allocations
jobs := make(map[string]string, 1) jobs := make(map[string]string, 1)
for _, alloc := range allocs { for _, alloc := range allocs {
rawJob, err := txn.First("jobs", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to query job: %v", err)
}
if rawJob != nil {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
}
existing, err := txn.First("allocs", "id", alloc.ID) existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil { if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err) return fmt.Errorf("alloc lookup failed: %v", err)
} }
exist, _ := existing.(*structs.Allocation) exist, _ := existing.(*structs.Allocation)
if err := s.updateSummaryWithAlloc(index, alloc, exist, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
if exist == nil { if exist == nil {
alloc.CreateIndex = index alloc.CreateIndex = index
alloc.ModifyIndex = index alloc.ModifyIndex = index
@ -1343,7 +1335,18 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
// updateSummaryWithAlloc updates the job summary when allocations are updated // updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted // or inserted
func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation,
watcher watch.Items, txn *memdb.Txn) error { existingAlloc *structs.Allocation, watcher watch.Items, txn *memdb.Txn) error {
rawJob, err := txn.First("jobs", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to query job: %v", err)
}
// We don't have to update the summary if the job is missing
if rawJob == nil {
return nil
}
summaryRaw, err := txn.First("job_summary", "id", alloc.JobID) summaryRaw, err := txn.First("job_summary", "id", alloc.JobID)
if err != nil { if err != nil {
return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) return fmt.Errorf("unable to lookup job summary for job id %q: %v", err)
@ -1354,18 +1357,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
} }
jobSummary := summary.Copy() jobSummary := summary.Copy()
// Look for existing alloc
existing, err := s.AllocByID(alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] tgSummary, ok := jobSummary.Summary[alloc.TaskGroup]
if !ok { if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
} }
var summaryChanged bool var summaryChanged bool
if existing == nil { if existingAlloc == nil {
switch alloc.DesiredStatus { switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
@ -1383,7 +1380,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v",
alloc.ID, alloc.ClientStatus) alloc.ID, alloc.ClientStatus)
} }
} else if existing.ClientStatus != alloc.ClientStatus { } else if existingAlloc.ClientStatus != alloc.ClientStatus {
// Incrementing the client of the bin of the current state // Incrementing the client of the bin of the current state
switch alloc.ClientStatus { switch alloc.ClientStatus {
case structs.AllocClientStatusRunning: case structs.AllocClientStatusRunning:
@ -1399,7 +1396,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
} }
// Decrementing the count of the bin of the last state // Decrementing the count of the bin of the last state
switch existing.ClientStatus { switch existingAlloc.ClientStatus {
case structs.AllocClientStatusRunning: case structs.AllocClientStatusRunning:
tgSummary.Running -= 1 tgSummary.Running -= 1
case structs.AllocClientStatusPending: case structs.AllocClientStatusPending:
@ -1408,7 +1405,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
tgSummary.Lost -= 1 tgSummary.Lost -= 1
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: invalid old state of allocation with id: %v, and state: %v", s.logger.Printf("[ERR] state_store: invalid old state of allocation with id: %v, and state: %v",
existing.ID, existing.ClientStatus) existingAlloc.ID, existingAlloc.ClientStatus)
} }
summaryChanged = true summaryChanged = true
} }

View file

@ -106,12 +106,12 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
watch.Item{Table: "nodes"}, watch.Item{Table: "nodes"},
watch.Item{Node: node.ID}) watch.Item{Node: node.ID})
err := state.UpsertNode(1000, node) err := state.UpsertNode(800, node)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
err = state.UpdateNodeStatus(1001, node.ID, structs.NodeStatusReady) err = state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -124,7 +124,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
if out.Status != structs.NodeStatusReady { if out.Status != structs.NodeStatusReady {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
if out.ModifyIndex != 1001 { if out.ModifyIndex != 801 {
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
@ -132,7 +132,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if index != 1001 { if index != 801 {
t.Fatalf("bad: %d", index) t.Fatalf("bad: %d", index)
} }
@ -145,13 +145,14 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusPending alloc.ClientStatus = structs.AllocClientStatusPending
alloc1.ClientStatus = structs.AllocClientStatusPending alloc1.ClientStatus = structs.AllocClientStatusPending
alloc2.ClientStatus = structs.AllocClientStatusPending alloc2.ClientStatus = structs.AllocClientStatusPending
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil {
if err := state.UpsertJob(850, alloc.Job); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc1.JobID)); err != nil { if err := state.UpsertJob(851, alloc1.Job); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil { if err := state.UpsertJob(852, alloc2.Job); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil { if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil {
@ -159,16 +160,17 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
} }
// Change the state of the allocs to running and failed // Change the state of the allocs to running and failed
newAlloc := new(structs.Allocation) newAlloc := alloc.Copy()
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning newAlloc.ClientStatus = structs.AllocClientStatusRunning
newAlloc1 := new(structs.Allocation)
*newAlloc1 = *alloc1 newAlloc1 := alloc1.Copy()
newAlloc1.ClientStatus = structs.AllocClientStatusFailed newAlloc1.ClientStatus = structs.AllocClientStatusFailed
if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil { if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Change the state of the node to down
if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil { if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -201,14 +203,45 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) {
js2, _ := state.JobSummaryByID(alloc1.JobID) js2, _ := state.JobSummaryByID(alloc1.JobID)
js3, _ := state.JobSummaryByID(alloc2.JobID) js3, _ := state.JobSummaryByID(alloc2.JobID)
if js1.Summary["web"].Lost != 1 { expectedSummary1 := structs.JobSummary{
t.Fatalf("expected: %v, got: %v", 1, js1.Summary["web"].Lost) JobID: alloc.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Lost: 1,
},
},
CreateIndex: 850,
ModifyIndex: 1004,
} }
if js2.Summary["web"].Failed != 1 { expectedSummary2 := structs.JobSummary{
t.Fatalf("expected: %v, got: %v", 1, js2.Summary["web"].Failed) JobID: alloc1.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Failed: 1,
},
},
CreateIndex: 851,
ModifyIndex: 1003,
} }
if js3.Summary["web"].Lost != 1 { expectedSummary3 := structs.JobSummary{
t.Fatalf("expected: %v, got: %v", 1, js3.Summary["web"].Lost) JobID: alloc2.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Lost: 1,
},
},
CreateIndex: 852,
ModifyIndex: 1004,
}
if !reflect.DeepEqual(js1, &expectedSummary1) {
t.Fatalf("expected: %v, got: %v", expectedSummary1, js1)
}
if !reflect.DeepEqual(js2, &expectedSummary2) {
t.Fatalf("expected: %v, got: %#v", expectedSummary2, js2)
}
if !reflect.DeepEqual(js3, &expectedSummary3) {
t.Fatalf("expected: %v, got: %v", expectedSummary3, js3)
} }
notify.verify(t) notify.verify(t)
@ -1750,6 +1783,73 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
notify.verify(t) notify.verify(t)
} }
func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
state.UpsertJobSummary(900, mock.JobSummary(alloc.JobID))
if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create the delta updates
ts := map[string]*structs.TaskState{"web": &structs.TaskState{State: structs.TaskStatePending}}
update := &structs.Allocation{
ID: alloc.ID,
ClientStatus: structs.AllocClientStatusRunning,
TaskStates: ts,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
}
update2 := &structs.Allocation{
ID: alloc.ID,
ClientStatus: structs.AllocClientStatusPending,
TaskStates: ts,
JobID: alloc.JobID,
TaskGroup: alloc.TaskGroup,
}
err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2})
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = 1000
alloc.ModifyIndex = 1001
alloc.TaskStates = ts
alloc.ClientStatus = structs.AllocClientStatusPending
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v , actual:%#v", alloc, out)
}
summary, err := state.JobSummaryByID(alloc.JobID)
expectedSummary := &structs.JobSummary{
JobID: alloc.JobID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Starting: 1,
},
},
ModifyIndex: 1001,
}
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(summary, expectedSummary) {
t.Fatalf("expected: %#v, actual: %#v", expectedSummary, summary)
}
}
func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
state := testStateStore(t) state := testStateStore(t)
alloc := mock.Alloc() alloc := mock.Alloc()