From b0e1f02e2638bf7774ece25e5222357e6476ed7f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 28 Jul 2016 15:23:09 -0700 Subject: [PATCH] Not updating job summaries if jobs are not present --- nomad/state/state_store.go | 27 +++++++++++++------- nomad/state/state_store_test.go | 44 +++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1349e0908..e2e84ea28 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -859,8 +859,14 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo // Handle each of the updated allocations for _, alloc := range allocs { - if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) + rawJob, err := txn.First("jobs", "id", alloc.JobID) + if err != nil { + return fmt.Errorf("unable to query job: %v", err) + } + if rawJob != nil { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } } if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil { return err @@ -938,8 +944,14 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er // Handle the allocations jobs := make(map[string]string, 1) for _, alloc := range allocs { - if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { - return fmt.Errorf("error updating job summary: %v", err) + rawJob, err := txn.First("jobs", "id", alloc.JobID) + if err != nil { + return fmt.Errorf("unable to query job: %v", err) + } + if rawJob != nil { + if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil { + return fmt.Errorf("error updating job summary: %v", err) + } } existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { @@ -1395,7 +1407,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat case structs.AllocClientStatusLost: tgSummary.Lost -= 1 case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: - s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v", + s.logger.Printf("[ERR] state_store: invalid old state of allocation with id: %v, and state: %v", existing.ID, existing.ClientStatus) } summaryChanged = true @@ -1575,9 +1587,6 @@ func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error { } // Calculate the summary for the job for _, alloc := range allocs { - if _, ok := summary.Summary[alloc.TaskGroup]; !ok { - summary.Summary[alloc.TaskGroup] = structs.TaskGroupSummary{} - } tg := summary.Summary[alloc.TaskGroup] switch alloc.ClientStatus { case structs.AllocClientStatusFailed: @@ -1593,8 +1602,8 @@ func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error { } summary.Summary[alloc.TaskGroup] = tg } - // Insert the job summary + // Insert the job summary summary.CreateIndex = r.latestIndex summary.ModifyIndex = r.latestIndex if err := r.txn.Insert("job_summary", summary); err != nil { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 893045a7d..69f831dc1 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1882,6 +1882,50 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) { notify.verify(t) } +// This test ensures an allocation can be updated when there is no job +// associated with it. This will happen when a job is stopped by an user which +// has non-terminal allocations on clients +func TestStateStore_UpdateAlloc_NoJob(t *testing.T) { + state := testStateStore(t) + alloc := mock.Alloc() + + // Upsert a job + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + if err := state.UpsertJob(999, alloc.Job); err != nil { + t.Fatalf("err: %v", err) + } + + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + if err := state.DeleteJob(1001, alloc.JobID); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the desired state of the allocation to stop + allocCopy := alloc.Copy() + allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + if err := state.UpsertAllocs(1002, []*structs.Allocation{allocCopy}); err != nil { + t.Fatalf("err: %v", err) + } + + // Update the client state of the allocation to complete + allocCopy1 := allocCopy.Copy() + allocCopy1.ClientStatus = structs.AllocClientStatusComplete + if err := state.UpdateAllocsFromClient(1003, []*structs.Allocation{allocCopy1}); err != nil { + t.Fatalf("err: %v", err) + } + + out, _ := state.AllocByID(alloc.ID) + // Update the modify index of the alloc before comparing + allocCopy1.ModifyIndex = 1003 + if !reflect.DeepEqual(out, allocCopy1) { + t.Fatalf("expected: %#v \n actual: %#v", allocCopy1, out) + } +} + func TestStateStore_EvictAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc()