Merge pull request #5168 from hashicorp/b-kill-race

Improve Kill handling on task runner
This commit is contained in:
Alex Dadgar 2019-01-09 12:05:10 -08:00 committed by GitHub
commit bd12e0b1f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 57 deletions

View file

@ -4,12 +4,13 @@ import (
"context"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// Restart a task. Returns immediately if no task is running. Blocks until
// existing task exits or passed-in context is canceled.
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Restart requested", "failure", failure)
// Grab the handle
handle := tr.getDriverHandle()
@ -47,6 +48,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
}
func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
tr.logger.Trace("Signal requested", "signal", s)
// Grab the handle
handle := tr.getDriverHandle()
@ -65,58 +68,28 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
// Kill a task. Blocks until task exits or context is canceled. State is set to
// dead.
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason)
// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.killCtxCancel()
// Grab the handle
handle := tr.getDriverHandle()
// Check it is running
if handle == nil {
return ErrTaskNotRunning
}
// Emit the event since it may take a long time to kill
// Emit kill event
tr.EmitEvent(event)
// Run the hooks prior to killing the task
tr.killing()
// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()
// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
}
// Block until task has exited.
waitCh, err := handle.WaitCh(ctx)
// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
// Check if the Run method has started yet. If it hasn't we return early,
// since the task hasn't even started so there is nothing to wait for. This
// is still correct since the Run method no-op since the kill context has
// already been cancelled.
if !tr.hasRunLaunched() {
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
return err
}
select {
case <-waitCh:
case <-tr.WaitCh():
case <-ctx.Done():
return ctx.Err()
}
if killErr != nil {
return killErr
} else if err := ctx.Err(); err != nil {
return err
}
return nil
return tr.getKillErr()
}

View file

@ -199,6 +199,12 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
// interpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service {
// Guard against not having a valid taskEnv. This can be the case if the
// Killing or Exited hook is run before post-run.
if taskEnv == nil || len(services) == 0 {
return nil
}
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {

View file

@ -85,6 +85,13 @@ type TaskRunner struct {
// stateDB is for persisting localState and taskState
stateDB cstate.StateDB
// shutdownCtx is used to exit the TaskRunner *without* affecting task state.
shutdownCtx context.Context
// shutdownCtxCancel causes the TaskRunner to exit immediately without
// affecting task state. Useful for testing or graceful agent shutdown.
shutdownCtxCancel context.CancelFunc
// killCtx is the task runner's context representing the tasks's lifecycle.
// The context is canceled when the task is killed.
killCtx context.Context
@ -92,12 +99,10 @@ type TaskRunner struct {
// killCtxCancel is called when killing a task.
killCtxCancel context.CancelFunc
// ctx is used to exit the TaskRunner *without* affecting task state.
ctx context.Context
// ctxCancel causes the TaskRunner to exit immediately without
// affecting task state. Useful for testing or graceful agent shutdown.
ctxCancel context.CancelFunc
// killErr is populated when killing a task. Access should be done use the
// getter/setter
killErr error
killErrLock sync.Mutex
// Logger is the logger for the task runner.
logger log.Logger
@ -181,6 +186,11 @@ type TaskRunner struct {
// driverManager is used to dispense driver plugins and register event
// handlers
driverManager drivermanager.Manager
// runLaunched marks whether the Run goroutine has been started. It should
// be accessed via helpers
runLaunched bool
runLaunchedLock sync.Mutex
}
type Config struct {
@ -251,8 +261,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
ctx: trCtx,
ctxCancel: trCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
devicemanager: config.DeviceManager,
@ -360,6 +370,10 @@ func (tr *TaskRunner) initLabels() {
// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
// Run closes WaitCh when it exits. Should be started in a goroutine.
func (tr *TaskRunner) Run() {
// Mark that the run routine has been launched so that other functions can
// decide to use the wait channel or not.
tr.setRunLaunched()
defer close(tr.waitCh)
var result *drivers.ExitResult
@ -372,8 +386,9 @@ MAIN:
for {
select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
default:
@ -388,8 +403,9 @@ MAIN:
select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
default:
@ -421,7 +437,11 @@ MAIN:
tr.logger.Error("wait task failed", "error", err)
} else {
select {
case <-tr.ctx.Done():
case <-tr.killCtx.Done():
// We can go through the normal should restart check since
// the restart tracker knowns it is killed
tr.handleKill()
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
case result = <-resultCh:
@ -455,8 +475,9 @@ MAIN:
case <-time.After(restartDelay):
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
}
@ -682,6 +703,50 @@ func (tr *TaskRunner) initDriver() error {
return nil
}
// handleKill is used to handle the a request to kill a task. It will store any
// error in the task runner killErr value.
func (tr *TaskRunner) handleKill() {
// Run the hooks prior to killing the task
tr.killing()
// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()
// Check it is running
handle := tr.getDriverHandle()
if handle == nil {
return
}
// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
tr.setKillErr(killErr)
}
// Block until task has exited.
waitCh, err := handle.WaitCh(tr.shutdownCtx)
// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
return
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return
}
select {
case <-waitCh:
case <-tr.shutdownCtx.Done():
}
}
// killTask kills the task handle. In the case that killing fails,
// killTask will retry with an exponential backoff and will give up at a
// given limit. Returns an error if the task could not be killed.
@ -1009,7 +1074,7 @@ func (tr *TaskRunner) triggerUpdateHooks() {
// Shutdown blocks until the main Run loop exits.
func (tr *TaskRunner) Shutdown() {
tr.logger.Trace("shutting down")
tr.ctxCancel()
tr.shutdownCtxCancel()
<-tr.WaitCh()

View file

@ -85,3 +85,31 @@ func (tr *TaskRunner) clearDriverHandle() {
}
tr.handle = nil
}
// setKillErr stores any error that arouse while killing the task
func (tr *TaskRunner) setKillErr(err error) {
tr.killErrLock.Lock()
defer tr.killErrLock.Unlock()
tr.killErr = err
}
// getKillErr returns any error that arouse while killing the task
func (tr *TaskRunner) getKillErr() error {
tr.killErrLock.Lock()
defer tr.killErrLock.Unlock()
return tr.killErr
}
// setRunLaunched marks the fact that the Run loop has been started
func (tr *TaskRunner) setRunLaunched() {
tr.runLaunchedLock.Lock()
defer tr.runLaunchedLock.Unlock()
tr.runLaunched = true
}
// hasRunLaunched returns whether the Run loop has been started
func (tr *TaskRunner) hasRunLaunched() bool {
tr.runLaunchedLock.Lock()
defer tr.runLaunchedLock.Unlock()
return tr.runLaunched
}

View file

@ -256,7 +256,7 @@ func (tr *TaskRunner) poststart() error {
// Pass the lazy handle to the hooks so even if the driver exits and we
// launch a new one (external plugin), the handle will refresh.
lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger)
lazyHandle := NewLazyHandle(tr.shutdownCtx, tr.getDriverHandle, tr.logger)
var merr multierror.Error
for _, hook := range tr.runnerHooks {