From ef2de73315e6cdf608ff7388b16b1906074b7cdf Mon Sep 17 00:00:00 2001 From: hc-github-team-nomad-core <82989552+hc-github-team-nomad-core@users.noreply.github.com> Date: Wed, 29 Nov 2023 12:33:08 -0600 Subject: [PATCH] backport of commit d29ac461a7229ca61da9561c1058e14aecbe4b90 (#19223) Co-authored-by: Luiz Aoqui --- .changelog/19147.txt | 3 + command/job_restart.go | 25 +- command/job_restart_test.go | 290 +++++++++++++++--- website/content/docs/commands/job/restart.mdx | 3 +- 4 files changed, 273 insertions(+), 48 deletions(-) create mode 100644 .changelog/19147.txt diff --git a/.changelog/19147.txt b/.changelog/19147.txt new file mode 100644 index 000000000..2122c8732 --- /dev/null +++ b/.changelog/19147.txt @@ -0,0 +1,3 @@ +```release-note:bug +cli: Fixed the `nomad job restart` command to create replacements for batch and system jobs and to prevent sysbatch jobs from being rescheduled since they never create replacements +``` diff --git a/command/job_restart.go b/command/job_restart.go index 7e7d953f9..93eb1e11f 100644 --- a/command/job_restart.go +++ b/command/job_restart.go @@ -187,7 +187,8 @@ Restart Options: in-place. Since the group is not modified the restart does not create a new deployment, and so values defined in 'update' blocks, such as 'max_parallel', are not taken into account. This option cannot be used with - '-task'. + '-task'. Only jobs of type 'batch', 'service', and 'system' can be + rescheduled. -task= Specify the task to restart. Can be specified multiple times. If groups are @@ -286,6 +287,16 @@ func (c *JobRestartCommand) Run(args []string) int { go c.handleSignal(c.sigsCh, activeCh) + // Verify job type can be rescheduled. + if c.reschedule { + switch *job.Type { + case api.JobTypeBatch, api.JobTypeService, api.JobTypeSystem: + default: + c.Ui.Error(fmt.Sprintf("Jobs of type %q are not allowed to be rescheduled.", *job.Type)) + return 1 + } + } + // Confirm that we should restart a multi-region job in a single region. if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() { c.Ui.Output("\nJob restart canceled.") @@ -952,6 +963,18 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error { return fmt.Errorf("Failed to stop allocation: %w", err) } + // Allocations for system jobs do not get replaced by the scheduler after + // being stopped, so an eval is needed to trigger the reconciler. + if *alloc.Job.Type == api.JobTypeSystem { + opts := api.EvalOptions{ + ForceReschedule: true, + } + _, _, err := c.client.Jobs().EvaluateWithOpts(*alloc.Job.ID, opts, nil) + if err != nil { + return fmt.Errorf("Failed evaluate job: %w", err) + } + } + // errCh receives an error if anything goes wrong or nil when the // replacement allocation is running. // Use a buffered channel to prevent both goroutine from blocking trying to diff --git a/command/job_restart_test.go b/command/job_restart_test.go index 6d473dcbf..5f15791a4 100644 --- a/command/job_restart_test.go +++ b/command/job_restart_test.go @@ -259,8 +259,19 @@ func TestJobRestartCommand_Run(t *testing.T) { SetConfig("run_for", "1m"). SetConfig("exit_code", 0) - jobID := "test_job_restart_cmd" - job := api.NewServiceJob(jobID, jobID, "global", 1). + batchJob := api.NewBatchJob("test_job_batch", "test_job_batch", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("single_task", 3). + AddTask(mainTask), + ). + AddTaskGroup( + api.NewTaskGroup("multiple_tasks", 2). + AddTask(prestartTask). + AddTask(sidecarTask). + AddTask(mainTask), + ) + serviceJob := api.NewServiceJob("test_job_service", "test_job_service", "global", 1). AddDatacenter("dc1"). AddTaskGroup( api.NewTaskGroup("single_task", 3). @@ -613,55 +624,195 @@ func TestJobRestartCommand_Run(t *testing.T) { }, } - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - // Run each test case in parallel because they are fairly slow. - ci.Parallel(t) + for _, job := range []*api.Job{batchJob, serviceJob} { + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("%s/%s", *job.Type, tc.name), func(t *testing.T) { + // Run each test case in parallel because they are fairly slow. + ci.Parallel(t) + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + // User separate cluster for each test case so they can run in + // parallel without affecting each other. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) + + allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Fetch allocations before the restart so we know which ones are + // supposed to be affected in case the test reschedules allocs. + allocStubs, _, err = client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + + // Prepend server URL and append job ID to the test case command. + args := []string{"-address", url, "-yes"} + args = append(args, tc.args...) + args = append(args, *job.ID) + + // Run job restart command. + code = cmd.Run(args) + must.Eq(t, code, tc.expectedCode) + + // Run test case validation function. + if tc.validateFn != nil { + tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) + } + }) + } + } +} + +func TestJobRestartCommand_Run_system_reschedule(t *testing.T) { + ci.Parallel(t) + + // Create a system job. + job := api.NewSystemJob("test_job", "test_job", "global", 100). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("group", 1). + AddTask( + api.NewTask("task", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0), + ), + ) + + // Start a server and 3 clients. + srv, client, url := testServer(t, false, nil) + defer srv.Shutdown() + + srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC + testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) + testClient(t, "client2", newClientAgentConfigFunc("", "", srvRPCAddr)) + testClient(t, "client3", newClientAgentConfigFunc("", "", srvRPCAddr)) + + waitForNodes(t, client) + + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) + + allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Run job restart command. + args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *job.ID} + code = cmd.Run(args) + must.Eq(t, code, 0) + + reschedules := map[string]bool{} + for _, alloc := range allocStubs { + reschedules[alloc.ID] = true + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations were rescheduled properly. + stdout := ui.OutputWriter.String() + must.StrContains(t, stdout, "Restarting 3 allocations") + for _, alloc := range allocStubs { + must.StrContains(t, stdout, fmt.Sprintf(`Rescheduling allocation "%s"`, alloc.ID)) + must.StrContains(t, stdout, fmt.Sprintf(`Allocation "%s" replaced by`, alloc.ID)) + } +} + +func TestJobRestartCommand_Run_rescheduleNotSupported(t *testing.T) { + ci.Parallel(t) + + // Create a sysbatch job. + sysbatchJob := api.NewSysbatchJob("test_sysbatch_job", "test_sysbatch_job", "global", 100). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("group", 1). + AddTask( + api.NewTask("task", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0), + ), + ) + + // Start a server and a client. + srv, client, url := testServer(t, false, nil) + defer srv.Shutdown() + + srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC + testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr)) + waitForNodes(t, client) + + testCases := []struct { + name string + job *api.Job + }{ + { + name: "sysbatch job", + job: sysbatchJob, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { // Initialize UI and command. ui := cli.NewMockUi() cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} - // Start client and server and wait for node to be ready. - // User separate cluster for each test case so they can run in - // parallel without affecting each other. - srv, client, url := testServer(t, true, nil) - defer srv.Shutdown() - - waitForNodes(t, client) - // Register test job and wait for its allocs to be running. - resp, _, err := client.Jobs().Register(job, nil) + resp, _, err := client.Jobs().Register(tc.job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) - allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + allocStubs, _, err := client.Jobs().Allocations(*tc.job.ID, true, nil) must.NoError(t, err) for _, alloc := range allocStubs { waitForAllocRunning(t, client, alloc.ID) } - // Fetch allocations before the restart so we know which ones are - // supposed to be affected in case the test reschedules allocs. - allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil) - must.NoError(t, err) - - // Prepend server URL and append job ID to the test case command. - args := []string{"-address", url, "-yes"} - args = append(args, tc.args...) - args = append(args, jobID) - - // Run job restart command. + // Run job restart command and expect error. + args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *tc.job.ID} code = cmd.Run(args) - must.Eq(t, code, tc.expectedCode) + must.Eq(t, code, 1) - // Run test case validation function. - if tc.validateFn != nil { - tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) - } + stderr := ui.ErrorWriter.String() + must.StrContains(t, stderr, "not allowed to be rescheduled") }) } } @@ -752,10 +903,18 @@ func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) { code := cmd.Run(args) if tc.expectedErr != "" { - must.NonZero(t, code) + must.NonZero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } else { - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } }) } @@ -791,7 +950,11 @@ func TestJobRestartCommand_noAllocs(t *testing.T) { "-yes", jobID, }) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart") } @@ -810,13 +973,19 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) { // Register test job with 3 allocs. jobID := "test_job_restart_reschedule_fail" job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(3) + job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} resp, _, err := client.Jobs().Register(job, nil) must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Wait for allocs to be running. @@ -863,7 +1032,11 @@ func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } ui.OutputWriter.Reset() @@ -1076,9 +1249,17 @@ namespace "default" { // Run command. code := cmd.Run(args) if tc.expectedErr == "" { - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) } else { - must.One(t, code) + must.One(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) } }) @@ -1122,8 +1303,9 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { jobID := nonAlphaNum.ReplaceAllString(tc.name, "-") job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) job.TaskGroups[0].Count = pointer.Of(2) - job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m" + job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"} job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay job.TaskGroups[0].Tasks[0].Services = []*api.Service{{ Name: "service", @@ -1134,7 +1316,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout:\n%s\n\nstderr:\n%s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Wait for alloc to be running. @@ -1155,7 +1341,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { args = append(args, jobID) code = cmd.Run(args) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout:\n%s\n\nstderr:\n%s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) // Wait for all allocs to restart. reschedules := map[string]bool{} @@ -1380,7 +1570,11 @@ func TestJobRestartCommand_filterAllocs(t *testing.T) { args := append(tc.args, "-verbose", "-yes", "example") code, err := cmd.parseAndValidate(args) must.NoError(t, err) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) got := cmd.filterAllocs(allAllocs) must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool { @@ -1423,7 +1617,11 @@ func TestJobRestartCommand_onErrorFail(t *testing.T) { must.NoError(t, err) code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + must.Zero(t, code, must.Sprintf( + "stdout: %s\n\nstderr: %s\n", + ui.OutputWriter.String(), + ui.ErrorWriter.String()), + ) ui.OutputWriter.Reset() // Create a proxy to inject an error after 2 allocation restarts. diff --git a/website/content/docs/commands/job/restart.mdx b/website/content/docs/commands/job/restart.mdx index 601fce1bf..6f87e40a8 100644 --- a/website/content/docs/commands/job/restart.mdx +++ b/website/content/docs/commands/job/restart.mdx @@ -86,7 +86,8 @@ of the exact job ID. restarted in-place. Since the group is not modified the restart does not create a new deployment, and so values defined in [`update`][] blocks, such as [`max_parallel`][], are not taken into account. This option cannot be used - with `-task`. + with `-task`. Only jobs of type `batch`, `service`, and `system` can be + rescheduled. - `-on-error=`: Determines what action to take when an error happens during a restart batch. If `ask` the command stops and waits for user