diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1504900c1..b0dbde878 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -118,18 +118,12 @@ func (r *AllocRunner) RestoreState() error { return mErr.ErrorOrNil() } -// SaveState is used to snapshot our state +// SaveState is used to snapshot the state of the alloc runner +// if the fullSync is marked as false only the state of the Alloc Runner +// is snapshotted. If fullSync is marked as true, we snapshot +// all the Task Runners associated with the Alloc func (r *AllocRunner) SaveState() error { - r.taskStatusLock.RLock() - snap := allocRunnerState{ - Alloc: r.alloc, - RestartPolicy: r.RestartPolicy, - TaskStatus: r.taskStatus, - Context: r.ctx, - } - err := persistState(r.stateFilePath(), &snap) - r.taskStatusLock.RUnlock() - if err != nil { + if err := r.saveAllocRunnerState(); err != nil { return err } @@ -137,16 +131,35 @@ func (r *AllocRunner) SaveState() error { r.taskLock.RLock() defer r.taskLock.RUnlock() var mErr multierror.Error - for name, tr := range r.tasks { - if err := tr.SaveState(); err != nil { - r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", - r.alloc.ID, name, err) + for _, tr := range r.tasks { + if err := r.saveTaskRunnerState(tr); err != nil { mErr.Errors = append(mErr.Errors, err) } } return mErr.ErrorOrNil() } +func (r *AllocRunner) saveAllocRunnerState() error { + r.taskStatusLock.RLock() + defer r.taskStatusLock.RUnlock() + snap := allocRunnerState{ + Alloc: r.alloc, + RestartPolicy: r.RestartPolicy, + TaskStatus: r.taskStatus, + Context: r.ctx, + } + return persistState(r.stateFilePath(), &snap) +} + +func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { + var err error + if err = tr.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", + r.alloc.ID, tr.task.Name, err) + } + return err +} + // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { return os.RemoveAll(filepath.Dir(r.stateFilePath())) @@ -187,8 +200,10 @@ func (r *AllocRunner) dirtySyncState() { // retrySyncState is used to retry the state sync until success func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { for { - err := r.syncStatus() - if err == nil { + if err := r.syncStatus(); err == nil { + // The Alloc State might have been re-computed so we are + // snapshoting only the alloc runner + r.saveAllocRunnerState() return } select { @@ -257,6 +272,9 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { Description: desc, } r.taskStatusLock.Unlock() + if tr, ok := r.tasks[taskName]; ok { + r.saveTaskRunnerState(tr) + } select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/task_runner.go b/client/task_runner.go index b54b7604b..88e746b1f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -32,6 +32,8 @@ type TaskRunner struct { destroyCh chan struct{} destroyLock sync.Mutex waitCh chan struct{} + + snapshotLock sync.Mutex } // taskRunnerState is used to snapshot the state of the task runner @@ -112,6 +114,8 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { + r.snapshotLock.Lock() + defer r.snapshotLock.Unlock() snap := taskRunnerState{ Task: r.task, }