command/job_stop: accept multiple jobs, stop concurrently (#12582)

* command/job_stop: accept multiple jobs, stop concurrently

Signed-off-by: danishprakash <grafitykoncept@gmail.com>

* command/job_stop_test: add test for multiple job stops

Signed-off-by: danishprakash <grafitykoncept@gmail.com>

* improve output, add changelog and docs

Signed-off-by: danishprakash <grafitykoncept@gmail.com>
Co-authored-by: Michael Schurter <mschurter@hashicorp.com>
This commit is contained in:
Danish Prakash 2022-12-17 05:16:58 +05:30 committed by GitHub
parent 336d730b9c
commit dc81568f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 263 additions and 100 deletions

3
.changelog/12582.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
cli: `nomad job stop` can be used to stop multiple jobs concurrently.
```

View File

@ -3,6 +3,7 @@ package command
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts" "github.com/hashicorp/nomad/api/contexts"
@ -118,20 +119,18 @@ func (c *JobStopCommand) Run(args []string) int {
return 1 return 1
} }
// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}
// Check that we got exactly one job // Check that we got exactly one job
args = flags.Args() args = flags.Args()
if len(args) != 1 { if len(args) < 1 {
c.Ui.Error("This command takes one argument: <job>") c.Ui.Error("This command takes at least one argument: <job>")
c.Ui.Error(commandErrorText(c)) c.Ui.Error(commandErrorText(c))
return 1 return 1
} }
jobID := strings.TrimSpace(args[0])
var jobIDs []string
for _, jobID := range flags.Args() {
jobIDs = append(jobIDs, strings.TrimSpace(jobID))
}
// Get the HTTP client // Get the HTTP client
client, err := c.Meta.Client() client, err := c.Meta.Client()
@ -140,92 +139,145 @@ func (c *JobStopCommand) Run(args []string) int {
return 1 return 1
} }
// Check if the job exists statusCh := make(chan int, len(jobIDs))
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil { var wg sync.WaitGroup
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) for _, jobID := range jobIDs {
return 1 jobID := jobID
wg.Add(1)
go func() {
defer wg.Done()
// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}
// Check if the job exists
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error finding jobs with prefix: %s err: %s", jobID, err))
statusCh <- 1
return
}
if len(jobs) == 0 {
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID))
statusCh <- 1
return
}
if len(jobs) > 1 {
if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) {
c.Ui.Error(fmt.Sprintf("Prefix %q matched multiple jobs\n\n%s", jobID, createStatusListOutput(jobs, c.allNamespaces())))
statusCh <- 1
return
}
}
// Prefix lookup matched a single job
q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace}
job, _, err := client.Jobs().Info(jobs[0].ID, q)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err))
statusCh <- 1
return
}
getConfirmation := func(question string) (int, bool) {
answer, err := c.Ui.Ask(question)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1, false
}
if answer == "" || strings.ToLower(answer)[0] == 'n' {
// No case
c.Ui.Output("Cancelling job stop")
return 0, false
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
// Non exact match yes
c.Ui.Output("For confirmation, an exact y is required.")
return 0, false
} else if answer != "y" {
c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1, false
}
return 0, true
}
// Confirm the stop if the job was a prefix match
// Ask for confirmation only when there's just one
// job that needs to be stopped. Since we're stopping
// jobs concurrently, we're going to skip confirmation
// for when multiple jobs need to be stopped.
if len(jobIDs) == 1 && jobID != *job.ID && !autoYes {
question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
statusCh <- code
return
}
}
// Confirm we want to stop only a single region of a multiregion job
if len(jobIDs) == 1 && job.IsMultiregion() && !global && !autoYes {
question := fmt.Sprintf(
"Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
statusCh <- code
return
}
}
// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err))
statusCh <- 1
return
}
// If we are stopping a periodic job there won't be an evalID.
if evalID == "" {
statusCh <- 0
return
}
// Goroutine won't wait on monitor
if detach {
c.Ui.Output(evalID)
statusCh <- 0
return
}
// Start monitoring the stop eval
// and return result on status channel
mon := newMonitor(c.Ui, client, length)
statusCh <- mon.monitor(evalID)
}()
} }
if len(jobs) == 0 { // users will still see
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID)) // errors if any while we
return 1 // wait for the goroutines
} // to finish processing
if len(jobs) > 1 { wg.Wait()
if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) {
c.Ui.Error(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", createStatusListOutput(jobs, c.allNamespaces()))) // close the channel to ensure
return 1 // the range statement below
// doesn't go on indefinitely
close(statusCh)
// return a non-zero exit code
// if even a single job stop fails
for status := range statusCh {
if status != 0 {
return status
} }
} }
// Prefix lookup matched a single job return 0
q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace}
job, _, err := client.Jobs().Info(jobs[0].ID, q)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}
getConfirmation := func(question string) (int, bool) {
answer, err := c.Ui.Ask(question)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1, false
}
if answer == "" || strings.ToLower(answer)[0] == 'n' {
// No case
c.Ui.Output("Cancelling job stop")
return 0, false
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
// Non exact match yes
c.Ui.Output("For confirmation, an exact y is required.")
return 0, false
} else if answer != "y" {
c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1, false
}
return 0, true
}
// Confirm the stop if the job was a prefix match
if jobID != *job.ID && !autoYes {
question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
return code
}
}
// Confirm we want to stop only a single region of a multiregion job
if job.IsMultiregion() && !global {
question := fmt.Sprintf(
"Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
return code
}
}
// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}
// If we are stopping a periodic job there won't be an evalID.
if evalID == "" {
return 0
}
if detach {
c.Ui.Output(evalID)
return 0
}
// Start monitoring the stop eval
mon := newMonitor(c.Ui, client, length)
return mon.monitor(evalID)
} }

View File

@ -3,13 +3,18 @@ package command
import ( import (
"strings" "strings"
"testing" "testing"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
"github.com/posener/complete" "github.com/posener/complete"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestStopCommand_Implements(t *testing.T) { func TestStopCommand_Implements(t *testing.T) {
@ -17,6 +22,66 @@ func TestStopCommand_Implements(t *testing.T) {
var _ cli.Command = &JobStopCommand{} var _ cli.Command = &JobStopCommand{}
} }
func TestStopCommand_JSON(t *testing.T) {
ci.Parallel(t)
ui := cli.NewMockUi()
stop := func(args ...string) (stdout string, stderr string, code int) {
cmd := &JobStopCommand{
Meta: Meta{Ui: ui},
}
t.Logf("run: nomad job stop %s", strings.Join(args, " "))
code = cmd.Run(args)
return ui.OutputWriter.String(), ui.ErrorWriter.String(), code
}
// Agent startup is slow, do some work while we wait
agentReady := make(chan string)
var srv *agent.TestAgent
var client *api.Client
go func() {
var addr string
srv, client, addr = testServer(t, false, nil)
agentReady <- addr
}()
defer srv.Shutdown()
// Wait for agent to start and get its address
select {
case <-agentReady:
case <-time.After(20 * time.Second):
t.Fatalf("timed out waiting for agent to start")
}
// create and run 10 jobs
jobIDs := make([]string, 10)
for i := 0; i < 10; i++ {
jobID := uuid.Generate()
jobIDs = append(jobIDs, jobID)
job := testJob(jobID)
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "30s",
}
resp, _, err := client.Jobs().Register(job, nil)
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
t.Fatalf("[DEBUG] waiting for job to register; status code non zero saw %d", code)
}
require.NoError(t, err)
}
// stop all jobs
var args []string
args = append(args, "-detach")
args = append(args, jobIDs...)
stdout, stderr, code := stop(args...)
t.Logf("[DEBUG] run: nomad job stop stdout/stderr: %s/%s", stdout, stderr)
require.Zero(t, code)
require.Empty(t, stderr)
}
func TestStopCommand_Fails(t *testing.T) { func TestStopCommand_Fails(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
srv, _, url := testServer(t, false, nil) srv, _, url := testServer(t, false, nil)

View File

@ -15,17 +15,17 @@ to cancel all the running allocations.
## Usage ## Usage
```plaintext ```plaintext
nomad job stop [options] <job> nomad job stop [options] <job 1> <job 2> ... <job N>
``` ```
The `job stop` command requires a single argument, specifying the job ID or The `job stop` command requires at least one job ID or prefix to stop. If there
prefix to cancel. If there is an exact match based on the provided job ID or is an exact match based on the provided job ID or prefix, then the job will be
prefix, then the job will be cancelled. Otherwise, a list of matching jobs and cancelled. Otherwise, a list of matching jobs and information will be
information will be displayed. displayed.
Stop will issue a request to deregister the matched job and then invoke an Stop will issue a request to deregister the matched jobs and then invoke an
interactive monitor that exits automatically once the scheduler has processed interactive monitor that exits automatically once the scheduler has processed
the request. It is safe to exit the monitor early using ctrl+c. the requests. It is safe to exit the monitor early using ctrl+c.
When ACLs are enabled, this command requires a token with the `submit-job`, When ACLs are enabled, this command requires a token with the `submit-job`,
`read-job`, and `list-jobs` capabilities for the job's namespace. `read-job`, and `list-jobs` capabilities for the job's namespace.
@ -72,6 +72,49 @@ $ nomad job stop job1
==> Evaluation "43bfe672" finished with status "complete" ==> Evaluation "43bfe672" finished with status "complete"
``` ```
Stop multiple jobs:
```shell-session
$ nomad job stop job1 job2
==> 2022-12-16T15:19:28-08:00: Monitoring evaluation "166c39c5"
==> 2022-12-16T15:19:28-08:00: Monitoring evaluation "049404c2"
2022-12-16T15:19:28-08:00: Evaluation triggered by job "job1"
2022-12-16T15:19:28-08:00: Evaluation triggered by job "job2"
2022-12-16T15:19:28-08:00: Evaluation within deployment: "90885ce7"
2022-12-16T15:19:28-08:00: Evaluation status changed: "pending" -> "complete"
==> 2022-12-16T15:19:28-08:00: Evaluation "166c39c5" finished with status "complete"
==> 2022-12-16T15:19:28-08:00: Monitoring deployment "90885ce7"
✓ Deployment "90885ce7" successful
2022-12-16T15:19:28-08:00
ID = 90885ce7
Job ID = job1
Job Version = 0
Status = successful
Description = Deployment completed successfully
Deployed
Task Group Desired Placed Healthy Unhealthy Progress Deadline
example 1 1 1 0 2022-12-16T15:29:03-08:00
==> 2022-12-16T15:19:29-08:00: Monitoring evaluation "049404c2"
2022-12-16T15:19:29-08:00: Evaluation within deployment: "a13df8f8"
2022-12-16T15:19:29-08:00: Evaluation status changed: "pending" -> "complete"
==> 2022-12-16T15:19:29-08:00: Evaluation "049404c2" finished with status "complete"
==> 2022-12-16T15:19:29-08:00: Monitoring deployment "a13df8f8"
✓ Deployment "a13df8f8" successful
2022-12-16T15:19:29-08:00
ID = a13df8f8
Job ID = job2
Job Version = 0
Status = successful
Description = Deployment completed successfully
Deployed
Task Group Desired Placed Healthy Unhealthy Progress Deadline
example2 1 1 1 0 2022-12-16T15:29:16-08:00
```
Stop the job with ID "job1" and return immediately: Stop the job with ID "job1" and return immediately:
```shell-session ```shell-session