Restore state + upgrade path

This commit is contained in:
Alex Dadgar 2017-05-02 13:31:56 -07:00
parent ec101b4760
commit e00f9c9413
6 changed files with 323 additions and 98 deletions

View File

@ -88,6 +88,7 @@ type AllocRunner struct {
allocDirPersisted bool
}
// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Version string
@ -149,8 +150,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
return ar
}
// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
@ -159,30 +162,73 @@ func (r *AllocRunner) stateFilePath() string {
// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
var snap allocRunnerState
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
if r.alloc != nil {
r.taskStates = snap.Alloc.TaskStates
}
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
snap.AllocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
snap.AllocDir.NewTaskDir(taskName)
}
}
// Delete the old state
os.RemoveAll(oldPath)
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}
// Get the state objects
var mutable allocRunnerMutableState
var immutable allocRunnerImmutableState
var allocDir allocdir.AllocDir
if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
}
// Populate the fields
r.alloc = immutable.Alloc
r.allocDir = &allocDir
r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
return nil
})
if err != nil {
return fmt.Errorf("failed to read allocation state: %v", err)
}
}
// XXX needs to be updated and handle the upgrade path
// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
var snapshotErrors multierror.Error
if r.alloc == nil {
snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil allocation"))
@ -194,11 +240,17 @@ func (r *AllocRunner) RestoreState() error {
return e
}
r.taskStates = snap.Alloc.TaskStates
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup)
}
// Restore the task runners
var mErr multierror.Error
for name, state := range r.taskStates {
for _, task := range tg.Tasks {
name := task.Name
state := r.taskStates[name]
// Mark the task as restored.
r.restored[name] = struct{}{}
@ -210,7 +262,6 @@ func (r *AllocRunner) RestoreState() error {
return err
}
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr
@ -231,11 +282,6 @@ func (r *AllocRunner) RestoreState() error {
return mErr.ErrorOrNil()
}
// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
@ -330,7 +376,12 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
})
}
// DestroyContext is used to destroy the context
@ -338,6 +389,11 @@ func (r *AllocRunner) DestroyContext() error {
return r.allocDir.Destroy()
}
// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
@ -543,7 +599,7 @@ func (r *AllocRunner) Run() {
go r.dirtySyncState()
// Find the task group to run in the allocation
alloc := r.alloc
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
@ -629,6 +685,10 @@ OUTER:
for _, tr := range runners {
tr.Update(update)
}
if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
}
case <-r.destroyCh:
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER

View File

@ -608,27 +608,56 @@ func (c *Client) restoreState() error {
return nil
}
// XXX Needs to be updated and handle the upgrade case
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Allocs holds the IDs of the allocations being restored
var allocs []string
// Upgrading tracks whether this is a pre 0.6.0 upgrade path
var upgrading bool
// Scan the directory
list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc"))
if err != nil && os.IsNotExist(err) {
return nil
} else if err != nil {
allocDir := filepath.Join(c.config.StateDir, "alloc")
list, err := ioutil.ReadDir(allocDir)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to list alloc state: %v", err)
} else if err == nil && len(list) != 0 {
upgrading = true
for _, entry := range list {
allocs = append(allocs, entry.Name())
}
} else {
// Normal path
err := c.stateDB.View(func(tx *bolt.Tx) error {
allocs, err = getAllAllocationIDs(tx)
if err != nil {
return fmt.Errorf("failed to list allocations: %v", err)
}
return nil
})
if err != nil {
return err
}
}
// Load each alloc back
var mErr multierror.Error
for _, entry := range list {
id := entry.Name()
for _, id := range allocs {
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
c.allocLock.Unlock()
if err := ar.RestoreState(); err != nil {
c.logger.Printf("[ERR] client: failed to restore state for alloc %s: %v", id, err)
mErr.Errors = append(mErr.Errors, err)
@ -636,6 +665,14 @@ func (c *Client) restoreState() error {
go ar.Run()
}
}
// Delete all the entries
if upgrading {
if err := os.RemoveAll(allocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
@ -653,10 +690,9 @@ func (c *Client) saveState(blocking bool) error {
runners := c.getAllocRunners()
wg.Add(len(runners))
for id, ar := range c.getAllocRunners() {
go func() {
local := ar
err := local.SaveState()
for id, ar := range runners {
go func(id string, ar *AllocRunner) {
err := ar.SaveState()
if err != nil {
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err)
l.Lock()
@ -664,7 +700,7 @@ func (c *Client) saveState(blocking bool) error {
l.Unlock()
}
wg.Done()
}()
}(id, ar)
}
if blocking {
@ -1505,8 +1541,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Remove the old allocations
for _, remove := range diff.removed {
if err := c.removeAlloc(remove); err != nil {
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v",
remove.ID, err)
c.logger.Printf("[ERR] client: failed to remove alloc '%s': %v", remove.ID, err)
}
}
@ -1581,11 +1616,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}
// Persist our state
if err := c.saveState(false); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
}
// blockForRemoteAlloc blocks until the previous allocation of an allocation has
@ -1934,6 +1964,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %q failed: %v", alloc.ID, err)
}
go ar.Run()
// Store the alloc runner.

View File

@ -55,24 +55,54 @@ func putData(bkt *bolt.Bucket, key, value []byte) error {
return nil
}
// getAllocationBucket returns the bucket used to persist state about a
// particular allocation. If the root allocation bucket or the specific
// allocation bucket doesn't exist, it will be created.
func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
if !tx.Writable() {
return nil, fmt.Errorf("transaction must be writable")
func getObject(bkt *bolt.Bucket, key []byte, obj interface{}) error {
// Get the data
data := bkt.Get(key)
if data == nil {
return fmt.Errorf("no data at key %v", string(key))
}
// Deserialize the object
if err := codec.NewDecoderBytes(data, structs.MsgpackHandle).Decode(obj); err != nil {
return fmt.Errorf("failed to decode data into passed object: %v", err)
}
return nil
}
// getAllocationBucket returns the bucket used to persist state about a
// particular allocation. If the root allocation bucket or the specific
// allocation bucket doesn't exist, it will be created as long as the
// transaction is writable.
func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
var err error
w := tx.Writable()
// Retrieve the root allocations bucket
allocations, err := tx.CreateBucketIfNotExists(allocationsBucket)
if err != nil {
return nil, err
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
if !w {
return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable")
}
allocations, err = tx.CreateBucket(allocationsBucket)
if err != nil {
return nil, err
}
}
// Retrieve the specific allocations bucket
alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID))
if err != nil {
return nil, err
key := []byte(allocID)
alloc := allocations.Bucket(key)
if alloc == nil {
if !w {
return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable")
}
alloc, err = allocations.CreateBucket(key)
if err != nil {
return nil, err
}
}
return alloc, nil
@ -80,29 +110,94 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) {
// getTaskBucket returns the bucket used to persist state about a
// particular task. If the root allocation bucket, the specific
// allocation or task bucket doesn't exist, they will be created.
// allocation or task bucket doesn't exist, they will be created as long as the
// transaction is writable.
func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) {
if !tx.Writable() {
return nil, fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations, err := tx.CreateBucketIfNotExists(allocationsBucket)
if err != nil {
return nil, err
}
// Retrieve the specific allocations bucket
alloc, err := allocations.CreateBucketIfNotExists([]byte(allocID))
alloc, err := getAllocationBucket(tx, allocID)
if err != nil {
return nil, err
}
// Retrieve the specific task bucket
task, err := alloc.CreateBucketIfNotExists([]byte(taskName))
if err != nil {
return nil, err
w := tx.Writable()
key := []byte(taskName)
task := alloc.Bucket(key)
if task == nil {
if !w {
return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable")
}
task, err = alloc.CreateBucket(key)
if err != nil {
return nil, err
}
}
return task, nil
}
// deleteAllocationBucket is used to delete an allocation bucket if it exists.
func deleteAllocationBucket(tx *bolt.Tx, allocID string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Check if the bucket exists
key := []byte(allocID)
if allocBkt := allocations.Bucket(key); allocBkt == nil {
return nil
}
return allocations.DeleteBucket(key)
}
// deleteTaskBucket is used to delete a task bucket if it exists.
func deleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error {
if !tx.Writable() {
return fmt.Errorf("transaction must be writable")
}
// Retrieve the root allocations bucket
allocations := tx.Bucket(allocationsBucket)
if allocations == nil {
return nil
}
// Retrieve the specific allocations bucket
alloc := allocations.Bucket([]byte(allocID))
if alloc == nil {
return nil
}
// Check if the bucket exists
key := []byte(taskName)
if taskBkt := alloc.Bucket(key); taskBkt == nil {
return nil
}
return alloc.DeleteBucket(key)
}
func getAllAllocationIDs(tx *bolt.Tx) ([]string, error) {
allocationsBkt := tx.Bucket(allocationsBucket)
if allocationsBkt == nil {
return nil, nil
}
// Create a cursor for iteration.
var allocIDs []string
c := allocationsBkt.Cursor()
// Iterate over all the buckets
for k, _ := c.First(); k != nil; k, _ = c.Next() {
allocIDs = append(allocIDs, string(k))
}
return allocIDs, nil
}

View File

@ -249,34 +249,61 @@ func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
}
// stateFilePath returns the path to our state file
func (r *TaskRunner) stateFilePath() string {
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *TaskRunner) pre060StateFilePath() string {
// Get the MD5 of the task name
hashVal := md5.Sum([]byte(r.task.Name))
hashHex := hex.EncodeToString(hashVal[:])
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
dirName, "state.json")
return path
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, dirName, "state.json")
}
// RestoreState is used to restore our state
func (r *TaskRunner) RestoreState() error {
// XXX needs to be updated and handle the upgrade path
// COMPAT: Remove in 0.7.0
// 0.6.0 transistioned from individual state files to a single bolt-db.
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Load the snapshot
var snap taskRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Delete the old state
os.RemoveAll(oldPath)
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
if err != nil {
return fmt.Errorf("failed to get task bucket: %v", err)
}
if err := getObject(bkt, taskRunnerStateAllKey, &snap); err != nil {
return fmt.Errorf("failed to read task runner state: %v", err)
}
return nil
})
if err != nil {
return err
}
}
// Restore fields
// Restore fields from the snapshot
r.artifactsDownloaded = snap.ArtifactDownloaded
r.taskDirBuilt = snap.TaskDirBuilt
r.payloadRendered = snap.PayloadRendered
r.setCreatedResources(snap.CreatedResources)
if err := r.setTaskEnv(); err != nil {
@ -333,14 +360,13 @@ func (r *TaskRunner) RestoreState() error {
r.running = true
r.runningLock.Unlock()
}
return nil
}
// SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error {
// XXX needs to be updated
r.persistLock.Lock()
snap := taskRunnerState{
Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded,
@ -355,6 +381,7 @@ func (r *TaskRunner) SaveState() error {
}
r.handleLock.Unlock()
// If nothing has changed avoid the write
h := snap.Hash()
if bytes.Equal(h, r.persistedHash) {
r.persistLock.Unlock()
@ -394,11 +421,21 @@ func (r *TaskRunner) DestroyState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
return os.RemoveAll(r.stateFilePath())
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteTaskBucket(tx, r.alloc.ID, r.task.Name); err != nil {
return fmt.Errorf("failed to delete task bucket: %v", err)
}
return nil
})
}
// setState is used to update the state of the task runner
func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
// Persist our state to disk.
if err := r.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err)
}
// Indicate the task has been updated.
r.updater(r.task.Name, state, event)
}

View File

@ -106,14 +106,12 @@ func persistState(path string, data interface{}) error {
return nil
}
// restoreState is used to read back in the persisted state
func restoreState(path string, data interface{}) error {
// pre060RestoreState is used to read back in the persisted state for pre v0.6.0
// state
func pre060RestoreState(path string, data interface{}) error {
buf, err := ioutil.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("failed to read state: %v", err)
return err
}
if err := json.Unmarshal(buf, data); err != nil {
return fmt.Errorf("failed to decode state: %v", err)

Binary file not shown.