diff --git a/client/allocrunner/taskrunner/lazy_handle.go b/client/allocrunner/taskrunner/lazy_handle.go new file mode 100644 index 000000000..5dfd1c39a --- /dev/null +++ b/client/allocrunner/taskrunner/lazy_handle.go @@ -0,0 +1,142 @@ +package taskrunner + +import ( + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" +) + +const ( + // retrieveBackoffBaseline is the baseline time for exponential backoff while + // retrieving a handle. + retrieveBackoffBaseline = 250 * time.Millisecond + + // retrieveBackoffLimit is the limit of the exponential backoff for + // retrieving a handle. + retrieveBackoffLimit = 5 * time.Second + + // retrieveFailureLimit is how many times we will attempt to retireve a + // handle giving up and potentially leaking resources. + retrieveFailureLimit = 5 +) + +// retrieveHandleFn is used to retrieve the latest driver handle +type retrieveHandleFn func() *DriverHandle + +// LazyHandle is used to front calls to a DriverHandle where it is expected the +// existing handle may no longer be valid because the backing plugin has +// shutdown. LazyHandle detects the plugin shutting down and retrieves a new +// handle so that the consumer does not need to worry whether the handle is to +// the latest driver instance. +type LazyHandle struct { + // retrieveHandle is used to retrieve the latest handle + retrieveHandle retrieveHandleFn + + // h is the current handle and may be nil + h *DriverHandle + + logger log.Logger + sync.Mutex +} + +// NewLazyHandle takes the function to recieve the latest handle and a logger +// and returns a LazyHandle +func NewLazyHandle(fn retrieveHandleFn, logger log.Logger) *LazyHandle { + return &LazyHandle{ + retrieveHandle: fn, + h: fn(), + logger: logger.Named("lazy_handle"), + } +} + +// getHandle returns the current handle or retrieves a new one +func (l *LazyHandle) getHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + + if l.h != nil { + return l.h, nil + } + + return l.refreshHandleLocked() +} + +// refreshHandle retrieves a new handle +func (l *LazyHandle) refreshHandle() (*DriverHandle, error) { + l.Lock() + defer l.Unlock() + return l.refreshHandleLocked() +} + +// refreshHandleLocked retrieves a new handle and should be called with the lock +// held. It will retry to give the client time to restart the driver and restore +// the handle. +func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) { + for i := 0; i < retrieveFailureLimit; i++ { + l.h = l.retrieveHandle() + if l.h != nil { + return l.h, nil + } + + // Calculate the new backoff + backoff := (1 << (2 * uint64(i))) * killBackoffBaseline + if backoff > killBackoffLimit { + backoff = killBackoffLimit + } + + l.logger.Debug("failed to retrieve handle", "backoff", backoff) + time.Sleep(backoff) + } + + return nil, fmt.Errorf("no driver handle") +} + +func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + h, err := l.getHandle() + if err != nil { + return nil, 0, err + } + + // Only retry once + first := true + +TRY: + out, c, err := h.Exec(timeout, cmd, args) + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, c, err +} + +func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) { + h, err := l.getHandle() + if err != nil { + return nil, err + } + + // Only retry once + first := true + +TRY: + out, err := h.Stats() + if err == bstructs.ErrPluginShutdown && first { + first = false + + h, err = l.refreshHandle() + if err == nil { + goto TRY + } + } + + return out, err +} diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 7343df5bc..31100f471 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -99,13 +99,15 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto return } - // We do not log when the plugin is shutdown as this is simply a - // race between the stopCollection channel being closed and calling - // Stats on the handle. + // We do not log when the plugin is shutdown as this is either: + // - A race between the stopCollection channel being closed and + // calling Stats on the handle. + // = The driver plugin has unexpectedly exited + // + // In either case sleeping and trying again or returning based + // on the stop channel is the correct behavior if err != bstructs.ErrPluginShutdown { h.logger.Debug("error fetching stats of task", "error", err) - } else { - // TODO(alex) this breaks if the handle dies } continue diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 6aa0da23a..8063630ee 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error { handle := tr.getDriverHandle() net := handle.Network() + // Pass the lazy handle to the hooks so even if the driver exits and we + // launch a new one (external plugin), the handle will refresh. + lazyHandle := NewLazyHandle(tr.getDriverHandle, tr.logger) + var merr multierror.Error for _, hook := range tr.runnerHooks { post, ok := hook.(interfaces.TaskPoststartHook) @@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error { } req := interfaces.TaskPoststartRequest{ - DriverExec: handle, + DriverExec: lazyHandle, DriverNetwork: net, - DriverStats: handle, + DriverStats: lazyHandle, TaskEnv: tr.envBuilder.Build(), } var resp interfaces.TaskPoststartResponse