diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 6ef91546d..ac208e723 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -4,12 +4,13 @@ import ( "context" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/drivers" ) // Restart a task. Returns immediately if no task is running. Blocks until // existing task exits or passed-in context is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + tr.logger.Trace("Restart requested", "failure", failure) + // Grab the handle handle := tr.getDriverHandle() @@ -47,6 +48,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai } func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { + tr.logger.Trace("Signal requested", "signal", s) + // Grab the handle handle := tr.getDriverHandle() @@ -65,58 +68,28 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { // Kill a task. Blocks until task exits or context is canceled. State is set to // dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { + tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason) + // Cancel the task runner to break out of restart delay or the main run // loop. tr.killCtxCancel() - // Grab the handle - handle := tr.getDriverHandle() - - // Check it is running - if handle == nil { - return ErrTaskNotRunning - } - - // Emit the event since it may take a long time to kill + // Emit kill event tr.EmitEvent(event) - // Run the hooks prior to killing the task - tr.killing() - - // Tell the restart tracker that the task has been killed so it doesn't - // attempt to restart it. - tr.restartTracker.SetKilled() - - // Kill the task using an exponential backoff in-case of failures. - killErr := tr.killTask(handle) - if killErr != nil { - // We couldn't successfully destroy the resource created. - tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) - } - - // Block until task has exited. - waitCh, err := handle.WaitCh(ctx) - - // The error should be nil or TaskNotFound, if it's something else then a - // failure in the driver or transport layer occurred - if err != nil { - if err == drivers.ErrTaskNotFound { - return nil - } - tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) - return err + // Check if the Run method has started yet. If it hasn't we return early, + // since the task hasn't even started so there is nothing to wait for. This + // is still correct since the Run method no-op since the kill context has + // already been cancelled. + if !tr.hasRunLaunched() { + return nil } select { - case <-waitCh: + case <-tr.WaitCh(): case <-ctx.Done(): + return ctx.Err() } - if killErr != nil { - return killErr - } else if err := ctx.Err(); err != nil { - return err - } - - return nil + return tr.getKillErr() } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 3c749f1e3..55bd0b60e 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -199,6 +199,12 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices { // interpolateServices returns an interpolated copy of services and checks with // values from the task's environment. func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service { + // Guard against not having a valid taskEnv. This can be the case if the + // Killing or Exited hook is run before post-run. + if taskEnv == nil || len(services) == 0 { + return nil + } + interpolated := make([]*structs.Service, len(services)) for i, origService := range services { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index abaab7842..4562c8f5c 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -85,6 +85,13 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // shutdownCtx is used to exit the TaskRunner *without* affecting task state. + shutdownCtx context.Context + + // shutdownCtxCancel causes the TaskRunner to exit immediately without + // affecting task state. Useful for testing or graceful agent shutdown. + shutdownCtxCancel context.CancelFunc + // killCtx is the task runner's context representing the tasks's lifecycle. // The context is canceled when the task is killed. killCtx context.Context @@ -92,12 +99,10 @@ type TaskRunner struct { // killCtxCancel is called when killing a task. killCtxCancel context.CancelFunc - // ctx is used to exit the TaskRunner *without* affecting task state. - ctx context.Context - - // ctxCancel causes the TaskRunner to exit immediately without - // affecting task state. Useful for testing or graceful agent shutdown. - ctxCancel context.CancelFunc + // killErr is populated when killing a task. Access should be done use the + // getter/setter + killErr error + killErrLock sync.Mutex // Logger is the logger for the task runner. logger log.Logger @@ -181,6 +186,11 @@ type TaskRunner struct { // driverManager is used to dispense driver plugins and register event // handlers driverManager drivermanager.Manager + + // runLaunched marks whether the Run goroutine has been started. It should + // be accessed via helpers + runLaunched bool + runLaunchedLock sync.Mutex } type Config struct { @@ -251,8 +261,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { deviceStatsReporter: config.DeviceStatsReporter, killCtx: killCtx, killCtxCancel: killCancel, - ctx: trCtx, - ctxCancel: trCancel, + shutdownCtx: trCtx, + shutdownCtxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), waitCh: make(chan struct{}), devicemanager: config.DeviceManager, @@ -360,6 +370,10 @@ func (tr *TaskRunner) initLabels() { // Run the TaskRunner. Starts the user's task or reattaches to a restored task. // Run closes WaitCh when it exits. Should be started in a goroutine. func (tr *TaskRunner) Run() { + // Mark that the run routine has been launched so that other functions can + // decide to use the wait channel or not. + tr.setRunLaunched() + defer close(tr.waitCh) var result *drivers.ExitResult @@ -372,8 +386,9 @@ MAIN: for { select { case <-tr.killCtx.Done(): + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return default: @@ -388,8 +403,9 @@ MAIN: select { case <-tr.killCtx.Done(): + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return default: @@ -421,7 +437,11 @@ MAIN: tr.logger.Error("wait task failed", "error", err) } else { select { - case <-tr.ctx.Done(): + case <-tr.killCtx.Done(): + // We can go through the normal should restart check since + // the restart tracker knowns it is killed + tr.handleKill() + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return case result = <-resultCh: @@ -455,8 +475,9 @@ MAIN: case <-time.After(restartDelay): case <-tr.killCtx.Done(): tr.logger.Trace("task killed between restarts", "delay", restartDelay) + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return } @@ -682,6 +703,50 @@ func (tr *TaskRunner) initDriver() error { return nil } +// handleKill is used to handle the a request to kill a task. It will store any +// error in the task runner killErr value. +func (tr *TaskRunner) handleKill() { + // Run the hooks prior to killing the task + tr.killing() + + // Tell the restart tracker that the task has been killed so it doesn't + // attempt to restart it. + tr.restartTracker.SetKilled() + + // Check it is running + handle := tr.getDriverHandle() + if handle == nil { + return + } + + // Kill the task using an exponential backoff in-case of failures. + killErr := tr.killTask(handle) + if killErr != nil { + // We couldn't successfully destroy the resource created. + tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) + tr.setKillErr(killErr) + } + + // Block until task has exited. + waitCh, err := handle.WaitCh(tr.shutdownCtx) + + // The error should be nil or TaskNotFound, if it's something else then a + // failure in the driver or transport layer occurred + if err != nil { + if err == drivers.ErrTaskNotFound { + return + } + tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) + tr.setKillErr(killErr) + return + } + + select { + case <-waitCh: + case <-tr.shutdownCtx.Done(): + } +} + // killTask kills the task handle. In the case that killing fails, // killTask will retry with an exponential backoff and will give up at a // given limit. Returns an error if the task could not be killed. @@ -1009,7 +1074,7 @@ func (tr *TaskRunner) triggerUpdateHooks() { // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { tr.logger.Trace("shutting down") - tr.ctxCancel() + tr.shutdownCtxCancel() <-tr.WaitCh() diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index e38b9756b..9bb7d3246 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -85,3 +85,31 @@ func (tr *TaskRunner) clearDriverHandle() { } tr.handle = nil } + +// setKillErr stores any error that arouse while killing the task +func (tr *TaskRunner) setKillErr(err error) { + tr.killErrLock.Lock() + defer tr.killErrLock.Unlock() + tr.killErr = err +} + +// getKillErr returns any error that arouse while killing the task +func (tr *TaskRunner) getKillErr() error { + tr.killErrLock.Lock() + defer tr.killErrLock.Unlock() + return tr.killErr +} + +// setRunLaunched marks the fact that the Run loop has been started +func (tr *TaskRunner) setRunLaunched() { + tr.runLaunchedLock.Lock() + defer tr.runLaunchedLock.Unlock() + tr.runLaunched = true +} + +// hasRunLaunched returns whether the Run loop has been started +func (tr *TaskRunner) hasRunLaunched() bool { + tr.runLaunchedLock.Lock() + defer tr.runLaunchedLock.Unlock() + return tr.runLaunched +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index e1ec1d675..88b10cd57 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -256,7 +256,7 @@ func (tr *TaskRunner) poststart() error { // Pass the lazy handle to the hooks so even if the driver exits and we // launch a new one (external plugin), the handle will refresh. - lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger) + lazyHandle := NewLazyHandle(tr.shutdownCtx, tr.getDriverHandle, tr.logger) var merr multierror.Error for _, hook := range tr.runnerHooks {