diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d6486734e..c4725c6e4 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -336,7 +336,8 @@ func (r *AllocRunner) RestoreState() error { // Restart task runner if RestoreState gave a reason if restartReason != "" { r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason) - tr.Restart("upgrade", restartReason) + const failure = false + tr.Restart("upgrade", restartReason, failure) } } else { tr.Destroy(taskDestroyEvent) diff --git a/client/consul_template.go b/client/consul_template.go index 2e1b1ac60..2f7a62935 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -49,7 +49,7 @@ var ( // TaskHooks is an interface which provides hooks into the tasks life-cycle type TaskHooks interface { // Restart is used to restart the task - Restart(source, reason string) + Restart(source, reason string, failure bool) // Signal is used to signal the task Signal(source, reason string, s os.Signal) error @@ -439,7 +439,8 @@ func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time } if restart { - tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered") + const failure = false + tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered", failure) } else if len(signals) != 0 { var mErr multierror.Error for signal := range signals { diff --git a/client/restarts.go b/client/restarts.go index b6e49e31c..93f2a8fd0 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -37,6 +37,7 @@ type RestartTracker struct { waitRes *dstructs.WaitResult startErr error restartTriggered bool // Whether the task has been signalled to be restarted + failure bool // Whether a failure triggered the restart count int // Current number of attempts. onSuccess bool // Whether to restart on successful exit code. startTime time.Time // When the interval began @@ -79,6 +80,15 @@ func (r *RestartTracker) SetRestartTriggered() *RestartTracker { return r } +// SetFailure is used to mark that a task should be restarted due to failure +// such as a failed Consul healthcheck. +func (r *RestartTracker) SetFailure() *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.failure = true + return r +} + // GetReason returns a human-readable description for the last state returned by // GetState. func (r *RestartTracker) GetReason() string { @@ -106,6 +116,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) { r.startErr = nil r.waitRes = nil r.restartTriggered = false + r.failure = false }() // Hot path if a restart was triggered @@ -138,6 +149,8 @@ func (r *RestartTracker) GetState() (string, time.Duration) { return r.handleStartError() } else if r.waitRes != nil { return r.handleWaitResult() + } else if r.failure { + return r.handleFailure() } return "", 0 @@ -196,6 +209,25 @@ func (r *RestartTracker) handleWaitResult() (string, time.Duration) { return structs.TaskRestarting, r.jitter() } +// handleFailure returns the new state and potential wait duration for +// restarting the task due to a failure like an unhealthy Consul check. +func (r *RestartTracker) handleFailure() (string, time.Duration) { + if r.count > r.policy.Attempts { + if r.policy.Mode == structs.RestartPolicyModeFail { + r.reason = fmt.Sprintf( + `Exceeded allowed attempts %d in interval %v and mode is "fail"`, + r.policy.Attempts, r.policy.Interval) + return structs.TaskNotRestarting, 0 + } else { + r.reason = ReasonDelay + return structs.TaskRestarting, r.getDelay() + } + } + + r.reason = ReasonWithinPolicy + return structs.TaskRestarting, r.jitter() +} + // getDelay returns the delay time to enter the next interval. func (r *RestartTracker) getDelay() time.Duration { end := r.startTime.Add(r.policy.Interval) diff --git a/client/task_runner.go b/client/task_runner.go index 363264489..e31371026 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -65,13 +65,29 @@ var ( taskRunnerStateAllKey = []byte("simple-all") ) +// taskRestartEvent wraps a TaskEvent with additional metadata to control +// restart behavior. +type taskRestartEvent struct { + // taskEvent to report + taskEvent *structs.TaskEvent + + // if false, don't count against restart count + failure bool +} + +func newTaskRestartEvent(source, reason string, failure bool) *taskRestartEvent { + return &taskRestartEvent{ + taskEvent: structs.NewTaskEvent(source).SetRestartReason(reason), + failure: failure, + } +} + // TaskRunner is used to wrap a task within an allocation and provide the execution context. type TaskRunner struct { stateDB *bolt.DB config *config.Config updater TaskStateUpdater logger *log.Logger - alloc *structs.Allocation restartTracker *RestartTracker consul ConsulServiceAPI @@ -82,9 +98,14 @@ type TaskRunner struct { resourceUsage *cstructs.TaskResourceUsage resourceUsageLock sync.RWMutex + alloc *structs.Allocation task *structs.Task taskDir *allocdir.TaskDir + // Synchronizes access to alloc and task since the main Run loop may + // update them concurrent with reads in exported methods. + allocLock sync.Mutex + // envBuilder is used to build the task's environment envBuilder *env.Builder @@ -139,7 +160,7 @@ type TaskRunner struct { unblockLock sync.Mutex // restartCh is used to restart a task - restartCh chan *structs.TaskEvent + restartCh chan *taskRestartEvent // lastStart tracks the last time this task was started or restarted lastStart time.Time @@ -251,7 +272,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, waitCh: make(chan struct{}), startCh: make(chan struct{}, 1), unblockCh: make(chan struct{}), - restartCh: make(chan *structs.TaskEvent), + restartCh: make(chan *taskRestartEvent), signalCh: make(chan SignalEvent), } @@ -776,7 +797,8 @@ OUTER: return } case structs.VaultChangeModeRestart: - r.Restart("vault", "new Vault token acquired") + const failure = false + r.Restart("vault", "new Vault token acquired", failure) case structs.VaultChangeModeNoop: fallthrough default: @@ -1141,7 +1163,7 @@ func (r *TaskRunner) run() { res := r.handle.Signal(se.s) se.result <- res - case event := <-r.restartCh: + case restartEvent := <-r.restartCh: r.runningLock.Lock() running := r.running r.runningLock.Unlock() @@ -1151,8 +1173,8 @@ func (r *TaskRunner) run() { continue } - r.logger.Printf("[DEBUG] client: restarting %s: %v", common, event.RestartReason) - r.setState(structs.TaskStateRunning, event, false) + r.logger.Printf("[DEBUG] client: restarting %s: %v", common, restartEvent.taskEvent.RestartReason) + r.setState(structs.TaskStateRunning, restartEvent.taskEvent, false) r.killTask(nil) close(stopCollection) @@ -1161,9 +1183,13 @@ func (r *TaskRunner) run() { <-handleWaitCh } - // Since the restart isn't from a failure, restart immediately - // and don't count against the restart policy - r.restartTracker.SetRestartTriggered() + if restartEvent.failure { + r.restartTracker.SetFailure() + } else { + // Since the restart isn't from a failure, restart immediately + // and don't count against the restart policy + r.restartTracker.SetRestartTriggered() + } break WAIT case <-r.destroyCh: @@ -1593,6 +1619,7 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { for _, t := range tg.Tasks { if t.Name == r.task.Name { updatedTask = t.Copy() + break } } if updatedTask == nil { @@ -1678,20 +1705,10 @@ func (r *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, return } -// Restart will restart the task -func (r *TaskRunner) Restart(source, reason string) { - r.lastStartMu.Lock() - defer r.lastStartMu.Unlock() - - r.restart(source, reason) -} - -// restart is the internal task restart method. Callers must hold lastStartMu. -func (r *TaskRunner) restart(source, reason string) { - r.lastStart = time.Now() - +// Restart will restart the task. +func (r *TaskRunner) Restart(source, reason string, failure bool) { reasonStr := fmt.Sprintf("%s: %s", source, reason) - event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reasonStr) + event := newTaskRestartEvent(source, reasonStr, failure) select { case r.restartCh <- event: @@ -1699,23 +1716,10 @@ func (r *TaskRunner) restart(source, reason string) { } } -// RestartBy deadline. Restarts a task iff the last time it was started was -// before the deadline. Returns true if restart occurs; false if skipped. -func (r *TaskRunner) RestartBy(deadline time.Time, source, reason string) { - r.lastStartMu.Lock() - defer r.lastStartMu.Unlock() - - if r.lastStart.Before(deadline) { - r.restart(source, reason) - } -} - -// LastStart returns the last time this task was started (including restarts). -func (r *TaskRunner) LastStart() time.Time { - r.lastStartMu.Lock() - ls := r.lastStart - r.lastStartMu.Unlock() - return ls +// RestartDelay returns the value of the delay for this task's restart policy +// for use by the healtcheck watcher. +func (r *TaskRunner) RestartDelay() time.Duration { + return r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup).RestartPolicy.Delay } // Signal will send a signal to the task @@ -1742,6 +1746,9 @@ func (r *TaskRunner) Signal(source, reason string, s os.Signal) error { // Kill will kill a task and store the error, no longer restarting the task. If // fail is set, the task is marked as having failed. func (r *TaskRunner) Kill(source, reason string, fail bool) { + r.allocLock.Lock() + defer r.allocLock.Unlock() + reasonStr := fmt.Sprintf("%s: %s", source, reason) event := structs.NewTaskEvent(structs.TaskKilling).SetKillReason(reasonStr) if fail { diff --git a/command/agent/consul/check_watcher.go b/command/agent/consul/check_watcher.go index 46967ef5f..feefbd520 100644 --- a/command/agent/consul/check_watcher.go +++ b/command/agent/consul/check_watcher.go @@ -20,8 +20,8 @@ type ConsulChecks interface { } type TaskRestarter interface { - LastStart() time.Time - RestartBy(deadline time.Time, source, reason string) + RestartDelay() time.Duration + Restart(source, reason string, failure bool) } // checkRestart handles restarting a task if a check is unhealthy. @@ -34,17 +34,23 @@ type checkRestart struct { // remove this checkID (if true only checkID will be set) remove bool - task TaskRestarter - grace time.Duration - interval time.Duration - timeLimit time.Duration - warning bool + task TaskRestarter + restartDelay time.Duration + grace time.Duration + interval time.Duration + timeLimit time.Duration + warning bool + + // Mutable fields // unhealthyStart is the time a check first went unhealthy. Set to the // zero value if the check passes before timeLimit. - // This is the only mutable field on checkRestart. unhealthyStart time.Time + // graceUntil is when the check's grace period expires and unhealthy + // checks should be counted. + graceUntil time.Time + logger *log.Logger } @@ -66,9 +72,8 @@ func (c *checkRestart) update(now time.Time, status string) { return } - if now.Before(c.task.LastStart().Add(c.grace)) { - // In grace period, reset state and exit - c.unhealthyStart = time.Time{} + if now.Before(c.graceUntil) { + // In grace period exit return } @@ -81,10 +86,17 @@ func (c *checkRestart) update(now time.Time, status string) { restartAt := c.unhealthyStart.Add(c.timeLimit) // Must test >= because if limit=1, restartAt == first failure - if now.UnixNano() >= restartAt.UnixNano() { + if now.Equal(restartAt) || now.After(restartAt) { // hasn't become healthy by deadline, restart! c.logger.Printf("[DEBUG] consul.health: restarting alloc %q task %q due to unhealthy check %q", c.allocID, c.taskName, c.checkName) - c.task.RestartBy(now, "healthcheck", fmt.Sprintf("check %q unhealthy", c.checkName)) + + // Tell TaskRunner to restart due to failure + const failure = true + c.task.Restart("healthcheck", fmt.Sprintf("check %q unhealthy", c.checkName), failure) + + // Reset grace time to grace + restart.delay + (restart.delay * 25%) (the max jitter) + c.graceUntil = now.Add(c.grace + c.restartDelay + time.Duration(float64(c.restartDelay)*0.25)) + c.unhealthyStart = time.Time{} } } @@ -190,7 +202,10 @@ func (w *checkWatcher) Run(ctx context.Context) { for cid, check := range checks { result, ok := results[cid] if !ok { - w.logger.Printf("[WARN] consul.health: watched check %q (%s) not found in Consul", check.checkName, cid) + // Only warn if outside grace period to avoid races with check registration + if now.After(check.graceUntil) { + w.logger.Printf("[WARN] consul.health: watched check %q (%s) not found in Consul", check.checkName, cid) + } continue } @@ -209,16 +224,18 @@ func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.S } c := checkRestart{ - allocID: allocID, - taskName: taskName, - checkID: checkID, - checkName: check.Name, - task: restarter, - interval: check.Interval, - grace: check.CheckRestart.Grace, - timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1), - warning: check.CheckRestart.OnWarning, - logger: w.logger, + allocID: allocID, + taskName: taskName, + checkID: checkID, + checkName: check.Name, + task: restarter, + restartDelay: restarter.RestartDelay(), + interval: check.Interval, + grace: check.CheckRestart.Grace, + graceUntil: time.Now().Add(check.CheckRestart.Grace), + timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1), + warning: check.CheckRestart.OnWarning, + logger: w.logger, } select {