Respond to comments

This commit is contained in:
Alex Dadgar 2017-05-09 10:50:24 -07:00
parent 9faa98e13b
commit 843bc26e5d
4 changed files with 41 additions and 55 deletions

View File

@ -163,12 +163,14 @@ func (r *AllocRunner) pre060StateFilePath() string {
// RestoreState is used to restore the state of the alloc runner // RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error { func (r *AllocRunner) RestoreState() error {
// COMPAT: Remove in 0.7.0
// Check if the old snapshot is there // Check if the old snapshot is there
oldPath := r.pre060StateFilePath() oldPath := r.pre060StateFilePath()
var snap allocRunnerState var snap allocRunnerState
var upgrading bool
if err := pre060RestoreState(oldPath, &snap); err == nil { if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields // Restore fields
r.logger.Printf("[DEBUG] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID) r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
r.alloc = snap.Alloc r.alloc = snap.Alloc
r.allocDir = snap.AllocDir r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus r.allocClientStatus = snap.AllocClientStatus
@ -178,6 +180,7 @@ func (r *AllocRunner) RestoreState() error {
r.taskStates = snap.Alloc.TaskStates r.taskStates = snap.Alloc.TaskStates
} }
// COMPAT: Remove in 0.7.0
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old // #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct // Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil { if snap.AllocDir == nil && snap.Context != nil {
@ -190,6 +193,7 @@ func (r *AllocRunner) RestoreState() error {
// Delete the old state // Delete the old state
os.RemoveAll(oldPath) os.RemoveAll(oldPath)
upgrading = true
} else if !os.IsNotExist(err) { } else if !os.IsNotExist(err) {
// Something corrupt in the old state file // Something corrupt in the old state file
return err return err
@ -222,6 +226,7 @@ func (r *AllocRunner) RestoreState() error {
r.allocClientStatus = mutable.AllocClientStatus r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates r.taskStates = mutable.TaskStates
r.alloc.ClientStatus = getClientStatus(r.taskStates)
return nil return nil
}) })
@ -277,6 +282,12 @@ func (r *AllocRunner) RestoreState() error {
} else if !r.alloc.TerminalStatus() { } else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status. // Only start if the alloc isn't in a terminal status.
go tr.Run() go tr.Run()
if upgrading {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
}
}
} }
} }
@ -437,10 +448,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Unlock() r.allocLock.Unlock()
// Scan the task states to determine the status of the alloc // Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock() r.taskStatusLock.RLock()
alloc.TaskStates = copyTaskStates(r.taskStates) alloc.TaskStates = copyTaskStates(r.taskStates)
for _, state := range r.taskStates { alloc.ClientStatus = getClientStatus(r.taskStates)
r.taskStatusLock.RUnlock()
return alloc
}
// getClientStatus takes in the task states for a given allocation and computes
// the client status
func getClientStatus(taskStates map[string]*structs.TaskState) string {
var pending, running, dead, failed bool
for _, state := range taskStates {
switch state.State { switch state.State {
case structs.TaskStateRunning: case structs.TaskStateRunning:
running = true running = true
@ -454,20 +474,19 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
} }
} }
} }
r.taskStatusLock.RUnlock()
// Determine the alloc status // Determine the alloc status
if failed { if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed return structs.AllocClientStatusFailed
} else if running { } else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning return structs.AllocClientStatusRunning
} else if pending { } else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending return structs.AllocClientStatusPending
} else if dead { } else if dead {
alloc.ClientStatus = structs.AllocClientStatusComplete return structs.AllocClientStatusComplete
} }
return alloc return ""
} }
// dirtySyncState is used to watch for state being marked dirty to sync // dirtySyncState is used to watch for state being marked dirty to sync

View File

@ -447,7 +447,7 @@ func (c *Client) Shutdown() error {
c.shutdown = true c.shutdown = true
close(c.shutdownCh) close(c.shutdownCh)
c.connPool.Shutdown() c.connPool.Shutdown()
return c.saveState(true) return c.saveState()
} }
// RPC is used to forward an RPC call to a nomad server, or fail if no servers. // RPC is used to forward an RPC call to a nomad server, or fail if no servers.
@ -663,6 +663,12 @@ func (c *Client) restoreState() error {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} else { } else {
go ar.Run() go ar.Run()
if upgrading {
if err := ar.SaveState(); err != nil {
c.logger.Printf("[WARN] client: initial save state for alloc %s failed: %v", id, err)
}
}
} }
} }
@ -676,10 +682,8 @@ func (c *Client) restoreState() error {
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
// saveState is used to snapshot our state into the data dir. If blocking is set // saveState is used to snapshot our state into the data dir.
// to true, the function will only return once state has been saved. If false, func (c *Client) saveState() error {
// the errors will be logged and state saving will be asyncronous
func (c *Client) saveState(blocking bool) error {
if c.config.DevMode { if c.config.DevMode {
return nil return nil
} }
@ -703,14 +707,10 @@ func (c *Client) saveState(blocking bool) error {
}(id, ar) }(id, ar)
} }
if blocking {
wg.Wait() wg.Wait()
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
return nil
}
// getAllocRunners returns a snapshot of the current set of alloc runners. // getAllocRunners returns a snapshot of the current set of alloc runners.
func (c *Client) getAllocRunners() map[string]*AllocRunner { func (c *Client) getAllocRunners() map[string]*AllocRunner {
c.allocLock.RLock() c.allocLock.RLock()
@ -1062,7 +1062,7 @@ func (c *Client) periodicSnapshot() {
select { select {
case <-snapshot: case <-snapshot:
snapshot = time.After(stateSnapshotIntv) snapshot = time.After(stateSnapshotIntv)
if err := c.saveState(false); err != nil { if err := c.saveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err) c.logger.Printf("[ERR] client: failed to save state: %v", err)
} }

View File

@ -367,6 +367,7 @@ func (r *TaskRunner) RestoreState() error {
// SaveState is used to snapshot our state // SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error { func (r *TaskRunner) SaveState() error {
r.persistLock.Lock() r.persistLock.Lock()
defer r.persistLock.Unlock()
snap := taskRunnerState{ snap := taskRunnerState{
Version: r.config.Version, Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded, ArtifactDownloaded: r.artifactsDownloaded,
@ -384,7 +385,6 @@ func (r *TaskRunner) SaveState() error {
// If nothing has changed avoid the write // If nothing has changed avoid the write
h := snap.Hash() h := snap.Hash()
if bytes.Equal(h, r.persistedHash) { if bytes.Equal(h, r.persistedHash) {
r.persistLock.Unlock()
return nil return nil
} }
@ -393,7 +393,6 @@ func (r *TaskRunner) SaveState() error {
if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil { if err := codec.NewEncoder(&buf, structs.MsgpackHandle).Encode(&snap); err != nil {
return fmt.Errorf("failed to serialize snapshot: %v", err) return fmt.Errorf("failed to serialize snapshot: %v", err)
} }
r.persistLock.Unlock()
// Start the transaction. // Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error { return r.stateDB.Batch(func(tx *bolt.Tx) error {

View File

@ -1,16 +1,12 @@
package client package client
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os"
"path/filepath"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
) )
type allocTuple struct { type allocTuple struct {
@ -78,34 +74,6 @@ func shuffleStrings(list []string) {
} }
} }
// persistState is used to help with saving state
func persistState(path string, data interface{}) error {
var buf bytes.Buffer
enc := codec.NewEncoder(&buf, structs.JsonHandlePretty)
if err := enc.Encode(data); err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return fmt.Errorf("failed to make dirs for %s: %v", path, err)
}
tmpPath := path + ".tmp"
if err := ioutil.WriteFile(tmpPath, buf.Bytes(), 0600); err != nil {
return fmt.Errorf("failed to save state to tmp: %v", err)
}
if err := os.Rename(tmpPath, path); err != nil {
return fmt.Errorf("failed to rename tmp to path: %v", err)
}
// Sanity check since users have reported empty state files on disk
if stat, err := os.Stat(path); err != nil {
return fmt.Errorf("unable to stat state file %s: %v", path, err)
} else if stat.Size() == 0 {
return fmt.Errorf("persisted invalid state file %s; see https://github.com/hashicorp/nomad/issues/1367", path)
}
return nil
}
// pre060RestoreState is used to read back in the persisted state for pre v0.6.0 // pre060RestoreState is used to read back in the persisted state for pre v0.6.0
// state // state
func pre060RestoreState(path string, data interface{}) error { func pre060RestoreState(path string, data interface{}) error {