From e401c660e70d2fb9e66367c8b756087e20d7d345 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 16 Jul 2018 14:37:27 -0700 Subject: [PATCH] Implement lifecycle hooks on the task runner --- .../taskrunner/restarts/restarts.go | 16 +++ .../taskrunner/restarts/restarts_test.go | 10 ++ .../interfaces/task_lifecycle.go | 28 ++++-- client/allocrunnerv2/taskrunner/errors.go | 11 +++ .../taskrunner/interfaces/events.go | 2 +- .../taskrunner/interfaces/lifecycle.go | 15 +-- client/allocrunnerv2/taskrunner/lifecycle.go | 99 +++++++++++++++++-- .../taskrunner/shutdown_delay_hook.go | 36 +++++++ .../allocrunnerv2/taskrunner/task_runner.go | 95 ++++++++++++------ .../taskrunner/task_runner_getters.go | 17 +++- .../taskrunner/task_runner_hooks.go | 48 ++++++++- .../taskrunner/template/template.go | 44 ++++++--- .../taskrunner/template/template_test.go | 43 ++++---- .../allocrunnerv2/taskrunner/template_hook.go | 5 +- client/allocrunnerv2/taskrunner/vault_hook.go | 32 ++++-- nomad/structs/structs.go | 6 ++ 16 files changed, 414 insertions(+), 93 deletions(-) create mode 100644 client/allocrunnerv2/taskrunner/errors.go create mode 100644 client/allocrunnerv2/taskrunner/shutdown_delay_hook.go diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index 05dd22958..ec7a2594d 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -36,6 +36,7 @@ func NewRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr type RestartTracker struct { waitRes *dstructs.WaitResult startErr error + killed bool // Whether the task has been killed restartTriggered bool // Whether the task has been signalled to be restarted failure bool // Whether a failure triggered the restart count int // Current number of attempts. @@ -95,6 +96,14 @@ func (r *RestartTracker) SetRestartTriggered(failure bool) *RestartTracker { return r } +// SetKilled is used to mark that the task has been killed. +func (r *RestartTracker) SetKilled() *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.killed = true + return r +} + // GetReason returns a human-readable description for the last state returned by // GetState. func (r *RestartTracker) GetReason() string { @@ -123,8 +132,15 @@ func (r *RestartTracker) GetState() (string, time.Duration) { r.waitRes = nil r.restartTriggered = false r.failure = false + r.killed = false }() + // Hot path if task was killed + if r.killed { + r.reason = "" + return structs.TaskKilled, 0 + } + // Hot path if a restart was triggered if r.restartTriggered { r.reason = "" diff --git a/client/allocrunner/taskrunner/restarts/restarts_test.go b/client/allocrunner/taskrunner/restarts/restarts_test.go index 7b8e5ea36..a671df83f 100644 --- a/client/allocrunner/taskrunner/restarts/restarts_test.go +++ b/client/allocrunner/taskrunner/restarts/restarts_test.go @@ -117,6 +117,16 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) { } } +func TestClient_RestartTracker_TaskKilled(t *testing.T) { + t.Parallel() + p := testPolicy(true, structs.RestartPolicyModeFail) + p.Attempts = 0 + rt := NewRestartTracker(p, structs.JobTypeService) + if state, when := rt.SetKilled().GetState(); state != structs.TaskKilled && when != 0 { + t.Fatalf("expect no restart; got %v %v", state, when) + } +} + func TestClient_RestartTracker_RestartTriggered(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) diff --git a/client/allocrunnerv2/interfaces/task_lifecycle.go b/client/allocrunnerv2/interfaces/task_lifecycle.go index 95393b5f3..01d098f8a 100644 --- a/client/allocrunnerv2/interfaces/task_lifecycle.go +++ b/client/allocrunnerv2/interfaces/task_lifecycle.go @@ -8,14 +8,28 @@ import ( ) /* - prestart poststart exited stop - | | | | - | | | | - --------> run ------> exited ----------> not restart ---------> garbage collect - | - | - kill -> exited -> stop + Restart + +--------------------------------------------------------+ + | | + | *Update | + | +-------+ | + | | | | + | | | | + | +---v-------+----+ | + +----v----+ | Running | +----+-----+ +--------------+ + | | *Prestart |----------------| *Exited | | *Stop | | + | Pending +-------------> *Poststart run +---^-----------> Exited +-----------> Terminal | + | | | upon entering | | | | NoRestart | | + +---------+ | running | | +----------+ +--------------+ + | | | + +--------+-------+ | + | | + +-----------+ + *Kill + (forces terminal) + +Link: http://stable.ascii-flow.appspot.com/#Draw4489375405966393064/1824429135 */ // TaskHook is a lifecycle hook into the life cycle of a task runner. diff --git a/client/allocrunnerv2/taskrunner/errors.go b/client/allocrunnerv2/taskrunner/errors.go new file mode 100644 index 000000000..bfe9b3dda --- /dev/null +++ b/client/allocrunnerv2/taskrunner/errors.go @@ -0,0 +1,11 @@ +package taskrunner + +import "errors" + +const ( + errTaskNotRunning = "Task not running" +) + +var ( + ErrTaskNotRunning = errors.New(errTaskNotRunning) +) diff --git a/client/allocrunnerv2/taskrunner/interfaces/events.go b/client/allocrunnerv2/taskrunner/interfaces/events.go index 3f908cedf..b9b623dff 100644 --- a/client/allocrunnerv2/taskrunner/interfaces/events.go +++ b/client/allocrunnerv2/taskrunner/interfaces/events.go @@ -4,5 +4,5 @@ import "github.com/hashicorp/nomad/nomad/structs" type EventEmitter interface { SetState(state string, event *structs.TaskEvent) - EmitEvent(source, message string) + EmitEvent(event *structs.TaskEvent) } diff --git a/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go b/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go index e440943a3..10e8edf82 100644 --- a/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go +++ b/client/allocrunnerv2/taskrunner/interfaces/lifecycle.go @@ -1,11 +1,14 @@ package interfaces -import "os" +import ( + "context" + "os" + + "github.com/hashicorp/nomad/nomad/structs" +) -// XXX These should probably all return an error and we should have predefined -// error types for the task not currently running type TaskLifecycle interface { - Restart(source, reason string, failure bool) - Signal(source, reason string, s os.Signal) error - Kill(source, reason string, fail bool) + Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error + Signal(event *structs.TaskEvent, s os.Signal) error + Kill(ctx context.Context, event *structs.TaskEvent) error } diff --git a/client/allocrunnerv2/taskrunner/lifecycle.go b/client/allocrunnerv2/taskrunner/lifecycle.go index 4d3f7bd0f..9de87f528 100644 --- a/client/allocrunnerv2/taskrunner/lifecycle.go +++ b/client/allocrunnerv2/taskrunner/lifecycle.go @@ -1,16 +1,99 @@ package taskrunner -import "os" +import ( + "context" + "os" -func (tr *TaskRunner) Restart(source, reason string, failure bool) { - // TODO -} + "github.com/hashicorp/nomad/nomad/structs" +) + +func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + // Grab the handle + handle := tr.getDriverHandle() + + // Check it is running + if handle == nil { + return ErrTaskNotRunning + } + + // Emit the event + tr.EmitEvent(event) + + // Tell the restart tracker that a restart triggered the exit + tr.restartTracker.SetRestartTriggered(failure) + + // Kill the task using an exponential backoff in-case of failures. + destroySuccess, err := tr.handleDestroy(handle) + if !destroySuccess { + // We couldn't successfully destroy the resource created. + tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err) + } + + // Drain the wait channel or wait for the request context to be cancelled + select { + case <-handle.WaitCh(): + case <-ctx.Done(): + return ctx.Err() + } -func (tr *TaskRunner) Signal(source, reason string, s os.Signal) error { - // TODO return nil } -func (tr *TaskRunner) Kill(source, reason string, fail bool) { - // TODO +func (tr *TaskRunner) Signal(event *structs.TaskEvent, s os.Signal) error { + // Grab the handle + handle := tr.getDriverHandle() + + // Check it is running + if handle == nil { + return ErrTaskNotRunning + } + + // Emit the event + tr.EmitEvent(event) + + // Send the signal + return handle.Signal(s) +} + +func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { + // Grab the handle + handle := tr.getDriverHandle() + + // Check if the handle is running + if handle == nil { + return ErrTaskNotRunning + } + + // Emit the event + tr.EmitEvent(event) + + // Run the hooks prior to killing the task + tr.kill() + + // Tell the restart tracker that the task has been killed + tr.restartTracker.SetKilled() + + // Kill the task using an exponential backoff in-case of failures. + destroySuccess, destroyErr := tr.handleDestroy(handle) + if !destroySuccess { + // We couldn't successfully destroy the resource created. + tr.logger.Error("failed to kill task. Resources may have been leaked", "error", destroyErr) + } + + // Drain the wait channel or wait for the request context to be cancelled + select { + case <-handle.WaitCh(): + case <-ctx.Done(): + } + + // Store that the task has been destroyed and any associated error. + tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) + + if destroyErr != nil { + return destroyErr + } else if err := ctx.Err(); err != nil { + return err + } + + return nil } diff --git a/client/allocrunnerv2/taskrunner/shutdown_delay_hook.go b/client/allocrunnerv2/taskrunner/shutdown_delay_hook.go new file mode 100644 index 000000000..4bad97136 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/shutdown_delay_hook.go @@ -0,0 +1,36 @@ +package taskrunner + +import ( + "context" + "time" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" +) + +// shutdownDelayHook delays shutting down a task between deregistering it from +// Consul and actually killing it. +type shutdownDelayHook struct { + delay time.Duration + logger log.Logger +} + +func newShutdownDelayHook(delay time.Duration, logger log.Logger) *shutdownDelayHook { + h := &shutdownDelayHook{ + delay: delay, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*shutdownDelayHook) Name() string { + return "shutdown-delay" +} + +func (h *shutdownDelayHook) Kill(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error { + select { + case <-ctx.Done(): + case <-time.After(h.delay): + } + return nil +} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index a6c57489e..e5fd74838 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -25,6 +25,20 @@ import ( "golang.org/x/crypto/blake2b" ) +const ( + // killBackoffBaseline is the baseline time for exponential backoff while + // killing a task. + killBackoffBaseline = 5 * time.Second + + // killBackoffLimit is the limit of the exponential backoff for killing + // the task. + killBackoffLimit = 2 * time.Minute + + // killFailureLimit is how many times we will attempt to kill a task before + // giving up and potentially leaking resources. + killFailureLimit = 5 +) + var ( // taskRunnerStateAllKey holds all the task runners state. At the moment // there is no need to split it @@ -79,10 +93,8 @@ type TaskRunner struct { driver driver.Driver // handle is the handle to the currently running driver - handle driver.DriverHandle - //XXX(schmichael) I think the handle is only manipulated in the Restore - // and Run loops, so there's never concurrent access. - //handleLock sync.Mutex + handle driver.DriverHandle + handleLock sync.Mutex // task is the task being run task *structs.Task @@ -226,9 +238,10 @@ func (tr *TaskRunner) initLabels() { func (tr *TaskRunner) Run() { defer close(tr.waitCh) + var handle driver.DriverHandle MAIN: - for { + for tr.ctx.Err() == nil { // Run the prestart hooks if err := tr.prestart(); err != nil { tr.logger.Error("prestart failed", "error", err) @@ -236,6 +249,10 @@ MAIN: goto RESTART } + if tr.ctx.Err() != nil { + break MAIN + } + // Run the task if err := tr.runDriver(); err != nil { tr.logger.Error("running driver failed", "error", err) @@ -248,24 +265,27 @@ MAIN: tr.logger.Error("poststart failed", "error", err) } - WAIT: + // Grab the handle + handle = tr.getDriverHandle() + select { - case waitRes := <-tr.handle.WaitCh(): + case waitRes := <-handle.WaitCh(): + // Clear the handle + tr.setDriverHandle(nil) + + // Store the wait result on the restart tracker tr.restartTracker.SetWaitResult(waitRes) - case _ = <-tr.updateCh: - //XXX Need to copy handleUpdate over - tr.logger.Warn("update not handled") - goto WAIT case <-tr.ctx.Done(): - tr.logger.Debug("task runner cancelled") - break MAIN + tr.logger.Debug("task killed") } + // TODO Need to run exited hooks + RESTART: // Actually restart by sleeping and also watching for destroy events restart, restartWait := tr.shouldRestart() if !restart { - break + break MAIN } deadline := time.Now().Add(restartWait) @@ -273,9 +293,6 @@ MAIN: for time.Now().Before(deadline) { select { case <-timer.C: - case _ = <-tr.updateCh: - //XXX Need to copy handleUpdate over - tr.logger.Warn("update not handled") case <-tr.ctx.Done(): tr.logger.Debug("task runner cancelled") break MAIN @@ -297,6 +314,9 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { state, when := tr.restartTracker.GetState() reason := tr.restartTracker.GetReason() switch state { + case structs.TaskKilled: + // The task was killed. Nothing to do + return false, 0 case structs.TaskNotRestarting, structs.TaskTerminated: tr.logger.Info("not restarting task", "reason", reason) if state == structs.TaskNotRestarting { @@ -333,8 +353,9 @@ func (tr *TaskRunner) runDriver() error { return err } - // Wait on the handle - tr.handle = sresp.Handle + // Grab the handle + tr.setDriverHandle(sresp.Handle) + //XXX need to capture the driver network // Emit an event that we started @@ -372,6 +393,30 @@ func (tr *TaskRunner) initDriver() error { return nil } +// handleDestroy kills the task handle. In the case that killing fails, +// handleDestroy will retry with an exponential backoff and will give up at a +// given limit. It returns whether the task was destroyed and the error +// associated with the last kill attempt. +func (tr *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) { + // Cap the number of times we attempt to kill the task. + for i := 0; i < killFailureLimit; i++ { + if err = handle.Kill(); err != nil { + // Calculate the new backoff + backoff := (1 << (2 * uint64(i))) * killBackoffBaseline + if backoff > killBackoffLimit { + backoff = killBackoffLimit + } + + tr.logger.Error("failed to kill task", "backoff", backoff, "error", err) + time.Sleep(backoff) + } else { + // Kill was successful + return true, nil + } + } + return +} + // persistLocalState persists local state to disk synchronously. func (tr *TaskRunner) persistLocalState() error { // buffer for writing to boltdb @@ -498,8 +543,7 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { //} } -func (tr *TaskRunner) EmitEvent(source, message string) { - event := structs.NewTaskEvent(source).SetMessage(message) +func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.SetState("", event) } @@ -522,15 +566,6 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { } } -// Shutdown the task runner. Does not stop the task or garbage collect a -// stopped task. -// -// This method is safe for calling concurently with Run(). Callers must -// receive on WaitCh() to block until Run() has exited. -func (tr *TaskRunner) Shutdown() { - tr.ctxCancel() -} - // appendTaskEvent updates the task status by appending the new event. func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { capacity := 10 diff --git a/client/allocrunnerv2/taskrunner/task_runner_getters.go b/client/allocrunnerv2/taskrunner/task_runner_getters.go index 455cec467..3ee76dff8 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_getters.go +++ b/client/allocrunnerv2/taskrunner/task_runner_getters.go @@ -1,6 +1,9 @@ package taskrunner -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "github.com/hashicorp/nomad/client/driver" + "github.com/hashicorp/nomad/nomad/structs" +) func (tr *TaskRunner) Alloc() *structs.Allocation { tr.allocLock.Lock() @@ -25,3 +28,15 @@ func (tr *TaskRunner) setVaultToken(token string) { defer tr.vaultTokenLock.Unlock() tr.vaultToken = token } + +func (tr *TaskRunner) getDriverHandle() driver.DriverHandle { + tr.handleLock.Lock() + defer tr.handleLock.Unlock() + return tr.handle +} + +func (tr *TaskRunner) setDriverHandle(handle driver.DriverHandle) { + tr.handleLock.Lock() + defer tr.handleLock.Unlock() + tr.handle = handle +} diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index 415fb7f94..9323e683e 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -1,6 +1,7 @@ package taskrunner import ( + "context" "fmt" "time" @@ -12,16 +13,18 @@ import ( // initHooks intializes the tasks hooks. func (tr *TaskRunner) initHooks() { hookLogger := tr.logger.Named("task_hook") + task := tr.Task() // Create the task directory hook. This is run first to ensure the // directoy path exists for other hooks. tr.runnerHooks = []interfaces.TaskHook{ newTaskDirHook(tr, hookLogger), newArtifactHook(tr, hookLogger), + newShutdownDelayHook(task.ShutdownDelay, hookLogger), } // If Vault is enabled, add the hook - if task := tr.Task(); task.Vault != nil { + if task.Vault != nil { tr.runnerHooks = append(tr.runnerHooks, newVaultHook(&vaultHookConfig{ vaultStanza: task.Vault, client: tr.vaultClient, @@ -35,7 +38,7 @@ func (tr *TaskRunner) initHooks() { } // If there are templates is enabled, add the hook - if task := tr.Task(); len(task.Templates) != 0 { + if len(task.Templates) != 0 { tr.runnerHooks = append(tr.runnerHooks, newTemplateHook(&templateHookConfig{ logger: hookLogger, lifecycle: tr, @@ -268,6 +271,47 @@ func (tr *TaskRunner) updateHooks() { } } +// kill is used to run the runners kill hooks. +func (tr *TaskRunner) kill() { + if tr.logger.IsTrace() { + start := time.Now() + tr.logger.Trace("running kill hooks", "start", start) + defer func() { + end := time.Now() + tr.logger.Trace("finished kill hooks", "end", end, "duration", end.Sub(start)) + }() + } + + for _, hook := range tr.runnerHooks { + upd, ok := hook.(interfaces.TaskKillHook) + if !ok { + tr.logger.Trace("skipping non-kill hook", "name", hook.Name()) + continue + } + + name := upd.Name() + + // Time the update hook + var start time.Time + if tr.logger.IsTrace() { + start = time.Now() + tr.logger.Trace("running kill hook", "name", name, "start", start) + } + + // Run the update hook + req := interfaces.TaskKillRequest{} + var resp interfaces.TaskKillResponse + if err := upd.Kill(context.Background(), &req, &resp); err != nil { + tr.logger.Error("kill hook failed", "name", name, "error", err) + } + + if tr.logger.IsTrace() { + end := time.Now() + tr.logger.Trace("finished kill hooks", "name", name, "end", end, "duration", end.Sub(start)) + } + } +} + /* TR Hooks: diff --git a/client/allocrunnerv2/taskrunner/template/template.go b/client/allocrunnerv2/taskrunner/template/template.go index 27dba209f..0685d1459 100644 --- a/client/allocrunnerv2/taskrunner/template/template.go +++ b/client/allocrunnerv2/taskrunner/template/template.go @@ -1,6 +1,7 @@ package template import ( + "context" "fmt" "math/rand" "os" @@ -206,7 +207,10 @@ func (tm *TaskTemplateManager) run() { // Read environment variables from env templates before we unblock envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) if err != nil { - tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template failed to read environment variables: %v", err))) return } tm.config.EnvBuilder.SetTemplateEnv(envMap) @@ -250,7 +254,10 @@ WAIT: continue } - tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template failed: %v", err))) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do events := tm.runner.RenderEvents() @@ -328,7 +335,7 @@ WAIT: } missingStr := strings.Join(missingSlice, ", ") - tm.config.Events.EmitEvent(consulTemplateSourceName, fmt.Sprintf("Missing: %s", missingStr)) + tm.config.Events.EmitEvent(structs.NewTaskEvent(consulTemplateSourceName).SetDisplayMessage(fmt.Sprintf("Missing: %s", missingStr))) } } } @@ -350,7 +357,10 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time continue } - tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template failed: %v", err))) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do var handling []string @@ -375,14 +385,20 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time // Lookup the template and determine what to do tmpls, ok := tm.lookup[id] if !ok { - tm.config.Lifecycle.Kill(consulTemplateSourceName, fmt.Sprintf("template runner returned unknown template id %q", id), true) + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template runner returned unknown template id %q", id))) return } // Read environment variables from templates envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) if err != nil { - tm.config.Lifecycle.Kill(consulTemplateSourceName, err.Error(), true) + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template failed to read environment variables: %v", err))) return } tm.config.EnvBuilder.SetTemplateEnv(envMap) @@ -424,13 +440,15 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time } if restart { - const failure = false - tm.config.Lifecycle.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered", failure) + tm.config.Lifecycle.Restart(context.Background(), + structs.NewTaskEvent(structs.TaskRestarting). + SetDisplayMessage("Template with change_mode restart re-rendered"), false) } else if len(signals) != 0 { var mErr multierror.Error for signal := range signals { - err := tm.config.Lifecycle.Signal(consulTemplateSourceName, "template re-rendered", tm.signals[signal]) - if err != nil { + s := tm.signals[signal] + event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetDisplayMessage("Template re-rendered") + if err := tm.config.Lifecycle.Signal(event, s); err != nil { multierror.Append(&mErr, err) } } @@ -440,7 +458,11 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time for signal := range signals { flat = append(flat, tm.signals[signal]) } - tm.config.Lifecycle.Kill(consulTemplateSourceName, fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) + + tm.config.Lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template failed to send signals %v: %v", flat, err))) } } } diff --git a/client/allocrunnerv2/taskrunner/template/template_test.go b/client/allocrunnerv2/taskrunner/template/template_test.go index 102f6e745..f4fefeb8f 100644 --- a/client/allocrunnerv2/taskrunner/template/template_test.go +++ b/client/allocrunnerv2/taskrunner/template/template_test.go @@ -1,6 +1,7 @@ package template import ( + "context" "fmt" "io" "io/ioutil" @@ -41,10 +42,10 @@ type MockTaskHooks struct { UnblockCh chan struct{} - KillReason string - KillCh chan struct{} + KillEvent *structs.TaskEvent + KillCh chan struct{} - Events []string + Events []*structs.TaskEvent EmitEventCh chan struct{} } @@ -57,15 +58,16 @@ func NewMockTaskHooks() *MockTaskHooks { EmitEventCh: make(chan struct{}, 1), } } -func (m *MockTaskHooks) Restart(source, reason string, failure bool) { +func (m *MockTaskHooks) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { m.Restarts++ select { case m.RestartCh <- struct{}{}: default: } + return nil } -func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error { +func (m *MockTaskHooks) Signal(event *structs.TaskEvent, s os.Signal) error { m.Signals = append(m.Signals, s) select { case m.SignalCh <- struct{}{}: @@ -75,16 +77,17 @@ func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error { return m.SignalError } -func (m *MockTaskHooks) Kill(source, reason string, fail bool) { - m.KillReason = reason +func (m *MockTaskHooks) Kill(ctx context.Context, event *structs.TaskEvent) error { + m.KillEvent = event select { case m.KillCh <- struct{}{}: default: } + return nil } -func (m *MockTaskHooks) EmitEvent(source, message string) { - m.Events = append(m.Events, message) +func (m *MockTaskHooks) EmitEvent(event *structs.TaskEvent) { + m.Events = append(m.Events, event) select { case m.EmitEventCh <- struct{}{}: default: @@ -966,6 +969,8 @@ func TestTaskTemplateManager_Interpolate_Destination(t *testing.T) { func TestTaskTemplateManager_Signal_Error(t *testing.T) { t.Parallel() + require := require.New(t) + // Make a template that renders based on a key in Consul and sends SIGALRM key1 := "foo" content1 := "bar" @@ -1006,9 +1011,8 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) { t.Fatalf("Should have received a signals: %+v", harness.mockHooks) } - if !strings.Contains(harness.mockHooks.KillReason, "Sending signals") { - t.Fatalf("Unexpected error: %v", harness.mockHooks.KillReason) - } + require.NotNil(harness.mockHooks.KillEvent) + require.Contains(harness.mockHooks.KillEvent.DisplayMessage, "failed to send signals") } // TestTaskTemplateManager_Env asserts templates with the env flag set are read @@ -1287,6 +1291,8 @@ func TestTaskTemplateManager_Config_VaultGrace(t *testing.T) { func TestTaskTemplateManager_BlockedEvents(t *testing.T) { t.Parallel() + require := require.New(t) + // Make a template that will render based on a key in Consul var embedded string for i := 0; i < 5; i++ { @@ -1315,10 +1321,8 @@ func TestTaskTemplateManager_BlockedEvents(t *testing.T) { } // Check to see we got a correct message - event := harness.mockHooks.Events[0] - if !strings.Contains(event, "and 2 more") { - t.Fatalf("bad event: %q", event) - } + require.Len(harness.mockHooks.Events, 1) + require.Contains(harness.mockHooks.Events[0].DisplayMessage, "and 2 more") // Write 3 keys to Consul for i := 0; i < 3; i++ { @@ -1334,9 +1338,10 @@ func TestTaskTemplateManager_BlockedEvents(t *testing.T) { t.Fatalf("timeout") } + // TODO // Check to see we got a correct message - event = harness.mockHooks.Events[len(harness.mockHooks.Events)-1] - if !strings.Contains(event, "Missing") || strings.Contains(event, "more") { - t.Fatalf("bad event: %q", event) + eventMsg := harness.mockHooks.Events[len(harness.mockHooks.Events)-1].DisplayMessage + if !strings.Contains(eventMsg, "Missing") || strings.Contains(eventMsg, "more") { + t.Fatalf("bad event: %q", eventMsg) } } diff --git a/client/allocrunnerv2/taskrunner/template_hook.go b/client/allocrunnerv2/taskrunner/template_hook.go index 56957aa6b..4b3073d7c 100644 --- a/client/allocrunnerv2/taskrunner/template_hook.go +++ b/client/allocrunnerv2/taskrunner/template_hook.go @@ -148,7 +148,10 @@ func (h *templateHook) Update(ctx context.Context, req *interfaces.TaskUpdateReq if _, err := h.newManager(); err != nil { err := fmt.Errorf("failed to build template manager: %v", err) h.logger.Error("failed to build template manager", "error", err) - h.config.lifecycle.Kill(h.Name(), err.Error(), true) + h.config.lifecycle.Kill(context.Background(), + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Template update %v", err))) } return nil diff --git a/client/allocrunnerv2/taskrunner/vault_hook.go b/client/allocrunnerv2/taskrunner/vault_hook.go index 4fa0b6896..7b52e8b10 100644 --- a/client/allocrunnerv2/taskrunner/vault_hook.go +++ b/client/allocrunnerv2/taskrunner/vault_hook.go @@ -208,7 +208,10 @@ OUTER: if err := h.writeToken(token); err != nil { errorString := "failed to write Vault token to disk" h.logger.Error(errorString, "error", err) - h.lifecycle.Kill("vault", errorString, true) + h.lifecycle.Kill(h.ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Vault %v", errorString))) return } } @@ -232,18 +235,27 @@ OUTER: s, err := signals.Parse(h.vaultStanza.ChangeSignal) if err != nil { h.logger.Error("failed to parse signal", "error", err) - h.lifecycle.Kill("vault", fmt.Sprintf("failed to parse signal: %v", err), true) + h.lifecycle.Kill(h.ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Vault: failed to parse signal: %v", err))) return } - if err := h.lifecycle.Signal("vault", "new Vault token acquired", s); err != nil { + event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetDisplayMessage("Vault: new Vault token acquired") + if err := h.lifecycle.Signal(event, s); err != nil { h.logger.Error("failed to send signal", "error", err) - h.lifecycle.Kill("vault", fmt.Sprintf("failed to send signal: %v", err), true) + h.lifecycle.Kill(h.ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Vault: failed to send signal: %v", err))) return } case structs.VaultChangeModeRestart: const noFailure = false - h.lifecycle.Restart("vault", "new Vault token acquired", noFailure) + h.lifecycle.Restart(h.ctx, + structs.NewTaskEvent(structs.TaskRestarting). + SetDisplayMessage("Vault: new Vault token acquired"), false) case structs.VaultChangeModeNoop: fallthrough default: @@ -289,14 +301,20 @@ func (h *vaultHook) deriveVaultToken() (token string, exit bool) { // Check if this is a server side error if structs.IsServerSide(err) { h.logger.Error("failed to derive Vault token", "error", err, "server_side", true) - h.lifecycle.Kill("vault", fmt.Sprintf("server error deriving vault token: %v", err), true) + h.lifecycle.Kill(h.ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Vault: server failed to derive vault token: %v", err))) return "", true } // Check if we can't recover from the error if !structs.IsRecoverable(err) { h.logger.Error("failed to derive Vault token", "error", err, "recoverable", false) - h.lifecycle.Kill("vault", fmt.Sprintf("failed to derive token: %v", err), true) + h.lifecycle.Kill(h.ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage(fmt.Sprintf("Vault: failed to derive vault token: %v", err))) return "", true } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c5cd06dde..ec0ec4b66 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5900,6 +5900,12 @@ func (te *TaskEvent) GoString() string { return fmt.Sprintf("%v - %v", te.Time, te.Type) } +// SetDisplayMessage sets the display message of TaskEvent +func (te *TaskEvent) SetDisplayMessage(msg string) *TaskEvent { + te.DisplayMessage = msg + return te +} + // SetMessage sets the message of TaskEvent func (te *TaskEvent) SetMessage(msg string) *TaskEvent { te.Message = msg