client: support graceful shutdowns
Client.Shutdown now blocks until all AllocRunners and TaskRunners have exited their Run loops. Tasks are left running.
This commit is contained in:
parent
4d92603340
commit
5bd744ac3d
|
@ -37,7 +37,7 @@ type allocRunner struct {
|
|||
// stateUpdater is used to emit updated alloc state
|
||||
stateUpdater cinterfaces.AllocStateHandler
|
||||
|
||||
// taskStateUpdateCh is ticked whenever task state as changed. Must
|
||||
// taskStateUpdatedCh is ticked whenever task state as changed. Must
|
||||
// have len==1 to allow nonblocking notification of state updates while
|
||||
// the goroutine is already processing a previous update.
|
||||
taskStateUpdatedCh chan struct{}
|
||||
|
@ -62,12 +62,12 @@ type allocRunner struct {
|
|||
// to access.
|
||||
destroyed bool
|
||||
|
||||
// runLaunched is true if Run() has been called. If this is false
|
||||
// Destroy() does not wait on tasks to shutdown as they are not
|
||||
// running. Must acquire destroyedLock to access.
|
||||
runLaunched bool
|
||||
// runnersLaunched is true if TaskRunners were Run. Must acquire
|
||||
// destroyedLock to access.
|
||||
runnersLaunched bool
|
||||
|
||||
// destroyedLock guards destroyed, ran, and serializes Destroy() calls.
|
||||
// destroyedLock guards destroyed, runnersLaunched, and serializes
|
||||
// Shutdown/Destroy calls.
|
||||
destroyedLock sync.Mutex
|
||||
|
||||
// Alloc captures the allocation being run.
|
||||
|
@ -178,38 +178,50 @@ func (ar *allocRunner) WaitCh() <-chan struct{} {
|
|||
|
||||
// Run is the main goroutine that executes all the tasks.
|
||||
func (ar *allocRunner) Run() {
|
||||
ar.destroyedLock.Lock()
|
||||
defer ar.destroyedLock.Unlock()
|
||||
// Close the wait channel on return
|
||||
defer close(ar.waitCh)
|
||||
|
||||
// Run should not be called after Destroy is called. This is a
|
||||
// programming error.
|
||||
if ar.destroyed {
|
||||
ar.logger.Error("alloc destroyed; cannot run")
|
||||
return
|
||||
}
|
||||
// Start the task state update handler
|
||||
go ar.handleTaskStateUpdates()
|
||||
|
||||
// If an alloc should not be run, ensure any restored task handles are
|
||||
// destroyed and exit to wait for the AR to be GC'd by the client.
|
||||
if !ar.shouldRun() {
|
||||
ar.logger.Debug("not running terminal alloc")
|
||||
|
||||
// Cleanup and sync state
|
||||
states := ar.killTasks()
|
||||
|
||||
// Get the client allocation
|
||||
calloc := ar.clientAlloc(states)
|
||||
|
||||
// Update the server
|
||||
ar.stateUpdater.AllocStateUpdated(calloc)
|
||||
|
||||
// Broadcast client alloc to listeners
|
||||
ar.allocBroadcaster.Send(calloc)
|
||||
// Ensure all tasks are cleaned up
|
||||
ar.killTasks()
|
||||
return
|
||||
}
|
||||
|
||||
// Run! (and mark as having been run to ensure Destroy cleans up properly)
|
||||
ar.runLaunched = true
|
||||
go ar.runImpl()
|
||||
// Mark task runners as being run for Shutdown
|
||||
ar.destroyedLock.Lock()
|
||||
ar.runnersLaunched = true
|
||||
ar.destroyedLock.Unlock()
|
||||
|
||||
// If task update chan has been closed, that means we've been shutdown.
|
||||
select {
|
||||
case <-ar.taskStateUpdateHandlerCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Run the prestart hooks
|
||||
if err := ar.prerun(); err != nil {
|
||||
ar.logger.Error("prerun failed", "error", err)
|
||||
goto POST
|
||||
}
|
||||
|
||||
// Run the runners and block until they exit
|
||||
<-ar.runTasks()
|
||||
|
||||
POST:
|
||||
// Run the postrun hooks
|
||||
// XXX Equivalent to TR.Poststop hook
|
||||
if err := ar.postrun(); err != nil {
|
||||
ar.logger.Error("postrun failed", "error", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// shouldRun returns true if the alloc is in a state that the alloc runner
|
||||
|
@ -236,30 +248,6 @@ func (ar *allocRunner) shouldRun() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (ar *allocRunner) runImpl() {
|
||||
// Close the wait channel on return
|
||||
defer close(ar.waitCh)
|
||||
|
||||
// Start the task state update handler
|
||||
go ar.handleTaskStateUpdates()
|
||||
|
||||
// Run the prestart hooks
|
||||
if err := ar.prerun(); err != nil {
|
||||
ar.logger.Error("prerun failed", "error", err)
|
||||
goto POST
|
||||
}
|
||||
|
||||
// Run the runners and block until they exit
|
||||
<-ar.runTasks()
|
||||
|
||||
POST:
|
||||
// Run the postrun hooks
|
||||
// XXX Equivalent to TR.Poststop hook
|
||||
if err := ar.postrun(); err != nil {
|
||||
ar.logger.Error("postrun failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// runTasks is used to run the task runners.
|
||||
func (ar *allocRunner) runTasks() <-chan struct{} {
|
||||
for _, task := range ar.tasks {
|
||||
|
@ -328,7 +316,7 @@ func (ar *allocRunner) TaskStateUpdated() {
|
|||
}
|
||||
|
||||
// handleTaskStateUpdates must be run in goroutine as it monitors
|
||||
// taskStateUpdateCh for task state update notifications and processes task
|
||||
// taskStateUpdatedCh for task state update notifications and processes task
|
||||
// states.
|
||||
//
|
||||
// Processing task state updates must be done in a goroutine as it may have to
|
||||
|
@ -340,11 +328,13 @@ func (ar *allocRunner) handleTaskStateUpdates() {
|
|||
select {
|
||||
case <-ar.taskStateUpdatedCh:
|
||||
case <-ar.waitCh:
|
||||
// Tasks have exited, run once more to ensure final
|
||||
// Run has exited, sync once more to ensure final
|
||||
// states are collected.
|
||||
done = true
|
||||
}
|
||||
|
||||
ar.logger.Trace("handling task state update", "done", done)
|
||||
|
||||
// Set with the appropriate event if task runners should be
|
||||
// killed.
|
||||
var killEvent *structs.TaskEvent
|
||||
|
@ -620,12 +610,11 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
|
|||
// exit (thus closing WaitCh).
|
||||
func (ar *allocRunner) Destroy() {
|
||||
ar.destroyedLock.Lock()
|
||||
defer ar.destroyedLock.Unlock()
|
||||
if ar.destroyed {
|
||||
// Only destroy once
|
||||
ar.destroyedLock.Unlock()
|
||||
return
|
||||
}
|
||||
defer ar.destroyedLock.Unlock()
|
||||
|
||||
// Stop any running tasks and persist states in case the client is
|
||||
// shutdown before Destroy finishes.
|
||||
|
@ -633,10 +622,8 @@ func (ar *allocRunner) Destroy() {
|
|||
calloc := ar.clientAlloc(states)
|
||||
ar.stateUpdater.AllocStateUpdated(calloc)
|
||||
|
||||
// Wait for tasks to exit and postrun hooks to finish (if they ran at all)
|
||||
if ar.runLaunched {
|
||||
// Wait for tasks to exit and postrun hooks to finish
|
||||
<-ar.waitCh
|
||||
}
|
||||
|
||||
// Run destroy hooks
|
||||
if err := ar.destroy(); err != nil {
|
||||
|
@ -645,9 +632,7 @@ func (ar *allocRunner) Destroy() {
|
|||
|
||||
// Wait for task state update handler to exit before removing local
|
||||
// state if Run() ran at all.
|
||||
if ar.runLaunched {
|
||||
<-ar.taskStateUpdateHandlerCh
|
||||
}
|
||||
|
||||
// Cleanup state db
|
||||
if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil {
|
||||
|
@ -678,6 +663,43 @@ func (ar *allocRunner) IsWaiting() bool {
|
|||
return ar.prevAllocWatcher.IsWaiting()
|
||||
}
|
||||
|
||||
// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners.
|
||||
// Tasks are unaffected and may be restored.
|
||||
func (ar *allocRunner) Shutdown() {
|
||||
ar.destroyedLock.Lock()
|
||||
defer ar.destroyedLock.Unlock()
|
||||
|
||||
// Destroy is a superset of Shutdown so there's nothing to do if this
|
||||
// has already been destroyed.
|
||||
if ar.destroyed {
|
||||
return
|
||||
}
|
||||
|
||||
ar.logger.Trace("shutting down")
|
||||
|
||||
// Shutdown tasks gracefully if they were run
|
||||
if ar.runnersLaunched {
|
||||
wg := sync.WaitGroup{}
|
||||
for _, tr := range ar.tasks {
|
||||
wg.Add(1)
|
||||
go func(tr *taskrunner.TaskRunner) {
|
||||
tr.Shutdown()
|
||||
wg.Done()
|
||||
}(tr)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Wait for Run to exit
|
||||
<-ar.waitCh
|
||||
|
||||
// Run shutdown hooks
|
||||
ar.shutdownHooks()
|
||||
|
||||
// Wait for updater to finish its final run
|
||||
<-ar.taskStateUpdateHandlerCh
|
||||
}
|
||||
|
||||
// IsMigrating returns true if the alloc runner is migrating data from its
|
||||
// previous allocation.
|
||||
//
|
||||
|
|
|
@ -241,3 +241,27 @@ func (ar *allocRunner) destroy() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdownHooks calls graceful shutdown hooks for when the agent is exiting.
|
||||
func (ar *allocRunner) shutdownHooks() {
|
||||
for _, hook := range ar.runnerHooks {
|
||||
sh, ok := hook.(interfaces.ShutdownHook)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
name := sh.Name()
|
||||
var start time.Time
|
||||
if ar.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
ar.logger.Trace("running shutdown hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
sh.Shutdown()
|
||||
|
||||
if ar.logger.IsTrace() {
|
||||
end := time.Now()
|
||||
ar.logger.Trace("finished shutdown hooks", "name", name, "end", end, "duration", end.Sub(start))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
|
|||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer ar.Destroy()
|
||||
ar.Run()
|
||||
go ar.Run()
|
||||
|
||||
// Wait for all tasks to be killed
|
||||
upd := conf.StateUpdater.(*MockStateUpdater)
|
||||
|
@ -214,7 +214,7 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
|
|||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer ar.Destroy()
|
||||
ar.Run()
|
||||
go ar.Run()
|
||||
|
||||
// Wait for tasks to start
|
||||
upd := conf.StateUpdater.(*MockStateUpdater)
|
||||
|
@ -308,7 +308,6 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
|
|||
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer ar.Destroy()
|
||||
|
||||
// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
|
||||
ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled))
|
||||
|
|
|
@ -188,6 +188,11 @@ func (h *allocHealthWatcherHook) Destroy() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Shutdown() {
|
||||
// Same as Destroy
|
||||
h.Destroy()
|
||||
}
|
||||
|
||||
// watchHealth watches alloc health until it is set, the alloc is stopped, or
|
||||
// the context is canceled. watchHealth will be canceled and restarted on
|
||||
// Updates so calls are serialized with a lock.
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
var _ interfaces.RunnerPrerunHook = (*allocHealthWatcherHook)(nil)
|
||||
var _ interfaces.RunnerUpdateHook = (*allocHealthWatcherHook)(nil)
|
||||
var _ interfaces.RunnerDestroyHook = (*allocHealthWatcherHook)(nil)
|
||||
var _ interfaces.ShutdownHook = (*allocHealthWatcherHook)(nil)
|
||||
|
||||
// allocHealth is emitted to a chan whenever SetHealth is called
|
||||
type allocHealth struct {
|
||||
|
|
|
@ -42,3 +42,11 @@ type HookTarget interface {
|
|||
// State retrieves a copy of the target alloc runners state.
|
||||
State() *state.State
|
||||
}
|
||||
|
||||
// ShutdownHook may be implemented by AllocRunner or TaskRunner hooks and will
|
||||
// be called when the agent process is being shutdown gracefully.
|
||||
type ShutdownHook interface {
|
||||
RunnerHook
|
||||
|
||||
Shutdown()
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ type statsHook struct {
|
|||
updater StatsUpdater
|
||||
interval time.Duration
|
||||
|
||||
// stopCh is closed by Exited
|
||||
// stopCh is closed by Exited or Canceled
|
||||
stopCh chan struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
|
@ -118,3 +118,19 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *statsHook) Shutdown() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if h.stopCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.stopCh:
|
||||
// Already closed
|
||||
default:
|
||||
close(h.stopCh)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
// Statically assert the stats hook implements the expected interfaces
|
||||
var _ interfaces.TaskPoststartHook = (*statsHook)(nil)
|
||||
var _ interfaces.TaskExitedHook = (*statsHook)(nil)
|
||||
var _ interfaces.ShutdownHook = (*statsHook)(nil)
|
||||
|
||||
type mockStatsUpdater struct {
|
||||
// Ch is sent task resource usage updates if not nil
|
||||
|
|
|
@ -67,13 +67,16 @@ type TaskRunner struct {
|
|||
stateUpdater interfaces.TaskStateHandler
|
||||
|
||||
// state captures the state of the task for updating the allocation
|
||||
// Must acquire stateLock to access.
|
||||
state *structs.TaskState
|
||||
stateLock sync.Mutex
|
||||
|
||||
// localState captures the node-local state of the task for when the
|
||||
// Nomad agent restarts
|
||||
// Nomad agent restarts.
|
||||
// Must acquire stateLock to access.
|
||||
localState *state.LocalState
|
||||
localStateLock sync.RWMutex
|
||||
|
||||
// stateLock must be acquired when accessing state or localState.
|
||||
stateLock sync.RWMutex
|
||||
|
||||
// stateDB is for persisting localState and taskState
|
||||
stateDB cstate.StateDB
|
||||
|
@ -498,7 +501,7 @@ func (tr *TaskRunner) runDriver() error {
|
|||
return fmt.Errorf("driver start failed: %v", err)
|
||||
}
|
||||
|
||||
tr.localStateLock.Lock()
|
||||
tr.stateLock.Lock()
|
||||
tr.localState.TaskHandle = handle
|
||||
tr.localState.DriverNetwork = net
|
||||
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
|
||||
|
@ -508,7 +511,7 @@ func (tr *TaskRunner) runDriver() error {
|
|||
tr.logger.Warn("error persisting local task state; may be unable to restore after a Nomad restart",
|
||||
"error", err, "task_id", handle.Config.ID)
|
||||
}
|
||||
tr.localStateLock.Unlock()
|
||||
tr.stateLock.Unlock()
|
||||
|
||||
tr.setDriverHandle(NewDriverHandle(tr.driver, taskConfig.ID, tr.Task(), net))
|
||||
|
||||
|
@ -612,8 +615,8 @@ func (tr *TaskRunner) killTask(handle *DriverHandle) error {
|
|||
|
||||
// persistLocalState persists local state to disk synchronously.
|
||||
func (tr *TaskRunner) persistLocalState() error {
|
||||
tr.localStateLock.Lock()
|
||||
defer tr.localStateLock.Unlock()
|
||||
tr.stateLock.RLock()
|
||||
defer tr.stateLock.RUnlock()
|
||||
|
||||
return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState)
|
||||
}
|
||||
|
@ -673,7 +676,12 @@ func (tr *TaskRunner) restoreHandle(taskHandle *drivers.TaskHandle, net *cstruct
|
|||
}
|
||||
|
||||
if err := tr.driver.RecoverTask(taskHandle); err != nil {
|
||||
tr.logger.Error("error recovering task; destroying and restarting",
|
||||
if tr.TaskState().State != structs.TaskStateRunning {
|
||||
// RecoverTask should fail if the Task wasn't running
|
||||
return
|
||||
}
|
||||
|
||||
tr.logger.Error("error recovering task; cleaning up",
|
||||
"error", err, "task_id", taskHandle.Config.ID)
|
||||
|
||||
// Try to cleanup any existing task state in the plugin before restarting
|
||||
|
@ -846,8 +854,21 @@ func (tr *TaskRunner) WaitCh() <-chan struct{} {
|
|||
// This method is safe for calling concurrently with Run() and does not modify
|
||||
// the passed in allocation.
|
||||
func (tr *TaskRunner) Update(update *structs.Allocation) {
|
||||
task := update.LookupTask(tr.taskName)
|
||||
if task == nil {
|
||||
// This should not happen and likely indicates a bug in the
|
||||
// server or client.
|
||||
tr.logger.Error("allocation update is missing task; killing",
|
||||
"group", update.TaskGroup)
|
||||
te := structs.NewTaskEvent(structs.TaskKilled).
|
||||
SetKillReason("update missing task").
|
||||
SetFailsTask()
|
||||
tr.Kill(context.Background(), te)
|
||||
return
|
||||
}
|
||||
|
||||
// Update tr.alloc
|
||||
tr.setAlloc(update)
|
||||
tr.setAlloc(update, task)
|
||||
|
||||
// Trigger update hooks if not terminal
|
||||
if !update.TerminalStatus() {
|
||||
|
@ -868,6 +889,21 @@ func (tr *TaskRunner) triggerUpdateHooks() {
|
|||
}
|
||||
}
|
||||
|
||||
// Shutdown TaskRunner gracefully without affecting the state of the task.
|
||||
// Shutdown blocks until the main Run loop exits.
|
||||
func (tr *TaskRunner) Shutdown() {
|
||||
tr.logger.Trace("shutting down")
|
||||
tr.ctxCancel()
|
||||
|
||||
<-tr.WaitCh()
|
||||
|
||||
// Run shutdown hooks to cleanup
|
||||
tr.shutdownHooks()
|
||||
|
||||
// Persist once more
|
||||
tr.persistLocalState()
|
||||
}
|
||||
|
||||
// LatestResourceUsage returns the last resource utilization datapoint
|
||||
// collected. May return nil if the task is not running or no resource
|
||||
// utilization has been collected yet.
|
||||
|
|
|
@ -10,10 +10,16 @@ func (tr *TaskRunner) Alloc() *structs.Allocation {
|
|||
return tr.alloc
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) setAlloc(updated *structs.Allocation) {
|
||||
// setAlloc and task on TaskRunner
|
||||
func (tr *TaskRunner) setAlloc(updated *structs.Allocation, task *structs.Task) {
|
||||
tr.allocLock.Lock()
|
||||
defer tr.allocLock.Unlock()
|
||||
|
||||
tr.taskLock.Lock()
|
||||
defer tr.taskLock.Unlock()
|
||||
|
||||
tr.alloc = updated
|
||||
tr.allocLock.Unlock()
|
||||
tr.task = task
|
||||
}
|
||||
|
||||
// IsLeader returns true if this task is the leader of its task group.
|
||||
|
|
|
@ -101,11 +101,11 @@ func (tr *TaskRunner) prestart() error {
|
|||
}
|
||||
|
||||
var origHookState *state.HookState
|
||||
tr.localStateLock.RLock()
|
||||
tr.stateLock.RLock()
|
||||
if tr.localState.Hooks != nil {
|
||||
origHookState = tr.localState.Hooks[name]
|
||||
}
|
||||
tr.localStateLock.RUnlock()
|
||||
tr.stateLock.RUnlock()
|
||||
if origHookState != nil && origHookState.PrestartDone {
|
||||
tr.logger.Trace("skipping done prestart hook", "name", pre.Name())
|
||||
continue
|
||||
|
@ -135,9 +135,9 @@ func (tr *TaskRunner) prestart() error {
|
|||
|
||||
// Store and persist local state if the hook state has changed
|
||||
if !hookState.Equal(origHookState) {
|
||||
tr.localStateLock.Lock()
|
||||
tr.stateLock.Lock()
|
||||
tr.localState.Hooks[name] = hookState
|
||||
tr.localStateLock.Unlock()
|
||||
tr.stateLock.Unlock()
|
||||
|
||||
if err := tr.persistLocalState(); err != nil {
|
||||
return err
|
||||
|
@ -360,12 +360,12 @@ func (tr *TaskRunner) killing() {
|
|||
}
|
||||
|
||||
for _, hook := range tr.runnerHooks {
|
||||
upd, ok := hook.(interfaces.TaskKillHook)
|
||||
killHook, ok := hook.(interfaces.TaskKillHook)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
name := upd.Name()
|
||||
name := killHook.Name()
|
||||
|
||||
// Time the update hook
|
||||
var start time.Time
|
||||
|
@ -374,10 +374,10 @@ func (tr *TaskRunner) killing() {
|
|||
tr.logger.Trace("running kill hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
// Run the update hook
|
||||
// Run the kill hook
|
||||
req := interfaces.TaskKillRequest{}
|
||||
var resp interfaces.TaskKillResponse
|
||||
if err := upd.Killing(context.Background(), &req, &resp); err != nil {
|
||||
if err := killHook.Killing(context.Background(), &req, &resp); err != nil {
|
||||
tr.logger.Error("kill hook failed", "name", name, "error", err)
|
||||
}
|
||||
|
||||
|
@ -389,3 +389,30 @@ func (tr *TaskRunner) killing() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shutdownHooks is called when the TaskRunner is gracefully shutdown but the
|
||||
// task is not being stopped or garbage collected.
|
||||
func (tr *TaskRunner) shutdownHooks() {
|
||||
for _, hook := range tr.runnerHooks {
|
||||
sh, ok := hook.(interfaces.ShutdownHook)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
name := sh.Name()
|
||||
|
||||
// Time the update hook
|
||||
var start time.Time
|
||||
if tr.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
tr.logger.Trace("running shutdown hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
sh.Shutdown()
|
||||
|
||||
if tr.logger.IsTrace() {
|
||||
end := time.Now()
|
||||
tr.logger.Trace("finished shutdown hook", "name", name, "end", end, "duration", end.Sub(start))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,8 +127,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
|
|||
})
|
||||
|
||||
// Cause TR to exit without shutting down task
|
||||
origTR.ctxCancel()
|
||||
<-origTR.WaitCh()
|
||||
origTR.Shutdown()
|
||||
|
||||
// Start a new TaskRunner and make sure it does not rerun the task
|
||||
newTR, err := NewTaskRunner(conf)
|
||||
|
|
|
@ -162,6 +162,10 @@ func (h *vaultHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, r
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *vaultHook) Shutdown() {
|
||||
h.cancel()
|
||||
}
|
||||
|
||||
// run should be called in a go-routine and manages the derivation, renewal and
|
||||
// handling of errors with the Vault token. The optional parameter allows
|
||||
// setting the initial Vault token. This is useful when the Vault token is
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package taskrunner
|
||||
|
||||
import "github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
|
||||
// Statically assert the stats hook implements the expected interfaces
|
||||
var _ interfaces.TaskPrestartHook = (*vaultHook)(nil)
|
||||
var _ interfaces.TaskStopHook = (*vaultHook)(nil)
|
||||
var _ interfaces.ShutdownHook = (*vaultHook)(nil)
|
|
@ -106,6 +106,7 @@ type AllocRunner interface {
|
|||
Alloc() *structs.Allocation
|
||||
AllocState() *arstate.State
|
||||
Destroy()
|
||||
Shutdown()
|
||||
GetAllocDir() *allocdir.AllocDir
|
||||
IsDestroyed() bool
|
||||
IsMigrating() bool
|
||||
|
@ -186,10 +187,19 @@ type Client struct {
|
|||
// HostStatsCollector collects host resource usage stats
|
||||
hostStatsCollector *stats.HostStatsCollector
|
||||
|
||||
// shutdown is true when the Client has been shutdown. Must hold
|
||||
// shutdownLock to access.
|
||||
shutdown bool
|
||||
|
||||
// shutdownCh is closed to signal the Client is shutting down.
|
||||
shutdownCh chan struct{}
|
||||
|
||||
shutdownLock sync.Mutex
|
||||
|
||||
// shutdownGroup are goroutines that exit when shutdownCh is closed.
|
||||
// Shutdown() blocks on Wait() after closing shutdownCh.
|
||||
shutdownGroup group
|
||||
|
||||
// vaultClient is used to interact with Vault for token and secret renewals
|
||||
vaultClient vaultclient.VaultClient
|
||||
|
||||
|
@ -332,7 +342,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
|
||||
// Setup Consul discovery if enabled
|
||||
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
|
||||
go c.consulDiscovery()
|
||||
c.shutdownGroup.Go(c.consulDiscovery)
|
||||
if c.servers.NumServers() == 0 {
|
||||
// No configured servers; trigger discovery manually
|
||||
c.triggerDiscoveryCh <- struct{}{}
|
||||
|
@ -359,19 +369,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
}
|
||||
|
||||
// Register and then start heartbeating to the servers.
|
||||
go c.registerAndHeartbeat()
|
||||
c.shutdownGroup.Go(c.registerAndHeartbeat)
|
||||
|
||||
// Begin periodic snapshotting of state.
|
||||
go c.periodicSnapshot()
|
||||
c.shutdownGroup.Go(c.periodicSnapshot)
|
||||
|
||||
// Begin syncing allocations to the server
|
||||
go c.allocSync()
|
||||
c.shutdownGroup.Go(c.allocSync)
|
||||
|
||||
// Start the client!
|
||||
// Start the client! Don't use the shutdownGroup as run handles
|
||||
// shutdowns manually to prevent updates from being applied during
|
||||
// shutdown.
|
||||
go c.run()
|
||||
|
||||
// Start collecting stats
|
||||
go c.emitStats()
|
||||
c.shutdownGroup.Go(c.emitStats)
|
||||
|
||||
c.logger.Info("started client", "node_id", c.NodeID())
|
||||
return c, nil
|
||||
|
@ -533,20 +545,14 @@ func (c *Client) RPCMinorVersion() int {
|
|||
|
||||
// Shutdown is used to tear down the client
|
||||
func (c *Client) Shutdown() error {
|
||||
c.logger.Info("shutting down")
|
||||
c.shutdownLock.Lock()
|
||||
defer c.shutdownLock.Unlock()
|
||||
|
||||
if c.shutdown {
|
||||
c.logger.Info("already shutdown")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Defer closing the database
|
||||
defer func() {
|
||||
if err := c.stateDB.Close(); err != nil {
|
||||
c.logger.Error("error closing state database on shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
c.logger.Info("shutting down")
|
||||
|
||||
// Shutdown the device manager
|
||||
c.devicemanager.Shutdown()
|
||||
|
@ -559,20 +565,39 @@ func (c *Client) Shutdown() error {
|
|||
// Stop Garbage collector
|
||||
c.garbageCollector.Stop()
|
||||
|
||||
// Destroy all the running allocations.
|
||||
if c.config.DevMode {
|
||||
// In DevMode destroy all the running allocations.
|
||||
for _, ar := range c.getAllocRunners() {
|
||||
ar.Destroy()
|
||||
}
|
||||
for _, ar := range c.getAllocRunners() {
|
||||
<-ar.WaitCh()
|
||||
}
|
||||
} else {
|
||||
// In normal mode call shutdown
|
||||
wg := sync.WaitGroup{}
|
||||
for _, ar := range c.getAllocRunners() {
|
||||
wg.Add(1)
|
||||
go func(ar AllocRunner) {
|
||||
ar.Shutdown()
|
||||
wg.Done()
|
||||
}(ar)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
c.shutdown = true
|
||||
close(c.shutdownCh)
|
||||
|
||||
// Must close connection pool to unblock alloc watcher
|
||||
c.connPool.Shutdown()
|
||||
return nil
|
||||
|
||||
// Wait for goroutines to stop
|
||||
c.shutdownGroup.Wait()
|
||||
|
||||
// One final save state
|
||||
c.saveState()
|
||||
return c.stateDB.Close()
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
|
@ -829,7 +854,7 @@ func (c *Client) restoreState() error {
|
|||
// All allocs restored successfully, run them!
|
||||
c.allocLock.Lock()
|
||||
for _, ar := range c.allocs {
|
||||
ar.Run()
|
||||
go ar.Run()
|
||||
}
|
||||
c.allocLock.Unlock()
|
||||
|
||||
|
@ -1194,10 +1219,10 @@ func (c *Client) registerAndHeartbeat() {
|
|||
c.retryRegisterNode()
|
||||
|
||||
// Start watching changes for node changes
|
||||
go c.watchNodeUpdates()
|
||||
c.shutdownGroup.Go(c.watchNodeUpdates)
|
||||
|
||||
// Start watching for emitting node events
|
||||
go c.watchNodeEvents()
|
||||
c.shutdownGroup.Go(c.watchNodeEvents)
|
||||
|
||||
// Setup the heartbeat timer, for the initial registration
|
||||
// we want to do this quickly. We want to do it extra quickly
|
||||
|
@ -1311,7 +1336,7 @@ func (c *Client) periodicSnapshot() {
|
|||
}
|
||||
}
|
||||
|
||||
// run is a long lived goroutine used to run the client
|
||||
// run is a long lived goroutine used to run the client. Shutdown() stops it first
|
||||
func (c *Client) run() {
|
||||
// Watch for changes in allocations
|
||||
allocUpdates := make(chan *allocUpdates, 8)
|
||||
|
@ -1320,7 +1345,17 @@ func (c *Client) run() {
|
|||
for {
|
||||
select {
|
||||
case update := <-allocUpdates:
|
||||
// Don't apply updates while shutting down.
|
||||
c.shutdownLock.Lock()
|
||||
if c.shutdown {
|
||||
c.shutdownLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Apply updates inside lock to prevent a concurrent
|
||||
// shutdown.
|
||||
c.runAllocs(update)
|
||||
c.shutdownLock.Unlock()
|
||||
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
|
@ -1785,6 +1820,7 @@ OUTER:
|
|||
pulled: pulledAllocs,
|
||||
migrateTokens: resp.MigrateTokens,
|
||||
}
|
||||
|
||||
select {
|
||||
case updates <- update:
|
||||
case <-c.shutdownCh:
|
||||
|
@ -1974,7 +2010,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
|||
// Store the alloc runner.
|
||||
c.allocs[alloc.ID] = ar
|
||||
|
||||
ar.Run()
|
||||
go ar.Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2561,3 +2597,21 @@ func (c *Client) allAllocs() map[string]*structs.Allocation {
|
|||
}
|
||||
return allocs
|
||||
}
|
||||
|
||||
// group wraps a func() in a goroutine and provides a way to block until it
|
||||
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
|
||||
type group struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (g *group) Go(f func()) {
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.wg.Done()
|
||||
f()
|
||||
}()
|
||||
}
|
||||
|
||||
func (g *group) Wait() {
|
||||
g.wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue