Killing a driver handle is retried with an exponential backoff
This commit is contained in:
parent
9301581bb4
commit
18d2d9c091
|
@ -232,6 +232,9 @@ func (h *execHandle) Update(task *structs.Task) error {
|
|||
|
||||
func (h *execHandle) Kill() error {
|
||||
if err := h.executor.ShutDown(); err != nil {
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("executor Shutdown failed: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -282,12 +282,25 @@ func (h *javaHandle) Update(task *structs.Task) error {
|
|||
}
|
||||
|
||||
func (h *javaHandle) Kill() error {
|
||||
h.executor.ShutDown()
|
||||
if err := h.executor.ShutDown(); err != nil {
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("executor Shutdown failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
return nil
|
||||
case <-time.After(h.killTimeout):
|
||||
return h.executor.Exit()
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
if err := h.executor.Exit(); err != nil {
|
||||
return fmt.Errorf("executor Exit failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -305,12 +305,25 @@ func (h *qemuHandle) Update(task *structs.Task) error {
|
|||
// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
|
||||
// to the VM
|
||||
func (h *qemuHandle) Kill() error {
|
||||
h.executor.ShutDown()
|
||||
if err := h.executor.ShutDown(); err != nil {
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("executor Shutdown failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
return nil
|
||||
case <-time.After(h.killTimeout):
|
||||
return h.executor.Exit()
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
if err := h.executor.Exit(); err != nil {
|
||||
return fmt.Errorf("executor Exit failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -207,12 +207,25 @@ func (h *rawExecHandle) Update(task *structs.Task) error {
|
|||
}
|
||||
|
||||
func (h *rawExecHandle) Kill() error {
|
||||
h.executor.ShutDown()
|
||||
if err := h.executor.ShutDown(); err != nil {
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("executor Shutdown failed: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
return nil
|
||||
case <-time.After(h.killTimeout):
|
||||
return h.executor.Exit()
|
||||
if h.pluginClient.Exited() {
|
||||
return nil
|
||||
}
|
||||
if err := h.executor.Exit(); err != nil {
|
||||
return fmt.Errorf("executor Exit failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,20 @@ import (
|
|||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// killBackoffBaseline is the baseline time for exponential backoff while
|
||||
// killing a task.
|
||||
killBackoffBaseline = 5 * time.Second
|
||||
|
||||
// killBackoffLimit is the the limit of the exponential backoff for killing
|
||||
// the task.
|
||||
killBackoffLimit = 5 * time.Minute
|
||||
|
||||
// killFailureLimit is how many times we will attempt to kill a task before
|
||||
// giving up and potentially leaking resources.
|
||||
killFailureLimit = 10
|
||||
)
|
||||
|
||||
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
|
||||
type TaskRunner struct {
|
||||
config *config.Config
|
||||
|
@ -258,17 +272,23 @@ func (r *TaskRunner) run() {
|
|||
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
||||
}
|
||||
case <-r.destroyCh:
|
||||
// Avoid destroying twice
|
||||
if destroyed {
|
||||
continue
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, err := r.handleDestroy()
|
||||
if !destroySuccess {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
|
||||
} else {
|
||||
// Wait for the task to exit but cap the time to ensure we don't block.
|
||||
select {
|
||||
case waitRes = <-r.handle.WaitCh():
|
||||
case <-time.After(3 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
// Send the kill signal, and use the WaitCh to block until complete
|
||||
if err := r.handle.Kill(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
|
||||
destroyErr = err
|
||||
}
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
destroyed = true
|
||||
destroyErr = err
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -382,6 +402,31 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// handleDestroy kills the task handle. In the case that killing fails,
|
||||
// handleDestroy will retry with an exponential backoff and will give up at a
|
||||
// given limit. It returns whether the task was destroyed and the error
|
||||
// associated with the last kill attempt.
|
||||
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
|
||||
// Cap the number of times we attempt to kill the task.
|
||||
for i := 0; i < killFailureLimit; i++ {
|
||||
if err = r.handle.Kill(); err != nil {
|
||||
// Calculate the new backoff
|
||||
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
|
||||
if backoff > killBackoffLimit {
|
||||
backoff = killBackoffLimit
|
||||
}
|
||||
|
||||
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc %q. Retrying in %v: %v",
|
||||
r.task.Name, r.alloc.ID, backoff, err)
|
||||
time.Sleep(time.Duration(backoff))
|
||||
} else {
|
||||
// Kill was successful
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Helper function for converting a WaitResult into a TaskTerminated event.
|
||||
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
|
||||
return structs.NewTaskEvent(structs.TaskTerminated).
|
||||
|
|
|
@ -413,7 +413,6 @@ func (w *Worker) shouldResubmit(err error) bool {
|
|||
// backoffErr is used to do an exponential back off on error. This is
|
||||
// maintained statefully for the worker. Returns if attempts should be
|
||||
// abandoneded due to shutdown.
|
||||
// be made or abandoned.
|
||||
func (w *Worker) backoffErr(base, limit time.Duration) bool {
|
||||
backoff := (1 << (2 * w.failures)) * base
|
||||
if backoff > limit {
|
||||
|
|
Loading…
Reference in a new issue