Merge pull request #5669 from hashicorp/b-block-on-servers-4

client: do not restart restored tasks until server is contacted
This commit is contained in:
Michael Schurter 2019-05-14 14:27:01 -07:00 committed by GitHub
commit 13eeefebb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 339 additions and 31 deletions

View File

@ -7,6 +7,10 @@ __BACKWARDS INCOMPATIBILITIES:__
to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)]
* client: The format of check IDs in Consul has changed. If you rely upon
Nomad's check IDs you will need to update your code. [[GH-5536](https://github.com/hashicorp/nomad/pull/5536)]
* client: On startup a client will reattach to running tasks as before but
will not restart exited tasks. Exited tasks will be restarted only after the
client has reestablished communication with servers. System jobs will always
be restarted. [[GH-5669](https://github.com/hashicorp/nomad/pull/5669)]
FEATURES:

View File

@ -136,6 +136,11 @@ type allocRunner struct {
// driverManager is responsible for dispensing driver plugins and registering
// event handlers
driverManager drivermanager.Manager
// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}
}
// NewAllocRunner returns a new allocation runner.
@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
prevAllocMigrator: config.PrevAllocMigrator,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
}
// Create the logger based on the allocation ID
@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
}
// Create, but do not Run, the task runner

View File

@ -51,4 +51,8 @@ type Config struct {
// DriverManager handles dispensing of driver plugins
DriverManager drivermanager.Manager
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}

View File

@ -193,6 +193,15 @@ type TaskRunner struct {
// maxEvents is the capacity of the TaskEvents on the TaskState.
// Defaults to defaultMaxEvents but overrideable for testing.
maxEvents int
// serversContactedCh is passed to TaskRunners so they can detect when
// GetClientAllocs has been called in case of a failed restore.
serversContactedCh <-chan struct{}
// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
waitOnServers bool
}
type Config struct {
@ -222,6 +231,10 @@ type Config struct {
// DriverManager is used to dispense driver plugins and register event
// handlers
DriverManager drivermanager.Manager
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
@ -270,6 +283,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
}
// Create the logger based on the allocation ID
@ -381,6 +395,19 @@ func (tr *TaskRunner) Run() {
// - should be handled serially.
go tr.handleUpdates()
// If restore failed wait until servers are contacted before running.
// #1795
if tr.waitOnServers {
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
select {
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
case <-tr.serversContactedCh:
tr.logger.Trace("server contacted; unblocking waiting task")
}
}
MAIN:
for !tr.Alloc().TerminalStatus() {
select {
@ -858,8 +885,28 @@ func (tr *TaskRunner) Restore() error {
if taskHandle := tr.localState.TaskHandle; taskHandle != nil {
//TODO if RecoverTask returned the DriverNetwork we wouldn't
// have to persist it at all!
tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
// If the handle could not be restored, the alloc is
// non-terminal, and the task isn't a system job: wait until
// servers have been contacted before running. #1795
if restored {
return nil
}
alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}
tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.waitOnServers = true
ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
}
return nil
}

View File

@ -25,10 +25,12 @@ import (
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
mockdriver "github.com/hashicorp/nomad/drivers/mock"
"github.com/hashicorp/nomad/drivers/rawexec"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
@ -93,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
}
conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, trCleanup
}
@ -124,8 +127,8 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string)
}
}
// TestTaskRunner_Restore asserts restoring a running task does not rerun the
// task.
// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -178,6 +181,221 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
assert.Equal(t, 1, started)
}
// setupRestoreFailureTest starts a service, shuts down the task runner, and
// kills the task before restarting a new TaskRunner. The new TaskRunner is
// returned once it is running and waiting in pending along with a cleanup
// func.
func setupRestoreFailureTest(t *testing.T, alloc *structs.Allocation) (*TaskRunner, *Config, func()) {
t.Parallel()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "sleep",
"args": []string{"30"},
}
conf, cleanup1 := testTaskRunnerConfig(t, alloc, task.Name)
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs
// Run the first TaskRunner
origTR, err := NewTaskRunner(conf)
require.NoError(t, err)
go origTR.Run()
cleanup2 := func() {
origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
cleanup1()
}
// Wait for it to be running
testWaitForTaskToStart(t, origTR)
handle := origTR.getDriverHandle()
require.NotNil(t, handle)
taskID := handle.taskID
// Cause TR to exit without shutting down task
origTR.Shutdown()
// Get the driver
driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name)
require.NoError(t, err)
rawexecDriver := driverPlugin.(*rawexec.Driver)
// Assert the task is still running despite TR having exited
taskStatus, err := rawexecDriver.InspectTask(taskID)
require.NoError(t, err)
require.Equal(t, drivers.TaskStateRunning, taskStatus.State)
// Kill the task so it fails to recover when restore is called
require.NoError(t, rawexecDriver.DestroyTask(taskID, true))
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())
// Create a new TaskRunner and Restore the task
conf.ServersContactedCh = make(chan struct{})
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)
// Assert the TR will wait on servers because reattachment failed
require.NoError(t, newTR.Restore())
require.True(t, newTR.waitOnServers)
// Start new TR
go newTR.Run()
cleanup3 := func() {
newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
cleanup2()
cleanup1()
}
// Assert task has not been restarted
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())
ts := newTR.TaskState()
require.Equal(t, structs.TaskStatePending, ts.State)
return newTR, conf, cleanup3
}
// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until
// MarkAlive is called. #1795
func TestTaskRunner_Restore_Restart(t *testing.T) {
newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()
// Fake contacting the server by closing the chan
close(conf.ServersContactedCh)
testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}
// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until
// the task is killed. #1795
func TestTaskRunner_Restore_Kill(t *testing.T) {
newTR, _, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()
// Sending the task a terminal update shouldn't kill it or unblock it
alloc := newTR.Alloc().Copy()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
newTR.Update(alloc)
require.Equal(t, structs.TaskStatePending, newTR.TaskState().State)
// AllocRunner will immediately kill tasks after sending a terminal
// update.
newTR.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilling))
select {
case <-newTR.WaitCh():
// It died as expected!
case <-time.After(10 * time.Second):
require.Fail(t, "timeout waiting for task to die")
}
}
// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until
// Update is called. #1795
func TestTaskRunner_Restore_Update(t *testing.T) {
newTR, conf, cleanup := setupRestoreFailureTest(t, mock.Alloc())
defer cleanup()
// Fake Client.runAllocs behavior by calling Update then closing chan
alloc := newTR.Alloc().Copy()
newTR.Update(alloc)
// Update alone should not unblock the test
require.Equal(t, structs.TaskStatePending, newTR.TaskState().State)
// Fake Client.runAllocs behavior of closing chan after Update
close(conf.ServersContactedCh)
testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}
// TestTaskRunner_Restore_System asserts restoring a dead system task does not
// block.
func TestTaskRunner_Restore_System(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.Job.Type = structs.JobTypeSystem
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "raw_exec"
task.Config = map[string]interface{}{
"command": "sleep",
"args": []string{"30"},
}
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
defer cleanup()
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between runs
// Run the first TaskRunner
origTR, err := NewTaskRunner(conf)
require.NoError(t, err)
go origTR.Run()
defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
// Wait for it to be running
testWaitForTaskToStart(t, origTR)
handle := origTR.getDriverHandle()
require.NotNil(t, handle)
taskID := handle.taskID
// Cause TR to exit without shutting down task
origTR.Shutdown()
// Get the driver
driverPlugin, err := conf.DriverManager.Dispense(rawexec.PluginID.Name)
require.NoError(t, err)
rawexecDriver := driverPlugin.(*rawexec.Driver)
// Assert the task is still running despite TR having exited
taskStatus, err := rawexecDriver.InspectTask(taskID)
require.NoError(t, err)
require.Equal(t, drivers.TaskStateRunning, taskStatus.State)
// Kill the task so it fails to recover when restore is called
require.NoError(t, rawexecDriver.DestroyTask(taskID, true))
_, err = rawexecDriver.InspectTask(taskID)
require.EqualError(t, err, drivers.ErrTaskNotFound.Error())
// Create a new TaskRunner and Restore the task
conf.ServersContactedCh = make(chan struct{})
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)
// Assert the TR will not wait on servers even though reattachment
// failed because it is a system task.
require.NoError(t, newTR.Restore())
require.False(t, newTR.waitOnServers)
// Nothing should have closed the chan
select {
case <-conf.ServersContactedCh:
require.Fail(t, "serversContactedCh was closed but should not have been")
default:
}
testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
return ts == structs.TaskStateRunning, fmt.Errorf("expected task to be running but found %q", ts)
}, func(err error) {
require.NoError(t, err)
})
}
// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
// interpolated.
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {

View File

@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
clientConf, cleanup := clientconfig.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, cleanup
}

View File

@ -259,6 +259,11 @@ type Client struct {
// fpInitialized chan is closed when the first batch of fingerprints are
// applied to the node and the server is updated
fpInitialized chan struct{}
// serversContactedCh is closed when GetClientAllocs and runAllocs have
// successfully run once.
serversContactedCh chan struct{}
serversContactedOnce sync.Once
}
var (
@ -309,6 +314,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
fpInitialized: make(chan struct{}),
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
}
c.batchNodeUpdates = newBatchNodeUpdates(
@ -439,6 +446,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far")
}
// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)
// Restore the state
if err := c.restoreState(); err != nil {
logger.Error("failed to restore state", "error", err)
@ -453,9 +463,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to restore state")
}
// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)
// Begin periodic snapshotting of state.
c.shutdownGroup.Go(c.periodicSnapshot)
@ -2030,6 +2037,12 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}
// Mark servers as having been contacted so blocked tasks that failed
// to restore can now restart.
c.serversContactedOnce.Do(func() {
close(c.serversContactedCh)
})
// Trigger the GC once more now that new allocs are started that could
// have caused thresholds to be exceeded
c.garbageCollector.Trigger()

View File

@ -555,7 +555,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
panic("not implemented")
h, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return h.TaskStatus(), nil
}
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {

View File

@ -10,11 +10,15 @@ KillMode=process
KillSignal=SIGINT
LimitNOFILE=infinity
LimitNPROC=infinity
TasksMax=infinity
Restart=on-failure
RestartSec=2
# systemd>=230 prefer StartLimitIntervalSec,StartLimitBurst in Unit,
# however Ubuntu 16.04 only has systemd==229. Use these old style settings
# as they will be supported by newer systemds.
StartLimitBurst=3
StartLimitIntervalSec=10
TasksMax=infinity
StartLimitInterval=10
[Install]
WantedBy=multi-user.target

View File

@ -5971,6 +5971,10 @@ const (
// TaskHookFailed indicates that one of the hooks for a task failed.
TaskHookFailed = "Task hook failed"
// TaskRestoreFailed indicates Nomad was unable to reattach to a
// restored task.
TaskRestoreFailed = "Failed Restoring Task"
)
// TaskEvent is an event that effects the state of a task and contains meta-data