Merge pull request #403 from hashicorp/f-snapshot-on-task-update
Updating snapshots of a TaskRunner when status of Task changes
This commit is contained in:
commit
4ff5d55453
|
@ -118,18 +118,12 @@ func (r *AllocRunner) RestoreState() error {
|
||||||
return mErr.ErrorOrNil()
|
return mErr.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveState is used to snapshot our state
|
// SaveState is used to snapshot the state of the alloc runner
|
||||||
|
// if the fullSync is marked as false only the state of the Alloc Runner
|
||||||
|
// is snapshotted. If fullSync is marked as true, we snapshot
|
||||||
|
// all the Task Runners associated with the Alloc
|
||||||
func (r *AllocRunner) SaveState() error {
|
func (r *AllocRunner) SaveState() error {
|
||||||
r.taskStatusLock.RLock()
|
if err := r.saveAllocRunnerState(); err != nil {
|
||||||
snap := allocRunnerState{
|
|
||||||
Alloc: r.alloc,
|
|
||||||
RestartPolicy: r.RestartPolicy,
|
|
||||||
TaskStatus: r.taskStatus,
|
|
||||||
Context: r.ctx,
|
|
||||||
}
|
|
||||||
err := persistState(r.stateFilePath(), &snap)
|
|
||||||
r.taskStatusLock.RUnlock()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,16 +131,35 @@ func (r *AllocRunner) SaveState() error {
|
||||||
r.taskLock.RLock()
|
r.taskLock.RLock()
|
||||||
defer r.taskLock.RUnlock()
|
defer r.taskLock.RUnlock()
|
||||||
var mErr multierror.Error
|
var mErr multierror.Error
|
||||||
for name, tr := range r.tasks {
|
for _, tr := range r.tasks {
|
||||||
if err := tr.SaveState(); err != nil {
|
if err := r.saveTaskRunnerState(tr); err != nil {
|
||||||
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
|
|
||||||
r.alloc.ID, name, err)
|
|
||||||
mErr.Errors = append(mErr.Errors, err)
|
mErr.Errors = append(mErr.Errors, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return mErr.ErrorOrNil()
|
return mErr.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *AllocRunner) saveAllocRunnerState() error {
|
||||||
|
r.taskStatusLock.RLock()
|
||||||
|
defer r.taskStatusLock.RUnlock()
|
||||||
|
snap := allocRunnerState{
|
||||||
|
Alloc: r.alloc,
|
||||||
|
RestartPolicy: r.RestartPolicy,
|
||||||
|
TaskStatus: r.taskStatus,
|
||||||
|
Context: r.ctx,
|
||||||
|
}
|
||||||
|
return persistState(r.stateFilePath(), &snap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
|
||||||
|
var err error
|
||||||
|
if err = tr.SaveState(); err != nil {
|
||||||
|
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
|
||||||
|
r.alloc.ID, tr.task.Name, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// DestroyState is used to cleanup after ourselves
|
// DestroyState is used to cleanup after ourselves
|
||||||
func (r *AllocRunner) DestroyState() error {
|
func (r *AllocRunner) DestroyState() error {
|
||||||
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
|
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
|
||||||
|
@ -187,8 +200,10 @@ func (r *AllocRunner) dirtySyncState() {
|
||||||
// retrySyncState is used to retry the state sync until success
|
// retrySyncState is used to retry the state sync until success
|
||||||
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
|
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
|
||||||
for {
|
for {
|
||||||
err := r.syncStatus()
|
if err := r.syncStatus(); err == nil {
|
||||||
if err == nil {
|
// The Alloc State might have been re-computed so we are
|
||||||
|
// snapshoting only the alloc runner
|
||||||
|
r.saveAllocRunnerState()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
|
@ -257,6 +272,9 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
|
||||||
Description: desc,
|
Description: desc,
|
||||||
}
|
}
|
||||||
r.taskStatusLock.Unlock()
|
r.taskStatusLock.Unlock()
|
||||||
|
if tr, ok := r.tasks[taskName]; ok {
|
||||||
|
r.saveTaskRunnerState(tr)
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case r.dirtyCh <- struct{}{}:
|
case r.dirtyCh <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -32,6 +32,8 @@ type TaskRunner struct {
|
||||||
destroyCh chan struct{}
|
destroyCh chan struct{}
|
||||||
destroyLock sync.Mutex
|
destroyLock sync.Mutex
|
||||||
waitCh chan struct{}
|
waitCh chan struct{}
|
||||||
|
|
||||||
|
snapshotLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// taskRunnerState is used to snapshot the state of the task runner
|
// taskRunnerState is used to snapshot the state of the task runner
|
||||||
|
@ -112,6 +114,8 @@ func (r *TaskRunner) RestoreState() error {
|
||||||
|
|
||||||
// SaveState is used to snapshot our state
|
// SaveState is used to snapshot our state
|
||||||
func (r *TaskRunner) SaveState() error {
|
func (r *TaskRunner) SaveState() error {
|
||||||
|
r.snapshotLock.Lock()
|
||||||
|
defer r.snapshotLock.Unlock()
|
||||||
snap := taskRunnerState{
|
snap := taskRunnerState{
|
||||||
Task: r.task,
|
Task: r.task,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue