diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d81fca216..584a031e1 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -404,7 +404,7 @@ OUTER: break FOUND } } - tr.Update(task) + tr.Update(update) } r.taskLock.RUnlock() diff --git a/client/driver/docker.go b/client/driver/docker.go index 631babdfd..ba2d49407 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -614,6 +614,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { } func (h *DockerHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/driver.go b/client/driver/driver.go index 5fd1238b3..f92b84fad 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -105,7 +105,8 @@ type DriverHandle interface { // WaitCh is used to return a channel used wait for task completion WaitCh() chan *cstructs.WaitResult - // Update is used to update the task if possible + // Update is used to update the task if possible and update task related + // configurations. Update(task *structs.Task) error // Kill is used to stop the task diff --git a/client/driver/exec.go b/client/driver/exec.go index efa846619..fb24f021f 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -173,6 +173,9 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { } func (h *execHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/java.go b/client/driver/java.go index 2a7cb5a30..3cb32679c 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -221,6 +221,9 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { } func (h *javaHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 76cd5aa42..df36b4cd3 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -262,6 +262,9 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { } func (h *qemuHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 376d0b626..5c870e80f 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -170,6 +170,9 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { } func (h *rawExecHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 764e17fb1..2b0f6218a 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -278,6 +278,9 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { } func (h *rktHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/restarts.go b/client/restarts.go index a7ac78f34..8e6af3bf4 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -2,6 +2,7 @@ package client import ( "math/rand" + "sync" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -29,9 +30,22 @@ type RestartTracker struct { startTime time.Time // When the interval began policy *structs.RestartPolicy rand *rand.Rand + lock sync.Mutex } +// SetPolicy updates the policy used to determine restarts. +func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) { + r.lock.Lock() + defer r.lock.Unlock() + r.policy = policy +} + +// NextRestart takes the exit code from the last attempt and returns whether the +// task should be restarted and the duration to wait. func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) { + r.lock.Lock() + defer r.lock.Unlock() + // Hot path if no attempts are expected if r.policy.Attempts == 0 { return false, 0 diff --git a/client/task_runner.go b/client/task_runner.go index aa185c51e..3d2b5c7c2 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -28,7 +28,7 @@ type TaskRunner struct { consulService *ConsulService task *structs.Task - updateCh chan *structs.Task + updateCh chan *structs.Allocation handle driver.DriverHandle destroy bool @@ -71,7 +71,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, alloc: alloc, task: task, - updateCh: make(chan *structs.Task, 8), + updateCh: make(chan *structs.Allocation, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), } @@ -239,10 +239,8 @@ func (r *TaskRunner) run() { case waitRes = <-r.handle.WaitCh(): break OUTER case update := <-r.updateCh: - // Update - r.task = update - if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) + if err := r.handleUpdate(update); err != nil { + r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err) } case <-r.destroyCh: // Avoid destroying twice @@ -311,6 +309,49 @@ func (r *TaskRunner) run() { return } +// handleUpdate takes an updated allocation and updates internal state to +// reflect the new config for the task. +func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { + // Extract the task group from the alloc. + tg := update.Job.LookupTaskGroup(update.TaskGroup) + if tg == nil { + return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup) + } + + // Extract the task. + var task *structs.Task + for _, t := range tg.Tasks { + if t.Name == r.task.Name { + task = t + } + } + if task == nil { + return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name) + } + r.task = task + + // Update will update resources and store the new kill timeout. + if r.handle != nil { + if err := r.handle.Update(task); err != nil { + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) + } + } + + // Update the restart policy. + if r.restartTracker != nil { + r.restartTracker.SetPolicy(tg.RestartPolicy) + } + + /* TODO + // Re-register the task to consul and store the updated alloc. + r.consulService.Deregister(r.task, r.alloc) + r.alloc = update + r.consulService.Register(r.task, r.alloc) + */ + + return nil +} + // Helper function for converting a WaitResult into a TaskTerminated event. func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { return structs.NewTaskEvent(structs.TaskTerminated). @@ -320,12 +361,12 @@ func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEve } // Update is used to update the task of the context -func (r *TaskRunner) Update(update *structs.Task) { +func (r *TaskRunner) Update(update *structs.Allocation) { select { case r.updateCh <- update: default: r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", - update.Name, r.alloc.ID) + r.task.Name, r.alloc.ID) } } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 3b0bc3086..9b009e035 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -137,14 +137,34 @@ func TestTaskRunner_Update(t *testing.T) { defer tr.ctx.AllocDir.Destroy() // Update the task definition - newTask := new(structs.Task) - *newTask = *tr.task + updateAlloc := tr.alloc.Copy() + + // Update the restart policy + newTG := updateAlloc.Job.TaskGroups[0] + newMode := "foo" + newTG.RestartPolicy.Mode = newMode + + newTask := updateAlloc.Job.TaskGroups[0].Tasks[0] newTask.Driver = "foobar" - tr.Update(newTask) + + // Update the kill timeout + oldHandle := tr.handle.ID() + newTask.KillTimeout = time.Hour + + tr.Update(updateAlloc) // Wait for update to take place testutil.WaitForResult(func() (bool, error) { - return tr.task == newTask, nil + if tr.task != newTask { + return false, fmt.Errorf("task not updated") + } + if tr.restartTracker.policy.Mode != newMode { + return false, fmt.Errorf("restart policy not updated") + } + if tr.handle.ID() == oldHandle { + return false, fmt.Errorf("handle not updated") + } + return true, nil }, func(err error) { t.Fatalf("err: %v", err) })