From a0cf5db7977ea5d3368da1a363123892f95157e4 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 13 Dec 2021 14:54:53 -0500 Subject: [PATCH] provide `-no-shutdown-delay` flag for job/alloc stop (#11596) Some operators use very long group/task `shutdown_delay` settings to safely drain network connections to their workloads after service deregistration. But during incident response, they may want to cause that drain to be skipped so they can quickly shed load. Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and `nomad job stop` commands that bypasses the delay. This sets a new desired transition state on the affected allocations that the allocation/task runner will identify during pre-kill on the client. Note (as documented here) that using this flag will almost always result in failed inbound network connections for workloads as the tasks will exit before clients receive updated service discovery information and won't be gracefully drained. --- .changelog/11596.txt | 3 + api/jobs.go | 9 +- client/allocrunner/alloc_runner.go | 12 ++ client/allocrunner/alloc_runner_hooks.go | 1 + client/allocrunner/groupservice_hook.go | 13 +- client/allocrunner/taskrunner/task_runner.go | 22 ++++ .../taskrunner/task_runner_test.go | 116 +++++++++++++++--- command/agent/alloc_endpoint.go | 12 +- command/agent/job_endpoint.go | 12 ++ command/alloc_stop.go | 16 ++- command/job_stop.go | 32 +++-- nomad/alloc_endpoint.go | 3 +- nomad/fsm.go | 26 +++- nomad/job_endpoint_test.go | 91 ++++++++++++++ nomad/state/state_store.go | 11 +- nomad/structs/structs.go | 26 +++- website/content/docs/commands/alloc/stop.mdx | 7 ++ website/content/docs/commands/job/stop.mdx | 7 ++ 18 files changed, 372 insertions(+), 47 deletions(-) create mode 100644 .changelog/11596.txt diff --git a/.changelog/11596.txt b/.changelog/11596.txt new file mode 100644 index 000000000..74b451c02 --- /dev/null +++ b/.changelog/11596.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: provide `-no-shutdown-delay` option to `job stop` and `alloc stop` commands to ignore `shutdown_delay` +``` diff --git a/api/jobs.go b/api/jobs.go index 146f65cf7..61d0c2892 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -299,6 +299,11 @@ type DeregisterOptions struct { // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions @@ -312,8 +317,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt // Protect against nil opts. url.Values expects a string, and so using // fmt.Sprintf is the best way to do this. if opts != nil { - endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", - opts.Purge, opts.Global, opts.EvalPriority) + endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t", + opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } wm, err := j.client.delete(endpoint, &resp, q) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7dcf072a9..c846b0916 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -171,6 +171,9 @@ type allocRunner struct { taskHookCoordinator *taskHookCoordinator + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. rpcClient RPCer @@ -230,6 +233,10 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks) + shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background()) + ar.shutdownDelayCtx = shutdownDelayCtx + ar.shutdownDelayCancelFn = shutdownDelayCancel + // Initialize the runners hooks. if err := ar.initRunnerHooks(config.ClientConfig); err != nil { return nil, err @@ -265,6 +272,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DriverManager: ar.driverManager, ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + ShutdownDelayCtx: ar.shutdownDelayCtx, } if ar.cpusetManager != nil { @@ -824,6 +832,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) { default: } + if update.DesiredTransition.ShouldIgnoreShutdownDelay() { + ar.shutdownDelayCancelFn() + } + // Queue the new update ar.allocUpdatedCh <- update } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9624e633c..8b79da4d5 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -159,6 +159,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: envBuilder, networkStatusGetter: ar, logger: hookLogger, + shutdownDelayCtx: ar.shutdownDelayCtx, }), newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 778109e65..69eae41e8 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -1,6 +1,7 @@ package allocrunner import ( + "context" "sync" "time" @@ -29,9 +30,9 @@ type groupServiceHook struct { consulClient consul.ConsulServiceAPI consulNamespace string prerun bool - delay time.Duration deregistered bool networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger @@ -41,6 +42,7 @@ type groupServiceHook struct { networks structs.Networks ports structs.AllocatedPorts taskEnvBuilder *taskenv.Builder + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -54,6 +56,7 @@ type groupServiceHookConfig struct { restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger } @@ -76,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { networkStatusGetter: cfg.networkStatusGetter, logger: cfg.logger.Named(groupServiceHookName), services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services, + shutdownDelayCtx: cfg.shutdownDelayCtx, } if cfg.alloc.AllocatedResources != nil { @@ -187,9 +191,12 @@ func (h *groupServiceHook) preKillLocked() { h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay) - // Wait for specified shutdown_delay + select { + // Wait for specified shutdown_delay unless ignored // This will block an agent from shutting down. - <-time.After(h.delay) + case <-time.After(h.delay): + case <-h.shutdownDelayCtx.Done(): + } } func (h *groupServiceHook) Postrun() error { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 68df0827f..dba2f092f 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -112,6 +112,11 @@ type TaskRunner struct { killErr error killErrLock sync.Mutex + // shutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // Logger is the logger for the task runner. logger log.Logger @@ -287,6 +292,13 @@ type Config struct { // startConditionMetCtx is done when TR should start the task StartConditionMetCtx <-chan struct{} + + // ShutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + ShutdownDelayCtx context.Context + + // ShutdownDelayCancelFn should only be used in testing. + ShutdownDelayCancelFn context.CancelFunc } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -342,6 +354,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { maxEvents: defaultMaxEvents, serversContactedCh: config.ServersContactedCh, startConditionMetCtx: config.StartConditionMetCtx, + shutdownDelayCtx: config.ShutdownDelayCtx, + shutdownDelayCancelFn: config.ShutdownDelayCancelFn, } // Create the logger based on the allocation ID @@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E select { case result := <-resultCh: return result + case <-tr.shutdownDelayCtx.Done(): + break case <-time.After(delay): } } @@ -1478,3 +1494,9 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { tr.allocHookResources = res } + +// shutdownDelayCancel is used for testing only and cancels the +// shutdownDelayCtx +func (tr *TaskRunner) shutdownDelayCancel() { + tr.shutdownDelayCancelFn() +} diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f3ef206cf..9b60458b4 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -14,6 +14,10 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" @@ -26,6 +30,7 @@ import ( agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/drivers/rawexec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -33,9 +38,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri cleanup() } + shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background()) + // Create a closed channel to mock TaskHookCoordinator.startConditionForTask. // Closed channel indicates this task is not blocked on prestart hooks. closedCh := make(chan struct{}) close(closedCh) conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - ConsulSI: consulapi.NewMockServiceIdentitiesClient(), - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - ServersContactedCh: make(chan struct{}), - StartConditionMetCtx: closedCh, + Alloc: alloc, + ClientConfig: clientConf, + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + ConsulSI: consulapi.NewMockServiceIdentitiesClient(), + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), + StartConditionMetCtx: closedCh, + ShutdownDelayCtx: shutdownDelayCtx, + ShutdownDelayCancelFn: shutdownDelayCancelFn, } return conf, trCleanup } @@ -996,6 +1002,82 @@ WAIT: } } +// TestTaskRunner_NoShutdownDelay asserts services are removed from +// Consul and tasks are killed without waiting for ${shutdown_delay} +// when the alloc has the NoShutdownDelay transition flag set. +func TestTaskRunner_NoShutdownDelay(t *testing.T) { + t.Parallel() + + // don't set this too high so that we don't block the test runner + // on shutting down the agent if the test fails + maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second + maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second + + alloc := mock.Alloc() + alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Services[0].Tags = []string{"tag1"} + task.Services = task.Services[:1] // only need 1 for this test + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "1000s", + } + task.ShutdownDelay = maxTestDuration + + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + + testWaitForTaskToStart(t, tr) + + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n != 1 { + return false, fmt.Errorf("expected 1 consul operation. Found %d", n) + } + return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration) + defer cancel() + + killed := make(chan error) + go func() { + tr.shutdownDelayCancel() + err := tr.Kill(testCtx, structs.NewTaskEvent("test")) + killed <- err + }() + + // Wait for first de-registration call. Note that unlike + // TestTaskRunner_ShutdownDelay, we're racing with task exit + // and can't assert that we only get the first deregistration op + // (from serviceHook.PreKill). + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n < 2 { + return false, fmt.Errorf("expected at least 2 consul operations.") + } + return ops[1].Op == "remove", fmt.Errorf( + "consul operation was not a deregistration: %#v", ops[1]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the task to exit + select { + case <-tr.WaitCh(): + case <-time.After(maxTimeToFailDuration): + t.Fatalf("task kill did not ignore shutdown delay") + return + } + + err := <-killed + require.NoError(t, err, "killing task returned unexpected error") +} + // TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the // payload was written to disk. func TestTaskRunner_Dispatch_Payload(t *testing.T) { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d1a7e210c..f6f724001 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } + noShutdownDelay := false + if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) + if err != nil { + return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) + } + } + sr := &structs.AllocStopRequest{ - AllocID: allocID, + AllocID: allocID, + NoShutdownDelay: noShutdownDelay, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 8a9da76fe..1ff8a7bde 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -453,6 +453,18 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, err } + // Identify the no_shutdown_delay query param and parse. + noShutdownDelayStr := req.URL.Query().Get("no_shutdown_delay") + var noShutdownDelay bool + if noShutdownDelayStr != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %qq (%v) as a bool: %v", "no_shutdown_delay", noShutdownDelayStr, err) + } + } + args.NoShutdownDelay = noShutdownDelay + // Validate the evaluation priority if the user supplied a non-default // value. It's more efficient to do it here, within the agent rather than // sending a bad request for the server to reject. diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 3c8f9cbd9..3b1d10218 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -38,6 +38,12 @@ Stop Specific Options: screen, which can be used to examine the rescheduling evaluation using the eval-status command. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocation + being stopped. + -verbose Show full information. ` @@ -47,12 +53,13 @@ Stop Specific Options: func (c *AllocStopCommand) Name() string { return "alloc stop" } func (c *AllocStopCommand) Run(args []string) int { - var detach, verbose bool + var detach, verbose, noShutdownDelay bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -115,7 +122,12 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - resp, err := client.Allocations().Stop(alloc, nil) + var opts *api.QueryOptions + if noShutdownDelay { + opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + } + + resp, err := client.Allocations().Stop(alloc, opts) if err != nil { c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) return 1 diff --git a/command/job_stop.go b/command/job_stop.go index 8dd5d8a11..1df4d0610 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -43,14 +43,20 @@ Stop Options: Override the priority of the evaluations produced as a result of this job deregistration. By default, this is set to the priority of the job. - -purge - Purge is used to stop the job and purge it from the system. If not set, the - job will still be queryable and will be purged by the garbage collector. - -global Stop a multi-region job in all its regions. By default job stop will stop only a single region at a time. Ignored for single-region jobs. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so that there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocations + being stopped. + + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -67,12 +73,13 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-eval-priority": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-no-shutdown-delay": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -94,7 +101,7 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor { func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { - var detach, purge, verbose, global, autoYes bool + var detach, purge, verbose, global, autoYes, noShutdownDelay bool var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -102,6 +109,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&global, "global", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") flags.IntVar(&evalPriority, "eval-priority", 0, "") @@ -199,7 +207,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 3a32b5f19..0b44175ad 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes Evals: []*structs.Evaluation{eval}, Allocs: map[string]*structs.DesiredTransition{ args.AllocID: { - Migrate: helper.BoolToPtr(true), + Migrate: helper.BoolToPtr(true), + NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay), }, }, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 847210145..727574312 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -670,12 +671,31 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) } + if noShutdownDelay { + ws := memdb.NewWatchSet() + allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false) + if err != nil { + return err + } + transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + for _, alloc := range allocs { + err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition) + if err != nil { + return err + } + err = tx.Insert("index", &state.IndexEntry{Key: "allocs", Value: index}) + if err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + } + if purge { if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { return fmt.Errorf("DeleteJob failed: %w", err) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 9fce33ca7..51da5d219 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3831,6 +3831,97 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { }) } +func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp0 structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp0)) + + // Deregister but don't purge + dereg1 := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp1 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg1, &resp1)) + require.NotZero(resp1.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.True(out.Stop) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp1.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp1.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + var ws memdb.WatchSet + allocs, err := state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.Nil(alloc.DesiredTransition) + } + + // Deregister with no shutdown delay + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp2)) + require.NotZero(resp2.Index) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.NotNil(alloc.DesiredTransition) + require.True(*(alloc.DesiredTransition.NoShutdownDelay)) + } + +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d08fdca7..81cbd7c63 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1600,7 +1600,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b } if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { - return fmt.Errorf("unable to update job scaling policies: %v", err) + return fmt.Errorf("unable to update job csi plugins: %v", err) } // Insert the job @@ -3371,7 +3371,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // Handle each of the updated allocations for id, transition := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { + if err := s.UpdateAllocDesiredTransitionTxn(txn, index, id, transition); err != nil { return err } } @@ -3390,9 +3390,9 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, return txn.Commit() } -// nestedUpdateAllocDesiredTransition is used to nest an update of an +// UpdateAllocDesiredTransitionTxn is used to nest an update of an // allocations desired transition -func (s *StateStore) nestedUpdateAllocDesiredTransition( +func (s *StateStore) UpdateAllocDesiredTransitionTxn( txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { @@ -3414,8 +3414,9 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // Merge the desired transitions copyAlloc.DesiredTransition.Merge(transition) - // Update the modify index + // Update the modify indexes copyAlloc.ModifyIndex = index + copyAlloc.AllocModifyIndex = index // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 48c978228..742fce06d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -628,6 +628,11 @@ type JobDeregisterRequest struct { // in busy clusters with a large evaluation backlog. EvalPriority int + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -955,7 +960,8 @@ type AllocUpdateDesiredTransitionRequest struct { // AllocStopRequest is used to stop and reschedule a running Allocation. type AllocStopRequest struct { - AllocID string + AllocID string + NoShutdownDelay bool WriteRequest } @@ -9140,6 +9146,11 @@ type DesiredTransition struct { // This field is only used when operators want to force a placement even if // a failed allocation is not eligible to be rescheduled ForceReschedule *bool + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay *bool } // Merge merges the two desired transitions, preferring the values from the @@ -9156,6 +9167,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.ForceReschedule != nil { d.ForceReschedule = o.ForceReschedule } + + if o.NoShutdownDelay != nil { + d.NoShutdownDelay = o.NoShutdownDelay + } } // ShouldMigrate returns whether the transition object dictates a migration. @@ -9178,6 +9193,15 @@ func (d *DesiredTransition) ShouldForceReschedule() bool { return d.ForceReschedule != nil && *d.ForceReschedule } +// ShouldIgnoreShutdownDelay returns whether the transition object dictates +// that shutdown skip any shutdown delays. +func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { + if d == nil { + return false + } + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop diff --git a/website/content/docs/commands/alloc/stop.mdx b/website/content/docs/commands/alloc/stop.mdx index b82195172..060d47ad4 100644 --- a/website/content/docs/commands/alloc/stop.mdx +++ b/website/content/docs/commands/alloc/stop.mdx @@ -42,6 +42,12 @@ allocation's namespace. - `-verbose`: Display verbose output. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so that + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocation being stopped. + ## Examples ```shell-session @@ -58,3 +64,4 @@ $ nomad alloc stop -detach eb17e557 ``` [eval status]: /docs/commands/eval-status +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index 004520b97..134715ffe 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -55,6 +55,12 @@ When ACLs are enabled, this command requires a token with the `submit-job`, Stop a [multi-region] job in all its regions. By default, `job stop` will stop only a single region at a time. Ignored for single-region jobs. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so that + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocations being stopped. + ## Examples Stop the job with ID "job1": @@ -75,3 +81,4 @@ $ nomad job stop -detach job1 [eval status]: /docs/commands/eval-status [multi-region]: /docs/job-specification/multiregion +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay