From dc81568f93a97ffcc75e668efb44d7e32629b848 Mon Sep 17 00:00:00 2001 From: Danish Prakash Date: Sat, 17 Dec 2022 05:16:58 +0530 Subject: [PATCH] command/job_stop: accept multiple jobs, stop concurrently (#12582) * command/job_stop: accept multiple jobs, stop concurrently Signed-off-by: danishprakash * command/job_stop_test: add test for multiple job stops Signed-off-by: danishprakash * improve output, add changelog and docs Signed-off-by: danishprakash Co-authored-by: Michael Schurter --- .changelog/12582.txt | 3 + command/job_stop.go | 238 +++++++++++++-------- command/job_stop_test.go | 65 ++++++ website/content/docs/commands/job/stop.mdx | 57 ++++- 4 files changed, 263 insertions(+), 100 deletions(-) create mode 100644 .changelog/12582.txt diff --git a/.changelog/12582.txt b/.changelog/12582.txt new file mode 100644 index 000000000..3dca683bf --- /dev/null +++ b/.changelog/12582.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: `nomad job stop` can be used to stop multiple jobs concurrently. +``` diff --git a/command/job_stop.go b/command/job_stop.go index 1df4d0610..938f7d684 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -3,6 +3,7 @@ package command import ( "fmt" "strings" + "sync" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api/contexts" @@ -118,20 +119,18 @@ func (c *JobStopCommand) Run(args []string) int { return 1 } - // Truncate the id unless full length is requested - length := shortId - if verbose { - length = fullId - } - // Check that we got exactly one job args = flags.Args() - if len(args) != 1 { - c.Ui.Error("This command takes one argument: ") + if len(args) < 1 { + c.Ui.Error("This command takes at least one argument: ") c.Ui.Error(commandErrorText(c)) return 1 } - jobID := strings.TrimSpace(args[0]) + + var jobIDs []string + for _, jobID := range flags.Args() { + jobIDs = append(jobIDs, strings.TrimSpace(jobID)) + } // Get the HTTP client client, err := c.Meta.Client() @@ -140,92 +139,145 @@ func (c *JobStopCommand) Run(args []string) int { return 1 } - // Check if the job exists - jobs, _, err := client.Jobs().PrefixList(jobID) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) - return 1 + statusCh := make(chan int, len(jobIDs)) + + var wg sync.WaitGroup + for _, jobID := range jobIDs { + jobID := jobID + + wg.Add(1) + go func() { + defer wg.Done() + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Check if the job exists + jobs, _, err := client.Jobs().PrefixList(jobID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error finding jobs with prefix: %s err: %s", jobID, err)) + statusCh <- 1 + return + } + if len(jobs) == 0 { + c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID)) + statusCh <- 1 + return + } + if len(jobs) > 1 { + if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) { + c.Ui.Error(fmt.Sprintf("Prefix %q matched multiple jobs\n\n%s", jobID, createStatusListOutput(jobs, c.allNamespaces()))) + statusCh <- 1 + return + } + } + + // Prefix lookup matched a single job + q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace} + job, _, err := client.Jobs().Info(jobs[0].ID, q) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err)) + statusCh <- 1 + return + } + + getConfirmation := func(question string) (int, bool) { + answer, err := c.Ui.Ask(question) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err)) + return 1, false + } + + if answer == "" || strings.ToLower(answer)[0] == 'n' { + // No case + c.Ui.Output("Cancelling job stop") + return 0, false + } else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 { + // Non exact match yes + c.Ui.Output("For confirmation, an exact ‘y’ is required.") + return 0, false + } else if answer != "y" { + c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.") + return 1, false + } + return 0, true + } + + // Confirm the stop if the job was a prefix match + // Ask for confirmation only when there's just one + // job that needs to be stopped. Since we're stopping + // jobs concurrently, we're going to skip confirmation + // for when multiple jobs need to be stopped. + if len(jobIDs) == 1 && jobID != *job.ID && !autoYes { + question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID) + code, confirmed := getConfirmation(question) + if !confirmed { + statusCh <- code + return + } + } + + // Confirm we want to stop only a single region of a multiregion job + if len(jobIDs) == 1 && job.IsMultiregion() && !global && !autoYes { + question := fmt.Sprintf( + "Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID) + code, confirmed := getConfirmation(question) + if !confirmed { + statusCh <- code + return + } + } + + // Invoke the stop + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} + wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} + evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err)) + statusCh <- 1 + return + } + + // If we are stopping a periodic job there won't be an evalID. + if evalID == "" { + statusCh <- 0 + return + } + + // Goroutine won't wait on monitor + if detach { + c.Ui.Output(evalID) + statusCh <- 0 + return + } + + // Start monitoring the stop eval + // and return result on status channel + mon := newMonitor(c.Ui, client, length) + statusCh <- mon.monitor(evalID) + }() } - if len(jobs) == 0 { - c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID)) - return 1 - } - if len(jobs) > 1 { - if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) { - c.Ui.Error(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", createStatusListOutput(jobs, c.allNamespaces()))) - return 1 + // users will still see + // errors if any while we + // wait for the goroutines + // to finish processing + wg.Wait() + + // close the channel to ensure + // the range statement below + // doesn't go on indefinitely + close(statusCh) + + // return a non-zero exit code + // if even a single job stop fails + for status := range statusCh { + if status != 0 { + return status } } - // Prefix lookup matched a single job - q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace} - job, _, err := client.Jobs().Info(jobs[0].ID, q) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) - return 1 - } - - getConfirmation := func(question string) (int, bool) { - answer, err := c.Ui.Ask(question) - if err != nil { - c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err)) - return 1, false - } - - if answer == "" || strings.ToLower(answer)[0] == 'n' { - // No case - c.Ui.Output("Cancelling job stop") - return 0, false - } else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 { - // Non exact match yes - c.Ui.Output("For confirmation, an exact ‘y’ is required.") - return 0, false - } else if answer != "y" { - c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.") - return 1, false - } - return 0, true - } - - // Confirm the stop if the job was a prefix match - if jobID != *job.ID && !autoYes { - question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID) - code, confirmed := getConfirmation(question) - if !confirmed { - return code - } - } - - // Confirm we want to stop only a single region of a multiregion job - if job.IsMultiregion() && !global { - question := fmt.Sprintf( - "Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID) - code, confirmed := getConfirmation(question) - if !confirmed { - return code - } - } - - // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} - wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} - evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) - return 1 - } - - // If we are stopping a periodic job there won't be an evalID. - if evalID == "" { - return 0 - } - - if detach { - c.Ui.Output(evalID) - return 0 - } - - // Start monitoring the stop eval - mon := newMonitor(c.Ui, client, length) - return mon.monitor(evalID) + return 0 } diff --git a/command/job_stop_test.go b/command/job_stop_test.go index a73c08309..fd2978461 100644 --- a/command/job_stop_test.go +++ b/command/job_stop_test.go @@ -3,13 +3,18 @@ package command import ( "strings" "testing" + "time" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/cli" "github.com/posener/complete" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestStopCommand_Implements(t *testing.T) { @@ -17,6 +22,66 @@ func TestStopCommand_Implements(t *testing.T) { var _ cli.Command = &JobStopCommand{} } +func TestStopCommand_JSON(t *testing.T) { + ci.Parallel(t) + ui := cli.NewMockUi() + stop := func(args ...string) (stdout string, stderr string, code int) { + cmd := &JobStopCommand{ + Meta: Meta{Ui: ui}, + } + t.Logf("run: nomad job stop %s", strings.Join(args, " ")) + code = cmd.Run(args) + return ui.OutputWriter.String(), ui.ErrorWriter.String(), code + } + + // Agent startup is slow, do some work while we wait + agentReady := make(chan string) + var srv *agent.TestAgent + var client *api.Client + go func() { + var addr string + srv, client, addr = testServer(t, false, nil) + agentReady <- addr + }() + defer srv.Shutdown() + + // Wait for agent to start and get its address + select { + case <-agentReady: + case <-time.After(20 * time.Second): + t.Fatalf("timed out waiting for agent to start") + } + + // create and run 10 jobs + jobIDs := make([]string, 10) + for i := 0; i < 10; i++ { + jobID := uuid.Generate() + jobIDs = append(jobIDs, jobID) + + job := testJob(jobID) + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "30s", + } + + resp, _, err := client.Jobs().Register(job, nil) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("[DEBUG] waiting for job to register; status code non zero saw %d", code) + } + + require.NoError(t, err) + } + + // stop all jobs + var args []string + args = append(args, "-detach") + args = append(args, jobIDs...) + stdout, stderr, code := stop(args...) + t.Logf("[DEBUG] run: nomad job stop stdout/stderr: %s/%s", stdout, stderr) + require.Zero(t, code) + require.Empty(t, stderr) + +} + func TestStopCommand_Fails(t *testing.T) { ci.Parallel(t) srv, _, url := testServer(t, false, nil) diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index c2a9fca2e..9ad931da4 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -15,17 +15,17 @@ to cancel all the running allocations. ## Usage ```plaintext -nomad job stop [options] +nomad job stop [options] ... ``` -The `job stop` command requires a single argument, specifying the job ID or -prefix to cancel. If there is an exact match based on the provided job ID or -prefix, then the job will be cancelled. Otherwise, a list of matching jobs and -information will be displayed. +The `job stop` command requires at least one job ID or prefix to stop. If there +is an exact match based on the provided job ID or prefix, then the job will be +cancelled. Otherwise, a list of matching jobs and information will be +displayed. -Stop will issue a request to deregister the matched job and then invoke an +Stop will issue a request to deregister the matched jobs and then invoke an interactive monitor that exits automatically once the scheduler has processed -the request. It is safe to exit the monitor early using ctrl+c. +the requests. It is safe to exit the monitor early using ctrl+c. When ACLs are enabled, this command requires a token with the `submit-job`, `read-job`, and `list-jobs` capabilities for the job's namespace. @@ -72,6 +72,49 @@ $ nomad job stop job1 ==> Evaluation "43bfe672" finished with status "complete" ``` +Stop multiple jobs: + +```shell-session +$ nomad job stop job1 job2 +==> 2022-12-16T15:19:28-08:00: Monitoring evaluation "166c39c5" +==> 2022-12-16T15:19:28-08:00: Monitoring evaluation "049404c2" + 2022-12-16T15:19:28-08:00: Evaluation triggered by job "job1" + 2022-12-16T15:19:28-08:00: Evaluation triggered by job "job2" + 2022-12-16T15:19:28-08:00: Evaluation within deployment: "90885ce7" + 2022-12-16T15:19:28-08:00: Evaluation status changed: "pending" -> "complete" +==> 2022-12-16T15:19:28-08:00: Evaluation "166c39c5" finished with status "complete" +==> 2022-12-16T15:19:28-08:00: Monitoring deployment "90885ce7" + ✓ Deployment "90885ce7" successful + + 2022-12-16T15:19:28-08:00 + ID = 90885ce7 + Job ID = job1 + Job Version = 0 + Status = successful + Description = Deployment completed successfully + + Deployed + Task Group Desired Placed Healthy Unhealthy Progress Deadline + example 1 1 1 0 2022-12-16T15:29:03-08:00 +==> 2022-12-16T15:19:29-08:00: Monitoring evaluation "049404c2" + 2022-12-16T15:19:29-08:00: Evaluation within deployment: "a13df8f8" + 2022-12-16T15:19:29-08:00: Evaluation status changed: "pending" -> "complete" +==> 2022-12-16T15:19:29-08:00: Evaluation "049404c2" finished with status "complete" +==> 2022-12-16T15:19:29-08:00: Monitoring deployment "a13df8f8" + ✓ Deployment "a13df8f8" successful + + 2022-12-16T15:19:29-08:00 + ID = a13df8f8 + Job ID = job2 + Job Version = 0 + Status = successful + Description = Deployment completed successfully + + Deployed + Task Group Desired Placed Healthy Unhealthy Progress Deadline + example2 1 1 1 0 2022-12-16T15:29:16-08:00 +``` + Stop the job with ID "job1" and return immediately: ```shell-session