Fix hooks
This commit is contained in:
parent
c9825a9c36
commit
44dca19012
142
client/allocrunner/taskrunner/lazy_handle.go
Normal file
142
client/allocrunner/taskrunner/lazy_handle.go
Normal file
|
@ -0,0 +1,142 @@
|
|||
package taskrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// retrieveBackoffBaseline is the baseline time for exponential backoff while
|
||||
// retrieving a handle.
|
||||
retrieveBackoffBaseline = 250 * time.Millisecond
|
||||
|
||||
// retrieveBackoffLimit is the limit of the exponential backoff for
|
||||
// retrieving a handle.
|
||||
retrieveBackoffLimit = 5 * time.Second
|
||||
|
||||
// retrieveFailureLimit is how many times we will attempt to retireve a
|
||||
// handle giving up and potentially leaking resources.
|
||||
retrieveFailureLimit = 5
|
||||
)
|
||||
|
||||
// retrieveHandleFn is used to retrieve the latest driver handle
|
||||
type retrieveHandleFn func() *DriverHandle
|
||||
|
||||
// LazyHandle is used to front calls to a DriverHandle where it is expected the
|
||||
// existing handle may no longer be valid because the backing plugin has
|
||||
// shutdown. LazyHandle detects the plugin shutting down and retrieves a new
|
||||
// handle so that the consumer does not need to worry whether the handle is to
|
||||
// the latest driver instance.
|
||||
type LazyHandle struct {
|
||||
// retrieveHandle is used to retrieve the latest handle
|
||||
retrieveHandle retrieveHandleFn
|
||||
|
||||
// h is the current handle and may be nil
|
||||
h *DriverHandle
|
||||
|
||||
logger log.Logger
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// NewLazyHandle takes the function to recieve the latest handle and a logger
|
||||
// and returns a LazyHandle
|
||||
func NewLazyHandle(fn retrieveHandleFn, logger log.Logger) *LazyHandle {
|
||||
return &LazyHandle{
|
||||
retrieveHandle: fn,
|
||||
h: fn(),
|
||||
logger: logger.Named("lazy_handle"),
|
||||
}
|
||||
}
|
||||
|
||||
// getHandle returns the current handle or retrieves a new one
|
||||
func (l *LazyHandle) getHandle() (*DriverHandle, error) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if l.h != nil {
|
||||
return l.h, nil
|
||||
}
|
||||
|
||||
return l.refreshHandleLocked()
|
||||
}
|
||||
|
||||
// refreshHandle retrieves a new handle
|
||||
func (l *LazyHandle) refreshHandle() (*DriverHandle, error) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
return l.refreshHandleLocked()
|
||||
}
|
||||
|
||||
// refreshHandleLocked retrieves a new handle and should be called with the lock
|
||||
// held. It will retry to give the client time to restart the driver and restore
|
||||
// the handle.
|
||||
func (l *LazyHandle) refreshHandleLocked() (*DriverHandle, error) {
|
||||
for i := 0; i < retrieveFailureLimit; i++ {
|
||||
l.h = l.retrieveHandle()
|
||||
if l.h != nil {
|
||||
return l.h, nil
|
||||
}
|
||||
|
||||
// Calculate the new backoff
|
||||
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
|
||||
if backoff > killBackoffLimit {
|
||||
backoff = killBackoffLimit
|
||||
}
|
||||
|
||||
l.logger.Debug("failed to retrieve handle", "backoff", backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no driver handle")
|
||||
}
|
||||
|
||||
func (l *LazyHandle) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) {
|
||||
h, err := l.getHandle()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Only retry once
|
||||
first := true
|
||||
|
||||
TRY:
|
||||
out, c, err := h.Exec(timeout, cmd, args)
|
||||
if err == bstructs.ErrPluginShutdown && first {
|
||||
first = false
|
||||
|
||||
h, err = l.refreshHandle()
|
||||
if err == nil {
|
||||
goto TRY
|
||||
}
|
||||
}
|
||||
|
||||
return out, c, err
|
||||
}
|
||||
|
||||
func (l *LazyHandle) Stats() (*cstructs.TaskResourceUsage, error) {
|
||||
h, err := l.getHandle()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only retry once
|
||||
first := true
|
||||
|
||||
TRY:
|
||||
out, err := h.Stats()
|
||||
if err == bstructs.ErrPluginShutdown && first {
|
||||
first = false
|
||||
|
||||
h, err = l.refreshHandle()
|
||||
if err == nil {
|
||||
goto TRY
|
||||
}
|
||||
}
|
||||
|
||||
return out, err
|
||||
}
|
|
@ -99,13 +99,15 @@ func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, sto
|
|||
return
|
||||
}
|
||||
|
||||
// We do not log when the plugin is shutdown as this is simply a
|
||||
// race between the stopCollection channel being closed and calling
|
||||
// Stats on the handle.
|
||||
// We do not log when the plugin is shutdown as this is either:
|
||||
// - A race between the stopCollection channel being closed and
|
||||
// calling Stats on the handle.
|
||||
// = The driver plugin has unexpectedly exited
|
||||
//
|
||||
// In either case sleeping and trying again or returning based
|
||||
// on the stop channel is the correct behavior
|
||||
if err != bstructs.ErrPluginShutdown {
|
||||
h.logger.Debug("error fetching stats of task", "error", err)
|
||||
} else {
|
||||
// TODO(alex) this breaks if the handle dies
|
||||
}
|
||||
|
||||
continue
|
||||
|
|
|
@ -254,6 +254,10 @@ func (tr *TaskRunner) poststart() error {
|
|||
handle := tr.getDriverHandle()
|
||||
net := handle.Network()
|
||||
|
||||
// Pass the lazy handle to the hooks so even if the driver exits and we
|
||||
// launch a new one (external plugin), the handle will refresh.
|
||||
lazyHandle := NewLazyHandle(tr.getDriverHandle, tr.logger)
|
||||
|
||||
var merr multierror.Error
|
||||
for _, hook := range tr.runnerHooks {
|
||||
post, ok := hook.(interfaces.TaskPoststartHook)
|
||||
|
@ -269,9 +273,9 @@ func (tr *TaskRunner) poststart() error {
|
|||
}
|
||||
|
||||
req := interfaces.TaskPoststartRequest{
|
||||
DriverExec: handle,
|
||||
DriverExec: lazyHandle,
|
||||
DriverNetwork: net,
|
||||
DriverStats: handle,
|
||||
DriverStats: lazyHandle,
|
||||
TaskEnv: tr.envBuilder.Build(),
|
||||
}
|
||||
var resp interfaces.TaskPoststartResponse
|
||||
|
|
Loading…
Reference in a new issue