Client handles updates to KillTimeout and Restart Policy

This commit is contained in:
Alex Dadgar 2016-02-03 19:43:44 -08:00
parent b6f9e9c61c
commit 41e1174f72
11 changed files with 108 additions and 14 deletions

View file

@ -404,7 +404,7 @@ OUTER:
break FOUND
}
}
tr.Update(task)
tr.Update(update)
}
r.taskLock.RUnlock()

View file

@ -614,6 +614,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -105,7 +105,8 @@ type DriverHandle interface {
// WaitCh is used to return a channel used wait for task completion
WaitCh() chan *cstructs.WaitResult
// Update is used to update the task if possible
// Update is used to update the task if possible and update task related
// configurations.
Update(task *structs.Task) error
// Kill is used to stop the task

View file

@ -173,6 +173,9 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -221,6 +221,9 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *javaHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -262,6 +262,9 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *qemuHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -170,6 +170,9 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *rawExecHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -278,6 +278,9 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
}
func (h *rktHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = task.KillTimeout
// Update is not possible
return nil
}

View file

@ -2,6 +2,7 @@ package client
import (
"math/rand"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/structs"
@ -29,9 +30,22 @@ type RestartTracker struct {
startTime time.Time // When the interval began
policy *structs.RestartPolicy
rand *rand.Rand
lock sync.Mutex
}
// SetPolicy updates the policy used to determine restarts.
func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
r.lock.Lock()
defer r.lock.Unlock()
r.policy = policy
}
// NextRestart takes the exit code from the last attempt and returns whether the
// task should be restarted and the duration to wait.
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()
// Hot path if no attempts are expected
if r.policy.Attempts == 0 {
return false, 0

View file

@ -28,7 +28,7 @@ type TaskRunner struct {
consulService *ConsulService
task *structs.Task
updateCh chan *structs.Task
updateCh chan *structs.Allocation
handle driver.DriverHandle
destroy bool
@ -71,7 +71,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
ctx: ctx,
alloc: alloc,
task: task,
updateCh: make(chan *structs.Task, 8),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
@ -239,10 +239,8 @@ func (r *TaskRunner) run() {
case waitRes = <-r.handle.WaitCh():
break OUTER
case update := <-r.updateCh:
// Update
r.task = update
if err := r.handle.Update(update); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
if err := r.handleUpdate(update); err != nil {
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
}
case <-r.destroyCh:
// Avoid destroying twice
@ -311,6 +309,49 @@ func (r *TaskRunner) run() {
return
}
// handleUpdate takes an updated allocation and updates internal state to
// reflect the new config for the task.
func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Extract the task group from the alloc.
tg := update.Job.LookupTaskGroup(update.TaskGroup)
if tg == nil {
return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup)
}
// Extract the task.
var task *structs.Task
for _, t := range tg.Tasks {
if t.Name == r.task.Name {
task = t
}
}
if task == nil {
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
}
r.task = task
// Update will update resources and store the new kill timeout.
if r.handle != nil {
if err := r.handle.Update(task); err != nil {
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
}
}
// Update the restart policy.
if r.restartTracker != nil {
r.restartTracker.SetPolicy(tg.RestartPolicy)
}
/* TODO
// Re-register the task to consul and store the updated alloc.
r.consulService.Deregister(r.task, r.alloc)
r.alloc = update
r.consulService.Register(r.task, r.alloc)
*/
return nil
}
// Helper function for converting a WaitResult into a TaskTerminated event.
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
return structs.NewTaskEvent(structs.TaskTerminated).
@ -320,12 +361,12 @@ func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEve
}
// Update is used to update the task of the context
func (r *TaskRunner) Update(update *structs.Task) {
func (r *TaskRunner) Update(update *structs.Allocation) {
select {
case r.updateCh <- update:
default:
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.alloc.ID)
r.task.Name, r.alloc.ID)
}
}

View file

@ -137,14 +137,34 @@ func TestTaskRunner_Update(t *testing.T) {
defer tr.ctx.AllocDir.Destroy()
// Update the task definition
newTask := new(structs.Task)
*newTask = *tr.task
updateAlloc := tr.alloc.Copy()
// Update the restart policy
newTG := updateAlloc.Job.TaskGroups[0]
newMode := "foo"
newTG.RestartPolicy.Mode = newMode
newTask := updateAlloc.Job.TaskGroups[0].Tasks[0]
newTask.Driver = "foobar"
tr.Update(newTask)
// Update the kill timeout
oldHandle := tr.handle.ID()
newTask.KillTimeout = time.Hour
tr.Update(updateAlloc)
// Wait for update to take place
testutil.WaitForResult(func() (bool, error) {
return tr.task == newTask, nil
if tr.task != newTask {
return false, fmt.Errorf("task not updated")
}
if tr.restartTracker.policy.Mode != newMode {
return false, fmt.Errorf("restart policy not updated")
}
if tr.handle.ID() == oldHandle {
return false, fmt.Errorf("handle not updated")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})