drivers: Capture exit code when task is killed (#10494)
This commit ensures Nomad captures the task code more reliably even when the task is killed. This issue affect to `raw_exec` driver, as noted in https://github.com/hashicorp/nomad/issues/10430 . We fix this issue by ensuring that the TaskRunner only calls `driver.WaitTask` once. The TaskRunner monitors the completion of the task by calling `driver.WaitTask` which should return the task exit code on completion. However, it also could return a "context canceled" error if the agent/executor is shutdown. Previously, when a task is to be stopped, the killTask path makes two WaitTask calls, and the second returns "context canceled" occasionally because of a "race" in task shutting down and depending on driver, and how fast it shuts down after task completes. By having a single WaitTask call and consistently waiting for the task, we ensure we capture the exit code reliably before the executor is shutdown or the contexts expired. I opted to change the TaskRunner implementation to avoid changing the driver interface or requiring 3rd party drivers to update. Additionally, the PR ensures that attempts to kill the task terminate when the task "naturally" dies. Without this change, if the task dies at the right moment, the `killTask` call may retry to kill an already-dead task for up to 5 minutes before giving up.
This commit is contained in:
parent
a86477021f
commit
067fd86a8c
|
@ -36,7 +36,7 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
|
|||
}
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
if err := tr.killTask(handle); err != nil {
|
||||
if _, err := tr.killTask(handle, waitCh); err != nil {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
|
||||
}
|
||||
|
|
|
@ -569,7 +569,7 @@ MAIN:
|
|||
case <-tr.killCtx.Done():
|
||||
// We can go through the normal should restart check since
|
||||
// the restart tracker knowns it is killed
|
||||
result = tr.handleKill()
|
||||
result = tr.handleKill(resultCh)
|
||||
case <-tr.shutdownCtx.Done():
|
||||
// TaskRunner was told to exit immediately
|
||||
return
|
||||
|
@ -616,7 +616,7 @@ MAIN:
|
|||
// that should be terminal, so if the handle still exists we should
|
||||
// kill it here.
|
||||
if tr.getDriverHandle() != nil {
|
||||
if result = tr.handleKill(); result != nil {
|
||||
if result = tr.handleKill(nil); result != nil {
|
||||
tr.emitExitResultEvent(result)
|
||||
}
|
||||
|
||||
|
@ -883,7 +883,7 @@ func (tr *TaskRunner) initDriver() error {
|
|||
// handleKill is used to handle the a request to kill a task. It will return
|
||||
// the handle exit result if one is available and store any error in the task
|
||||
// runner killErr value.
|
||||
func (tr *TaskRunner) handleKill() *drivers.ExitResult {
|
||||
func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.ExitResult {
|
||||
// Run the pre killing hooks
|
||||
tr.preKill()
|
||||
|
||||
|
@ -892,7 +892,12 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
|
|||
// before waiting to kill task
|
||||
if delay := tr.Task().ShutdownDelay; delay != 0 {
|
||||
tr.logger.Debug("waiting before killing task", "shutdown_delay", delay)
|
||||
time.Sleep(delay)
|
||||
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
return result
|
||||
case <-time.After(delay):
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the restart tracker that the task has been killed so it doesn't
|
||||
|
@ -900,21 +905,33 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
|
|||
tr.restartTracker.SetKilled()
|
||||
|
||||
// Check it is running
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
return result
|
||||
default:
|
||||
}
|
||||
|
||||
handle := tr.getDriverHandle()
|
||||
if handle == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
killErr := tr.killTask(handle)
|
||||
result, killErr := tr.killTask(handle, resultCh)
|
||||
if killErr != nil {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
|
||||
tr.setKillErr(killErr)
|
||||
}
|
||||
|
||||
if result != nil {
|
||||
return result
|
||||
}
|
||||
|
||||
// Block until task has exited.
|
||||
waitCh, err := handle.WaitCh(tr.shutdownCtx)
|
||||
if resultCh == nil {
|
||||
var err error
|
||||
resultCh, err = handle.WaitCh(tr.shutdownCtx)
|
||||
|
||||
// The error should be nil or TaskNotFound, if it's something else then a
|
||||
// failure in the driver or transport layer occurred
|
||||
|
@ -926,9 +943,10 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
|
|||
tr.setKillErr(killErr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case result := <-waitCh:
|
||||
case result := <-resultCh:
|
||||
return result
|
||||
case <-tr.shutdownCtx.Done():
|
||||
return nil
|
||||
|
@ -938,14 +956,14 @@ func (tr *TaskRunner) handleKill() *drivers.ExitResult {
|
|||
// killTask kills the task handle. In the case that killing fails,
|
||||
// killTask will retry with an exponential backoff and will give up at a
|
||||
// given limit. Returns an error if the task could not be killed.
|
||||
func (tr *TaskRunner) killTask(handle *DriverHandle) error {
|
||||
func (tr *TaskRunner) killTask(handle *DriverHandle, resultCh <-chan *drivers.ExitResult) (*drivers.ExitResult, error) {
|
||||
// Cap the number of times we attempt to kill the task.
|
||||
var err error
|
||||
for i := 0; i < killFailureLimit; i++ {
|
||||
if err = handle.Kill(); err != nil {
|
||||
if err == drivers.ErrTaskNotFound {
|
||||
tr.logger.Warn("couldn't find task to kill", "task_id", handle.ID())
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
// Calculate the new backoff
|
||||
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
|
||||
|
@ -954,13 +972,17 @@ func (tr *TaskRunner) killTask(handle *DriverHandle) error {
|
|||
}
|
||||
|
||||
tr.logger.Error("failed to kill task", "backoff", backoff, "error", err)
|
||||
time.Sleep(backoff)
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
return result, nil
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
} else {
|
||||
// Kill was successful
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// persistLocalState persists local state to disk synchronously.
|
||||
|
|
|
@ -202,6 +202,55 @@ func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestTaskRunner_Stop_ExitCode asserts that the exit code is captured on a task, even if it's stopped
|
||||
func TestTaskRunner_Stop_ExitCode(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
alloc.Job.TaskGroups[0].Count = 1
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.KillSignal = "SIGTERM"
|
||||
task.Driver = "raw_exec"
|
||||
task.Config = map[string]interface{}{
|
||||
"command": "/bin/sleep",
|
||||
"args": []string{"1000"},
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
// Run the first TaskRunner
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
go tr.Run()
|
||||
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
|
||||
// Wait for it to be running
|
||||
testWaitForTaskToStart(t, tr)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = tr.Kill(ctx, structs.NewTaskEvent("shutdown"))
|
||||
require.NoError(t, err)
|
||||
|
||||
var exitEvent *structs.TaskEvent
|
||||
state := tr.TaskState()
|
||||
for _, e := range state.Events {
|
||||
if e.Type == structs.TaskTerminated {
|
||||
exitEvent = e
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNilf(t, exitEvent, "exit event not found: %v", state.Events)
|
||||
|
||||
require.Equal(t, 143, exitEvent.ExitCode)
|
||||
require.Equal(t, 15, exitEvent.Signal)
|
||||
|
||||
}
|
||||
|
||||
// TestTaskRunner_Restore_Running asserts restoring a running task does not
|
||||
// rerun the task.
|
||||
func TestTaskRunner_Restore_Running(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue