client: ensure task is cleaned up when terminal

This commit is a significant change. TR.Run is now always executed, even
for terminal allocations. This was changed to allow TR.Run to cleanup
(run stop hooks) if a handle was recovered.

This is intended to handle the case of Nomad receiving a
DesiredStatus=Stop allocation update, persisting it, but crashing before
stopping AR/TR.

The commit also renames task runner hook data as it was very easy to
accidently set state on Requests instead of Responses using the old
field names.
This commit is contained in:
Michael Schurter 2019-02-21 15:37:22 -08:00
parent 0f45d81e7c
commit ef8d284352
15 changed files with 174 additions and 103 deletions

View file

@ -63,30 +63,26 @@ type allocRunner struct {
// vaultClient is the used to manage Vault tokens
vaultClient vaultclient.VaultClient
// waitCh is closed when the Run() loop has exited
// waitCh is closed when the Run loop has exited
waitCh chan struct{}
// destroyed is true when the Run() loop has exited, postrun hooks have
// destroyed is true when the Run loop has exited, postrun hooks have
// run, and alloc runner has been destroyed. Must acquire destroyedLock
// to access.
destroyed bool
// destroyCh is closed when the Run() loop has exited, postrun hooks have
// destroyCh is closed when the Run loop has exited, postrun hooks have
// run, and alloc runner has been destroyed.
destroyCh chan struct{}
// shutdown is true when the Run() loop has exited, and shutdown hooks have
// shutdown is true when the Run loop has exited, and shutdown hooks have
// run. Must acquire destroyedLock to access.
shutdown bool
// shutdownCh is closed when the Run() loop has exited, and shutdown hooks
// shutdownCh is closed when the Run loop has exited, and shutdown hooks
// have run.
shutdownCh chan struct{}
// runnersLaunched is true if TaskRunners were Run. Must acquire
// destroyedLock to access.
runnersLaunched bool
// destroyLaunched is true if Destroy has been called. Must acquire
// destroyedLock to access.
destroyLaunched bool
@ -95,8 +91,8 @@ type allocRunner struct {
// destroyedLock to access.
shutdownLaunched bool
// destroyedLock guards destroyed, runnersLaunched, destroyLaunched,
// shutdownLaunched, and serializes Shutdown/Destroy calls.
// destroyedLock guards destroyed, destroyLaunched, shutdownLaunched,
// and serializes Shutdown/Destroy calls.
destroyedLock sync.Mutex
// Alloc captures the allocation being run.
@ -237,21 +233,6 @@ func (ar *allocRunner) Run() {
// Start the alloc update handler
go ar.handleAllocUpdates()
// If an alloc should not be run, ensure any restored task handles are
// destroyed and exit to wait for the AR to be GC'd by the client.
if !ar.shouldRun() {
ar.logger.Debug("not running terminal alloc")
// Ensure all tasks are cleaned up
ar.killTasks()
return
}
// Mark task runners as being run for Shutdown
ar.destroyedLock.Lock()
ar.runnersLaunched = true
ar.destroyedLock.Unlock()
// If task update chan has been closed, that means we've been shutdown.
select {
case <-ar.taskStateUpdateHandlerCh:
@ -259,10 +240,12 @@ func (ar *allocRunner) Run() {
default:
}
// Run the prestart hooks
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST
// Run the prestart hooks if non-terminal
if ar.shouldRun() {
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST
}
}
// Run the runners (blocks until they exit)
@ -270,7 +253,6 @@ func (ar *allocRunner) Run() {
POST:
// Run the postrun hooks
// XXX Equivalent to TR.Poststop hook
if err := ar.postrun(); err != nil {
ar.logger.Error("postrun failed", "error", err)
}
@ -873,17 +855,15 @@ func (ar *allocRunner) Shutdown() {
ar.logger.Trace("shutting down")
// Shutdown tasks gracefully if they were run
if ar.runnersLaunched {
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
// Wait for Run to exit
<-ar.waitCh

View file

@ -110,8 +110,6 @@ func (ar *allocRunner) prerun() error {
continue
}
//TODO Check hook state
name := pre.Name()
var start time.Time
if ar.logger.IsTrace() {
@ -123,11 +121,9 @@ func (ar *allocRunner) prerun() error {
return fmt.Errorf("pre-run hook %q failed: %v", name, err)
}
//TODO Persist hook state locally
if ar.logger.IsTrace() {
end := time.Now()
ar.logger.Trace("finished pre-run hooks", "name", name, "end", end, "duration", end.Sub(start))
ar.logger.Trace("finished pre-run hook", "name", name, "end", end, "duration", end.Sub(start))
}
}

View file

@ -12,21 +12,34 @@ type RunnerHook interface {
Name() string
}
// RunnerPrerunHooks are executed before calling TaskRunner.Run for
// non-terminal allocations. Terminal allocations do *not* call prerun.
type RunnerPrerunHook interface {
RunnerHook
Prerun(context.Context) error
}
// RunnerPostrunHooks are executed after calling TaskRunner.Run, even for
// terminal allocations. Therefore Postrun hooks must be safe to call without
// first calling Prerun hooks.
type RunnerPostrunHook interface {
RunnerHook
Postrun() error
}
// RunnerDestroyHooks are executed after AllocRunner.Run has exited and must
// make a best effort cleanup allocation resources. Destroy hooks must be safe
// to call without first calling Prerun.
type RunnerDestroyHook interface {
RunnerHook
Destroy() error
}
// RunnerUpdateHooks are executed when an allocation update is received from
// the server. Update is called concurrently with AllocRunner execution and
// therefore must be safe for concurrent access with other hook methods. Calls
// to Update are serialized so allocaiton updates will always be processed in
// order.
type RunnerUpdateHook interface {
RunnerHook
Update(*RunnerUpdateRequest) error

View file

@ -43,8 +43,9 @@ type TaskHook interface {
}
type TaskPrestartRequest struct {
// HookData is previously set data by the hook
HookData map[string]string
// PreviousState is previously set data by the hook. It must be copied
// to State below to be maintained across restarts.
PreviousState map[string]string
// Task is the task to run
Task *structs.Task
@ -72,18 +73,21 @@ type TaskPrestartResponse struct {
// Devices are the set of devices to mount into the task
Devices []*drivers.DeviceConfig
// HookData allows the hook to emit data to be passed in the next time it is
// run
HookData map[string]string
// State allows the hook to emit data to be passed in the next time it is
// run. Hooks must copy relevant PreviousState to State to maintain it
// across restarts.
State map[string]string
// Done lets the hook indicate that it should only be run once
// Done lets the hook indicate that it completed successfully and
// should not be run again.
Done bool
}
type TaskPrestartHook interface {
TaskHook
// Prestart is called before the task is started.
// Prestart is called before the task is started including after every
// restart.
Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error
}
@ -120,7 +124,8 @@ type TaskPreKillResponse struct{}
type TaskPreKillHook interface {
TaskHook
// PreKilling is called right before a task is going to be killed or restarted.
// PreKilling is called right before a task is going to be killed or
// restarted. They are called concurrently with TaskRunner.Run.
PreKilling(context.Context, *TaskPreKillRequest, *TaskPreKillResponse) error
}
@ -130,7 +135,7 @@ type TaskExitedResponse struct{}
type TaskExitedHook interface {
TaskHook
// Exited is called when a task exits and may or may not be restarted.
// Exited is called after a task exits and may or may not be restarted.
Exited(context.Context, *TaskExitedRequest, *TaskExitedResponse) error
}
@ -151,12 +156,23 @@ type TaskUpdateHook interface {
Update(context.Context, *TaskUpdateRequest, *TaskUpdateResponse) error
}
type TaskStopRequest struct{}
type TaskStopRequest struct {
// ExistingState is previously set hook data and should only be
// read. Stop hooks cannot alter state.
ExistingState map[string]string
}
type TaskStopResponse struct{}
type TaskStopHook interface {
TaskHook
// Stop is called after the task has exited and will not be started again.
// Stop is called after the task has exited and will not be started
// again. It is the only hook guaranteed to be executed whenever
// TaskRunner.Run is called (and not gracefully shutting down).
// Therefore it may be called even when prestart and the other hooks
// have not.
//
// Stop hooks must be idempotent.
Stop(context.Context, *TaskStopRequest, *TaskStopResponse) error
}

View file

@ -37,16 +37,16 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
return nil
}
// Initialize HookData to store download progress
resp.HookData = make(map[string]string, len(req.Task.Artifacts))
// Initialize hook state to store download progress
resp.State = make(map[string]string, len(req.Task.Artifacts))
h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
for _, artifact := range req.Task.Artifacts {
aid := artifact.Hash()
if req.HookData[aid] != "" {
if req.PreviousState[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
resp.HookData[aid] = req.HookData[aid]
resp.State[aid] = req.PreviousState[aid]
continue
}
@ -65,7 +65,7 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.HookData[aid] = "1"
resp.State[aid] = "1"
}
resp.Done = true

View file

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
@ -117,7 +118,7 @@ func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
require.NotNil(t, err)
require.True(t, structs.IsRecoverable(err))
require.Len(t, resp.HookData, 1)
require.Len(t, resp.State, 1)
require.False(t, resp.Done)
require.Len(t, me.events, 1)
require.Equal(t, structs.TaskDownloadingArtifacts, me.events[0].Type)
@ -129,8 +130,8 @@ func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
file2 := filepath.Join(srcdir, "bar.txt")
require.NoError(t, ioutil.WriteFile(file2, []byte{'1'}, 0644))
// Mock TaskRunner by copying HookData from resp to req and reset resp.
req.HookData = resp.HookData
// Mock TaskRunner by copying state from resp to req and reset resp.
req.PreviousState = helper.CopyMapStringString(resp.State)
resp = interfaces.TaskPrestartResponse{}
@ -139,7 +140,7 @@ func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)
require.Len(t, resp.State, 2)
// Assert both files downloaded properly
files, err := filepath.Glob(filepath.Join(destdir, "*.txt"))
@ -150,10 +151,10 @@ func TestTaskRunner_ArtifactHook_PartialDone(t *testing.T) {
// Stop the test server entirely and assert that re-running works
ts.Close()
req.HookData = resp.HookData
req.PreviousState = helper.CopyMapStringString(resp.State)
resp = interfaces.TaskPrestartResponse{}
err = artifactHook.Prestart(context.Background(), req, &resp)
require.NoError(t, err)
require.True(t, resp.Done)
require.Len(t, resp.HookData, 2)
require.Len(t, resp.State, 2)
}

View file

@ -95,7 +95,7 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,
func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
reattachConfig, err := reattachConfigFromHookData(req.HookData)
reattachConfig, err := reattachConfigFromHookData(req.PreviousState)
if err != nil {
h.logger.Error("failed to load reattach config", "error", err)
return err
@ -131,11 +131,20 @@ func (h *logmonHook) Prestart(ctx context.Context,
if err != nil {
return err
}
resp.HookData = map[string]string{logmonReattachKey: string(jsonCfg)}
resp.State = map[string]string{logmonReattachKey: string(jsonCfg)}
return nil
}
func (h *logmonHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error {
func (h *logmonHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
// It's possible that Stop was called without calling Prestart on agent
// restarts. Attempt to reattach to an existing logmon.
if h.logmon == nil || h.logmonPluginClient == nil {
if err := h.reattach(req); err != nil {
h.logger.Debug("error reattaching to logmon when stopping")
}
}
if h.logmon != nil {
h.logmon.Stop()
}
@ -145,3 +154,18 @@ func (h *logmonHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfa
return nil
}
// reattach to a running logmon if possible. Will not start a new logmon.
func (h *logmonHook) reattach(req *interfaces.TaskStopRequest) error {
reattachConfig, err := reattachConfigFromHookData(req.ExistingState)
if err != nil {
return err
}
// Give up if there's no reattach config
if reattachConfig == nil {
return nil
}
return h.launchLogMon(reattachConfig)
}

View file

@ -10,6 +10,7 @@ import (
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
@ -84,19 +85,22 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
defer hook.Stop(context.Background(), nil, nil)
require.False(t, resp.Done)
origHookData := resp.HookData[logmonReattachKey]
origHookData := resp.State[logmonReattachKey]
require.NotEmpty(t, origHookData)
// Running prestart again should effectively noop as it reattaches to
// the running logmon.
req.HookData = map[string]string{
req.PreviousState = map[string]string{
logmonReattachKey: origHookData,
}
require.NoError(t, hook.Prestart(context.Background(), &req, &resp))
require.False(t, resp.Done)
origHookData = resp.HookData[logmonReattachKey]
require.Equal(t, origHookData, req.HookData[logmonReattachKey])
origHookData = resp.State[logmonReattachKey]
require.Equal(t, origHookData, req.PreviousState[logmonReattachKey])
// Running stop should shutdown logmon
require.NoError(t, hook.Stop(context.Background(), nil, nil))
stopReq := interfaces.TaskStopRequest{
ExistingState: helper.CopyMapStringString(resp.State),
}
require.NoError(t, hook.Stop(context.Background(), &stopReq, nil))
}

View file

@ -46,7 +46,7 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
require.NoError(t, hook.Prestart(context.Background(), &req, &resp))
defer hook.Stop(context.Background(), nil, nil)
origHookData := resp.HookData[logmonReattachKey]
origHookData := resp.State[logmonReattachKey]
require.NotEmpty(t, origHookData)
// Pluck PID out of reattach synthesize a crash
@ -75,14 +75,14 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
// Running prestart again should return a recoverable error with no
// reattach config to cause the task to be restarted with a new logmon.
req.HookData = map[string]string{
req.PreviousState = map[string]string{
logmonReattachKey: origHookData,
}
resp = interfaces.TaskPrestartResponse{}
err = hook.Prestart(context.Background(), &req, &resp)
require.Error(t, err)
require.True(t, structs.IsRecoverable(err))
require.Empty(t, resp.HookData)
require.Empty(t, resp.State)
// Running stop should shutdown logmon
require.NoError(t, hook.Stop(context.Background(), nil, nil))

View file

@ -41,8 +41,12 @@ func (s *LocalState) Canonicalize() {
}
}
// Copy should be called with the lock held
// Copy LocalState. Returns nil if nil.
func (s *LocalState) Copy() *LocalState {
if s == nil {
return nil
}
// Create a copy
c := &LocalState{
Hooks: make(map[string]*HookState, len(s.Hooks)),
@ -50,7 +54,7 @@ func (s *LocalState) Copy() *LocalState {
TaskHandle: s.TaskHandle.Copy(),
}
// Copy the hooks
// Copy the hook state
for h, state := range s.Hooks {
c.Hooks[h] = state.Copy()
}
@ -71,7 +75,12 @@ type HookState struct {
Env map[string]string
}
// Copy HookState. Returns nil if its nil.
func (h *HookState) Copy() *HookState {
if h == nil {
return nil
}
c := new(HookState)
*c = *h
c.Data = helper.CopyMapStringString(c.Data)

View file

@ -43,9 +43,9 @@ func (h *taskDirHook) Name() string {
func (h *taskDirHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
fsi := h.runner.driverCapabilities.FSIsolation
if v, ok := req.HookData[TaskDirHookIsDoneDataKey]; ok && v == "true" {
if v, ok := req.PreviousState[TaskDirHookIsDoneDataKey]; ok && v == "true" {
setEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig)
resp.HookData = map[string]string{
resp.State = map[string]string{
TaskDirHookIsDoneDataKey: "true",
}
return nil
@ -68,7 +68,7 @@ func (h *taskDirHook) Prestart(ctx context.Context, req *interfaces.TaskPrestart
// Update the environment variables based on the built task directory
setEnvvars(h.runner.envBuilder, fsi, h.runner.taskDir, h.runner.clientConfig)
resp.HookData = map[string]string{
resp.State = map[string]string{
TaskDirHookIsDoneDataKey: "true",
}
return nil

View file

@ -391,10 +391,9 @@ func (tr *TaskRunner) Run() {
go tr.handleUpdates()
MAIN:
for {
for !tr.Alloc().TerminalStatus() {
select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
@ -411,7 +410,6 @@ MAIN:
select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
@ -483,14 +481,21 @@ MAIN:
case <-time.After(restartDelay):
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
tr.handleKill()
break MAIN
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
tr.logger.Trace("gracefully shutting down during restart delay")
return
}
}
// Ensure handle is cleaned up. Restore could have recovered a task
// that should be terminal, so if the handle still exists we should
// kill it here.
if tr.getDriverHandle() != nil {
tr.handleKill()
}
// Mark the task as dead
tr.UpdateState(structs.TaskStateDead, nil)
@ -709,8 +714,8 @@ func (tr *TaskRunner) initDriver() error {
}
// handleKill is used to handle the a request to kill a task. It will return
//// the handle exit result if one is available and store any error in the task
//// runner killErr value.
// the handle exit result if one is available and store any error in the task
// runner killErr value.
func (tr *TaskRunner) handleKill() *drivers.ExitResult {
// Run the pre killing hooks
tr.preKill()
@ -1039,9 +1044,9 @@ func (tr *TaskRunner) WaitCh() <-chan struct{} {
}
// Update the running allocation with a new version received from the server.
// Calls Update hooks asynchronously with Run().
// Calls Update hooks asynchronously with Run.
//
// This method is safe for calling concurrently with Run() and does not modify
// This method is safe for calling concurrently with Run and does not modify
// the passed in allocation.
func (tr *TaskRunner) Update(update *structs.Allocation) {
task := update.LookupTask(tr.taskName)

View file

@ -1,6 +1,7 @@
package taskrunner
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -113,3 +114,16 @@ func (tr *TaskRunner) hasRunLaunched() bool {
defer tr.runLaunchedLock.Unlock()
return tr.runLaunched
}
// hookState returns the state for the given hook or nil if no state is
// persisted for the hook.
func (tr *TaskRunner) hookState(name string) *state.HookState {
tr.stateLock.RLock()
defer tr.stateLock.RUnlock()
var s *state.HookState
if tr.localState.Hooks != nil {
s = tr.localState.Hooks[name].Copy()
}
return s
}

View file

@ -151,13 +151,7 @@ func (tr *TaskRunner) prestart() error {
TaskResources: tr.taskResources,
}
var origHookState *state.HookState
tr.stateLock.RLock()
if tr.localState.Hooks != nil {
origHookState = tr.localState.Hooks[name]
}
tr.stateLock.RUnlock()
origHookState := tr.hookState(name)
if origHookState != nil {
if origHookState.PrestartDone {
tr.logger.Trace("skipping done prestart hook", "name", pre.Name())
@ -173,7 +167,7 @@ func (tr *TaskRunner) prestart() error {
}
// Give the hook it's old data
req.HookData = origHookState.Data
req.PreviousState = origHookState.Data
}
req.VaultToken = tr.getVaultToken()
@ -195,7 +189,7 @@ func (tr *TaskRunner) prestart() error {
// Store the hook state
{
hookState := &state.HookState{
Data: resp.HookData,
Data: resp.State,
PrestartDone: resp.Done,
Env: resp.Env,
}
@ -361,13 +355,21 @@ func (tr *TaskRunner) stop() error {
}
req := interfaces.TaskStopRequest{}
origHookState := tr.hookState(name)
if origHookState != nil {
// Give the hook data provided by prestart
req.ExistingState = origHookState.Data
}
var resp interfaces.TaskStopResponse
if err := post.Stop(tr.killCtx, &req, &resp); err != nil {
tr.emitHookError(err, name)
merr.Errors = append(merr.Errors, fmt.Errorf("stop hook %q failed: %v", name, err))
}
// No need to persist as TaskStopResponse is currently empty
// Stop hooks cannot alter state and must be idempotent, so
// unlike prestart there's no state to persist here.
if tr.logger.IsTrace() {
end := time.Now()

View file

@ -1546,11 +1546,18 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
consul := conf.Consul.(*consulapi.MockConsulServiceClient)
consulOps := consul.GetOps()
require.Len(t, consulOps, 6)
// pattern: add followed by two removals
// Initial add
require.Equal(t, "add", consulOps[0].Op)
// Removing canary and non-canary entries on first exit
require.Equal(t, "remove", consulOps[1].Op)
require.Equal(t, "remove", consulOps[2].Op)
// Second add on retry
require.Equal(t, "add", consulOps[3].Op)
// Removing canary and non-canary entries on retry
require.Equal(t, "remove", consulOps[4].Op)
require.Equal(t, "remove", consulOps[5].Op)
}