open-nomad/command/job_restart.go

1244 lines
36 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package command
import (
"context"
"errors"
"fmt"
"math"
"os"
"os/signal"
"regexp"
"strconv"
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize/english"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
const (
// jobRestartTimestampPrefixLength is the number of characters in the
// "==> [timestamp]: " string that prefixes most of the outputs of this
// command.
jobRestartTimestampPrefixLength = 31
// jobRestartBatchWaitAsk is the special token used to indicate that the
// command should ask user for confirmation between batches.
jobRestartBatchWaitAsk = "ask"
// jobRestartOnErrorFail is the special token used to indicate that the
// command should exit when a batch has errors.
jobRestartOnErrorFail = "fail"
// jobRestartOnErrorAks is the special token used to indicate that the
// command should ask user for confirmation when a batch has errors.
jobRestartOnErrorAsk = "ask"
)
var (
// jobRestartBatchSizeValueRegex validates that the value passed to
// -batch-size is an integer optionally followed by a % sign.
//
// Use ^...$ to make sure we're matching over the entire input to avoid
// partial matches such as 10%20%.
jobRestartBatchSizeValueRegex = regexp.MustCompile(`^(\d+)%?$`)
)
// ErrJobRestartPlacementFailure is an error that indicates a placement failure
type ErrJobRestartPlacementFailure struct {
EvalID string
TaskGroup string
Failures *api.AllocationMetric
}
func (e ErrJobRestartPlacementFailure) Error() string {
return fmt.Sprintf("Evaluation %q has placement failures for group %q:\n%s",
e.EvalID,
e.TaskGroup,
formatAllocMetrics(e.Failures, false, strings.Repeat(" ", 4)),
)
}
func (e ErrJobRestartPlacementFailure) Is(err error) bool {
_, ok := err.(ErrJobRestartPlacementFailure)
return ok
}
// JobRestartCommand is the implementation for the command that restarts a job.
type JobRestartCommand struct {
Meta
// client is the Nomad API client shared by all functions in the command to
// reuse the same connection.
client *api.Client
// Configuration values read and parsed from command flags and args.
allTasks bool
autoYes bool
batchSize int
batchSizePercent bool
batchWait time.Duration
batchWaitAsk bool
groups *set.Set[string]
jobID string
noShutdownDelay bool
onError string
reschedule bool
tasks *set.Set[string]
verbose bool
length int
// canceled is set to true when the user gives a negative answer to any of
// the questions.
canceled bool
// sigsCh is used to subscribe to signals from the operating system.
sigsCh chan os.Signal
}
func (c *JobRestartCommand) Help() string {
helpText := `
Usage: nomad job restart [options] <job>
Restart or reschedule allocations for a particular job.
Restarting the job calls the 'Restart Allocation' API endpoint to restart the
tasks inside allocations, so the allocations themselves are not modified but
rather restarted in-place.
Rescheduling the job uses the 'Stop Allocation' API endpoint to stop the
allocations and trigger the Nomad scheduler to compute new placements. This
may cause the new allocations to be scheduled in different clients from the
originals.
This command can operate in batches and it waits until all restarted or
rescheduled allocations are running again before proceeding to the next
batch. It is also possible to specify additional time to wait between
batches.
Allocations can be restarted in-place or rescheduled. When restarting
in-place the command may target specific tasks in the allocations, restart
only tasks that are currently running, or restart all tasks, even the ones
that have already run. Allocations can also be targeted by group. When both
groups and tasks are defined only the tasks for the allocations of those
groups are restarted.
When rescheduling, the current allocations are stopped triggering the Nomad
scheduler to create replacement allocations that may be placed in different
clients. The command waits until the new allocations have client status
'ready' before proceeding with the remaining batches. Services health checks
are not taken into account.
By default the command restarts all running tasks in-place with one
allocation per batch.
When ACLs are enabled, this command requires a token with the
'alloc-lifecycle' and 'read-job' capabilities for the job's namespace. The
'list-jobs' capability is required to run the command with a job prefix
instead of the exact job ID.
General Options:
` + generalOptionsUsage(usageOptsDefault) + `
Restart Options:
-all-tasks
If set, all tasks in the allocations are restarted, even the ones that
have already run, such as non-sidecar tasks. Tasks will restart following
their lifecycle order. This option cannot be used with '-task'.
-batch-size=<n|n%>
Number of allocations to restart at once. It may be defined as a percentage
value of the current number of running allocations. Percentage values are
rounded up to increase parallelism. Defaults to 1.
-batch-wait=<duration|'ask'>
Time to wait between restart batches. If set to 'ask' the command halts
between batches and waits for user input on how to proceed. If the answer
is a time duration all remaining batches will use this new value. Defaults
to 0.
-group=<group-name>
Only restart allocations for the given group. Can be specified multiple
times. If no group is set all allocations for the job are restarted.
-no-shutdown-delay
Ignore the group and task 'shutdown_delay' configuration so there is no
delay between service deregistration and task shutdown or restart. Note
that using this flag will result in failed network connections to the
allocation being restarted.
-on-error=<'ask'|'fail'>
Determines what action to take when an error happens during a restart
batch. If 'ask' the command stops and waits for user confirmation on how to
proceed. If 'fail' the command exits immediately. Defaults to 'ask'.
-reschedule
If set, allocations are stopped and rescheduled instead of restarted
in-place. Since the group is not modified the restart does not create a new
deployment, and so values defined in 'update' blocks, such as
'max_parallel', are not taken into account. This option cannot be used with
'-task'. Only jobs of type 'batch', 'service', and 'system' can be
rescheduled.
-task=<task-name>
Specify the task to restart. Can be specified multiple times. If groups are
also specified the task must exist in at least one of them. If no task is
set only tasks that are currently running are restarted. For example,
non-sidecar tasks that already ran are not restarted unless '-all-tasks' is
used instead. This option cannot be used with '-all-tasks' or
'-reschedule'.
-yes
Automatic yes to prompts. If set, the command automatically restarts
multi-region jobs only in the region targeted by the command, ignores batch
errors, and automatically proceeds with the remaining batches without
waiting. Use '-on-error' and '-batch-wait' to adjust these behaviors.
-verbose
Display full information.
`
return strings.TrimSpace(helpText)
}
func (c *JobRestartCommand) Synopsis() string {
return "Restart or reschedule allocations for a job"
}
func (c *JobRestartCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-all-tasks": complete.PredictNothing,
"-batch-size": complete.PredictAnything,
"-batch-wait": complete.PredictAnything,
"-no-shutdown-delay": complete.PredictNothing,
"-on-error": complete.PredictSet(jobRestartOnErrorAsk, jobRestartOnErrorFail),
"-reschedule": complete.PredictNothing,
"-task": complete.PredictAnything,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}
func (c *JobRestartCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}
func (c *JobRestartCommand) Name() string { return "job restart" }
func (c *JobRestartCommand) Run(args []string) int {
// Parse and validate command line arguments.
code, err := c.parseAndValidate(args)
if err != nil {
c.Ui.Error(err.Error())
c.Ui.Error(commandErrorText(c))
return code
}
if code != 0 {
return code
}
c.client, err = c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %v", err))
return 1
}
// Use prefix matching to find job.
job, err := c.JobByPrefix(c.client, c.jobID, nil)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
c.jobID = *job.ID
if job.Namespace != nil {
c.client.SetNamespace(*job.Namespace)
}
// Handle SIGINT to prevent accidental cancellations of the long-lived
// restart loop. activeCh is blocked while a signal is being handled to
// prevent new work from starting while the user is deciding if they want
// to cancel the command or not.
activeCh := make(chan any)
c.sigsCh = make(chan os.Signal, 1)
signal.Notify(c.sigsCh, os.Interrupt)
defer signal.Stop(c.sigsCh)
go c.handleSignal(c.sigsCh, activeCh)
// Verify job type can be rescheduled.
if c.reschedule {
switch *job.Type {
case api.JobTypeBatch, api.JobTypeService, api.JobTypeSystem:
default:
c.Ui.Error(fmt.Sprintf("Jobs of type %q are not allowed to be rescheduled.", *job.Type))
return 1
}
}
// Confirm that we should restart a multi-region job in a single region.
if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() {
c.Ui.Output("\nJob restart canceled.")
return 0
}
// Retrieve the job history so we can properly determine if a group or task
// exists in the specific allocation job version.
jobVersions, _, _, err := c.client.Jobs().Versions(c.jobID, false, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error retrieving versions of job %q: %s", c.jobID, err))
return 1
}
// Index jobs by version.
jobVersionIndex := make(map[uint64]*api.Job, len(jobVersions))
for _, job := range jobVersions {
jobVersionIndex[*job.Version] = job
}
// Fetch all allocations for the job and filter out the ones that are not
// eligible for restart.
allocStubs, _, err := c.client.Jobs().Allocations(c.jobID, true, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error retrieving allocations for job %q: %v", c.jobID, err))
return 1
}
allocStubsWithJob := make([]AllocationListStubWithJob, 0, len(allocStubs))
for _, stub := range allocStubs {
allocStubsWithJob = append(allocStubsWithJob, AllocationListStubWithJob{
AllocationListStub: stub,
Job: jobVersionIndex[stub.JobVersion],
})
}
restartAllocs := c.filterAllocs(allocStubsWithJob)
// Exit early if there's nothing to do.
if len(restartAllocs) == 0 {
c.Ui.Output("No allocations to restart")
return 0
}
// Calculate absolute batch size based on the number of eligible
// allocations. Round values up to increase parallelism.
if c.batchSizePercent {
c.batchSize = int(math.Ceil(float64(len(restartAllocs)*c.batchSize) / 100))
}
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Restarting %s[reset]",
formatTime(time.Now()),
english.Plural(len(restartAllocs), "allocation", "allocations"),
)))
// restartErr accumulates the errors that happen in each batch.
var restartErr *multierror.Error
// Restart allocations in batches.
batch := multierror.Group{}
for restartCount, alloc := range restartAllocs {
// Block and wait before each iteration if the command is handling an
// interrupt signal.
<-activeCh
// Make sure there are not active deployments to prevent the restart
// process from interfering with it.
err := c.ensureNoActiveDeployment()
if err != nil {
restartErr = multierror.Append(restartErr, err)
break
}
// Print new batch header every time we restart a multiple of the batch
// size which indicates that we're starting a new batch.
// Skip batch header if batch size is one because it's redundant.
if restartCount%c.batchSize == 0 && c.batchSize > 1 {
batchNumber := restartCount/c.batchSize + 1
remaining := len(restartAllocs) - restartCount
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Restarting %s batch of %d allocations[reset]",
formatTime(time.Now()),
humanize.Ordinal(batchNumber),
min(c.batchSize, remaining),
)))
}
// Restart allocation. Wrap the callback function to capture the
// allocID loop variable and prevent it from changing inside the
// goroutine at each iteration.
batch.Go(func(allocStubWithJob AllocationListStubWithJob) func() error {
return func() error {
return c.handleAlloc(allocStubWithJob)
}
}(alloc))
// Check if we restarted enough allocations to complete a batch or if
// we restarted the last allocation.
batchComplete := (restartCount+1)%c.batchSize == 0
restartComplete := restartCount+1 == len(restartAllocs)
if batchComplete || restartComplete {
// Block and wait for the batch to finish. Handle the
// *mutierror.Error response to add the custom formatting and to
// convert it to an error to avoid problems where an empty
// *multierror.Error is not considered a nil error.
var batchErr error
if batchMerr := batch.Wait(); batchMerr != nil {
restartErr = multierror.Append(restartErr, batchMerr)
batchMerr.ErrorFormat = c.errorFormat(jobRestartTimestampPrefixLength)
batchErr = batchMerr.ErrorOrNil()
}
// Block if the command is handling an interrupt signal.
<-activeCh
// Exit loop before sleeping or asking for user input if we just
// finished the last batch.
if restartComplete {
break
}
// Handle errors that happened in this batch.
if batchErr != nil {
// Exit early if -on-error is 'fail'.
if c.onError == jobRestartOnErrorFail {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Stopping job restart due to error[reset]",
formatTime(time.Now()),
)))
break
}
// Exit early if -yes but error is not recoverable.
if c.autoYes && !c.isErrorRecoverable(batchErr) {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Stopping job restart due to unrecoverable error[reset]",
formatTime(time.Now()),
)))
break
}
}
// Check if we need to ask the user how to proceed. This is needed
// in case -yes is not set and -batch-wait is 'ask' or an error
// happened and -on-error is 'ask'.
askUser := !c.autoYes && (c.batchWaitAsk || c.onError == jobRestartOnErrorAsk && batchErr != nil)
if askUser {
if batchErr != nil {
// Print errors so user can decide what to below.
c.Ui.Warn(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: %s[reset]", formatTime(time.Now()), batchErr,
)))
}
// Exit early if user provides a negative answer.
if !c.shouldProceed(batchErr) {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Job restart canceled[reset]",
formatTime(time.Now()),
)))
c.canceled = true
break
}
}
// Sleep if -batch-wait is set or if -batch-wait is 'ask' and user
// responded with a new interval above.
if c.batchWait > 0 {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Waiting %s before restarting the next batch[reset]",
formatTime(time.Now()),
c.batchWait,
)))
time.Sleep(c.batchWait)
}
// Start a new batch.
batch = multierror.Group{}
}
}
if restartErr != nil && len(restartErr.Errors) > 0 {
if !c.canceled {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Job restart finished with errors[reset]",
formatTime(time.Now()),
)))
}
restartErr.ErrorFormat = c.errorFormat(0)
c.Ui.Error(fmt.Sprintf("\n%s", restartErr))
return 1
}
if !c.canceled {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Job restart finished[reset]",
formatTime(time.Now()),
)))
c.Ui.Output("\nJob restarted successfully!")
}
return 0
}
// parseAndValidate parses and validates the arguments passed to the command.
//
// This function mutates the command and is not thread-safe so it must be
// called only once and early in the command lifecycle.
func (c *JobRestartCommand) parseAndValidate(args []string) (int, error) {
var batchSizeStr string
var batchWaitStr string
var groups []string
var tasks []string
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&c.allTasks, "all-tasks", false, "")
flags.BoolVar(&c.autoYes, "yes", false, "")
flags.StringVar(&batchSizeStr, "batch-size", "1", "")
flags.StringVar(&batchWaitStr, "batch-wait", "0s", "")
flags.StringVar(&c.onError, "on-error", jobRestartOnErrorAsk, "")
flags.BoolVar(&c.noShutdownDelay, "no-shutdown-delay", false, "")
flags.BoolVar(&c.reschedule, "reschedule", false, "")
flags.BoolVar(&c.verbose, "verbose", false, "")
flags.Var((funcVar)(func(s string) error {
groups = append(groups, s)
return nil
}), "group", "")
flags.Var((funcVar)(func(s string) error {
tasks = append(tasks, s)
return nil
}), "task", "")
err := flags.Parse(args)
if err != nil {
// Let the flags library handle and print the error message.
return 1, nil
}
// Truncate IDs unless full length is requested.
c.length = shortId
if c.verbose {
c.length = fullId
}
// Check that we got exactly one job.
args = flags.Args()
if len(args) != 1 {
return 1, fmt.Errorf("This command takes one argument: <job>")
}
c.jobID = strings.TrimSpace(args[0])
// Parse and validate -batch-size.
matches := jobRestartBatchSizeValueRegex.FindStringSubmatch(batchSizeStr)
if len(matches) != 2 {
return 1, fmt.Errorf(
"Invalid -batch-size value %q: batch size must be an integer or a percentage",
batchSizeStr,
)
}
c.batchSizePercent = strings.HasSuffix(batchSizeStr, "%")
c.batchSize, err = strconv.Atoi(matches[1])
if err != nil {
return 1, fmt.Errorf("Invalid -batch-size value %q: %w", batchSizeStr, err)
}
if c.batchSize == 0 {
return 1, fmt.Errorf(
"Invalid -batch-size value %q: number value must be greater than zero",
batchSizeStr,
)
}
// Parse and validate -batch-wait.
if strings.ToLower(batchWaitStr) == jobRestartBatchWaitAsk {
if !isTty() && !c.autoYes {
return 1, fmt.Errorf(
"Invalid -batch-wait value %[1]q: %[1]q cannot be used when terminal is not interactive",
jobRestartBatchWaitAsk,
)
}
c.batchWaitAsk = true
} else {
c.batchWait, err = time.ParseDuration(batchWaitStr)
if err != nil {
return 1, fmt.Errorf("Invalid -batch-wait value %q: %w", batchWaitStr, err)
}
}
// Parse and validate -on-error.
switch c.onError {
case jobRestartOnErrorAsk:
if !isTty() && !c.autoYes {
return 1, fmt.Errorf(
"Invalid -on-error value %[1]q: %[1]q cannot be used when terminal is not interactive",
jobRestartOnErrorAsk,
)
}
case jobRestartOnErrorFail:
default:
return 1, fmt.Errorf(
"Invalid -on-error value %q: valid options are %q and %q",
c.onError,
jobRestartOnErrorAsk,
jobRestartOnErrorFail,
)
}
// -all-tasks conflicts with -task and <task>.
if c.allTasks && len(tasks) != 0 {
return 1, fmt.Errorf("The -all-tasks option cannot be used with -task")
}
// -reschedule conflicts with -task and <task>.
if c.reschedule && len(tasks) != 0 {
return 1, fmt.Errorf("The -reschedule option cannot be used with -task")
}
// Dedup tasks and groups.
c.groups = set.From(groups)
c.tasks = set.From(tasks)
return 0, nil
}
// filterAllocs returns a slice of the allocations that should be restarted.
func (c *JobRestartCommand) filterAllocs(stubs []AllocationListStubWithJob) []AllocationListStubWithJob {
result := []AllocationListStubWithJob{}
for _, stub := range stubs {
shortAllocID := limit(stub.ID, c.length)
// Skip allocations that are not running.
if !stub.IsRunning() {
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Skipping allocation %q because desired status is %q and client status is %q[reset]",
formatTime(time.Now()),
shortAllocID,
stub.ClientStatus,
stub.DesiredStatus,
)))
}
continue
}
// Skip allocations that have already been replaced.
if stub.NextAllocation != "" {
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Skipping allocation %q because it has already been replaced by %q[reset]",
formatTime(time.Now()),
shortAllocID,
limit(stub.NextAllocation, c.length),
)))
}
continue
}
// Skip allocations for groups that were not requested.
if c.groups.Size() > 0 {
if !c.groups.Contains(stub.TaskGroup) {
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested groups[reset]",
formatTime(time.Now()),
shortAllocID,
)))
}
continue
}
}
// Skip allocations that don't have any of the requested tasks.
if c.tasks.Size() > 0 {
hasTask := false
for _, taskName := range c.tasks.Slice() {
if stub.HasTask(taskName) {
hasTask = true
break
}
}
if !hasTask {
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested tasks[reset]",
formatTime(time.Now()),
shortAllocID,
)))
}
continue
}
}
result = append(result, stub)
}
return result
}
// ensureNoActiveDeployment returns an error if the job has an active
// deployment.
func (c *JobRestartCommand) ensureNoActiveDeployment() error {
deployments, _, err := c.client.Jobs().Deployments(c.jobID, true, nil)
if err != nil {
return fmt.Errorf("Error retrieving deployments for job %q: %v", c.jobID, err)
}
for _, d := range deployments {
switch d.Status {
case api.DeploymentStatusFailed, api.DeploymentStatusSuccessful, api.DeploymentStatusCancelled:
// Deployment is terminal so it's safe to proceed.
default:
return fmt.Errorf("Deployment %q is %q", limit(d.ID, c.length), d.Status)
}
}
return nil
}
// shouldRestartMultiregion blocks and waits for the user to confirm if the
// restart of a multi-region job should proceed. Returns true if the answer is
// positive.
func (c *JobRestartCommand) shouldRestartMultiregion() bool {
question := fmt.Sprintf(
"Are you sure you want to restart multi-region job %q in a single region? [y/N]",
c.jobID,
)
return c.askQuestion(
question,
false,
func(answer string) (bool, error) {
switch strings.TrimSpace(strings.ToLower(answer)) {
case "", "n", "no":
return false, nil
case "y", "yes":
return true, nil
default:
return false, fmt.Errorf("Invalid answer %q", answer)
}
})
}
// shouldProceed blocks and waits for the user to provide a valid input on how
// to proceed. Returns true if the answer is positive.
//
// The flags -batch-wait and -on-error have an 'ask' option. This function
// handles both to prevent asking the user twice in case they are both set to
// 'ask' and an error happens.
func (c *JobRestartCommand) shouldProceed(err error) bool {
var question, options string
if err == nil {
question = "Proceed with the next batch?"
options = "Y/n"
} else {
question = "Ignore the errors above and proceed with the next batch?"
options = "y/N" // Defaults to 'no' if an error happens.
if !c.isErrorRecoverable(err) {
question = `The errors above are likely to happen again.
Ignore them anyway and proceed with the next batch?`
}
}
// If -batch-wait is 'ask' the user can provide a new wait duration.
if c.batchWaitAsk {
options += "/<wait duration>"
}
return c.askQuestion(
fmt.Sprintf("%s [%s]", question, options),
false,
func(answer string) (bool, error) {
switch strings.ToLower(answer) {
case "":
// Proceed by default only if there is no error.
return err == nil, nil
case "y", "yes":
return true, nil
case "n", "no":
return false, nil
default:
if c.batchWaitAsk {
// Check if user passed a time duration and adjust the
// command to use that moving forward.
batchWait, err := time.ParseDuration(answer)
if err != nil {
return false, fmt.Errorf("Invalid answer %q", answer)
}
c.batchWaitAsk = false
c.batchWait = batchWait
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[bold]==> %s: Proceeding restarts with new wait time of %s[reset]",
formatTime(time.Now()),
c.batchWait,
)))
return true, nil
} else {
return false, fmt.Errorf("Invalid answer %q", answer)
}
}
})
}
// shouldExit blocks and waits for the user for confirmation if they would like
// to interrupt the command. Returns true if the answer is positive.
func (c *JobRestartCommand) shouldExit() bool {
question := `Restart interrupted, no more allocations will be restarted.
Are you sure you want to stop the restart process? [y/N]`
return c.askQuestion(
question,
true,
func(answer string) (bool, error) {
switch strings.ToLower(answer) {
case "n", "no", "":
return false, nil
case "y", "yes":
return true, nil
default:
return false, fmt.Errorf("Invalid answer %q", answer)
}
})
}
// askQuestion asks question to user until they provide a valid response.
func (c *JobRestartCommand) askQuestion(question string, onError bool, cb func(string) (bool, error)) bool {
prefixedQuestion := fmt.Sprintf(
"[bold]==> %s: %s[reset]",
formatTime(time.Now()),
indentString(question, jobRestartTimestampPrefixLength),
)
// Let ui.Ask() handle interrupt signals.
signal.Stop(c.sigsCh)
defer func() {
signal.Notify(c.sigsCh, os.Interrupt)
}()
for {
answer, err := c.Ui.Ask(c.Colorize().Color(prefixedQuestion))
if err != nil {
if err.Error() != "interrupted" {
c.Ui.Output(err.Error())
}
return onError
}
exit, err := cb(strings.TrimSpace(answer))
if err != nil {
c.Ui.Output(fmt.Sprintf("%s%s", strings.Repeat(" ", jobRestartTimestampPrefixLength), err))
continue
}
return exit
}
}
// handleAlloc stops or restarts an allocation in-place. Blocks until the
// allocation is done restarting or the rescheduled allocation is running.
func (c *JobRestartCommand) handleAlloc(alloc AllocationListStubWithJob) error {
var err error
if c.reschedule {
// Stopping an allocation triggers a reschedule.
err = c.stopAlloc(alloc)
} else {
err = c.restartAlloc(alloc)
}
if err != nil {
msg := fmt.Sprintf("Error restarting allocation %q:", limit(alloc.ID, c.length))
if mErr, ok := err.(*multierror.Error); ok {
// Unwrap the errors and prefix them with a common message to
// prevent deep nesting of errors.
return multierror.Prefix(mErr, msg)
}
return fmt.Errorf("%s %w", msg, err)
}
return nil
}
// restartAlloc restarts an allocation in place and blocks until the tasks are
// done restarting.
func (c *JobRestartCommand) restartAlloc(alloc AllocationListStubWithJob) error {
shortAllocID := limit(alloc.ID, c.length)
if c.allTasks {
c.Ui.Output(fmt.Sprintf(
" %s: Restarting all tasks in allocation %q for group %q",
formatTime(time.Now()),
shortAllocID,
alloc.TaskGroup,
))
return c.client.Allocations().RestartAllTasks(&api.Allocation{ID: alloc.ID}, nil)
}
if c.tasks.Size() == 0 {
c.Ui.Output(fmt.Sprintf(
" %s: Restarting running tasks in allocation %q for group %q",
formatTime(time.Now()),
shortAllocID,
alloc.TaskGroup,
))
return c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, "", nil)
}
// Run restarts concurrently when specific tasks were requested.
var restarts multierror.Group
for _, task := range c.tasks.Slice() {
if !alloc.HasTask(task) {
continue
}
c.Ui.Output(fmt.Sprintf(
" %s: Restarting task %q in allocation %q for group %q",
formatTime(time.Now()),
task,
shortAllocID,
alloc.TaskGroup,
))
restarts.Go(func(taskName string) func() error {
return func() error {
err := c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, taskName, nil)
if err != nil {
return fmt.Errorf("Failed to restart task %q: %w", taskName, err)
}
return nil
}
}(task))
}
return restarts.Wait().ErrorOrNil()
}
// stopAlloc stops an allocation and blocks until the replacement allocation is
// running.
func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error {
shortAllocID := limit(alloc.ID, c.length)
c.Ui.Output(fmt.Sprintf(
" %s: Rescheduling allocation %q for group %q",
formatTime(time.Now()),
shortAllocID,
alloc.TaskGroup,
))
var q *api.QueryOptions
if c.noShutdownDelay {
q = &api.QueryOptions{
Params: map[string]string{"no_shutdown_delay": "true"},
}
}
// Stop allocation and wait for its replacement to be running or for a
// blocked evaluation that prevents placements for this task group to
// happen.
resp, err := c.client.Allocations().Stop(&api.Allocation{ID: alloc.ID}, q)
if err != nil {
return fmt.Errorf("Failed to stop allocation: %w", err)
}
// Allocations for system jobs do not get replaced by the scheduler after
// being stopped, so an eval is needed to trigger the reconciler.
if *alloc.Job.Type == api.JobTypeSystem {
opts := api.EvalOptions{
ForceReschedule: true,
}
_, _, err := c.client.Jobs().EvaluateWithOpts(*alloc.Job.ID, opts, nil)
if err != nil {
return fmt.Errorf("Failed evaluate job: %w", err)
}
}
// errCh receives an error if anything goes wrong or nil when the
// replacement allocation is running.
// Use a buffered channel to prevent both goroutine from blocking trying to
// send a result back.
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Pass the LastIndex from the Stop() call to only monitor data that was
// created after the Stop() call.
go c.monitorPlacementFailures(ctx, alloc, resp.LastIndex, errCh)
go c.monitorReplacementAlloc(ctx, alloc, errCh)
// This process may take a while, so ping user from time to time to
// indicate the command is still alive.
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.Ui.Output(fmt.Sprintf(
" %s: Still waiting for allocation %q to be replaced",
formatTime(time.Now()),
shortAllocID,
))
case err := <-errCh:
return err
}
}
}
// monitorPlacementFailures searches for evaluations of the allocation job that
// have placement failures.
//
// Returns an error in errCh if anything goes wrong or if there are placement
// failures for the allocation task group.
func (c *JobRestartCommand) monitorPlacementFailures(
ctx context.Context,
alloc AllocationListStubWithJob,
index uint64,
errCh chan<- error,
) {
q := &api.QueryOptions{WaitIndex: index}
for {
select {
case <-ctx.Done():
return
default:
}
evals, qm, err := c.client.Jobs().Evaluations(alloc.JobID, q)
if err != nil {
errCh <- fmt.Errorf("Failed to retrieve evaluations for job %q: %w", alloc.JobID, err)
return
}
for _, eval := range evals {
select {
case <-ctx.Done():
return
default:
}
// Skip evaluations created before the allocation was stopped or
// that are not blocked.
if eval.CreateIndex < index || eval.Status != api.EvalStatusBlocked {
continue
}
failures := eval.FailedTGAllocs[alloc.TaskGroup]
if failures != nil {
errCh <- ErrJobRestartPlacementFailure{
EvalID: limit(eval.ID, c.length),
TaskGroup: alloc.TaskGroup,
Failures: failures,
}
return
}
}
q.WaitIndex = qm.LastIndex
}
}
// monitorReplacementAlloc waits for the allocation to have a follow-up
// placement and for the new allocation be running.
//
// Returns an error in errCh if anything goes wrong or nil when the new
// allocation is running.
func (c *JobRestartCommand) monitorReplacementAlloc(
ctx context.Context,
allocStub AllocationListStubWithJob,
errCh chan<- error,
) {
currentAllocID := allocStub.ID
q := &api.QueryOptions{WaitIndex: 1}
for {
select {
case <-ctx.Done():
return
default:
}
alloc, qm, err := c.client.Allocations().Info(currentAllocID, q)
if err != nil {
errCh <- fmt.Errorf("Failed to retrieve allocation %q: %w", limit(currentAllocID, c.length), err)
return
}
// Follow replacement allocations. We expect the original allocation to
// be replaced, but the replacements may be themselves replaced in
// cases such as the allocation failing.
if alloc.NextAllocation != "" {
c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running",
formatTime(time.Now()),
limit(alloc.ID, c.length),
limit(alloc.NextAllocation, c.length),
))
currentAllocID = alloc.NextAllocation
// Reset the blocking query so the Info() API call returns the new
// allocation immediately.
q.WaitIndex = 1
continue
}
switch alloc.ClientStatus {
case api.AllocClientStatusRunning:
// Make sure the running allocation we found is a replacement, not
// the original one.
if alloc.ID != allocStub.ID {
c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q is %q",
formatTime(time.Now()),
limit(alloc.ID, c.length),
alloc.ClientStatus,
))
errCh <- nil
return
}
default:
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Allocation %q is %q[reset]",
formatTime(time.Now()),
limit(alloc.ID, c.length),
alloc.ClientStatus,
)))
}
}
q.WaitIndex = qm.LastIndex
}
}
// handleSignal receives input signals and blocks the activeCh until the user
// confirms how to proceed.
//
// Exit immediately if the user confirms the interrupt, otherwise resume the
// command and feed activeCh to unblock it.
func (c *JobRestartCommand) handleSignal(sigsCh chan os.Signal, activeCh chan any) {
for {
select {
case <-sigsCh:
// Consume activeCh to prevent the main loop from proceeding.
select {
case <-activeCh:
default:
}
if c.shouldExit() {
c.Ui.Output("\nCanceling job restart process")
os.Exit(0)
}
case activeCh <- struct{}{}:
}
}
}
// isErrorRecoverable returns true when the error is likely to impact all
// restarts and so there is not reason to keep going.
func (c *JobRestartCommand) isErrorRecoverable(err error) bool {
if err == nil {
return true
}
if errors.Is(err, ErrJobRestartPlacementFailure{}) {
return false
}
if strings.Contains(err.Error(), api.PermissionDeniedErrorContent) {
return false
}
return true
}
// errorFormat returns a multierror.ErrorFormatFunc that indents each line,
// except for the first one, of the resulting error string with the given
// number of spaces.
func (c *JobRestartCommand) errorFormat(indent int) func([]error) string {
return func(es []error) string {
points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %s", strings.TrimSpace(err.Error()))
}
out := fmt.Sprintf(
"%s occurred while restarting job:\n%s",
english.Plural(len(es), "error", "errors"),
strings.Join(points, "\n"),
)
return indentString(out, indent)
}
}
// AllocationListStubWithJob combines an AllocationListStub with its
// corresponding job at the right version.
type AllocationListStubWithJob struct {
*api.AllocationListStub
Job *api.Job
}
// HasTask returns true if the allocation has the given task in the specific
// job version it was created.
func (a *AllocationListStubWithJob) HasTask(name string) bool {
// Check task state first as it's the fastest and most reliable source.
if _, ok := a.TaskStates[name]; ok {
return true
}
// But task states are only set when the client updates its allocations
// with the server, so they may not be available yet. Lookup the task in
// the job version as a fallback.
if a.Job == nil {
return false
}
var taskGroup *api.TaskGroup
for _, tg := range a.Job.TaskGroups {
if tg.Name == nil || *tg.Name != a.TaskGroup {
continue
}
taskGroup = tg
}
if taskGroup == nil {
return false
}
for _, task := range taskGroup.Tasks {
if task.Name == name {
return true
}
}
return false
}
// IsRunning returns true if the allocation's ClientStatus or DesiredStatus is
// running.
func (a *AllocationListStubWithJob) IsRunning() bool {
return a.ClientStatus == api.AllocClientStatusRunning ||
a.DesiredStatus == api.AllocDesiredStatusRun
}