Don't share task state with the alloc in the task runner

This commit is contained in:
Alex Dadgar 2016-02-01 17:47:53 -08:00
parent b5260fc14e
commit a72d39bd04
3 changed files with 61 additions and 28 deletions

View file

@ -1,7 +1,6 @@
package client
import (
"encoding/json"
"fmt"
"log"
"os"
@ -113,8 +112,7 @@ func (r *AllocRunner) RestoreState() error {
task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[name] = tr
// Skip tasks in terminal states.
@ -189,7 +187,7 @@ func (r *AllocRunner) DestroyContext() error {
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc
return r.alloc.Copy()
}
// dirtySyncState is used to watch for state being marked dirty to sync
@ -223,11 +221,17 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc.
alloc := r.Alloc()
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
for _, tr := range r.tasks {
state := tr.state
for name, tr := range r.tasks {
// Store the state of each task in the copied alloc.
state := tr.getState()
alloc.TaskStates[name] = state
switch state.State {
case structs.TaskStateRunning:
running = true
@ -245,26 +249,18 @@ func (r *AllocRunner) syncStatus() error {
r.taskStatusLock.RUnlock()
// Determine the alloc status
r.allocLock.Lock()
defer r.allocLock.Unlock()
if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
alloc.ClientStatus = structs.AllocClientStatusDead
}
// Attempt to update the status
if err := r.updater(r.alloc); err != nil {
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
@ -334,8 +330,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}

View file

@ -27,10 +27,11 @@ type TaskRunner struct {
restartTracker *RestartTracker
consulService *ConsulService
task *structs.Task
state *structs.TaskState
updateCh chan *structs.Task
handle driver.DriverHandle
task *structs.Task
state *structs.TaskState
stateLock sync.RWMutex
updateCh chan *structs.Task
handle driver.DriverHandle
destroy bool
destroyCh chan struct{}
@ -43,6 +44,7 @@ type TaskRunner struct {
// taskRunnerState is used to snapshot the state of the task runner
type taskRunnerState struct {
Task *structs.Task
State *structs.TaskState
HandleID string
}
@ -52,7 +54,7 @@ type TaskStateUpdater func(taskName string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
alloc *structs.Allocation, task *structs.Task,
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
tc := &TaskRunner{
@ -64,7 +66,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
ctx: ctx,
alloc: alloc,
task: task,
state: state,
state: new(structs.TaskState),
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
@ -100,6 +102,7 @@ func (r *TaskRunner) RestoreState() error {
// Restore fields
r.task = snap.Task
r.state = snap.State
// Restore the driver
if snap.HandleID != "" {
@ -126,7 +129,8 @@ func (r *TaskRunner) SaveState() error {
r.snapshotLock.Lock()
defer r.snapshotLock.Unlock()
snap := taskRunnerState{
Task: r.task,
Task: r.task,
State: r.getState(),
}
if r.handle != nil {
snap.HandleID = r.handle.ID()
@ -158,8 +162,10 @@ func (r *TaskRunner) appendEvent(event *structs.TaskEvent) {
// setState is used to update the state of the task runner
func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
// Update the task.
r.stateLock.Lock()
r.state.State = state
r.appendEvent(event)
r.stateLock.Unlock()
// Persist our state to disk.
if err := r.SaveState(); err != nil {
@ -170,6 +176,13 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
r.updater(r.task.Name)
}
// Get state returns a copy of our state.
func (r *TaskRunner) getState() *structs.TaskState {
r.stateLock.RLock()
r.stateLock.RUnlock()
return r.state.Copy()
}
// createDriver makes a driver for the task
func (r *TaskRunner) createDriver() (driver.Driver, error) {
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task)

View file

@ -1448,6 +1448,16 @@ type TaskState struct {
Events []*TaskEvent
}
func (ts *TaskState) Copy() *TaskState {
copy := new(TaskState)
copy.State = ts.State
copy.Events = make([]*TaskEvent, len(ts.Events))
for i, e := range ts.Events {
copy.Events[i] = e.Copy()
}
return copy
}
const (
// A Driver failure indicates that the task could not be started due to a
// failure in the driver.
@ -1482,6 +1492,12 @@ type TaskEvent struct {
KillError string // Error killing the task.
}
func (te *TaskEvent) Copy() *TaskEvent {
copy := new(TaskEvent)
*copy = *te
return copy
}
func NewTaskEvent(event string) *TaskEvent {
return &TaskEvent{
Type: event,
@ -1664,6 +1680,15 @@ type Allocation struct {
AllocModifyIndex uint64
}
func (a *Allocation) Copy() *Allocation {
i, err := copystructure.Copy(a)
if err != nil {
panic(err)
}
return i.(*Allocation)
}
// TerminalStatus returns if the desired or actual status is terminal and
// will no longer transition.
func (a *Allocation) TerminalStatus() bool {