From b0e048bfa4971be8ba80577e8e7321a004db4d31 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 9 Mar 2021 10:45:36 -0500 Subject: [PATCH 1/3] periodic: always reset periodic children status Fixes a bug where Nomad reports negative or incorrect running children counts for periodic jobs. The periodic dispatcher derives a child job without reseting the status. If the periodic job has a `running` status, the derived job will start as `running` status and transition to `pending`. Since this is unexpected transition, the counting in StateStore.setJobSummary gets out of sync and result in negative/incorrect values. Note that this only affects periodic jobs after a leader transition. During the first job registration, the job is added with `pending` or `""` status. However, after a leader transition, the new leader repopulates the dispatcher heap with `"running"` status and triggers the bug. --- nomad/periodic.go | 4 +++- nomad/periodic_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/nomad/periodic.go b/nomad/periodic.go index a2ea2219f..c4f40066f 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -444,12 +444,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( }() // Create a copy of the periodic job, give it a derived ID/Name and make it - // non-periodic. + // non-periodic in initial status derived = periodicJob.Copy() derived.ParentID = periodicJob.ID derived.ID = p.derivedJobID(periodicJob, time) derived.Name = derived.ID derived.Periodic = nil + derived.Status = "" + derived.StatusDescription = "" return } diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 7618dacd5..605032cd3 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -70,6 +70,20 @@ func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, namespace, pare return launches, nil } +func (m *MockJobEvalDispatcher) dispatchedJobs(parent *structs.Job) []*structs.Job { + m.lock.Lock() + defer m.lock.Unlock() + + jobs := []*structs.Job{} + for _, job := range m.Jobs { + if job.ParentID == parent.ID && job.Namespace == parent.Namespace { + jobs = append(jobs, job) + } + } + + return jobs +} + type times []time.Time func (t times) Len() int { return len(t) } @@ -754,3 +768,22 @@ func TestPeriodicDispatch_RunningChildren_ActiveAllocs(t *testing.T) { t.Fatalf("RunningChildren should return true") } } + +// TestPeriodicDispatch_JobEmptyStatus asserts that dispatched +// job will always has an empty status +func TestPeriodicDispatch_JobEmptyStatus(t *testing.T) { + t.Parallel() + p, m := testPeriodicDispatcher(t) + + job := testPeriodicJob(time.Now().Add(1 * time.Second)) + job.Status = structs.JobStatusRunning + + err := p.Add(job) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + dispatched := m.dispatchedJobs(job) + require.NotEmpty(t, dispatched) + require.Empty(t, dispatched[0].Status) +} From e643742a385441c6a28406216e995a2ee43bd540 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 24 Mar 2021 17:42:10 -0400 Subject: [PATCH 2/3] Add a test for parameterized summary counts --- e2e/e2e_test.go | 1 + e2e/e2eutil/job.go | 50 +++++++++++- e2e/parameterized/input/simple.nomad | 26 +++++++ e2e/parameterized/parameterized.go | 90 ++++++++++++++++++++++ e2e/periodic/input/simple.nomad | 4 +- e2e/periodic/periodic.go | 1 + nomad/job_endpoint_test.go | 111 +++++++++++++++++++++++++++ 7 files changed, 279 insertions(+), 4 deletions(-) create mode 100644 e2e/parameterized/input/simple.nomad create mode 100644 e2e/parameterized/parameterized.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index deecc1c4d..fe3f4077c 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -26,6 +26,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/nodedrain" _ "github.com/hashicorp/nomad/e2e/nomad09upgrade" _ "github.com/hashicorp/nomad/e2e/nomadexec" + _ "github.com/hashicorp/nomad/e2e/parameterized" _ "github.com/hashicorp/nomad/e2e/periodic" _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/quotas" diff --git a/e2e/e2eutil/job.go b/e2e/e2eutil/job.go index d0a74a6df..96d29f67e 100644 --- a/e2e/e2eutil/job.go +++ b/e2e/e2eutil/job.go @@ -40,8 +40,7 @@ func Register(jobID, jobFilePath string) error { return nil } -// PeriodicForce forces a periodic job to dispatch, returning the child job ID -// or an error +// PeriodicForce forces a periodic job to dispatch func PeriodicForce(jobID string) error { // nomad job periodic force cmd := exec.Command("nomad", "job", "periodic", "force", jobID) @@ -54,6 +53,29 @@ func PeriodicForce(jobID string) error { return nil } +// Dispatch dispatches a parameterized job +func Dispatch(jobID string, meta map[string]string, payload string) error { + // nomad job periodic force + args := []string{"job", "dispatch"} + for k, v := range meta { + args = append(args, "-meta", fmt.Sprintf("%v=%v", k, v)) + } + args = append(args, jobID) + if payload != "" { + args = append(args, "-") + } + + cmd := exec.Command("nomad", args...) + cmd.Stdin = strings.NewReader(payload) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("could not dispatch job: %w\n%v", err, string(out)) + } + + return nil +} + // JobInspectTemplate runs nomad job inspect and formats the output // using the specified go template func JobInspectTemplate(jobID, template string) (string, error) { @@ -102,7 +124,10 @@ func ChildrenJobSummary(jobID string) ([]map[string]string, error) { section, err := GetSection(out, "Children Job Summary") if err != nil { - return nil, fmt.Errorf("could not find children job summary section: %w", err) + section, err = GetSection(out, "Parameterized Job Summary") + if err != nil { + return nil, fmt.Errorf("could not find children job summary section: %w", err) + } } summary, err := ParseColumns(section) @@ -131,3 +156,22 @@ func PreviouslyLaunched(jobID string) ([]map[string]string, error) { return summary, nil } + +func DispatchedJobs(jobID string) ([]map[string]string, error) { + out, err := Command("nomad", "job", "status", jobID) + if err != nil { + return nil, fmt.Errorf("nomad job status failed: %w", err) + } + + section, err := GetSection(out, "Dispatched Jobs") + if err != nil { + return nil, fmt.Errorf("could not find previously launched jobs section: %w", err) + } + + summary, err := ParseColumns(section) + if err != nil { + return nil, fmt.Errorf("could not parse previously launched jobs section: %w", err) + } + + return summary, nil +} diff --git a/e2e/parameterized/input/simple.nomad b/e2e/parameterized/input/simple.nomad new file mode 100644 index 000000000..868095a57 --- /dev/null +++ b/e2e/parameterized/input/simple.nomad @@ -0,0 +1,26 @@ +job "periodic" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + operator = "set_contains_any" + value = "darwin,linux" + } + + parameterized { + meta_optional = ["i"] + } + + group "group" { + task "task" { + driver = "docker" + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["-c", "sleep 5"] + } + } + } +} diff --git a/e2e/parameterized/parameterized.go b/e2e/parameterized/parameterized.go new file mode 100644 index 000000000..33df7cf73 --- /dev/null +++ b/e2e/parameterized/parameterized.go @@ -0,0 +1,90 @@ +package parameterized + +import ( + "fmt" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +type ParameterizedTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Parameterized", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(ParameterizedTest), + }, + }) +} + +func (tc *ParameterizedTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) +} + +func (tc *ParameterizedTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + j := nomadClient.Jobs() + + for _, id := range tc.jobIDs { + j.Deregister(id, true, nil) + } + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +func (tc *ParameterizedTest) TestParameterizedDispatch_Basic(f *framework.F) { + t := f.T() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("dispatch-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + + // register job + require.NoError(t, e2eutil.Register(jobID, "parameterized/input/simple.nomad")) + + // force dispatch + dispatched := 4 + + for i := 0; i < dispatched; i++ { + require.NoError(t, e2eutil.Dispatch(jobID, map[string]string{"i": fmt.Sprintf("%v", i)}, "")) + } + + testutil.WaitForResult(func() (bool, error) { + children, err := e2eutil.DispatchedJobs(jobID) + if err != nil { + return false, err + } + + dead := 0 + for _, c := range children { + if c["Status"] != "dead" { + return false, fmt.Errorf("expected periodic job to be dead") + } + dead++ + } + + if dead != dispatched { + return false, fmt.Errorf("expected %d but found %d children", dispatched, dead) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + // Assert there are no pending children + summary, err := e2eutil.ChildrenJobSummary(jobID) + require.NoError(t, err) + require.Len(t, summary, 1) + require.Equal(t, summary[0]["Pending"], "0") + require.Equal(t, summary[0]["Running"], "0") + require.Equal(t, summary[0]["Dead"], fmt.Sprintf("%v", dispatched)) +} diff --git a/e2e/periodic/input/simple.nomad b/e2e/periodic/input/simple.nomad index c63974c81..ca315c23c 100644 --- a/e2e/periodic/input/simple.nomad +++ b/e2e/periodic/input/simple.nomad @@ -4,10 +4,12 @@ job "periodic" { constraint { attribute = "${attr.kernel.name}" - value = "linux" + operator = "set_contains_any" + value = "darwin,linux" } + periodic { cron = "* * * * *" prohibit_overlap = true diff --git a/e2e/periodic/periodic.go b/e2e/periodic/periodic.go index c20a32625..cef0d11f6 100644 --- a/e2e/periodic/periodic.go +++ b/e2e/periodic/periodic.go @@ -74,5 +74,6 @@ func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) { require.NoError(t, err) require.Len(t, summary, 1) require.Equal(t, summary[0]["Pending"], "0") + require.Equal(t, summary[0]["Running"], "0") require.Equal(t, summary[0]["Dead"], "1") } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 82dace1c4..2278ca71c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6401,6 +6401,117 @@ func TestJobEndpoint_Dispatch(t *testing.T) { } } +// TestJobEndpoint_Dispatch_JobChildrenSummary asserts that the job summary is updated +// appropriately as its dispatched/children jobs status are updated. +func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + state := s1.fsm.State() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + node := mock.Node() + require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node)) + + parameterizedJob := mock.BatchJob() + parameterizedJob.ParameterizedJob = &structs.ParameterizedJobConfig{} + + // Create the register request + regReq := &structs.JobRegisterRequest{ + Job: parameterizedJob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: parameterizedJob.Namespace, + }, + } + var regResp structs.JobRegisterResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, ®Resp)) + + jobChildren := func() *structs.JobChildrenSummary { + summary, err := state.JobSummaryByID(nil, parameterizedJob.Namespace, parameterizedJob.ID) + require.NoError(t, err) + + return summary.Children + } + require.Equal(t, &structs.JobChildrenSummary{}, jobChildren()) + + // dispatch a child job + dispatchReq := &structs.JobDispatchRequest{ + JobID: parameterizedJob.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: parameterizedJob.Namespace, + }, + } + var dispatchResp structs.JobDispatchResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatchReq, &dispatchResp) + require.NoError(t, err) + + nextIdx := dispatchResp.Index + 1 + + require.Equal(t, &structs.JobChildrenSummary{Pending: 1}, jobChildren()) + + dispatchedJob, err := state.JobByID(nil, parameterizedJob.Namespace, dispatchResp.DispatchedJobID) + require.NoError(t, err) + require.NotNil(t, dispatchedJob) + + dispatchedStatus := func() string { + job, err := state.JobByID(nil, dispatchedJob.Namespace, dispatchedJob.ID) + require.NoError(t, err) + require.NotNil(t, job) + + return job.Status + } + + // Let's start a alloc for the dispatch job and walk through states + // Note that job summary reports 1 running even when alloc is pending! + nextIdx++ + alloc := mock.Alloc() + alloc.Job = dispatchedJob + alloc.JobID = dispatchedJob.ID + alloc.TaskGroup = dispatchedJob.TaskGroups[0].Name + alloc.Namespace = dispatchedJob.Namespace + alloc.ClientStatus = structs.AllocClientStatusPending + err = s1.State().UpsertAllocs(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{alloc}) + require.NoError(t, err) + require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren()) + require.Equal(t, structs.JobStatusRunning, dispatchedStatus()) + + // mark the creation eval as completed + nextIdx++ + eval, err := state.EvalByID(nil, dispatchResp.EvalID) + require.NoError(t, err) + eval = eval.Copy() + eval.Status = structs.EvalStatusComplete + require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, nextIdx, []*structs.Evaluation{eval})) + + updateAllocStatus := func(status string) { + nextIdx++ + nalloc, err := state.AllocByID(nil, alloc.ID) + require.NoError(t, err) + nalloc = nalloc.Copy() + nalloc.ClientStatus = status + err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc}) + require.NoError(t, err) + } + + // job should remain remaining when alloc runs + updateAllocStatus(structs.AllocClientStatusRunning) + require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren()) + require.Equal(t, structs.JobStatusRunning, dispatchedStatus()) + + // job should be dead after alloc completes + updateAllocStatus(structs.AllocClientStatusComplete) + require.Equal(t, &structs.JobChildrenSummary{Dead: 1}, jobChildren()) + require.Equal(t, structs.JobStatusDead, dispatchedStatus()) +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t) From 5d75705edd01403fae5aff6855370df67d348b71 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Mar 2021 15:14:21 -0400 Subject: [PATCH 3/3] dispatched parameterized job should clear status too --- nomad/job_endpoint.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 07e7ea0aa..9f4371481 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1887,13 +1887,15 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } - // Derive the child job and commit it via Raft + // Derive the child job and commit it via Raft - with initial status dispatchJob := parameterizedJob.Copy() dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) dispatchJob.ParentID = parameterizedJob.ID dispatchJob.Name = dispatchJob.ID dispatchJob.SetSubmitTime() dispatchJob.Dispatched = true + dispatchJob.Status = "" + dispatchJob.StatusDescription = "" // Merge in the meta data for k, v := range args.Meta {