Not updating job summaries if jobs are not present

This commit is contained in:
Diptanu Choudhury 2016-07-28 15:23:09 -07:00
parent cc029dc11b
commit b0e1f02e26
2 changed files with 62 additions and 9 deletions

View File

@ -859,8 +859,14 @@ func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allo
// Handle each of the updated allocations
for _, alloc := range allocs {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
rawJob, err := txn.First("jobs", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to query job: %v", err)
}
if rawJob != nil {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
}
if err := s.nestedUpdateAllocFromClient(txn, watcher, index, alloc); err != nil {
return err
@ -938,8 +944,14 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
// Handle the allocations
jobs := make(map[string]string, 1)
for _, alloc := range allocs {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
rawJob, err := txn.First("jobs", "id", alloc.JobID)
if err != nil {
return fmt.Errorf("unable to query job: %v", err)
}
if rawJob != nil {
if err := s.updateSummaryWithAlloc(index, alloc, watcher, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}
}
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
@ -1395,7 +1407,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[ERR] state_store: invalid old state of allocation with id:%v, and state: %v",
s.logger.Printf("[ERR] state_store: invalid old state of allocation with id: %v, and state: %v",
existing.ID, existing.ClientStatus)
}
summaryChanged = true
@ -1575,9 +1587,6 @@ func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error {
}
// Calculate the summary for the job
for _, alloc := range allocs {
if _, ok := summary.Summary[alloc.TaskGroup]; !ok {
summary.Summary[alloc.TaskGroup] = structs.TaskGroupSummary{}
}
tg := summary.Summary[alloc.TaskGroup]
switch alloc.ClientStatus {
case structs.AllocClientStatusFailed:
@ -1593,8 +1602,8 @@ func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error {
}
summary.Summary[alloc.TaskGroup] = tg
}
// Insert the job summary
// Insert the job summary
summary.CreateIndex = r.latestIndex
summary.ModifyIndex = r.latestIndex
if err := r.txn.Insert("job_summary", summary); err != nil {

View File

@ -1882,6 +1882,50 @@ func TestStateStore_UpdateAlloc_Alloc(t *testing.T) {
notify.verify(t)
}
// This test ensures an allocation can be updated when there is no job
// associated with it. This will happen when a job is stopped by an user which
// has non-terminal allocations on clients
func TestStateStore_UpdateAlloc_NoJob(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
// Upsert a job
state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))
if err := state.UpsertJob(999, alloc.Job); err != nil {
t.Fatalf("err: %v", err)
}
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
if err := state.DeleteJob(1001, alloc.JobID); err != nil {
t.Fatalf("err: %v", err)
}
// Update the desired state of the allocation to stop
allocCopy := alloc.Copy()
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(1002, []*structs.Allocation{allocCopy}); err != nil {
t.Fatalf("err: %v", err)
}
// Update the client state of the allocation to complete
allocCopy1 := allocCopy.Copy()
allocCopy1.ClientStatus = structs.AllocClientStatusComplete
if err := state.UpdateAllocsFromClient(1003, []*structs.Allocation{allocCopy1}); err != nil {
t.Fatalf("err: %v", err)
}
out, _ := state.AllocByID(alloc.ID)
// Update the modify index of the alloc before comparing
allocCopy1.ModifyIndex = 1003
if !reflect.DeepEqual(out, allocCopy1) {
t.Fatalf("expected: %#v \n actual: %#v", allocCopy1, out)
}
}
func TestStateStore_EvictAlloc_Alloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()