Merge pull request #5205 from hashicorp/b-queuedallocs-bugfix

Reconcile child summaries correctly
This commit is contained in:
Preetha 2019-01-18 18:13:19 -06:00 committed by GitHub
commit 2e71168e5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 242 additions and 1 deletions

View file

@ -10,7 +10,6 @@ name = "client1"
# Enable the client
client {
enabled = true
server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}

View file

@ -1401,6 +1401,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
break
}
job := rawJob.(*structs.Job)
// Nothing to do for queued allocations if the job is a parent periodic/parameterized job
if job.IsParameterized() || job.IsPeriodic() {
continue
}
planner := &scheduler.Harness{
State: &snap.StateStore,
}

View file

@ -2816,6 +2816,78 @@ func TestFSM_ReconcileSummaries(t *testing.T) {
}
}
// COMPAT: Remove in 0.11
func TestFSM_ReconcileParentJobSummary(t *testing.T) {
// This test exercises code to handle https://github.com/hashicorp/nomad/issues/3886
t.Parallel()
require := require.New(t)
// Add some state
fsm := testFSM(t)
state := fsm.State()
// Add a node
node := mock.Node()
state.UpsertNode(800, node)
// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(1000, job1)
// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning
// Create an alloc for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning
state.UpsertJob(1010, childJob)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)
summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}
req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
CreateIndex: 1000,
ModifyIndex: out1.ModifyIndex,
Children: &structs.JobChildrenSummary{
Running: 1,
},
}
require.Equal(&expected, out1)
}
func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)

View file

@ -6,6 +6,8 @@ import (
"sort"
"time"
"reflect"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
@ -3049,12 +3051,86 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
if err != nil {
return err
}
// COMPAT: Remove after 0.11
// Iterate over jobs to build a list of parent jobs and their children
parentMap := make(map[string][]*structs.Job)
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
if job.ParentID != "" {
children := parentMap[job.ParentID]
children = append(children, job)
parentMap[job.ParentID] = children
}
}
// Get all the jobs again
iter, err = txn.Get("jobs", "id")
if err != nil {
return err
}
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
if job.IsParameterized() || job.IsPeriodic() {
// COMPAT: Remove after 0.11
// The following block of code fixes incorrect child summaries due to a bug
// See https://github.com/hashicorp/nomad/issues/3886 for details
rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID)
if err != nil {
return err
}
if rawSummary == nil {
continue
}
oldSummary := rawSummary.(*structs.JobSummary)
// Create an empty summary
summary := &structs.JobSummary{
JobID: job.ID,
Namespace: job.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{},
}
// Iterate over children of this job if any to fix summary counts
children := parentMap[job.ID]
for _, childJob := range children {
switch childJob.Status {
case structs.JobStatusPending:
summary.Children.Pending++
case structs.JobStatusDead:
summary.Children.Dead++
case structs.JobStatusRunning:
summary.Children.Running++
}
}
// Insert the job summary if its different
if !reflect.DeepEqual(summary, oldSummary) {
// Set the create index of the summary same as the job's create index
// and the modify index to the current index
summary.CreateIndex = job.CreateIndex
summary.ModifyIndex = index
if err := txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
}
// Done with handling a parent job, continue to next
continue
}
// Create a job summary for the job
summary := &structs.JobSummary{

View file

@ -4354,6 +4354,95 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
}
}
func TestStateStore_ReconcileParentJobSummary(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
// Add a node
node := mock.Node()
state.UpsertNode(80, node)
// Make a parameterized job
job1 := mock.BatchJob()
job1.ID = "test"
job1.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: "random",
}
job1.TaskGroups[0].Count = 1
state.UpsertJob(100, job1)
// Make a child job
childJob := job1.Copy()
childJob.ID = job1.ID + "dispatch-23423423"
childJob.ParentID = job1.ID
childJob.Dispatched = true
childJob.Status = structs.JobStatusRunning
// Make some allocs for child job
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.Job = childJob
alloc.JobID = childJob.ID
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
alloc2.Job = childJob
alloc2.JobID = childJob.ID
alloc2.ClientStatus = structs.AllocClientStatusFailed
require.Nil(state.UpsertJob(110, childJob))
require.Nil(state.UpsertAllocs(111, []*structs.Allocation{alloc, alloc2}))
// Make the summary incorrect in the state store
summary, err := state.JobSummaryByID(nil, job1.Namespace, job1.ID)
require.Nil(err)
summary.Children = nil
summary.Summary = make(map[string]structs.TaskGroupSummary)
summary.Summary["web"] = structs.TaskGroupSummary{
Queued: 1,
}
// Delete the child job summary
state.DeleteJobSummary(125, childJob.Namespace, childJob.ID)
state.ReconcileJobSummaries(120)
ws := memdb.NewWatchSet()
// Verify parent summary is corrected
summary, _ = state.JobSummaryByID(ws, alloc.Namespace, job1.ID)
expectedSummary := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: make(map[string]structs.TaskGroupSummary),
Children: &structs.JobChildrenSummary{
Running: 1,
},
CreateIndex: 100,
ModifyIndex: 120,
}
require.Equal(&expectedSummary, summary)
// Verify child job summary is also correct
childSummary, _ := state.JobSummaryByID(ws, childJob.Namespace, childJob.ID)
expectedChildSummary := structs.JobSummary{
JobID: childJob.ID,
Namespace: childJob.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Running: 1,
Failed: 1,
},
},
CreateIndex: 110,
ModifyIndex: 120,
}
require.Equal(&expectedChildSummary, childSummary)
}
func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) {
state := testStateStore(t)