Refactored Save State of Alloc runner

This commit is contained in:
Diptanu Choudhury 2015-11-09 16:15:11 -08:00
parent 5ef34f7560
commit 38a047ec6d
2 changed files with 23 additions and 16 deletions

View file

@ -118,8 +118,11 @@ func (r *AllocRunner) RestoreState() error {
return mErr.ErrorOrNil()
}
// SaveState is used to snapshot our state
func (r *AllocRunner) SaveState(taskName string) error {
// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
// all the Task Runners associated with the Alloc
func (r *AllocRunner) SaveState(fullSync bool) error {
r.taskStatusLock.RLock()
snap := allocRunnerState{
Alloc: r.alloc,
@ -133,30 +136,29 @@ func (r *AllocRunner) SaveState(taskName string) error {
return err
}
if !fullSync {
return nil
}
// Save state for each task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
var mErr multierror.Error
if taskName != "" {
if tr, ok := r.tasks[taskName]; ok {
r.saveTaskRunnerState(tr, &mErr)
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name))
}
return mErr.ErrorOrNil()
}
for _, tr := range r.tasks {
r.saveTaskRunnerState(tr, &mErr)
if err := r.saveTaskRunnerState(tr); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner, mErr *multierror.Error) {
if err := tr.SaveState(); err != nil {
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
var err error
if err = tr.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
r.alloc.ID, tr.task.Name, err)
mErr.Errors = append(mErr.Errors, err)
}
return err
}
// DestroyState is used to cleanup after ourselves
@ -201,6 +203,9 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
err := r.syncStatus()
if err == nil {
// The Alloc State might have been re-computed so we are
// snapshoting only the alloc runner
r.SaveState(false)
return
}
select {
@ -269,7 +274,9 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
Description: desc,
}
r.taskStatusLock.Unlock()
r.SaveState(taskName)
if tr, ok := r.tasks[taskName]; ok {
r.saveTaskRunnerState(tr)
}
select {
case r.dirtyCh <- struct{}{}:
default:

View file

@ -351,7 +351,7 @@ func (c *Client) saveState() error {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for id, ar := range c.allocs {
if err := ar.SaveState(""); err != nil {
if err := ar.SaveState(true); err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
id, err)
mErr.Errors = append(mErr.Errors, err)