diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e2e84ea28..e3b123b47 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -226,12 +226,14 @@ func (s *StateStore) UpdateNodeStatus(index uint64, nodeID, status string) error return fmt.Errorf("error retrieving any allocations for the node: %v", nodeID) } for _, alloc := range allocs { - copyAlloc := new(structs.Allocation) - *copyAlloc = *alloc + copyAlloc := alloc.Copy() if alloc.ClientStatus == structs.AllocClientStatusPending || alloc.ClientStatus == structs.AllocClientStatusRunning { copyAlloc.ClientStatus = structs.AllocClientStatusLost - if err := s.updateSummaryWithAlloc(index, copyAlloc, watcher, txn); err != nil { + + // Updating the summary since we are changing the state of the + // allocation to lost + if err := s.updateSummaryWithAlloc(index, copyAlloc, alloc, watcher, txn); err != nil { return fmt.Errorf("error updating job summary: %v", err) } if err := txn.Insert("allocs", copyAlloc); err != nil { @@ -859,15 +861,6 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo // Handle each of the updated allocations for _, alloc := range allocs { - rawJob, err := txn.First("jobs", "id", alloc.JobID) - if err != nil { - return fmt.Errorf("unable to query job: %v", err) - } - if rawJob != nil { - if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) - } - } if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { return err } @@ -897,6 +890,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I } exist := existing.(*structs.Allocation) + if err := s.updateSummaryWithAlloc(index, alloc, exist, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } + // Trigger the watcher watcher.Add(watch.Item{Alloc: alloc.ID}) watcher.Add(watch.Item{AllocEval: exist.EvalID}) @@ -944,21 +941,16 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { - rawJob, err := txn.First("jobs", "id", alloc.JobID) - if err != nil { - return fmt.Errorf("unable to query job: %v", err) - } - if rawJob != nil { - if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) - } - } existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) } - exist, _ := existing.(*structs.Allocation) + + if err := s.updateSummaryWithAlloc(index, alloc, exist, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } + if exist == nil { alloc.CreateIndex = index alloc.ModifyIndex = index @@ -1343,7 +1335,18 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, - watcher watch.Items, txn *memdb.Txn) error { + existingAlloc *structs.Allocation, watcher watch.Items, txn *memdb.Txn) error { + + rawJob, err := txn.First("jobs", "id", alloc.JobID) + if err != nil { + return fmt.Errorf("unable to query job: %v", err) + } + + // We don't have to update the summary if the job is missing + if rawJob == nil { + return nil + } + summaryRaw, err := txn.First("job_summary", "id", alloc.JobID) if err != nil { return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) @@ -1354,18 +1357,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat } jobSummary := summary.Copy() - // Look for existing alloc - existing, err := s.AllocByID(alloc.ID) - if err != nil { - return fmt.Errorf("alloc lookup failed: %v", err) - } - tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] if !ok { return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) } var summaryChanged bool - if existing == nil { + if existingAlloc == nil { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", @@ -1383,7 +1380,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat s.logger.Printf("[ERR] state_store: new allocation inserted into state store with id: %v and state: %v", alloc.ID, alloc.ClientStatus) } - } else if existing.ClientStatus != alloc.ClientStatus { + } else if existingAlloc.ClientStatus != alloc.ClientStatus { // Incrementing the client of the bin of the current state switch alloc.ClientStatus { case structs.AllocClientStatusRunning: @@ -1399,7 +1396,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat } // Decrementing the count of the bin of the last state - switch existing.ClientStatus { + switch existingAlloc.ClientStatus { case structs.AllocClientStatusRunning: tgSummary.Running -= 1 case structs.AllocClientStatusPending: @@ -1408,7 +1405,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat tgSummary.Lost -= 1 case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: s.logger.Printf("[ERR] state_store: invalid old state of allocation with id: %v, and state: %v", - existing.ID, existing.ClientStatus) + existingAlloc.ID, existingAlloc.ClientStatus) } summaryChanged = true } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 69f831dc1..e601d22a6 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -106,12 +106,12 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { watch.Item{Table: "nodes"}, watch.Item{Node: node.ID}) - err := state.UpsertNode(1000, node) + err := state.UpsertNode(800, node) if err != nil { t.Fatalf("err: %v", err) } - err = state.UpdateNodeStatus(1001, node.ID, structs.NodeStatusReady) + err = state.UpdateNodeStatus(801, node.ID, structs.NodeStatusReady) if err != nil { t.Fatalf("err: %v", err) } @@ -124,7 +124,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { if out.Status != structs.NodeStatusReady { t.Fatalf("bad: %#v", out) } - if out.ModifyIndex != 1001 { + if out.ModifyIndex != 801 { t.Fatalf("bad: %#v", out) } @@ -132,7 +132,7 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if index != 1001 { + if index != 801 { t.Fatalf("bad: %d", index) } @@ -145,13 +145,14 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusPending alloc1.ClientStatus = structs.AllocClientStatusPending alloc2.ClientStatus = structs.AllocClientStatusPending - if err := state.UpsertJobSummary(990, mock.JobSummary(alloc.JobID)); err != nil { + + if err := state.UpsertJob(850, alloc.Job); err != nil { t.Fatal(err) } - if err := state.UpsertJobSummary(990, mock.JobSummary(alloc1.JobID)); err != nil { + if err := state.UpsertJob(851, alloc1.Job); err != nil { t.Fatal(err) } - if err := state.UpsertJobSummary(990, mock.JobSummary(alloc2.JobID)); err != nil { + if err := state.UpsertJob(852, alloc2.Job); err != nil { t.Fatal(err) } if err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc1, alloc2}); err != nil { @@ -159,16 +160,17 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { } // Change the state of the allocs to running and failed - newAlloc := new(structs.Allocation) - *newAlloc = *alloc + newAlloc := alloc.Copy() newAlloc.ClientStatus = structs.AllocClientStatusRunning - newAlloc1 := new(structs.Allocation) - *newAlloc1 = *alloc1 + + newAlloc1 := alloc1.Copy() newAlloc1.ClientStatus = structs.AllocClientStatusFailed + if err = state.UpdateAllocsFromClient(1003, []*structs.Allocation{newAlloc, newAlloc1}); err != nil { t.Fatalf("err: %v", err) } + // Change the state of the node to down if err = state.UpdateNodeStatus(1004, node.ID, structs.NodeStatusDown); err != nil { t.Fatalf("err: %v", err) } @@ -201,14 +203,45 @@ func TestStateStore_UpdateNodeStatus_Node(t *testing.T) { js2, _ := state.JobSummaryByID(alloc1.JobID) js3, _ := state.JobSummaryByID(alloc2.JobID) - if js1.Summary["web"].Lost != 1 { - t.Fatalf("expected: %v, got: %v", 1, js1.Summary["web"].Lost) + expectedSummary1 := structs.JobSummary{ + JobID: alloc.JobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Lost: 1, + }, + }, + CreateIndex: 850, + ModifyIndex: 1004, } - if js2.Summary["web"].Failed != 1 { - t.Fatalf("expected: %v, got: %v", 1, js2.Summary["web"].Failed) + expectedSummary2 := structs.JobSummary{ + JobID: alloc1.JobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Failed: 1, + }, + }, + CreateIndex: 851, + ModifyIndex: 1003, } - if js3.Summary["web"].Lost != 1 { - t.Fatalf("expected: %v, got: %v", 1, js3.Summary["web"].Lost) + expectedSummary3 := structs.JobSummary{ + JobID: alloc2.JobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Lost: 1, + }, + }, + CreateIndex: 852, + ModifyIndex: 1004, + } + + if !reflect.DeepEqual(js1, &expectedSummary1) { + t.Fatalf("expected: %v, got: %v", expectedSummary1, js1) + } + if !reflect.DeepEqual(js2, &expectedSummary2) { + t.Fatalf("expected: %v, got: %#v", expectedSummary2, js2) + } + if !reflect.DeepEqual(js3, &expectedSummary3) { + t.Fatalf("expected: %v, got: %v", expectedSummary3, js3) } notify.verify(t) @@ -1750,6 +1783,73 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { notify.verify(t) } +func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + + state.UpsertJobSummary(900, mock.JobSummary(alloc.JobID)) + if err := state.UpsertJob(999, alloc.Job); err != nil { + t.Fatalf("err: %v", err) + } + + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the delta updates + ts := map[string]*structs.TaskState{"web": &structs.TaskState{State: structs.TaskStatePending}} + update := &structs.Allocation{ + ID: alloc.ID, + ClientStatus: structs.AllocClientStatusRunning, + TaskStates: ts, + JobID: alloc.JobID, + TaskGroup: alloc.TaskGroup, + } + update2 := &structs.Allocation{ + ID: alloc.ID, + ClientStatus: structs.AllocClientStatusPending, + TaskStates: ts, + JobID: alloc.JobID, + TaskGroup: alloc.TaskGroup, + } + + err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + alloc.CreateIndex = 1000 + alloc.ModifyIndex = 1001 + alloc.TaskStates = ts + alloc.ClientStatus = structs.AllocClientStatusPending + if !reflect.DeepEqual(alloc, out) { + t.Fatalf("bad: %#v , actual:%#v", alloc, out) + } + + summary, err := state.JobSummaryByID(alloc.JobID) + expectedSummary := &structs.JobSummary{ + JobID: alloc.JobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Starting: 1, + }, + }, + ModifyIndex: 1001, + } + if err != nil { + t.Fatalf("err: %v", err) + } + if !reflect.DeepEqual(summary, expectedSummary) { + t.Fatalf("expected: %#v, actual: %#v", expectedSummary, summary) + } +} + func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc()