diff --git a/CHANGELOG.md b/CHANGELOG.md index 03fbb622d..3aa3df85f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ __BACKWARDS INCOMPATIBILITIES:__ to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)] * client: The format of check IDs in Consul has changed. If you rely upon Nomad's check IDs you will need to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)] + * client: On startup a client will reattach to running tasks as before but + will not restart exited tasks. Exited tasks will be restarted only after the + client has reestablished communication with servers. System jobs will always + be restarted. [[GH-5669](https://github.com/hashicorp/nomad/pull/5669)] FEATURES: diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7168914b2..e9efa120f 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -136,6 +136,11 @@ type allocRunner struct { // driverManager is responsible for dispensing driver plugins and registering // event handlers driverManager drivermanager.Manager + + // serversContactedCh is passed to TaskRunners so they can detect when + // servers have been contacted for the first time in case of a failed + // restore. + serversContactedCh chan struct{} } // NewAllocRunner returns a new allocation runner. @@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { prevAllocMigrator: config.PrevAllocMigrator, devicemanager: config.DeviceManager, driverManager: config.DriverManager, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DeviceStatsReporter: ar.deviceStatsReporter, DeviceManager: ar.devicemanager, DriverManager: ar.driverManager, + ServersContactedCh: ar.serversContactedCh, } // Create, but do not Run, the task runner diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 16f3e27db..42cea978e 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -51,4 +51,8 @@ type Config struct { // DriverManager handles dispensing of driver plugins DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 1cf727e46..8fe87cf36 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -193,6 +193,15 @@ type TaskRunner struct { // maxEvents is the capacity of the TaskEvents on the TaskState. // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int + + // serversContactedCh is passed to TaskRunners so they can detect when + // GetClientAllocs has been called in case of a failed restore. + serversContactedCh <-chan struct{} + + // waitOnServers defaults to false but will be set true if a restore + // fails and the Run method should wait until serversContactedCh is + // closed. + waitOnServers bool } type Config struct { @@ -222,6 +231,10 @@ type Config struct { // DriverManager is used to dispense driver plugins and register event // handlers DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -270,6 +283,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -381,6 +395,19 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() + // If restore failed wait until servers are contacted before running. + // #1795 + if tr.waitOnServers { + tr.logger.Info("task failed to restore; waiting to contact server before restarting") + select { + case <-tr.killCtx.Done(): + case <-tr.shutdownCtx.Done(): + return + case <-tr.serversContactedCh: + tr.logger.Trace("server contacted; unblocking waiting task") + } + } + MAIN: for !tr.Alloc().TerminalStatus() { select { @@ -858,8 +885,28 @@ func (tr *TaskRunner) Restore() error { if taskHandle := tr.localState.TaskHandle; taskHandle != nil { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! - tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) + + // If the handle could not be restored, the alloc is + // non-terminal, and the task isn't a system job: wait until + // servers have been contacted before running. #1795 + if restored { + return nil + } + + alloc := tr.Alloc() + if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem { + return nil + } + + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.waitOnServers = true + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) } + return nil } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 99900b785..7d2bd5275 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -25,10 +25,12 @@ import ( "github.com/hashicorp/nomad/client/vaultclient" agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" + "github.com/hashicorp/nomad/drivers/rawexec" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" + "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" @@ -93,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri } conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc, + ClientConfig: clientConf, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, trCleanup } @@ -124,8 +127,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string) } } -// TestTaskRunner_Restore asserts restoring a running task does not rerun the -// task. +// TestTaskRunner_Restore_Running asserts restoring a running task does not +// rerun the task. func TestTaskRunner_Restore_Running(t *testing.T) { t.Parallel() require := require.New(t) @@ -178,6 +181,221 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// setupRestoreFailureTest starts a service, shuts down the task runner, and +// kills the task before restarting a new TaskRunner. The new TaskRunner is +// returned once it is running and waiting in pending along with a cleanup +// func. +func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) { + t.Parallel() + + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + cleanup2 := func() { + origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup1() + } + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the driver + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Assert the TR will wait on servers because reattachment failed + require.NoError(t, newTR.Restore()) + require.True(t, newTR.waitOnServers) + + // Start new TR + go newTR.Run() + cleanup3 := func() { + newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + cleanup2() + cleanup1() + } + + // Assert task has not been restarted + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + ts := newTR.TaskState() + require.Equal(t, structs.TaskStatePending, ts.State) + + return newTR, conf, cleanup3 +} + +// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until +// MarkAlive is called. #1795 +func TestTaskRunner_Restore_Restart(t *testing.T) { + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) + defer cleanup() + + // Fake contacting the server by closing the chan + close(conf.ServersContactedCh) + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until +// the task is killed. #1795 +func TestTaskRunner_Restore_Kill(t *testing.T) { + newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc()) + defer cleanup() + + // Sending the task a terminal update shouldn't kill it or unblock it + alloc := newTR.Alloc().Copy() + alloc.DesiredStatus = structs.AllocDesiredStatusStop + newTR.Update(alloc) + + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // AllocRunner will immediately kill tasks after sending a terminal + // update. + newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling)) + + select { + case <-newTR.WaitCh(): + // It died as expected! + case <-time.After(10 * time.Second): + require.Fail(t, "timeout waiting for task to die") + } +} + +// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until +// Update is called. #1795 +func TestTaskRunner_Restore_Update(t *testing.T) { + newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc()) + defer cleanup() + + // Fake Client.runAllocs behavior by calling Update then closing chan + alloc := newTR.Alloc().Copy() + newTR.Update(alloc) + + // Update alone should not unblock the test + require.Equal(t, structs.TaskStatePending, newTR.TaskState().State) + + // Fake Client.runAllocs behavior of closing chan after Update + close(conf.ServersContactedCh) + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + +// TestTaskRunner_Restore_System asserts restoring a dead system task does not +// block. +func TestTaskRunner_Restore_System(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.Job.Type = structs.JobTypeSystem + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "raw_exec" + task.Config = map[string]interface{}{ + "command": "sleep", + "args": []string{"30"}, + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be running + testWaitForTaskToStart(t, origTR) + + handle := origTR.getDriverHandle() + require.NotNil(t, handle) + taskID := handle.taskID + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Get the driver + driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name) + require.NoError(t, err) + rawexecDriver := driverPlugin.(*rawexec.Driver) + + // Assert the task is still running despite TR having exited + taskStatus, err := rawexecDriver.InspectTask(taskID) + require.NoError(t, err) + require.Equal(t, drivers.TaskStateRunning, taskStatus.State) + + // Kill the task so it fails to recover when restore is called + require.NoError(t, rawexecDriver.DestroyTask(taskID, true)) + _, err = rawexecDriver.InspectTask(taskID) + require.EqualError(t, err, drivers.ErrTaskNotFound.Error()) + + // Create a new TaskRunner and Restore the task + conf.ServersContactedCh = make(chan struct{}) + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + + // Assert the TR will not wait on servers even though reattachment + // failed because it is a system task. + require.NoError(t, newTR.Restore()) + require.False(t, newTR.waitOnServers) + + // Nothing should have closed the chan + select { + case <-conf.ServersContactedCh: + require.Fail(t, "serversContactedCh was closed but should not have been") + default: + } + + testutil.WaitForResult(func() (bool, error) { + ts := newTR.TaskState().State + return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts) + }, func(err error) { + require.NoError(t, err) + }) +} + // TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are // interpolated. func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 5933019ea..75806644b 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu clientConf, cleanup := clientconfig.TestClientConfig(t) conf := &Config{ // Copy the alloc in case the caller edits and reuses it - Alloc: alloc.Copy(), - Logger: clientConf.Logger, - ClientConfig: clientConf, - StateDB: state.NoopDB{}, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), - Vault: vaultclient.NewMockVaultClient(), - StateUpdater: &MockStateUpdater{}, - PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, - PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc.Copy(), + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: state.NoopDB{}, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, cleanup } diff --git a/client/client.go b/client/client.go index e45288c28..a15618f77 100644 --- a/client/client.go +++ b/client/client.go @@ -259,6 +259,11 @@ type Client struct { // fpInitialized chan is closed when the first batch of fingerprints are // applied to the node and the server is updated fpInitialized chan struct{} + + // serversContactedCh is closed when GetClientAllocs and runAllocs have + // successfully run once. + serversContactedCh chan struct{} + serversContactedOnce sync.Once } var ( @@ -309,6 +314,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), fpInitialized: make(chan struct{}), invalidAllocs: make(map[string]struct{}), + serversContactedCh: make(chan struct{}), + serversContactedOnce: sync.Once{}, } c.batchNodeUpdates = newBatchNodeUpdates( @@ -439,6 +446,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far") } + // Register and then start heartbeating to the servers. + c.shutdownGroup.Go(c.registerAndHeartbeat) + // Restore the state if err := c.restoreState(); err != nil { logger.Error("failed to restore state", "error", err) @@ -453,9 +463,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic return nil, fmt.Errorf("failed to restore state") } - // Register and then start heartbeating to the servers. - c.shutdownGroup.Go(c.registerAndHeartbeat) - // Begin periodic snapshotting of state. c.shutdownGroup.Go(c.periodicSnapshot) @@ -2030,6 +2037,12 @@ func (c *Client) runAllocs(update *allocUpdates) { } } + // Mark servers as having been contacted so blocked tasks that failed + // to restore can now restart. + c.serversContactedOnce.Do(func() { + close(c.serversContactedCh) + }) + // Trigger the GC once more now that new allocs are started that could // have caused thresholds to be exceeded c.garbageCollector.Trigger() diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 1c99a9a9a..749d4c25c 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -555,7 +555,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { } func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { - panic("not implemented") + h, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + return h.TaskStatus(), nil + } func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { diff --git a/e2e/terraform/shared/config/nomad.service b/e2e/terraform/shared/config/nomad.service index 995d055e7..622b69a47 100644 --- a/e2e/terraform/shared/config/nomad.service +++ b/e2e/terraform/shared/config/nomad.service @@ -10,11 +10,15 @@ KillMode=process KillSignal=SIGINT LimitNOFILE=infinity LimitNPROC=infinity +TasksMax=infinity Restart=on-failure RestartSec=2 + +# systemd>=230 prefer StartLimitIntervalSec,StartLimitBurst in Unit, +# however Ubuntu 16.04 only has systemd==229. Use these old style settings +# as they will be supported by newer systemds. StartLimitBurst=3 -StartLimitIntervalSec=10 -TasksMax=infinity +StartLimitInterval=10 [Install] WantedBy=multi-user.target diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c9271efee..6f6d54005 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5971,6 +5971,10 @@ const ( // TaskHookFailed indicates that one of the hooks for a task failed. TaskHookFailed = "Task hook failed" + + // TaskRestoreFailed indicates Nomad was unable to reattach to a + // restored task. + TaskRestoreFailed = "Failed Restoring Task" ) // TaskEvent is an event that effects the state of a task and contains meta-data