From e5d31bca615859c15dd0e2ea1c8f631fe54823e7 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 23 Mar 2023 18:28:26 -0400 Subject: [PATCH] cli: job restart command (#16278) Implement the new `nomad job restart` command that allows operators to restart allocations tasks or reschedule then entire allocation. Restarts can be batched to target multiple allocations in parallel. Between each batch the command can stop and hold for a predefined time or until the user confirms that the process should proceed. This implements the "Stateless Restarts" alternative from the original RFC (https://gist.github.com/schmichael/e0b8b2ec1eb146301175fd87ddd46180). The original concept is still worth implementing, as it allows this functionality to be exposed over an API that can be consumed by the Nomad UI and other clients. But the implementation turned out to be more complex than we initially expected so we thought it would be better to release a stateless CLI-based implementation first to gather feedback and validate the restart behaviour. Co-authored-by: Shishir Mahajan --- .changelog/16278.txt | 3 + api/tasks.go | 6 + command/commands.go | 8 + command/helpers.go | 7 + command/job_restart.go | 1205 +++++++++++++ command/job_restart_test.go | 1591 +++++++++++++++++ command/meta.go | 31 +- command/testing_test.go | 35 +- website/content/docs/commands/job/restart.mdx | 234 +++ website/data/docs-nav-data.json | 4 + 10 files changed, 3119 insertions(+), 5 deletions(-) create mode 100644 .changelog/16278.txt create mode 100644 command/job_restart.go create mode 100644 command/job_restart_test.go create mode 100644 website/content/docs/commands/job/restart.mdx diff --git a/.changelog/16278.txt b/.changelog/16278.txt new file mode 100644 index 000000000..7609e3ae1 --- /dev/null +++ b/.changelog/16278.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Added new `nomad job restart` command to restart all allocations for a job +``` diff --git a/api/tasks.go b/api/tasks.go index ecf357271..ecc14a8ea 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -976,6 +976,12 @@ func (t *Task) SetLogConfig(l *LogConfig) *Task { return t } +// SetLifecycle is used to set lifecycle config to a task. +func (t *Task) SetLifecycle(l *TaskLifecycle) *Task { + t.Lifecycle = l + return t +} + // TaskState tracks the current state of a task and events that caused state // transitions. type TaskState struct { diff --git a/command/commands.go b/command/commands.go index 7cb978006..b7b041ca3 100644 --- a/command/commands.go +++ b/command/commands.go @@ -415,6 +415,14 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job restart": func() (cli.Command, error) { + // Use a *cli.ConcurrentUi because this command spawns several + // goroutines that write to the terminal concurrently. + meta.Ui = &cli.ConcurrentUi{Ui: meta.Ui} + return &JobRestartCommand{ + Meta: meta, + }, nil + }, "job deployments": func() (cli.Command, error) { return &JobDeploymentsCommand{ Meta: meta, diff --git a/command/helpers.go b/command/helpers.go index 22ba4dd7f..1d9de2467 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -62,6 +62,13 @@ func limit(s string, length int) string { return s[:length] } +// indentString returns the string s padded with the given number of empty +// spaces before each line except for the first one. +func indentString(s string, pad int) string { + prefix := strings.Repeat(" ", pad) + return strings.Join(strings.Split(s, "\n"), fmt.Sprintf("\n%s", prefix)) +} + // wrapAtLengthWithPadding wraps the given text at the maxLineLength, taking // into account any provided left padding. func wrapAtLengthWithPadding(s string, pad int) string { diff --git a/command/job_restart.go b/command/job_restart.go new file mode 100644 index 000000000..55084bf79 --- /dev/null +++ b/command/job_restart.go @@ -0,0 +1,1205 @@ +package command + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "os/signal" + "regexp" + "strconv" + "strings" + "time" + + humanize "github.com/dustin/go-humanize" + "github.com/dustin/go-humanize/english" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/hashicorp/nomad/helper" + "github.com/posener/complete" +) + +const ( + // jobRestartTimestampPrefixLength is the number of characters in the + // "==> [timestamp]: " string that prefixes most of the outputs of this + // command. + jobRestartTimestampPrefixLength = 31 + + // jobRestartBatchWaitAsk is the special token used to indicate that the + // command should ask user for confirmation between batches. + jobRestartBatchWaitAsk = "ask" + + // jobRestartOnErrorFail is the special token used to indicate that the + // command should exit when a batch has errors. + jobRestartOnErrorFail = "fail" + + // jobRestartOnErrorAks is the special token used to indicate that the + // command should ask user for confirmation when a batch has errors. + jobRestartOnErrorAsk = "ask" +) + +var ( + // jobRestartBatchSizeValueRegex validates that the value passed to + // -batch-size is an integer optionally followed by a % sign. + // + // Use ^...$ to make sure we're matching over the entire input to avoid + // partial matches such as 10%20%. + jobRestartBatchSizeValueRegex = regexp.MustCompile(`^(\d+)%?$`) +) + +// ErrJobRestartPlacementFailure is an error that indicates a placement failure +type ErrJobRestartPlacementFailure struct { + EvalID string + TaskGroup string + Failures *api.AllocationMetric +} + +func (e ErrJobRestartPlacementFailure) Error() string { + return fmt.Sprintf("Evaluation %q has placement failures for group %q:\n%s", + e.EvalID, + e.TaskGroup, + formatAllocMetrics(e.Failures, false, strings.Repeat(" ", 4)), + ) +} + +func (e ErrJobRestartPlacementFailure) Is(err error) bool { + _, ok := err.(ErrJobRestartPlacementFailure) + return ok +} + +// JobRestartCommand is the implementation for the command that restarts a job. +type JobRestartCommand struct { + Meta + + // client is the Nomad API client shared by all functions in the command to + // reuse the same connection. + client *api.Client + + // Configuration values read and parsed from command flags and args. + allTasks bool + autoYes bool + batchSize int + batchSizePercent bool + batchWait time.Duration + batchWaitAsk bool + groups *set.Set[string] + jobID string + noShutdownDelay bool + onError string + reschedule bool + tasks *set.Set[string] + verbose bool + length int + + // canceled is set to true when the user gives a negative answer to any of + // the questions. + canceled bool + + // sigsCh is used to subscribe to signals from the operating system. + sigsCh chan os.Signal +} + +func (c *JobRestartCommand) Help() string { + helpText := ` +Usage: nomad job restart [options] + + Restart or reschedule allocations for a particular job. + + Restarting the job calls the 'Restart Allocation' API endpoint to restart the + tasks inside allocations, so the allocations themselves are not modified but + rather restarted in-place. + + Rescheduling the job uses the 'Stop Allocation' API endpoint to stop the + allocations and trigger the Nomad scheduler to compute new placements. This + may cause the new allocations to be scheduled in different clients from the + originals. + + This command can operate in batches and it waits until all restarted or + rescheduled allocations are running again before proceeding to the next + batch. It is also possible to specify additional time to wait between + batches. + + Allocations can be restarted in-place or rescheduled. When restarting + in-place the command may target specific tasks in the allocations, restart + only tasks that are currently running, or restart all tasks, even the ones + that have already run. Allocations can also be targeted by group. When both + groups and tasks are defined only the tasks for the allocations of those + groups are restarted. + + When rescheduling, the current allocations are stopped triggering the Nomad + scheduler to create replacement allocations that may be placed in different + clients. The command waits until the new allocations have client status + 'ready' before proceeding with the remaining batches. Services health checks + are not taken into account. + + By default the command restarts all running tasks in-place with one + allocation per batch. + + When ACLs are enabled, this command requires a token with the + 'alloc-lifecycle' and 'read-job' capabilities for the job's namespace. The + 'list-jobs' capability is required to run the command with a job prefix + instead of the exact job ID. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault) + ` + +Restart Options: + + -all-tasks + If set, all tasks in the allocations are restarted, even the ones that + have already run, such as non-sidecar tasks. Tasks will restart following + their lifecycle order. This option cannot be used with '-task'. + + -batch-size= + Number of allocations to restart at once. It may be defined as a percentage + value of the current number of running allocations. Percentage values are + rounded up to increase parallelism. Defaults to 1. + + -batch-wait= + Time to wait between restart batches. If set to 'ask' the command halts + between batches and waits for user input on how to proceed. If the answer + is a time duration all remaining batches will use this new value. Defaults + to 0. + + -group= + Only restart allocations for the given group. Can be specified multiple + times. If no group is set all allocations for the job are restarted. + + -no-shutdown-delay + Ignore the group and task 'shutdown_delay' configuration so there is no + delay between service deregistration and task shutdown or restart. Note + that using this flag will result in failed network connections to the + allocation being restarted. + + -on-error=<'ask'|'fail'> + Determines what action to take when an error happens during a restart + batch. If 'ask' the command stops and waits for user confirmation on how to + proceed. If 'fail' the command exits immediately. Defaults to 'ask'. + + -reschedule + If set, allocations are stopped and rescheduled instead of restarted + in-place. Since the group is not modified the restart does not create a new + deployment, and so values defined in 'update' blocks, such as + 'max_parallel', are not taken into account. This option cannot be used with + '-task'. + + -task= + Specify the task to restart. Can be specified multiple times. If groups are + also specified the task must exist in at least one of them. If no task is + set only tasks that are currently running are restarted. For example, + non-sidecar tasks that already ran are not restarted unless '-all-tasks' is + used instead. This option cannot be used with '-all-tasks' or + '-reschedule'. + + -yes + Automatic yes to prompts. If set, the command automatically restarts + multi-region jobs only in the region targeted by the command, ignores batch + errors, and automatically proceeds with the remaining batches without + waiting. Use '-on-error' and '-batch-wait' to adjust these behaviors. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +func (c *JobRestartCommand) Synopsis() string { + return "Restart or reschedule allocations for a job" +} + +func (c *JobRestartCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-all-tasks": complete.PredictNothing, + "-batch-size": complete.PredictAnything, + "-batch-wait": complete.PredictAnything, + "-no-shutdown-delay": complete.PredictNothing, + "-on-error": complete.PredictSet(jobRestartOnErrorAsk, jobRestartOnErrorFail), + "-reschedule": complete.PredictNothing, + "-task": complete.PredictAnything, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +func (c *JobRestartCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Jobs] + }) +} + +func (c *JobRestartCommand) Name() string { return "job restart" } + +func (c *JobRestartCommand) Run(args []string) int { + // Parse and validate command line arguments. + code, err := c.parseAndValidate(args) + if err != nil { + c.Ui.Error(err.Error()) + c.Ui.Error(commandErrorText(c)) + return code + } + if code != 0 { + return code + } + + c.client, err = c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + // Use prefix matching to find job. + job, err := c.JobByPrefix(c.client, c.jobID, nil) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + c.jobID = *job.ID + if job.Namespace != nil { + c.client.SetNamespace(*job.Namespace) + } + + // Confirm that we should restart a multi-region job in a single region. + if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() { + c.Ui.Output("\nJob restart canceled.") + return 0 + } + + // Retrieve the job history so we can properly determine if a group or task + // exists in the specific allocation job version. + jobVersions, _, _, err := c.client.Jobs().Versions(c.jobID, false, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving versions of job %q: %s", c.jobID, err)) + return 1 + } + + // Index jobs by version. + jobVersionIndex := make(map[uint64]*api.Job, len(jobVersions)) + for _, job := range jobVersions { + jobVersionIndex[*job.Version] = job + } + + // Fetch all allocations for the job and filter out the ones that are not + // eligible for restart. + allocStubs, _, err := c.client.Jobs().Allocations(c.jobID, true, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving allocations for job %q: %v", c.jobID, err)) + return 1 + } + allocStubsWithJob := make([]AllocationListStubWithJob, 0, len(allocStubs)) + for _, stub := range allocStubs { + allocStubsWithJob = append(allocStubsWithJob, AllocationListStubWithJob{ + AllocationListStub: stub, + Job: jobVersionIndex[stub.JobVersion], + }) + } + restartAllocs := c.filterAllocs(allocStubsWithJob) + + // Exit early if there's nothing to do. + if len(restartAllocs) == 0 { + c.Ui.Output("No allocations to restart") + return 0 + } + + // Calculate absolute batch size based on the number of eligible + // allocations. Round values up to increase parallelism. + if c.batchSizePercent { + c.batchSize = int(math.Ceil(float64(len(restartAllocs)*c.batchSize) / 100)) + } + + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Restarting %s[reset]", + formatTime(time.Now()), + english.Plural(len(restartAllocs), "allocation", "allocations"), + ))) + + // Handle SIGINT to prevent accidental cancellations of the long-lived + // restart loop. activeCh is blocked while a signal is being handled to + // prevent new work from starting while the user is deciding if they want + // to cancel the command or not. + activeCh := make(chan any) + c.sigsCh = make(chan os.Signal, 1) + signal.Notify(c.sigsCh, os.Interrupt) + defer signal.Stop(c.sigsCh) + + go c.handleSignal(c.sigsCh, activeCh) + + // restartErr accumulates the errors that happen in each batch. + var restartErr *multierror.Error + + // Restart allocations in batches. + batch := multierror.Group{} + for restartCount, alloc := range restartAllocs { + // Block and wait before each iteration if the command is handling an + // interrupt signal. + <-activeCh + + // Make sure there are not active deployments to prevent the restart + // process from interfering with it. + err := c.ensureNoActiveDeployment() + if err != nil { + restartErr = multierror.Append(restartErr, err) + break + } + + // Print new batch header every time we restart a multiple of the batch + // size which indicates that we're starting a new batch. + // Skip batch header if batch size is one because it's redundant. + if restartCount%c.batchSize == 0 && c.batchSize > 1 { + batchNumber := restartCount/c.batchSize + 1 + remaining := len(restartAllocs) - restartCount + + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Restarting %s batch of %d allocations[reset]", + formatTime(time.Now()), + humanize.Ordinal(batchNumber), + helper.Min(c.batchSize, remaining), + ))) + } + + // Restart allocation. Wrap the callback function to capture the + // allocID loop variable and prevent it from changing inside the + // goroutine at each iteration. + batch.Go(func(allocStubWithJob AllocationListStubWithJob) func() error { + return func() error { + return c.handleAlloc(allocStubWithJob) + } + }(alloc)) + + // Check if we restarted enough allocations to complete a batch or if + // we restarted the last allocation. + batchComplete := (restartCount+1)%c.batchSize == 0 + restartComplete := restartCount+1 == len(restartAllocs) + if batchComplete || restartComplete { + + // Block and wait for the batch to finish. Handle the + // *mutierror.Error response to add the custom formatting and to + // convert it to an error to avoid problems where an empty + // *multierror.Error is not considered a nil error. + var batchErr error + if batchMerr := batch.Wait(); batchMerr != nil { + restartErr = multierror.Append(restartErr, batchMerr) + batchMerr.ErrorFormat = c.errorFormat(jobRestartTimestampPrefixLength) + batchErr = batchMerr.ErrorOrNil() + } + + // Block if the command is handling an interrupt signal. + <-activeCh + + // Exit loop before sleeping or asking for user input if we just + // finished the last batch. + if restartComplete { + break + } + + // Handle errors that happened in this batch. + if batchErr != nil { + // Exit early if -on-error is 'fail'. + if c.onError == jobRestartOnErrorFail { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Stopping job restart due to error[reset]", + formatTime(time.Now()), + ))) + break + } + + // Exit early if -yes but error is not recoverable. + if c.autoYes && !c.isErrorRecoverable(batchErr) { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Stopping job restart due to unrecoverable error[reset]", + formatTime(time.Now()), + ))) + break + } + } + + // Check if we need to ask the user how to proceed. This is needed + // in case -yes is not set and -batch-wait is 'ask' or an error + // happened and -on-error is 'ask'. + askUser := !c.autoYes && (c.batchWaitAsk || c.onError == jobRestartOnErrorAsk && batchErr != nil) + if askUser { + if batchErr != nil { + // Print errors so user can decide what to below. + c.Ui.Warn(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: %s[reset]", formatTime(time.Now()), batchErr, + ))) + } + + // Exit early if user provides a negative answer. + if !c.shouldProceed(batchErr) { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart canceled[reset]", + formatTime(time.Now()), + ))) + c.canceled = true + break + } + } + + // Sleep if -batch-wait is set or if -batch-wait is 'ask' and user + // responded with a new interval above. + if c.batchWait > 0 { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Waiting %s before restarting the next batch[reset]", + formatTime(time.Now()), + c.batchWait, + ))) + time.Sleep(c.batchWait) + } + + // Start a new batch. + batch = multierror.Group{} + } + } + + if restartErr != nil && len(restartErr.Errors) > 0 { + if !c.canceled { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart finished with errors[reset]", + formatTime(time.Now()), + ))) + } + + restartErr.ErrorFormat = c.errorFormat(0) + c.Ui.Error(fmt.Sprintf("\n%s", restartErr)) + return 1 + } + + if !c.canceled { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart finished[reset]", + formatTime(time.Now()), + ))) + + c.Ui.Output("\nJob restarted successfully!") + } + return 0 +} + +// parseAndValidate parses and validates the arguments passed to the command. +// +// This function mutates the command and is not thread-safe so it must be +// called only once and early in the command lifecycle. +func (c *JobRestartCommand) parseAndValidate(args []string) (int, error) { + var batchSizeStr string + var batchWaitStr string + var groups []string + var tasks []string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&c.allTasks, "all-tasks", false, "") + flags.BoolVar(&c.autoYes, "yes", false, "") + flags.StringVar(&batchSizeStr, "batch-size", "1", "") + flags.StringVar(&batchWaitStr, "batch-wait", "0s", "") + flags.StringVar(&c.onError, "on-error", jobRestartOnErrorAsk, "") + flags.BoolVar(&c.noShutdownDelay, "no-shutdown-delay", false, "") + flags.BoolVar(&c.reschedule, "reschedule", false, "") + flags.BoolVar(&c.verbose, "verbose", false, "") + flags.Var((funcVar)(func(s string) error { + groups = append(groups, s) + return nil + }), "group", "") + flags.Var((funcVar)(func(s string) error { + tasks = append(tasks, s) + return nil + }), "task", "") + + err := flags.Parse(args) + if err != nil { + // Let the flags library handle and print the error message. + return 1, nil + } + + // Truncate IDs unless full length is requested. + c.length = shortId + if c.verbose { + c.length = fullId + } + + // Check that we got exactly one job. + args = flags.Args() + if len(args) != 1 { + return 1, fmt.Errorf("This command takes one argument: ") + } + c.jobID = strings.TrimSpace(args[0]) + + // Parse and validate -batch-size. + matches := jobRestartBatchSizeValueRegex.FindStringSubmatch(batchSizeStr) + if len(matches) != 2 { + return 1, fmt.Errorf( + "Invalid -batch-size value %q: batch size must be an integer or a percentage", + batchSizeStr, + ) + } + + c.batchSizePercent = strings.HasSuffix(batchSizeStr, "%") + c.batchSize, err = strconv.Atoi(matches[1]) + if err != nil { + return 1, fmt.Errorf("Invalid -batch-size value %q: %w", batchSizeStr, err) + } + if c.batchSize == 0 { + return 1, fmt.Errorf( + "Invalid -batch-size value %q: number value must be greater than zero", + batchSizeStr, + ) + } + + // Parse and validate -batch-wait. + if strings.ToLower(batchWaitStr) == jobRestartBatchWaitAsk { + if !isTty() && !c.autoYes { + return 1, fmt.Errorf( + "Invalid -batch-wait value %[1]q: %[1]q cannot be used when terminal is not interactive", + jobRestartBatchWaitAsk, + ) + } + c.batchWaitAsk = true + } else { + c.batchWait, err = time.ParseDuration(batchWaitStr) + if err != nil { + return 1, fmt.Errorf("Invalid -batch-wait value %q: %w", batchWaitStr, err) + } + } + + // Parse and validate -on-error. + switch c.onError { + case jobRestartOnErrorAsk: + if !isTty() && !c.autoYes { + return 1, fmt.Errorf( + "Invalid -on-error value %[1]q: %[1]q cannot be used when terminal is not interactive", + jobRestartOnErrorAsk, + ) + } + case jobRestartOnErrorFail: + default: + return 1, fmt.Errorf( + "Invalid -on-error value %q: valid options are %q and %q", + c.onError, + jobRestartOnErrorAsk, + jobRestartOnErrorFail, + ) + } + + // -all-tasks conflicts with -task and . + if c.allTasks && len(tasks) != 0 { + return 1, fmt.Errorf("The -all-tasks option cannot be used with -task") + } + + // -reschedule conflicts with -task and . + if c.reschedule && len(tasks) != 0 { + return 1, fmt.Errorf("The -reschedule option cannot be used with -task") + } + + // Dedup tasks and groups. + c.groups = set.From(groups) + c.tasks = set.From(tasks) + + return 0, nil +} + +// filterAllocs returns a slice of the allocations that should be restarted. +func (c *JobRestartCommand) filterAllocs(stubs []AllocationListStubWithJob) []AllocationListStubWithJob { + result := []AllocationListStubWithJob{} + for _, stub := range stubs { + shortAllocID := limit(stub.ID, c.length) + + // Skip allocations that are not running. + if !stub.IsRunning() { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because desired status is %q and client status is %q[reset]", + formatTime(time.Now()), + shortAllocID, + stub.ClientStatus, + stub.DesiredStatus, + ))) + } + continue + } + + // Skip allocations for groups that were not requested. + if c.groups.Size() > 0 { + if !c.groups.Contains(stub.TaskGroup) { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested groups[reset]", + formatTime(time.Now()), + shortAllocID, + ))) + } + continue + } + } + + // Skip allocations that don't have any of the requested tasks. + if c.tasks.Size() > 0 { + hasTask := false + for _, taskName := range c.tasks.Slice() { + if stub.HasTask(taskName) { + hasTask = true + break + } + } + + if !hasTask { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested tasks[reset]", + formatTime(time.Now()), + shortAllocID, + ))) + } + continue + } + } + + result = append(result, stub) + } + + return result +} + +// ensureNoActiveDeployment returns an error if the job has an active +// deployment. +func (c *JobRestartCommand) ensureNoActiveDeployment() error { + deployments, _, err := c.client.Jobs().Deployments(c.jobID, true, nil) + if err != nil { + return fmt.Errorf("Error retrieving deployments for job %q: %v", c.jobID, err) + + } + + for _, d := range deployments { + switch d.Status { + case api.DeploymentStatusFailed, api.DeploymentStatusSuccessful, api.DeploymentStatusCancelled: + // Deployment is terminal so it's safe to proceed. + default: + return fmt.Errorf("Deployment %q is %q", limit(d.ID, c.length), d.Status) + } + } + return nil +} + +// shouldRestartMultiregion blocks and waits for the user to confirm if the +// restart of a multi-region job should proceed. Returns true if the answer is +// positive. +func (c *JobRestartCommand) shouldRestartMultiregion() bool { + question := fmt.Sprintf( + "Are you sure you want to restart multi-region job %q in a single region? [y/N]", + c.jobID, + ) + + return c.askQuestion( + question, + false, + func(answer string) (bool, error) { + switch strings.TrimSpace(strings.ToLower(answer)) { + case "", "n", "no": + return false, nil + case "y", "yes": + return true, nil + default: + return false, fmt.Errorf("Invalid answer %q", answer) + } + }) +} + +// shouldProceed blocks and waits for the user to provide a valid input on how +// to proceed. Returns true if the answer is positive. +// +// The flags -batch-wait and -on-error have an 'ask' option. This function +// handles both to prevent asking the user twice in case they are both set to +// 'ask' and an error happens. +func (c *JobRestartCommand) shouldProceed(err error) bool { + var question, options string + + if err == nil { + question = "Proceed with the next batch?" + options = "Y/n" + } else { + question = "Ignore the errors above and proceed with the next batch?" + options = "y/N" // Defaults to 'no' if an error happens. + + if !c.isErrorRecoverable(err) { + question = `The errors above are likely to happen again. +Ignore them anyway and proceed with the next batch?` + } + } + + // If -batch-wait is 'ask' the user can provide a new wait duration. + if c.batchWaitAsk { + options += "/" + } + + return c.askQuestion( + fmt.Sprintf("%s [%s]", question, options), + false, + func(answer string) (bool, error) { + switch strings.ToLower(answer) { + case "": + // Proceed by default only if there is no error. + return err == nil, nil + case "y", "yes": + return true, nil + case "n", "no": + return false, nil + default: + if c.batchWaitAsk { + // Check if user passed a time duration and adjust the + // command to use that moving forward. + batchWait, err := time.ParseDuration(answer) + if err != nil { + return false, fmt.Errorf("Invalid answer %q", answer) + } + + c.batchWaitAsk = false + c.batchWait = batchWait + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Proceeding restarts with new wait time of %s[reset]", + formatTime(time.Now()), + c.batchWait, + ))) + return true, nil + } else { + return false, fmt.Errorf("Invalid answer %q", answer) + } + } + }) +} + +// shouldExit blocks and waits for the user for confirmation if they would like +// to interrupt the command. Returns true if the answer is positive. +func (c *JobRestartCommand) shouldExit() bool { + question := `Restart interrupted, no more allocations will be restarted. +Are you sure you want to stop the restart process? [y/N]` + + return c.askQuestion( + question, + true, + func(answer string) (bool, error) { + switch strings.ToLower(answer) { + case "n", "no", "": + return false, nil + case "y", "yes": + return true, nil + default: + return false, fmt.Errorf("Invalid answer %q", answer) + } + }) +} + +// askQuestion asks question to user until they provide a valid response. +func (c *JobRestartCommand) askQuestion(question string, onError bool, cb func(string) (bool, error)) bool { + prefixedQuestion := fmt.Sprintf( + "[bold]==> %s: %s[reset]", + formatTime(time.Now()), + indentString(question, jobRestartTimestampPrefixLength), + ) + + // Let ui.Ask() handle interrupt signals. + signal.Stop(c.sigsCh) + defer func() { + signal.Notify(c.sigsCh, os.Interrupt) + }() + + for { + answer, err := c.Ui.Ask(c.Colorize().Color(prefixedQuestion)) + if err != nil { + if err.Error() != "interrupted" { + c.Ui.Output(err.Error()) + } + return onError + } + + exit, err := cb(strings.TrimSpace(answer)) + if err != nil { + c.Ui.Output(fmt.Sprintf("%s%s", strings.Repeat(" ", jobRestartTimestampPrefixLength), err)) + continue + } + return exit + } +} + +// handleAlloc stops or restarts an allocation in-place. Blocks until the +// allocation is done restarting or the rescheduled allocation is running. +func (c *JobRestartCommand) handleAlloc(alloc AllocationListStubWithJob) error { + var err error + if c.reschedule { + // Stopping an allocation triggers a reschedule. + err = c.stopAlloc(alloc) + } else { + err = c.restartAlloc(alloc) + } + if err != nil { + msg := fmt.Sprintf("Error restarting allocation %q:", limit(alloc.ID, c.length)) + if mErr, ok := err.(*multierror.Error); ok { + // Unwrap the errors and prefix them with a common message to + // prevent deep nesting of errors. + return multierror.Prefix(mErr, msg) + } + return fmt.Errorf("%s %w", msg, err) + } + return nil +} + +// restartAlloc restarts an allocation in place and blocks until the tasks are +// done restarting. +func (c *JobRestartCommand) restartAlloc(alloc AllocationListStubWithJob) error { + shortAllocID := limit(alloc.ID, c.length) + + if c.allTasks { + c.Ui.Output(fmt.Sprintf( + " %s: Restarting all tasks in allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + return c.client.Allocations().RestartAllTasks(&api.Allocation{ID: alloc.ID}, nil) + } + + if c.tasks.Size() == 0 { + c.Ui.Output(fmt.Sprintf( + " %s: Restarting running tasks in allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + return c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, "", nil) + } + + // Run restarts concurrently when specific tasks were requested. + var restarts multierror.Group + for _, task := range c.tasks.Slice() { + if !alloc.HasTask(task) { + continue + } + + c.Ui.Output(fmt.Sprintf( + " %s: Restarting task %q in allocation %q for group %q", + formatTime(time.Now()), + task, + shortAllocID, + alloc.TaskGroup, + )) + + restarts.Go(func(taskName string) func() error { + return func() error { + err := c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, taskName, nil) + if err != nil { + return fmt.Errorf("Failed to restart task %q: %w", taskName, err) + } + return nil + } + }(task)) + } + return restarts.Wait().ErrorOrNil() +} + +// stopAlloc stops an allocation and blocks until the replacement allocation is +// running. +func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error { + shortAllocID := limit(alloc.ID, c.length) + + c.Ui.Output(fmt.Sprintf( + " %s: Rescheduling allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + var q *api.QueryOptions + if c.noShutdownDelay { + q = &api.QueryOptions{ + Params: map[string]string{"no_shutdown_delay": "true"}, + } + } + + // Stop allocation and wait for its replacement to be running or for a + // blocked evaluation that prevents placements for this task group to + // happen. + resp, err := c.client.Allocations().Stop(&api.Allocation{ID: alloc.ID}, q) + if err != nil { + return fmt.Errorf("Failed to stop allocation: %w", err) + } + + // errCh receives an error if anything goes wrong or nil when the + // replacement allocation is running. + // Use a buffered channel to prevent both goroutine from blocking trying to + // send a result back. + errCh := make(chan error, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Pass the LastIndex from the Stop() call to only monitor data that was + // created after the Stop() call. + go c.monitorPlacementFailures(ctx, alloc, resp.LastIndex, errCh) + go c.monitorReplacementAlloc(ctx, alloc, errCh) + + // This process may take a while, so ping user from time to time to + // indicate the command is still alive. + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.Ui.Output(fmt.Sprintf( + " %s: Still waiting for allocation %q to be replaced", + formatTime(time.Now()), + shortAllocID, + )) + case err := <-errCh: + return err + } + } +} + +// monitorPlacementFailures searches for evaluations of the allocation job that +// have placement failures. +// +// Returns an error in errCh if anything goes wrong or if there are placement +// failures for the allocation task group. +func (c *JobRestartCommand) monitorPlacementFailures( + ctx context.Context, + alloc AllocationListStubWithJob, + index uint64, + errCh chan<- error, +) { + q := &api.QueryOptions{WaitIndex: index} + for { + select { + case <-ctx.Done(): + return + default: + } + + evals, qm, err := c.client.Jobs().Evaluations(alloc.JobID, q) + if err != nil { + errCh <- fmt.Errorf("Failed to retrieve evaluations for job %q: %w", alloc.JobID, err) + return + } + + for _, eval := range evals { + select { + case <-ctx.Done(): + return + default: + } + + // Skip evaluations created before the allocation was stopped or + // that are not blocked. + if eval.CreateIndex < index || eval.Status != api.EvalStatusBlocked { + continue + } + + failures := eval.FailedTGAllocs[alloc.TaskGroup] + if failures != nil { + errCh <- ErrJobRestartPlacementFailure{ + EvalID: limit(eval.ID, c.length), + TaskGroup: alloc.TaskGroup, + Failures: failures, + } + return + } + } + q.WaitIndex = qm.LastIndex + } +} + +// monitorReplacementAlloc waits for the allocation to have a follow-up +// placement and for the new allocation be running. +// +// Returns an error in errCh if anything goes wrong or nil when the new +// allocation is running. +func (c *JobRestartCommand) monitorReplacementAlloc( + ctx context.Context, + allocStub AllocationListStubWithJob, + errCh chan<- error, +) { + currentAllocID := allocStub.ID + q := &api.QueryOptions{WaitIndex: 1} + for { + select { + case <-ctx.Done(): + return + default: + } + + alloc, qm, err := c.client.Allocations().Info(currentAllocID, q) + if err != nil { + errCh <- fmt.Errorf("Failed to retrieve allocation %q: %w", limit(alloc.ID, c.length), err) + return + } + + // Follow replacement allocations. We expect the original allocation to + // be replaced, but the replacements may be themselves replaced in + // cases such as the allocation failing. + if alloc.NextAllocation != "" { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running", + formatTime(time.Now()), + limit(alloc.ID, c.length), + limit(alloc.NextAllocation, c.length), + )) + currentAllocID = alloc.NextAllocation + + // Reset the blocking query so the Info() API call returns the new + // allocation immediately. + q.WaitIndex = 1 + continue + } + + switch alloc.ClientStatus { + case api.AllocClientStatusRunning: + // Make sure the running allocation we found is a replacement, not + // the original one. + if alloc.ID != allocStub.ID { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q is %q", + formatTime(time.Now()), + limit(alloc.ID, c.length), + alloc.ClientStatus, + )) + errCh <- nil + return + } + + default: + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Allocation %q is %q[reset]", + formatTime(time.Now()), + limit(alloc.ID, c.length), + alloc.ClientStatus, + ))) + } + } + + q.WaitIndex = qm.LastIndex + } +} + +// handleSignal receives input signals and blocks the activeCh until the user +// confirms how to proceed. +// +// Exit immediately if the user confirms the interrupt, otherwise resume the +// command and feed activeCh to unblock it. +func (c *JobRestartCommand) handleSignal(sigsCh chan os.Signal, activeCh chan any) { + for { + select { + case <-sigsCh: + // Consume activeCh to prevent the main loop from proceeding. + select { + case <-activeCh: + default: + } + + if c.shouldExit() { + c.Ui.Output("\nCanceling job restart process") + os.Exit(0) + } + case activeCh <- struct{}{}: + } + } +} + +// isErrorRecoverable returns true when the error is likely to impact all +// restarts and so there is not reason to keep going. +func (c *JobRestartCommand) isErrorRecoverable(err error) bool { + if err == nil { + return true + } + + if errors.Is(err, ErrJobRestartPlacementFailure{}) { + return false + } + + if strings.Contains(err.Error(), api.PermissionDeniedErrorContent) { + return false + } + + return true +} + +// errorFormat returns a multierror.ErrorFormatFunc that indents each line, +// except for the first one, of the resulting error string with the given +// number of spaces. +func (c *JobRestartCommand) errorFormat(indent int) func([]error) string { + return func(es []error) string { + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("* %s", strings.TrimSpace(err.Error())) + } + + out := fmt.Sprintf( + "%s occurred while restarting job:\n%s", + english.Plural(len(es), "error", "errors"), + strings.Join(points, "\n"), + ) + return indentString(out, indent) + } +} + +// AllocationListStubWithJob combines an AllocationListStub with its +// corresponding job at the right version. +type AllocationListStubWithJob struct { + *api.AllocationListStub + Job *api.Job +} + +// HasTask returns true if the allocation has the given task in the specific +// job version it was created. +func (a *AllocationListStubWithJob) HasTask(name string) bool { + // Check task state first as it's the fastest and most reliable source. + if _, ok := a.TaskStates[name]; ok { + return true + } + + // But task states are only set when the client updates its allocations + // with the server, so they may not be available yet. Lookup the task in + // the job version as a fallback. + if a.Job == nil { + return false + } + + var taskGroup *api.TaskGroup + for _, tg := range a.Job.TaskGroups { + if tg.Name == nil || *tg.Name != a.TaskGroup { + continue + } + taskGroup = tg + } + if taskGroup == nil { + return false + } + + for _, task := range taskGroup.Tasks { + if task.Name == name { + return true + } + } + + return false +} + +// IsRunning returns true if the allocation's ClientStatus or DesiredStatus is +// running. +func (a *AllocationListStubWithJob) IsRunning() bool { + return a.ClientStatus == api.AllocClientStatusRunning || + a.DesiredStatus == api.AllocDesiredStatusRun +} diff --git a/command/job_restart_test.go b/command/job_restart_test.go new file mode 100644 index 000000000..d33bf879a --- /dev/null +++ b/command/job_restart_test.go @@ -0,0 +1,1591 @@ +package command + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/http/httputil" + neturl "net/url" + "regexp" + "sort" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" +) + +func TestJobRestartCommand_Implements(t *testing.T) { + ci.Parallel(t) + var _ cli.Command = &JobRestartCommand{} +} + +func TestJobRestartCommand_parseAndValidate(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + args []string + expectedErr string + expectedCmd *JobRestartCommand + }{ + { + name: "missing job", + args: []string{}, + expectedErr: "This command takes one argument", + }, + { + name: "too many args", + args: []string{"one", "two", "three"}, + expectedErr: "This command takes one argument", + }, + { + name: "tasks and groups", + args: []string{ + "-task", "my-task-1", "-task", "my-task-2", + "-group", "my-group-1", "-group", "my-group-2", + "my-job", + }, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + groups: set.From([]string{"my-group-1", "my-group-2"}), + tasks: set.From([]string{"my-task-1", "my-task-2"}), + batchSize: 1, + }, + }, + { + name: "all tasks", + args: []string{"-all-tasks", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + allTasks: true, + batchSize: 1, + }, + }, + { + name: "all tasks conflicts with task", + args: []string{"-all-tasks", "-task", "my-task", "-yes", "my-job"}, + expectedErr: "The -all-tasks option cannot be used with -task", + }, + { + name: "batch size as number", + args: []string{"-batch-size", "10", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 10, + }, + }, + { + name: "batch size as percentage", + args: []string{"-batch-size", "10%", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 10, + batchSizePercent: true, + }, + }, + { + name: "batch size not valid", + args: []string{"-batch-size", "not-valid", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size decimal not valid", + args: []string{"-batch-size", "1.5", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size zero", + args: []string{"-batch-size", "0", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size decimal percent not valid", + args: []string{"-batch-size", "1.5%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size zero percentage", + args: []string{"-batch-size", "0%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size with multiple numbers and percentages", + args: []string{"-batch-size", "15%10%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch wait ask", + args: []string{"-batch-wait", "ask", "my-job"}, + expectedErr: "terminal is not interactive", // Can't test non-interactive. + }, + { + name: "batch wait duration", + args: []string{"-batch-wait", "10s", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + batchWait: 10 * time.Second, + }, + }, + { + name: "batch wait invalid", + args: []string{"-batch-wait", "10", "my-job"}, + expectedErr: "Invalid -batch-wait value", + }, + { + name: "on error fail", + args: []string{"-on-error", "fail", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + onError: jobRestartOnErrorFail, + }, + }, + { + name: "on error invalid", + args: []string{"-on-error", "invalid", "my-job"}, + expectedErr: "Invalid -on-error value", + }, + { + name: "no shutdown delay", + args: []string{"-no-shutdown-delay", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + noShutdownDelay: true, + }, + }, + { + name: "reschedule", + args: []string{"-reschedule", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + reschedule: true, + }, + }, + { + name: "reschedule conflicts with task", + args: []string{"-reschedule", "-task", "my-task", "-yes", "my-job"}, + expectedErr: "The -reschedule option cannot be used with -task", + }, + { + name: "verbose", + args: []string{"-verbose", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + verbose: true, + length: fullId, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := &cli.ConcurrentUi{Ui: cli.NewMockUi()} + meta := Meta{Ui: ui} + + // Set some default values if not defined in test case. + if tc.expectedCmd != nil { + tc.expectedCmd.Meta = meta + + if tc.expectedCmd.length == 0 { + tc.expectedCmd.length = shortId + } + if tc.expectedCmd.groups == nil { + tc.expectedCmd.groups = set.New[string](0) + } + if tc.expectedCmd.tasks == nil { + tc.expectedCmd.tasks = set.New[string](0) + } + if tc.expectedCmd.onError == "" { + tc.expectedCmd.onError = jobRestartOnErrorAsk + tc.expectedCmd.autoYes = true + tc.args = append([]string{"-yes"}, tc.args...) + } + } + + cmd := &JobRestartCommand{Meta: meta} + code, err := cmd.parseAndValidate(tc.args) + + if tc.expectedErr != "" { + must.NonZero(t, code) + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + must.Zero(t, code) + must.Eq(t, tc.expectedCmd, cmd, must.Cmp(cmpopts.IgnoreFields(JobRestartCommand{}, "Meta", "Meta.Ui"))) + } + }) + } +} + +func TestJobRestartCommand_Run(t *testing.T) { + ci.Parallel(t) + + // Create a job with multiple tasks, groups, and allocations. + prestartTask := api.NewTask("prestart", "mock_driver"). + SetConfig("run_for", "100ms"). + SetConfig("exit_code", 0). + SetLifecycle(&api.TaskLifecycle{ + Hook: api.TaskLifecycleHookPrestart, + Sidecar: false, + }) + sidecarTask := api.NewTask("sidecar", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0). + SetLifecycle(&api.TaskLifecycle{ + Hook: api.TaskLifecycleHookPoststart, + Sidecar: true, + }) + mainTask := api.NewTask("main", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0) + + jobID := "test_job_restart_cmd" + job := api.NewServiceJob(jobID, jobID, "global", 1). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("single_task", 3). + AddTask(mainTask), + ). + AddTaskGroup( + api.NewTaskGroup("multiple_tasks", 2). + AddTask(prestartTask). + AddTask(sidecarTask). + AddTask(mainTask), + ) + + testCases := []struct { + name string + args []string // Job arg is added automatically. + expectedCode int + validateFn func(*testing.T, *api.Client, []*api.AllocationListStub, string, string) + }{ + { + name: "restart only running tasks in all groups by default", + args: []string{"-batch-size", "100%"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + }, + }, + { + name: "restart specific task in all groups", + args: []string{"-batch-size", "100%", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart multiple tasks in all groups", + args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart all tasks in all groups", + args: []string{"-batch-size", "100%", "-all-tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": true, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart running tasks in specific group", + args: []string{"-batch-size", "100%", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + }, + }, + { + name: "restart specific task that is not running", + args: []string{"-batch-size", "100%", "-task", "prestart"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + // Check that we have an error message. + must.StrContains(t, stderr, "Task not running") + }, + expectedCode: 1, + }, + { + name: "restart specific task in specific group", + args: []string{"-batch-size", "100%", "-task", "main", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart multiple tasks in specific group", + args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar", "-group", "multiple_tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") + must.Len(t, 2, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart all tasks in specific group", + args: []string{"-batch-size", "100%", "-all-tasks", "-group", "multiple_tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": true, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") + must.Len(t, 2, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart in batches", + args: []string{"-batch-size", "3", "-batch-wait", "3s", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations were properly batched. + batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") + + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + + must.Len(t, 2, batches[1]) + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + + // Check that we only waited between batches. + waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") + must.Eq(t, 1, waitMsgCount) + + // Check that batches waited the expected time. + batch1Restart := batches[0][0].TaskStates["main"].LastRestart + batch2Restart := batches[1][0].TaskStates["main"].LastRestart + diff := batch2Restart.Sub(batch1Restart) + must.Between(t, 3*time.Second, diff, 4*time.Second) + }, + }, + { + name: "restart in percent batch", + args: []string{"-batch-size", "50%", "-batch-wait", "3s", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations were properly batched. + batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") + + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + + must.Len(t, 2, batches[1]) + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + + // Check that we only waited between batches. + waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") + must.Eq(t, 1, waitMsgCount) + + // Check that batches waited the expected time. + batch1Restart := batches[0][0].TaskStates["main"].LastRestart + batch2Restart := batches[1][0].TaskStates["main"].LastRestart + diff := batch2Restart.Sub(batch1Restart) + must.Between(t, 3*time.Second, diff, 4*time.Second) + }, + }, + { + name: "restart in batch ask with yes", + args: []string{"-batch-size", "100%", "-batch-wait", "ask", "-yes", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "reschedule in batches", + args: []string{"-reschedule", "-batch-size", "3"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + // Expect all allocations were rescheduled. + reschedules := map[string]bool{} + for _, alloc := range allocs { + reschedules[alloc.ID] = true + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations were properly batched. + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + must.StrNotContains(t, stdout, "Waiting") + }, + }, + { + name: "reschedule specific group", + args: []string{"-reschedule", "-batch-size", "100%", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + // Expect that only allocs for the single_task group were + // rescheduled. + reschedules := map[string]bool{} + for _, alloc := range allocs { + if alloc.TaskGroup == "single_task" { + reschedules[alloc.ID] = true + } + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations restarted in a single batch. + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + // Run each test case in parallel because they are fairly slow. + ci.Parallel(t) + + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + // User separate cluster for each test case so they can run in + // parallel without affecting each other. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + + allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Fetch allocations before the restart so we know which ones are + // supposed to be affected in case the test reschedules allocs. + allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + + // Prepend server URL and append job ID to the test case command. + args := []string{"-address", url, "-yes"} + args = append(args, tc.args...) + args = append(args, jobID) + + // Run job restart command. + code = cmd.Run(args) + must.Eq(t, code, tc.expectedCode) + + // Run test case validation function. + if tc.validateFn != nil { + tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) + } + }) + } +} + +func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Create non-default namespace. + _, err := client.Namespaces().Register(&api.Namespace{Name: "prod"}, nil) + must.NoError(t, err) + + // Register job with same name in both namespaces. + evalIDs := []string{} + + jobDefault := testJob("test_job_restart") + resp, _, err := client.Jobs().Register(jobDefault, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + jobProd := testJob("test_job_restart") + jobProd.Namespace = pointer.Of("prod") + resp, _, err = client.Jobs().Register(jobProd, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + jobUniqueProd := testJob("test_job_restart_prod_ns") + jobUniqueProd.Namespace = pointer.Of("prod") + resp, _, err = client.Jobs().Register(jobUniqueProd, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + // Wait for evals to be processed. + for _, evalID := range evalIDs { + code := waitForSuccess(ui, client, fullId, t, evalID) + must.Eq(t, 0, code) + } + ui.OutputWriter.Reset() + + testCases := []struct { + name string + args []string + expectedErr string + }{ + { + name: "prefix match in default namespace", + args: []string{"test_job"}, + }, + { + name: "invalid job", + args: []string{"not-valid"}, + expectedErr: "No job(s) with prefix or ID", + }, + { + name: "prefix matches multiple jobs", + args: []string{"-namespace", "prod", "test_job"}, + expectedErr: "matched multiple jobs", + }, + { + name: "prefix matches multiple jobs across namespaces", + args: []string{"-namespace", "*", "test_job"}, + expectedErr: "matched multiple jobs", + }, + { + name: "unique prefix match across namespaces", + args: []string{"-namespace", "*", "test_job_restart_prod"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer func() { + ui.OutputWriter.Reset() + ui.ErrorWriter.Reset() + }() + + cmd := &JobRestartCommand{ + Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, + } + args := append([]string{"-address", url, "-yes"}, tc.args...) + code := cmd.Run(args) + + if tc.expectedErr != "" { + must.NonZero(t, code) + must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) + } else { + must.Zero(t, code) + } + }) + } +} + +func TestJobRestartCommand_noAllocs(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job with impossible constraint so it doesn't get allocs. + jobID := "test_job_restart_no_allocs" + job := testJob(jobID) + job.Datacenters = []string{"invalid"} + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Eq(t, 2, code) // Placement is expected to fail so exit code is not 0. + ui.OutputWriter.Reset() + + // Run job restart command and expect it to exit without restarts. + code = cmd.Run([]string{ + "-address", url, + "-yes", + jobID, + }) + must.Zero(t, code) + must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart") +} + +func TestJobRestartCommand_rescheduleFail(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job with 3 allocs. + jobID := "test_job_restart_reschedule_fail" + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(3) + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Wait for allocs to be running. + allocs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Mark node as ineligible to prevent allocs from being replaced. + nodeID := srv.Agent.Client().NodeID() + client.Nodes().ToggleEligibility(nodeID, false, nil) + + // Run job restart command and expect it to fail. + code = cmd.Run([]string{ + "-address", url, + "-batch-size", "2", + "-reschedule", + "-yes", + jobID, + }) + must.One(t, code) + must.StrContains(t, ui.ErrorWriter.String(), "No nodes were eligible for evaluation") +} + +func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + srv, client, _ := testServer(t, true, nil) + defer srv.Shutdown() + waitForNodes(t, client) + + // Register test job and update it twice so we end up with three + // allocations, one replacing the next one. + jobID := "test_job_restart_monitor_replacement" + job := testJob(jobID) + + for i := 1; i <= 3; i++ { + job.TaskGroups[0].Tasks[0].Config["run_for"] = fmt.Sprintf("%ds", i) + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + } + ui.OutputWriter.Reset() + + // Prepare the command internals. We want to run a specific function and + // target a specific allocation, so we can't run the full command. + cmd.client = client + cmd.verbose = true + cmd.length = fullId + + // Fetch, sort, and monitor the oldest allocation. + allocs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + sort.Slice(allocs, func(i, j int) bool { + return allocs[i].CreateIndex < allocs[j].CreateIndex + }) + + errCh := make(chan error) + go cmd.monitorReplacementAlloc(context.Background(), AllocationListStubWithJob{ + AllocationListStub: allocs[0], + Job: job, + }, errCh) + + // Make sure the command doesn't get stuck and that we traverse the + // follow-up allocations properly. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + select { + case err := <-errCh: + return err + default: + return fmt.Errorf("waiting for response") + } + }), + wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), + )) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[0].ID, allocs[1].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[1].ID, allocs[2].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q is %q", allocs[2].ID, api.AllocClientStatusRunning)) +} + +func TestJobRestartCommand_activeDeployment(t *testing.T) { + ci.Parallel(t) + + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + waitForNodes(t, client) + + // Register test job and update it once to trigger a deployment. + jobID := "test_job_restart_deployment" + job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) + job.Update = &api.UpdateStrategy{ + Canary: pointer.Of(1), + AutoPromote: pointer.Of(false), + } + + _, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + _, _, err = client.Jobs().Register(job, nil) + must.NoError(t, err) + + // Wait for a deployment to be running. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + deployments, _, err := client.Jobs().Deployments(jobID, true, nil) + if err != nil { + return err + } + for _, d := range deployments { + if d.Status == api.DeploymentStatusRunning { + return nil + } + } + return fmt.Errorf("no running deployments") + }), + wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), + )) + + // Run job restart command and expect it to fail. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + code := cmd.Run([]string{ + "-address", url, + "-on-error", jobRestartOnErrorFail, + "-verbose", + jobID, + }) + must.One(t, code) + must.RegexMatch(t, regexp.MustCompile(`Deployment .+ is "running"`), ui.ErrorWriter.String()) +} + +func TestJobRestartCommand_ACL(t *testing.T) { + ci.Parallel(t) + + // Start server with ACL enabled. + srv, client, url := testServer(t, true, func(c *agent.Config) { + c.ACL.Enabled = true + }) + defer srv.Shutdown() + + rootTokenOpts := &api.WriteOptions{ + AuthToken: srv.RootToken.SecretID, + } + + // Register test job. + jobID := "test_job_restart_acl" + job := testJob(jobID) + _, _, err := client.Jobs().Register(job, rootTokenOpts) + must.NoError(t, err) + + // Wait for allocs to be running. + waitForJobAllocsStatus(t, client, jobID, api.AllocClientStatusRunning, srv.RootToken.SecretID) + + testCases := []struct { + name string + jobPrefix bool + aclPolicy string + expectedErr string + }{ + { + name: "no token", + aclPolicy: "", + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "alloc-lifecycle not enough", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle"] +} +`, + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "read-job not enough", + aclPolicy: ` +namespace "default" { + capabilities = ["read-job"] +} +`, + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "alloc-lifecycle and read-job allowed", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle", "read-job"] +} +`, + }, + { + name: "job prefix requires list-jobs", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle", "read-job"] +} +`, + jobPrefix: true, + expectedErr: "job not found", + }, + { + name: "job prefix works with list-jobs", + aclPolicy: ` +namespace "default" { + capabilities = ["list-jobs", "alloc-lifecycle", "read-job"] +} +`, + jobPrefix: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + args := []string{ + "-address", url, + "-yes", + } + + if tc.aclPolicy != "" { + // Create ACL token with test case policy. + policy := &api.ACLPolicy{ + Name: nonAlphaNum.ReplaceAllString(tc.name, "-"), + Rules: tc.aclPolicy, + } + _, err := client.ACLPolicies().Upsert(policy, rootTokenOpts) + must.NoError(t, err) + + token := &api.ACLToken{ + Type: "client", + Policies: []string{policy.Name}, + } + token, _, err = client.ACLTokens().Create(token, rootTokenOpts) + must.NoError(t, err) + + // Set token in command args. + args = append(args, "-token", token.SecretID) + } + + // Add job ID or job ID prefix to the command. + if tc.jobPrefix { + args = append(args, jobID[0:3]) + } else { + args = append(args, jobID) + } + + // Run command. + code := cmd.Run(args) + if tc.expectedErr == "" { + must.Zero(t, code) + } else { + must.One(t, code) + must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) + } + }) + } +} + +// TODO(luiz): update once alloc restart supports -no-shutdown-delay. +func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { + ci.Parallel(t) + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + testCases := []struct { + name string + args []string + shutdownDelay bool + }{ + { + name: "job reschedule with shutdown delay by default", + args: []string{"-reschedule"}, + shutdownDelay: true, + }, + { + name: "job reschedule no shutdown delay", + args: []string{"-reschedule", "-no-shutdown-delay"}, + shutdownDelay: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Register job with 2 allocations and shutdown_delay. + shutdownDelay := 3 * time.Second + jobID := nonAlphaNum.ReplaceAllString(tc.name, "-") + + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(2) + job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m" + job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay + job.TaskGroups[0].Tasks[0].Services = []*api.Service{{ + Name: "service", + Provider: "nomad", + }} + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Wait for alloc to be running. + allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Add address and job ID to the command and run. + args := []string{ + "-address", url, + "-batch-size", "1", + "-batch-wait", "0", + "-yes", + } + args = append(args, tc.args...) + args = append(args, jobID) + + code = cmd.Run(args) + must.Zero(t, code) + + // Wait for all allocs to restart. + reschedules := map[string]bool{} + for _, alloc := range allocStubs { + reschedules[alloc.ID] = true + } + allocs := waitAllocsRescheduled(t, client, reschedules) + + // Check that allocs have shutdown delay event. + for _, alloc := range allocs { + for _, s := range alloc.TaskStates { + var killedEv *api.TaskEvent + var killingEv *api.TaskEvent + for _, ev := range s.Events { + if strings.Contains(ev.Type, "Killed") { + killedEv = ev + } + if strings.Contains(ev.Type, "Killing") { + killingEv = ev + } + } + + diff := killedEv.Time - killingEv.Time + if tc.shutdownDelay { + must.GreaterEq(t, shutdownDelay, time.Duration(diff)) + } else { + // Add a bit of slack to account for the actual + // shutdown time of the task. + must.Between(t, shutdownDelay, time.Duration(diff), shutdownDelay+time.Second) + } + } + } + }) + } +} + +func TestJobRestartCommand_filterAllocs(t *testing.T) { + ci.Parallel(t) + + task1 := api.NewTask("task_1", "mock_driver") + task2 := api.NewTask("task_2", "mock_driver") + task3 := api.NewTask("task_3", "mock_driver") + + jobV1 := api.NewServiceJob("example", "example", "global", 1). + AddTaskGroup( + api.NewTaskGroup("group_1", 1). + AddTask(task1), + ). + AddTaskGroup( + api.NewTaskGroup("group_2", 1). + AddTask(task1). + AddTask(task2), + ). + AddTaskGroup( + api.NewTaskGroup("group_3", 1). + AddTask(task3), + ) + jobV1.Version = pointer.Of(uint64(1)) + + jobV2 := api.NewServiceJob("example", "example", "global", 1). + AddTaskGroup( + api.NewTaskGroup("group_1", 1). + AddTask(task1), + ). + AddTaskGroup( + api.NewTaskGroup("group_2", 1). + AddTask(task2), + ) + jobV2.Version = pointer.Of(uint64(2)) + + allAllocs := []AllocationListStubWithJob{} + allocs := map[string]AllocationListStubWithJob{} + for _, job := range []*api.Job{jobV1, jobV2} { + for _, tg := range job.TaskGroups { + for _, desired := range []string{api.AllocDesiredStatusRun, api.AllocDesiredStatusStop} { + for _, client := range []string{api.AllocClientStatusRunning, api.AllocClientStatusComplete} { + key := fmt.Sprintf("job_v%d_%s_%s_%s", *job.Version, *tg.Name, desired, client) + alloc := AllocationListStubWithJob{ + AllocationListStub: &api.AllocationListStub{ + ID: key, + JobVersion: *job.Version, + TaskGroup: *tg.Name, + DesiredStatus: desired, + ClientStatus: client, + }, + Job: job, + } + allocs[key] = alloc + allAllocs = append(allAllocs, alloc) + } + } + } + } + + testCases := []struct { + name string + args []string + expectedAllocs []AllocationListStubWithJob + }{ + { + name: "skip by group", + args: []string{"-group", "group_1"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_1_run_running"], + allocs["job_v1_group_1_run_complete"], + allocs["job_v1_group_1_stop_running"], + allocs["job_v2_group_1_run_running"], + allocs["job_v2_group_1_run_complete"], + allocs["job_v2_group_1_stop_running"], + }, + }, + { + name: "skip by old group", + args: []string{"-group", "group_3"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + }, + }, + { + name: "skip by task", + args: []string{"-task", "task_2"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "skip by old task", + args: []string{"-task", "task_3"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + }, + }, + { + name: "skip by group and task", + args: []string{ + "-group", "group_1", + "-group", "group_2", + "-task", "task_2", + }, + // Only group_2 has task_2 in all job versions. + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "skip by status", + args: []string{}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_1_run_running"], + allocs["job_v1_group_1_run_complete"], + allocs["job_v1_group_1_stop_running"], + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + allocs["job_v2_group_1_run_running"], + allocs["job_v2_group_1_run_complete"], + allocs["job_v2_group_1_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "no matches by group", + args: []string{"-group", "group_404"}, + expectedAllocs: []AllocationListStubWithJob{}, + }, + { + name: "no matches by task", + args: []string{"-task", "task_404"}, + expectedAllocs: []AllocationListStubWithJob{}, + }, + { + name: "no matches by task with group", + args: []string{ + "-group", "group_1", + "-task", "task_2", // group_1 never has task_2. + }, + expectedAllocs: []AllocationListStubWithJob{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{ + Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, + } + + args := append(tc.args, "-verbose", "-yes", "example") + code, err := cmd.parseAndValidate(args) + must.NoError(t, err) + must.Zero(t, code) + + got := cmd.filterAllocs(allAllocs) + must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool { + return a.ID == b.ID + }) + + expected := set.FromFunc(tc.expectedAllocs, func(a AllocationListStubWithJob) string { + return a.ID + }) + for _, a := range allAllocs { + if !expected.Contains(a.ID) { + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("Skipping allocation %q", a.ID)) + } + } + }) + } +} + +func TestJobRestartCommand_onErrorFail(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + parsedURL, err := neturl.Parse(url) + must.NoError(t, err) + + waitForNodes(t, client) + + // Register a job with 3 allocations. + jobID := "test_job_restart_command_fail_on_error" + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(3) + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Create a proxy to inject an error after 2 allocation restarts. + // Also counts how many restart requests are made so we can check that the + // command stops after the error happens. + var allocRestarts int32 + proxy := httptest.NewServer(&httputil.ReverseProxy{ + ModifyResponse: func(resp *http.Response) error { + if strings.HasSuffix(resp.Request.URL.Path, "/restart") { + count := atomic.AddInt32(&allocRestarts, 1) + if count == 2 { + return fmt.Errorf("fail") + } + } + return nil + }, + Rewrite: func(r *httputil.ProxyRequest) { + r.SetURL(parsedURL) + }, + }) + defer proxy.Close() + + // Run command with -fail-on-error. + // Expect only 2 restarts requests even though there are 3 allocations. + code = cmd.Run([]string{ + "-address", proxy.URL, + "-on-error", jobRestartOnErrorFail, + jobID, + }) + must.One(t, code) + must.Eq(t, 2, allocRestarts) +} + +// waitTasksRestarted blocks until the given allocations have restarted or not. +// Returns a list with updated state of the allocations. +// +// To determine if a restart happened the function looks for a "Restart +// Signaled" event in the list of task events. Allocations that are reused +// between tests may contain a restart event from a past test case, leading to +// false positives. +// +// The restarts map contains values structured as group:task:. +func waitTasksRestarted( + t *testing.T, + client *api.Client, + allocs []*api.AllocationListStub, + restarts map[string]map[string]bool, +) []*api.Allocation { + t.Helper() + + var newAllocs []*api.Allocation + testutil.WaitForResult(func() (bool, error) { + newAllocs = make([]*api.Allocation, 0, len(allocs)) + + for _, alloc := range allocs { + if _, ok := restarts[alloc.TaskGroup]; !ok { + t.Fatalf("Missing group %q in restarts map", alloc.TaskGroup) + } + + // Skip allocations that are not supposed to be running. + if alloc.DesiredStatus != api.AllocDesiredStatusRun { + continue + } + + updated, _, err := client.Allocations().Info(alloc.ID, nil) + if err != nil { + return false, err + } + newAllocs = append(newAllocs, updated) + + for task, state := range updated.TaskStates { + restarted := false + for _, ev := range state.Events { + if ev.Type == api.TaskRestartSignal { + restarted = true + break + } + } + + if restarted && !restarts[updated.TaskGroup][task] { + return false, fmt.Errorf( + "task %q in alloc %s for group %q not expected to restart", + task, updated.ID, updated.TaskGroup, + ) + } + if !restarted && restarts[updated.TaskGroup][task] { + return false, fmt.Errorf( + "task %q in alloc %s for group %q expected to restart but didn't", + task, updated.ID, updated.TaskGroup, + ) + } + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + return newAllocs +} + +// waitAllocsRescheduled blocks until the given allocations have been +// rescueduled or not. Returns a list with updated state of the allocations. +// +// To determined if an allocation has been rescheduled the function looks for +// a non-empty NextAllocation field. +// +// The reschedules map maps allocation IDs to a boolean indicating if a +// reschedule is expected for that allocation. +func waitAllocsRescheduled(t *testing.T, client *api.Client, reschedules map[string]bool) []*api.Allocation { + t.Helper() + + var newAllocs []*api.Allocation + testutil.WaitForResult(func() (bool, error) { + newAllocs = make([]*api.Allocation, 0, len(reschedules)) + + for allocID, reschedule := range reschedules { + alloc, _, err := client.Allocations().Info(allocID, nil) + if err != nil { + return false, err + } + newAllocs = append(newAllocs, alloc) + + wasRescheduled := alloc.NextAllocation != "" + if wasRescheduled && !reschedule { + return false, fmt.Errorf("alloc %s not expected to be rescheduled", alloc.ID) + } + if !wasRescheduled && reschedule { + return false, fmt.Errorf("alloc %s expected to be rescheduled but wasn't", alloc.ID) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + return newAllocs +} + +// getRestartBatches returns a list of allocations per batch of restarts. +// +// Since restarts are issued concurrently, it's expected that allocations in +// the same batch have fairly close LastRestart times, so a 1s delay between +// restarts may be enough to indicate a new batch. +func getRestartBatches(allocs []*api.Allocation, groups []string, task string) [][]*api.Allocation { + groupsSet := set.From(groups) + batches := [][]*api.Allocation{} + + type allocRestart struct { + alloc *api.Allocation + restart time.Time + } + + restarts := make([]allocRestart, 0, len(allocs)) + for _, alloc := range allocs { + if !groupsSet.Contains(alloc.TaskGroup) { + continue + } + + restarts = append(restarts, allocRestart{ + alloc: alloc, + restart: alloc.TaskStates[task].LastRestart, + }) + } + + sort.Slice(restarts, func(i, j int) bool { + return restarts[i].restart.Before(restarts[j].restart) + }) + + prev := restarts[0].restart + batch := []*api.Allocation{} + for _, r := range restarts { + if r.restart.Sub(prev) >= time.Second { + prev = r.restart + batches = append(batches, batch) + batch = []*api.Allocation{} + } + batch = append(batch, r.alloc) + } + batches = append(batches, batch) + + return batches +} diff --git a/command/meta.go b/command/meta.go index 06aa63e22..b8eac1900 100644 --- a/command/meta.go +++ b/command/meta.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "os" + "reflect" "strings" "github.com/hashicorp/nomad/api" @@ -176,7 +177,35 @@ func (m *Meta) allNamespaces() bool { } func (m *Meta) Colorize() *colorstring.Colorize { - _, coloredUi := m.Ui.(*cli.ColoredUi) + ui := m.Ui + coloredUi := false + + // Meta.Ui may wrap other cli.Ui instances, so unwrap them until we find a + // *cli.ColoredUi or there is nothing left to unwrap. + for { + if ui == nil { + break + } + + _, coloredUi = ui.(*cli.ColoredUi) + if coloredUi { + break + } + + v := reflect.ValueOf(ui) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + if !v.Field(i).CanInterface() { + continue + } + ui, _ = v.Field(i).Interface().(cli.Ui) + if ui != nil { + break + } + } + } return &colorstring.Colorize{ Colors: colorstring.DefaultColors, diff --git a/command/testing_test.go b/command/testing_test.go index b1ffc23fd..6f8ca0787 100644 --- a/command/testing_test.go +++ b/command/testing_test.go @@ -152,21 +152,48 @@ func waitForNodes(t *testing.T, client *api.Client) { }) } -func waitForAllocRunning(t *testing.T, client *api.Client, allocID string) { +func waitForJobAllocsStatus(t *testing.T, client *api.Client, jobID string, status string, token string) { + testutil.WaitForResult(func() (bool, error) { + q := &api.QueryOptions{AuthToken: token} + + allocs, _, err := client.Jobs().Allocations(jobID, true, q) + if err != nil { + return false, fmt.Errorf("failed to query job allocs: %v", err) + } + if len(allocs) == 0 { + return false, fmt.Errorf("no allocs") + } + + for _, alloc := range allocs { + if alloc.ClientStatus != status { + return false, fmt.Errorf("alloc status is %q not %q", alloc.ClientStatus, status) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + +func waitForAllocStatus(t *testing.T, client *api.Client, allocID string, status string) { testutil.WaitForResult(func() (bool, error) { alloc, _, err := client.Allocations().Info(allocID, nil) if err != nil { return false, err } - if alloc.ClientStatus == api.AllocClientStatusRunning { + if alloc.ClientStatus == status { return true, nil } - return false, fmt.Errorf("alloc status: %s", alloc.ClientStatus) + return false, fmt.Errorf("alloc status is %q not %q", alloc.ClientStatus, status) }, func(err error) { - t.Fatalf("timed out waiting for alloc to be running: %v", err) + must.NoError(t, err) }) } +func waitForAllocRunning(t *testing.T, client *api.Client, allocID string) { + waitForAllocStatus(t, client, allocID, api.AllocClientStatusRunning) +} + func waitForCheckStatus(t *testing.T, client *api.Client, allocID, status string) { testutil.WaitForResult(func() (bool, error) { results, err := client.Allocations().Checks(allocID, nil) diff --git a/website/content/docs/commands/job/restart.mdx b/website/content/docs/commands/job/restart.mdx new file mode 100644 index 000000000..601fce1bf --- /dev/null +++ b/website/content/docs/commands/job/restart.mdx @@ -0,0 +1,234 @@ +--- +layout: docs +page_title: 'Commands: job restart' +description: | + The job restart command is used to restart allocations for a job. +--- + +# Command: job restart + +The `job restart` command is used to restart or reschedule allocations for a +particular job. + +Restarting the job calls the [Restart Allocation][api_alloc_restart] API +endpoint to restart the tasks inside allocations, so the allocations themselves +are not modified but rather restarted in-place. + +Rescheduling the job uses the [Stop Allocation][api_alloc_stop] API endpoint to +stop the allocations and trigger the Nomad scheduler to compute new placements. +This may cause the new allocations to be scheduled in different clients from +the originals. + +## Usage + +```plaintext +nomad job restart [options] +``` + +The `job restart` command requires a single argument, specifying the job ID to +restart. + +The command can operate in batches and wait until all restarted or +rescheduled allocations are running again before proceeding to the next batch. +It is also possible to specify additional time to wait between batches. + +Allocations can be restarted in-place or rescheduled. When restarting +in-place the command may target specific tasks in the allocations, restart +only tasks that are currently running, or restart all tasks, even the ones +that have already run. Allocations can also be targeted by groups and tasks. +When both groups and tasks are defined only the tasks for the allocations of +those groups are restarted. + +When rescheduling, the current allocations are stopped triggering the Nomad +scheduler to create replacement allocations that may be placed in different +clients. The command waits until the new allocations have client status `ready` +before proceeding with the remaining batches. Services health checks are not +taken into account. + +By default the command restarts all running tasks in-place with one allocation +per batch. + +When ACLs are enabled, this command requires a token with the +`alloc-lifecycle` and `read-job` capabilities for the job's namespace. The +`list-jobs` capability is required to run the command with a job prefix instead +of the exact job ID. + +## General Options + +@include 'general_options.mdx' + +## Restart Options + +- `-all-tasks`: If set, all tasks in the allocations are restarted, even the + ones that have already run, such as non-sidecar tasks. Tasks will restart + following their [`lifecycle`][] order. This option cannot be used with + `-task`. + +- `-batch-size=`: Number of allocations to restart at once. It may be + defined as a percentage value of the current number of running allocations. + Percentage values are rounded up to increase parallelism. Defaults to `1`. + +- `-batch-wait=`: Time to wait between restart batches. If set + to `ask` the command halts between batches and waits for user input on how to + proceed. If the answer is a time duration all remaining batches will use this + new value. Defaults to `0`. + +- `-group=`: Only restart allocations for the given group. Can be + specified multiple times. If no group is set all allocations for the job are + restarted. + +- `-no-shutdown-delay`: Ignore the group and task [`shutdown_delay`][] + configuration so there is no delay between service deregistration and task + shutdown or restart. Note that using this flag will result in failed network + connections to the allocation being restarted. + +- `-reschedule`: If set, allocations are stopped and rescheduled instead of + restarted in-place. Since the group is not modified the restart does not + create a new deployment, and so values defined in [`update`][] blocks, such + as [`max_parallel`][], are not taken into account. This option cannot be used + with `-task`. + +- `-on-error=`: Determines what action to take when an error happens + during a restart batch. If `ask` the command stops and waits for user + confirmation on how to proceed. If `fail` the command exits immediately. + Defaults to `ask`. + +- `-task=`: Specify the task to restart. Can be specified multiple + times. If groups are also specified the task must exist in at least one of + them. If no task is set only tasks that are currently running are restarted. + For example, non-sidecar tasks that already ran are not restarted unless + `-all-tasks` is used instead. This option cannot be used with `-all-tasks` or + `-reschedule`. + +- `-yes`: Automatic yes to prompts. If set, the command automatically restarts + multi-region jobs only in the region targeted by the command, ignores batch + errors, and automatically proceeds with the remaining batches without + waiting. Use `-on-error` and `-batch-wait` to adjust these behaviors. + + +- `-verbose`: Display full information. + +## Examples + +Restart running tasks of all allocations. + +```shell-session +$ nomad job restart example +==> 2023-02-28T17:36:31-05:00: Restarting 5 allocations + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "388129e0" for group "web" + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T17:36:32-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" + 2023-02-28T17:36:32-05:00: Restarting running tasks in allocation "d4303a30" for group "web" +==> 2023-02-28T17:36:32-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Target allocations of a specific group to restart. + +```shell-session +$ nomad job restart -group=web example +==> 2023-02-28T17:37:36-05:00: Restarting 2 allocations + 2023-02-28T17:37:36-05:00: Restarting running tasks in allocation "388129e0" for group "web" + 2023-02-28T17:37:37-05:00: Restarting running tasks in allocation "d4303a30" for group "web" +==> 2023-02-28T17:37:37-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Reschedule allocations instead of restarting them in-place. + +```shell-session +❯ nomad job restart -group=web -reschedule example +==> 2023-02-28T17:39:14-05:00: Restarting 2 allocations + 2023-02-28T17:39:14-05:00: Rescheduling allocation "388129e0" for group "web" + 2023-02-28T17:39:45-05:00: Rescheduling allocation "d4303a30" for group "web" +==> 2023-02-28T17:40:16-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Batch allocations to restart them 2 at a time. + +```shell-session +$ nomad job restart -batch-size=2 example +==> 2023-02-28T17:40:58-05:00: Restarting 5 allocations +==> 2023-02-28T17:40:58-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "653f983e" for group "web" + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "4d18e545" for group "web" +==> 2023-02-28T17:40:58-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" +==> 2023-02-28T17:40:59-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T17:40:59-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T17:40:59-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Batch allocations as a percentage of total running allocations. + +```shell-session +$ nomad job restart -batch-size=50% example +==> 2023-02-28T18:52:47-05:00: Restarting 5 allocations +==> 2023-02-28T18:52:47-05:00: Restarting 1st batch of 3 allocations + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "d28f6f60" for group "proxy" + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "b931b496" for group "proxy" + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "18673b40" for group "proxy" +==> 2023-02-28T18:52:48-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:52:48-05:00: Restarting running tasks in allocation "439b1632" for group "web" + 2023-02-28T18:52:48-05:00: Restarting running tasks in allocation "8fae60f6" for group "web" +==> 2023-02-28T18:52:48-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Pause between batches of restart and wait for user input on how to proceed. + +```shell-session +$ nomad job restart -batch-size=2 -batch-wait=ask example +==> 2023-02-28T18:04:19-05:00: Restarting 5 allocations +==> 2023-02-28T18:04:19-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T18:04:19-05:00: Restarting running tasks in allocation "4d18e545" for group "web" + 2023-02-28T18:04:19-05:00: Restarting running tasks in allocation "653f983e" for group "web" +==> 2023-02-28T18:04:19-05:00: Proceed with next batch? [Y/n/] y +==> 2023-02-28T18:04:20-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:04:20-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T18:04:20-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" +==> 2023-02-28T18:04:20-05:00: Proceed with next batch? [Y/n/] 10s +==> 2023-02-28T18:04:22-05:00: Proceeding restarts with new wait time of 10s +==> 2023-02-28T18:04:22-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:04:32-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T18:04:32-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T18:04:32-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Wait 10 seconds before each restart batch. + +```shell-session +$ nomad job restart -batch-size=2 -batch-wait=10s example +==> 2023-02-28T18:03:43-05:00: Restarting 5 allocations +==> 2023-02-28T18:03:43-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T18:03:43-05:00: Restarting running tasks in allocation "653f983e" for group "web" + 2023-02-28T18:03:43-05:00: Restarting running tasks in allocation "4d18e545" for group "web" +==> 2023-02-28T18:03:43-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:03:53-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:03:53-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T18:03:53-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" +==> 2023-02-28T18:03:53-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:04:03-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T18:04:03-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T18:04:03-05:00: Finished job restart + +All allocations restarted successfully! +``` + +[`lifecycle`]: /nomad/docs/job-specification/lifecycle +[`max_parallel`]: /nomad/docs/job-specification/update#max_parallel +[`shutdown_delay`]: /nomad/docs/job-specification/task#shutdown_delay +[`update`]: /nomad/docs/job-specification/update +[api_alloc_restart]: /nomad/api-docs/allocations#restart-allocation +[api_alloc_stop]: /nomad/api-docs/allocations#stop-allocation diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index e2d2a7e86..2b38a16c4 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -542,6 +542,10 @@ "title": "promote", "path": "commands/job/promote" }, + { + "title": "restart", + "path": "commands/job/restart" + }, { "title": "revert", "path": "commands/job/revert"