backport of commit d29ac461a7229ca61da9561c1058e14aecbe4b90 (#19223)

Co-authored-by: Luiz Aoqui <luiz@hashicorp.com>
This commit is contained in:
hc-github-team-nomad-core 2023-11-29 12:33:08 -06:00 committed by GitHub
parent 406e2a5658
commit ef2de73315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 273 additions and 48 deletions

3
.changelog/19147.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
cli: Fixed the `nomad job restart` command to create replacements for batch and system jobs and to prevent sysbatch jobs from being rescheduled since they never create replacements
```

View File

@ -187,7 +187,8 @@ Restart Options:
in-place. Since the group is not modified the restart does not create a new
deployment, and so values defined in 'update' blocks, such as
'max_parallel', are not taken into account. This option cannot be used with
'-task'.
'-task'. Only jobs of type 'batch', 'service', and 'system' can be
rescheduled.
-task=<task-name>
Specify the task to restart. Can be specified multiple times. If groups are
@ -286,6 +287,16 @@ func (c *JobRestartCommand) Run(args []string) int {
go c.handleSignal(c.sigsCh, activeCh)
// Verify job type can be rescheduled.
if c.reschedule {
switch *job.Type {
case api.JobTypeBatch, api.JobTypeService, api.JobTypeSystem:
default:
c.Ui.Error(fmt.Sprintf("Jobs of type %q are not allowed to be rescheduled.", *job.Type))
return 1
}
}
// Confirm that we should restart a multi-region job in a single region.
if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() {
c.Ui.Output("\nJob restart canceled.")
@ -952,6 +963,18 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error {
return fmt.Errorf("Failed to stop allocation: %w", err)
}
// Allocations for system jobs do not get replaced by the scheduler after
// being stopped, so an eval is needed to trigger the reconciler.
if *alloc.Job.Type == api.JobTypeSystem {
opts := api.EvalOptions{
ForceReschedule: true,
}
_, _, err := c.client.Jobs().EvaluateWithOpts(*alloc.Job.ID, opts, nil)
if err != nil {
return fmt.Errorf("Failed evaluate job: %w", err)
}
}
// errCh receives an error if anything goes wrong or nil when the
// replacement allocation is running.
// Use a buffered channel to prevent both goroutine from blocking trying to

View File

@ -259,8 +259,19 @@ func TestJobRestartCommand_Run(t *testing.T) {
SetConfig("run_for", "1m").
SetConfig("exit_code", 0)
jobID := "test_job_restart_cmd"
job := api.NewServiceJob(jobID, jobID, "global", 1).
batchJob := api.NewBatchJob("test_job_batch", "test_job_batch", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("single_task", 3).
AddTask(mainTask),
).
AddTaskGroup(
api.NewTaskGroup("multiple_tasks", 2).
AddTask(prestartTask).
AddTask(sidecarTask).
AddTask(mainTask),
)
serviceJob := api.NewServiceJob("test_job_service", "test_job_service", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("single_task", 3).
@ -613,55 +624,195 @@ func TestJobRestartCommand_Run(t *testing.T) {
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// Run each test case in parallel because they are fairly slow.
ci.Parallel(t)
for _, job := range []*api.Job{batchJob, serviceJob} {
for _, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("%s/%s", *job.Type, tc.name), func(t *testing.T) {
// Run each test case in parallel because they are fairly slow.
ci.Parallel(t)
// Initialize UI and command.
ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}
// Start client and server and wait for node to be ready.
// User separate cluster for each test case so they can run in
// parallel without affecting each other.
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
waitForNodes(t, client)
// Register test job and wait for its allocs to be running.
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
}
// Fetch allocations before the restart so we know which ones are
// supposed to be affected in case the test reschedules allocs.
allocStubs, _, err = client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
// Prepend server URL and append job ID to the test case command.
args := []string{"-address", url, "-yes"}
args = append(args, tc.args...)
args = append(args, *job.ID)
// Run job restart command.
code = cmd.Run(args)
must.Eq(t, code, tc.expectedCode)
// Run test case validation function.
if tc.validateFn != nil {
tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String())
}
})
}
}
}
func TestJobRestartCommand_Run_system_reschedule(t *testing.T) {
ci.Parallel(t)
// Create a system job.
job := api.NewSystemJob("test_job", "test_job", "global", 100).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("group", 1).
AddTask(
api.NewTask("task", "mock_driver").
SetConfig("run_for", "1m").
SetConfig("exit_code", 0),
),
)
// Start a server and 3 clients.
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC
testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr))
testClient(t, "client2", newClientAgentConfigFunc("", "", srvRPCAddr))
testClient(t, "client3", newClientAgentConfigFunc("", "", srvRPCAddr))
waitForNodes(t, client)
// Initialize UI and command.
ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}
// Register test job and wait for its allocs to be running.
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(*job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
}
// Run job restart command.
args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *job.ID}
code = cmd.Run(args)
must.Eq(t, code, 0)
reschedules := map[string]bool{}
for _, alloc := range allocStubs {
reschedules[alloc.ID] = true
}
waitAllocsRescheduled(t, client, reschedules)
// Check that allocations were rescheduled properly.
stdout := ui.OutputWriter.String()
must.StrContains(t, stdout, "Restarting 3 allocations")
for _, alloc := range allocStubs {
must.StrContains(t, stdout, fmt.Sprintf(`Rescheduling allocation "%s"`, alloc.ID))
must.StrContains(t, stdout, fmt.Sprintf(`Allocation "%s" replaced by`, alloc.ID))
}
}
func TestJobRestartCommand_Run_rescheduleNotSupported(t *testing.T) {
ci.Parallel(t)
// Create a sysbatch job.
sysbatchJob := api.NewSysbatchJob("test_sysbatch_job", "test_sysbatch_job", "global", 100).
AddDatacenter("dc1").
AddTaskGroup(
api.NewTaskGroup("group", 1).
AddTask(
api.NewTask("task", "mock_driver").
SetConfig("run_for", "1m").
SetConfig("exit_code", 0),
),
)
// Start a server and a client.
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
srvRPCAddr := srv.GetConfig().AdvertiseAddrs.RPC
testClient(t, "client1", newClientAgentConfigFunc("", "", srvRPCAddr))
waitForNodes(t, client)
testCases := []struct {
name string
job *api.Job
}{
{
name: "sysbatch job",
job: sysbatchJob,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Initialize UI and command.
ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}
// Start client and server and wait for node to be ready.
// User separate cluster for each test case so they can run in
// parallel without affecting each other.
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
waitForNodes(t, client)
// Register test job and wait for its allocs to be running.
resp, _, err := client.Jobs().Register(job, nil)
resp, _, err := client.Jobs().Register(tc.job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil)
allocStubs, _, err := client.Jobs().Allocations(*tc.job.ID, true, nil)
must.NoError(t, err)
for _, alloc := range allocStubs {
waitForAllocRunning(t, client, alloc.ID)
}
// Fetch allocations before the restart so we know which ones are
// supposed to be affected in case the test reschedules allocs.
allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil)
must.NoError(t, err)
// Prepend server URL and append job ID to the test case command.
args := []string{"-address", url, "-yes"}
args = append(args, tc.args...)
args = append(args, jobID)
// Run job restart command.
// Run job restart command and expect error.
args := []string{"-address", url, "-yes", "-verbose", "-reschedule", *tc.job.ID}
code = cmd.Run(args)
must.Eq(t, code, tc.expectedCode)
must.Eq(t, code, 1)
// Run test case validation function.
if tc.validateFn != nil {
tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String())
}
stderr := ui.ErrorWriter.String()
must.StrContains(t, stderr, "not allowed to be rescheduled")
})
}
}
@ -752,10 +903,18 @@ func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) {
code := cmd.Run(args)
if tc.expectedErr != "" {
must.NonZero(t, code)
must.NonZero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr)
} else {
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
}
})
}
@ -791,7 +950,11 @@ func TestJobRestartCommand_noAllocs(t *testing.T) {
"-yes",
jobID,
})
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart")
}
@ -810,13 +973,19 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) {
// Register test job with 3 allocs.
jobID := "test_job_restart_reschedule_fail"
job := testJob(jobID)
job.Type = pointer.Of(api.JobTypeService)
job.TaskGroups[0].Count = pointer.Of(3)
job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"}
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Wait for allocs to be running.
@ -863,7 +1032,11 @@ func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
}
ui.OutputWriter.Reset()
@ -1076,9 +1249,17 @@ namespace "default" {
// Run command.
code := cmd.Run(args)
if tc.expectedErr == "" {
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
} else {
must.One(t, code)
must.One(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr)
}
})
@ -1122,8 +1303,9 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
jobID := nonAlphaNum.ReplaceAllString(tc.name, "-")
job := testJob(jobID)
job.Type = pointer.Of(api.JobTypeService)
job.TaskGroups[0].Count = pointer.Of(2)
job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m"
job.TaskGroups[0].Tasks[0].Config = map[string]any{"run_for": "10m"}
job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay
job.TaskGroups[0].Tasks[0].Services = []*api.Service{{
Name: "service",
@ -1134,7 +1316,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout:\n%s\n\nstderr:\n%s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Wait for alloc to be running.
@ -1155,7 +1341,11 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) {
args = append(args, jobID)
code = cmd.Run(args)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout:\n%s\n\nstderr:\n%s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
// Wait for all allocs to restart.
reschedules := map[string]bool{}
@ -1380,7 +1570,11 @@ func TestJobRestartCommand_filterAllocs(t *testing.T) {
args := append(tc.args, "-verbose", "-yes", "example")
code, err := cmd.parseAndValidate(args)
must.NoError(t, err)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
got := cmd.filterAllocs(allAllocs)
must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool {
@ -1423,7 +1617,11 @@ func TestJobRestartCommand_onErrorFail(t *testing.T) {
must.NoError(t, err)
code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
must.Zero(t, code, must.Sprintf(
"stdout: %s\n\nstderr: %s\n",
ui.OutputWriter.String(),
ui.ErrorWriter.String()),
)
ui.OutputWriter.Reset()
// Create a proxy to inject an error after 2 allocation restarts.

View File

@ -86,7 +86,8 @@ of the exact job ID.
restarted in-place. Since the group is not modified the restart does not
create a new deployment, and so values defined in [`update`][] blocks, such
as [`max_parallel`][], are not taken into account. This option cannot be used
with `-task`.
with `-task`. Only jobs of type `batch`, `service`, and `system` can be
rescheduled.
- `-on-error=<ask|fail>`: Determines what action to take when an error happens
during a restart batch. If `ask` the command stops and waits for user