From 149dec21697789b04369bea2e9157b39d7c0221e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 8 Jan 2019 16:42:26 -0800 Subject: [PATCH 1/3] Improve Kill handling on task runner This PR improves how killing a task is handled. Before the kill function directly orchestrated the killing and was only valid while the task was running. The new behavior is to mark the desired state and wait for the task runner to converge to that state. --- client/allocrunner/taskrunner/lifecycle.go | 57 +++--------- client/allocrunner/taskrunner/service_hook.go | 4 + client/allocrunner/taskrunner/task_runner.go | 91 ++++++++++++++++--- .../taskrunner/task_runner_getters.go | 28 ++++++ .../taskrunner/task_runner_hooks.go | 2 +- 5 files changed, 125 insertions(+), 57 deletions(-) diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 6ef91546d..6ca43b6c7 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -4,12 +4,13 @@ import ( "context" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/plugins/drivers" ) // Restart a task. Returns immediately if no task is running. Blocks until // existing task exits or passed-in context is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + tr.logger.Trace("Restart requested", "failure", failure) + // Grab the handle handle := tr.getDriverHandle() @@ -47,6 +48,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai } func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { + tr.logger.Trace("Signal requested", "signal", s) + // Grab the handle handle := tr.getDriverHandle() @@ -65,58 +68,26 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { // Kill a task. Blocks until task exits or context is canceled. State is set to // dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { + tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason) + // Cancel the task runner to break out of restart delay or the main run // loop. tr.killCtxCancel() - // Grab the handle - handle := tr.getDriverHandle() - - // Check it is running - if handle == nil { - return ErrTaskNotRunning - } - - // Emit the event since it may take a long time to kill + // Emit kill event tr.EmitEvent(event) - // Run the hooks prior to killing the task - tr.killing() - - // Tell the restart tracker that the task has been killed so it doesn't - // attempt to restart it. - tr.restartTracker.SetKilled() - - // Kill the task using an exponential backoff in-case of failures. - killErr := tr.killTask(handle) - if killErr != nil { - // We couldn't successfully destroy the resource created. - tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) - } - - // Block until task has exited. - waitCh, err := handle.WaitCh(ctx) - - // The error should be nil or TaskNotFound, if it's something else then a - // failure in the driver or transport layer occurred - if err != nil { - if err == drivers.ErrTaskNotFound { - return nil - } - tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) - return err + // Check if the Run method has started yet. If it hasn't we return early, + // since the task hasn't even started so there is nothing to wait for + if !tr.hasRunLaunched() { + return nil } select { - case <-waitCh: + case <-tr.WaitCh(): case <-ctx.Done(): + return ctx.Err() } - if killErr != nil { - return killErr - } else if err := ctx.Err(); err != nil { - return err - } - - return nil + return tr.getKillErr() } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 0c1d7368a..fdfa3d2c6 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -199,6 +199,10 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices { // interpolateServices returns an interpolated copy of services and checks with // values from the task's environment. func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service { + if taskEnv == nil || len(services) == 0 { + return nil + } + interpolated := make([]*structs.Service, len(services)) for i, origService := range services { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 364594eef..21d913e4b 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -85,6 +85,13 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // shutdownCtx is used to exit the TaskRunner *without* affecting task state. + shutdownCtx context.Context + + // shutdownCtxCancel causes the TaskRunner to exit immediately without + // affecting task state. Useful for testing or graceful agent shutdown. + shutdownCtxCancel context.CancelFunc + // killCtx is the task runner's context representing the tasks's lifecycle. // The context is canceled when the task is killed. killCtx context.Context @@ -92,12 +99,10 @@ type TaskRunner struct { // killCtxCancel is called when killing a task. killCtxCancel context.CancelFunc - // ctx is used to exit the TaskRunner *without* affecting task state. - ctx context.Context - - // ctxCancel causes the TaskRunner to exit immediately without - // affecting task state. Useful for testing or graceful agent shutdown. - ctxCancel context.CancelFunc + // killErr is populatd when killing a task. Access should be done use the + // getter/setter + killErr error + killErrLock sync.Mutex // Logger is the logger for the task runner. logger log.Logger @@ -181,6 +186,11 @@ type TaskRunner struct { // driverManager is used to dispense driver plugins and register event // handlers driverManager drivermanager.Manager + + // runLaunched marks whether the Run goroutine has been started. It should + // be accessed via helpers + runLaunched bool + runLaunchedLock sync.Mutex } type Config struct { @@ -251,8 +261,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { deviceStatsReporter: config.DeviceStatsReporter, killCtx: killCtx, killCtxCancel: killCancel, - ctx: trCtx, - ctxCancel: trCancel, + shutdownCtx: trCtx, + shutdownCtxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), waitCh: make(chan struct{}), devicemanager: config.DeviceManager, @@ -360,6 +370,10 @@ func (tr *TaskRunner) initLabels() { // Run the TaskRunner. Starts the user's task or reattaches to a restored task. // Run closes WaitCh when it exits. Should be started in a goroutine. func (tr *TaskRunner) Run() { + // Mark that the run routine has been launched so that other functions can + // decide to use the wait channel or not. + tr.setRunLaunched() + defer close(tr.waitCh) var result *drivers.ExitResult @@ -372,8 +386,9 @@ MAIN: for { select { case <-tr.killCtx.Done(): + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return default: @@ -388,8 +403,9 @@ MAIN: select { case <-tr.killCtx.Done(): + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return default: @@ -421,7 +437,11 @@ MAIN: tr.logger.Error("wait task failed", "error", err) } else { select { - case <-tr.ctx.Done(): + case <-tr.killCtx.Done(): + // We can go through the normal should restart check since + // the restart tracker knowns it is killed + tr.handleKill() + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return case result = <-resultCh: @@ -455,8 +475,9 @@ MAIN: case <-time.After(restartDelay): case <-tr.killCtx.Done(): tr.logger.Trace("task killed between restarts", "delay", restartDelay) + tr.handleKill() break MAIN - case <-tr.ctx.Done(): + case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return } @@ -682,6 +703,50 @@ func (tr *TaskRunner) initDriver() error { return nil } +// handleKill is used to handle the a request to kill a task. It will store any +// error in the task runner killErr value. +func (tr *TaskRunner) handleKill() { + // Run the hooks prior to killing the task + tr.killing() + + // Tell the restart tracker that the task has been killed so it doesn't + // attempt to restart it. + tr.restartTracker.SetKilled() + + // Check it is running + handle := tr.getDriverHandle() + if handle == nil { + return + } + + // Kill the task using an exponential backoff in-case of failures. + killErr := tr.killTask(handle) + if killErr != nil { + // We couldn't successfully destroy the resource created. + tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr) + tr.setKillErr(killErr) + } + + // Block until task has exited. + waitCh, err := handle.WaitCh(tr.shutdownCtx) + + // The error should be nil or TaskNotFound, if it's something else then a + // failure in the driver or transport layer occurred + if err != nil { + if err == drivers.ErrTaskNotFound { + return + } + tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err) + tr.setKillErr(killErr) + return + } + + select { + case <-waitCh: + case <-tr.shutdownCtx.Done(): + } +} + // killTask kills the task handle. In the case that killing fails, // killTask will retry with an exponential backoff and will give up at a // given limit. Returns an error if the task could not be killed. @@ -1009,7 +1074,7 @@ func (tr *TaskRunner) triggerUpdateHooks() { // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { tr.logger.Trace("shutting down") - tr.ctxCancel() + tr.shutdownCtxCancel() <-tr.WaitCh() diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index e38b9756b..9bb7d3246 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -85,3 +85,31 @@ func (tr *TaskRunner) clearDriverHandle() { } tr.handle = nil } + +// setKillErr stores any error that arouse while killing the task +func (tr *TaskRunner) setKillErr(err error) { + tr.killErrLock.Lock() + defer tr.killErrLock.Unlock() + tr.killErr = err +} + +// getKillErr returns any error that arouse while killing the task +func (tr *TaskRunner) getKillErr() error { + tr.killErrLock.Lock() + defer tr.killErrLock.Unlock() + return tr.killErr +} + +// setRunLaunched marks the fact that the Run loop has been started +func (tr *TaskRunner) setRunLaunched() { + tr.runLaunchedLock.Lock() + defer tr.runLaunchedLock.Unlock() + tr.runLaunched = true +} + +// hasRunLaunched returns whether the Run loop has been started +func (tr *TaskRunner) hasRunLaunched() bool { + tr.runLaunchedLock.Lock() + defer tr.runLaunchedLock.Unlock() + return tr.runLaunched +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index e1ec1d675..88b10cd57 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -256,7 +256,7 @@ func (tr *TaskRunner) poststart() error { // Pass the lazy handle to the hooks so even if the driver exits and we // launch a new one (external plugin), the handle will refresh. - lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger) + lazyHandle := NewLazyHandle(tr.shutdownCtx, tr.getDriverHandle, tr.logger) var merr multierror.Error for _, hook := range tr.runnerHooks { From e5ddff861c5d92328f40d88672acd28ef1ce6bce Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 9 Jan 2019 11:42:40 -0800 Subject: [PATCH 2/3] Spelling fix Co-Authored-By: dadgar --- client/allocrunner/taskrunner/task_runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 21d913e4b..0c68aef7e 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -99,7 +99,7 @@ type TaskRunner struct { // killCtxCancel is called when killing a task. killCtxCancel context.CancelFunc - // killErr is populatd when killing a task. Access should be done use the + // killErr is populated when killing a task. Access should be done use the // getter/setter killErr error killErrLock sync.Mutex From 069e181e8f3d611a8957c1ca2bd8e4e3aad85b26 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 9 Jan 2019 12:04:22 -0800 Subject: [PATCH 3/3] add more comments --- client/allocrunner/taskrunner/lifecycle.go | 4 +++- client/allocrunner/taskrunner/service_hook.go | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index 6ca43b6c7..ac208e723 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -78,7 +78,9 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error tr.EmitEvent(event) // Check if the Run method has started yet. If it hasn't we return early, - // since the task hasn't even started so there is nothing to wait for + // since the task hasn't even started so there is nothing to wait for. This + // is still correct since the Run method no-op since the kill context has + // already been cancelled. if !tr.hasRunLaunched() { return nil } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index fdfa3d2c6..bc23e9cbc 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -199,6 +199,8 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices { // interpolateServices returns an interpolated copy of services and checks with // values from the task's environment. func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service { + // Guard against not having a valid taskEnv. This can be the case if the + // Killing or Exited hook is run before post-run. if taskEnv == nil || len(services) == 0 { return nil }