Merge pull request #2838 from hashicorp/b-alloc-dir-fixes
Ensure allocDir is never nil and persisted safely
This commit is contained in:
commit
2585d68382
|
@ -51,6 +51,11 @@ type AllocRunner struct {
|
|||
updater AllocStateUpdater
|
||||
logger *log.Logger
|
||||
|
||||
// allocID is the ID of this runner's allocation. Since it does not
|
||||
// change for the lifetime of the AllocRunner it is safe to read
|
||||
// without acquiring a lock (unlike alloc).
|
||||
allocID string
|
||||
|
||||
alloc *structs.Allocation
|
||||
allocClientStatus string // Explicit status of allocation. Set when there are failures
|
||||
allocClientDescription string
|
||||
|
@ -151,8 +156,10 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
|
|||
updater: updater,
|
||||
logger: logger,
|
||||
alloc: alloc,
|
||||
allocID: alloc.ID,
|
||||
allocBroadcast: cstructs.NewAllocBroadcaster(0),
|
||||
dirtyCh: make(chan struct{}, 1),
|
||||
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
|
||||
tasks: make(map[string]*TaskRunner),
|
||||
taskStates: copyTaskStates(alloc.TaskStates),
|
||||
restored: make(map[string]struct{}),
|
||||
|
@ -173,7 +180,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
|
|||
func (r *AllocRunner) pre060StateFilePath() string {
|
||||
r.allocLock.Lock()
|
||||
defer r.allocLock.Unlock()
|
||||
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
|
||||
path := filepath.Join(r.config.StateDir, "alloc", r.allocID, "state.json")
|
||||
return path
|
||||
}
|
||||
|
||||
|
@ -187,7 +194,7 @@ func (r *AllocRunner) RestoreState() error {
|
|||
var upgrading bool
|
||||
if err := pre060RestoreState(oldPath, &snap); err == nil {
|
||||
// Restore fields
|
||||
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
|
||||
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID)
|
||||
r.alloc = snap.Alloc
|
||||
r.allocDir = snap.AllocDir
|
||||
r.allocClientStatus = snap.AllocClientStatus
|
||||
|
@ -201,7 +208,7 @@ func (r *AllocRunner) RestoreState() error {
|
|||
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
|
||||
// Context struct to new AllocDir struct
|
||||
if snap.AllocDir == nil && snap.Context != nil {
|
||||
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID)
|
||||
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
|
||||
for taskName := range snap.Context.AllocDir.TaskDirs {
|
||||
r.allocDir.NewTaskDir(taskName)
|
||||
|
@ -217,7 +224,7 @@ func (r *AllocRunner) RestoreState() error {
|
|||
} else {
|
||||
// We are doing a normal restore
|
||||
err := r.stateDB.View(func(tx *bolt.Tx) error {
|
||||
bkt, err := getAllocationBucket(tx, r.alloc.ID)
|
||||
bkt, err := getAllocationBucket(tx, r.allocID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get allocation bucket: %v", err)
|
||||
}
|
||||
|
@ -283,10 +290,11 @@ func (r *AllocRunner) RestoreState() error {
|
|||
|
||||
td, ok := r.allocDir.TaskDirs[name]
|
||||
if !ok {
|
||||
err := fmt.Errorf("failed to find task dir metadata for alloc %q task %q",
|
||||
r.alloc.ID, name)
|
||||
r.logger.Printf("[ERR] client: %v", err)
|
||||
return err
|
||||
// Create the task dir metadata if it doesn't exist.
|
||||
// Since task dirs are created during r.Run() the
|
||||
// client may save state and exit before all task dirs
|
||||
// are created
|
||||
td = r.allocDir.NewTaskDir(name)
|
||||
}
|
||||
|
||||
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
|
||||
|
@ -298,7 +306,7 @@ func (r *AllocRunner) RestoreState() error {
|
|||
}
|
||||
|
||||
if restartReason, err := tr.RestoreState(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err)
|
||||
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
} else if !r.alloc.TerminalStatus() {
|
||||
// Only start if the alloc isn't in a terminal status.
|
||||
|
@ -306,13 +314,13 @@ func (r *AllocRunner) RestoreState() error {
|
|||
|
||||
if upgrading {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
|
||||
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Restart task runner if RestoreState gave a reason
|
||||
if restartReason != "" {
|
||||
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason)
|
||||
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
|
||||
tr.Restart("upgrade", restartReason)
|
||||
}
|
||||
}
|
||||
|
@ -351,14 +359,14 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
|||
r.allocLock.Unlock()
|
||||
|
||||
r.allocDirLock.Lock()
|
||||
allocDir := r.allocDir
|
||||
allocDir := r.allocDir.Copy()
|
||||
r.allocDirLock.Unlock()
|
||||
|
||||
// Start the transaction.
|
||||
return r.stateDB.Batch(func(tx *bolt.Tx) error {
|
||||
|
||||
// Grab the allocation bucket
|
||||
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
|
||||
allocBkt, err := getAllocationBucket(tx, r.allocID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
||||
}
|
||||
|
@ -399,7 +407,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
|||
}
|
||||
|
||||
// Write the alloc dir data if it hasn't been written before and it exists.
|
||||
if !r.allocDirPersisted && r.allocDir != nil {
|
||||
if !r.allocDirPersisted && allocDir != nil {
|
||||
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
|
||||
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
|
||||
}
|
||||
|
@ -427,7 +435,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
|
|||
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
return fmt.Errorf("failed to save state for alloc %s task '%s': %v",
|
||||
r.alloc.ID, tr.task.Name, err)
|
||||
r.allocID, tr.task.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -435,7 +443,7 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
|
|||
// DestroyState is used to cleanup after ourselves
|
||||
func (r *AllocRunner) DestroyState() error {
|
||||
return r.stateDB.Update(func(tx *bolt.Tx) error {
|
||||
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
|
||||
if err := deleteAllocationBucket(tx, r.allocID); err != nil {
|
||||
return fmt.Errorf("failed to delete allocation bucket: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -549,7 +557,11 @@ func (r *AllocRunner) dirtySyncState() {
|
|||
for {
|
||||
select {
|
||||
case <-r.dirtyCh:
|
||||
r.syncStatus()
|
||||
if err := r.syncStatus(); err != nil {
|
||||
// Only WARN instead of ERR because we continue on
|
||||
r.logger.Printf("[WARN] client: error persisting alloc %q state: %v",
|
||||
r.allocID, err)
|
||||
}
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
@ -685,30 +697,27 @@ func (r *AllocRunner) Run() {
|
|||
alloc := r.Alloc()
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
|
||||
r.logger.Printf("[ERR] client: alloc %q for missing task group %q", r.allocID, alloc.TaskGroup)
|
||||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
|
||||
return
|
||||
}
|
||||
|
||||
// Create the execution context
|
||||
r.allocDirLock.Lock()
|
||||
if r.allocDir == nil {
|
||||
// Build allocation directory
|
||||
r.allocDir = allocdir.NewAllocDir(r.logger, filepath.Join(r.config.AllocDir, r.alloc.ID))
|
||||
if err := r.allocDir.Build(); err != nil {
|
||||
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
|
||||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
|
||||
r.allocDirLock.Unlock()
|
||||
return
|
||||
}
|
||||
// Build allocation directory (idempotent)
|
||||
if err := r.allocDir.Build(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to build task directories: %v", err)
|
||||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
|
||||
r.allocDirLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if r.otherAllocDir != nil {
|
||||
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
|
||||
r.logger.Printf("[ERROR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err)
|
||||
}
|
||||
if err := r.otherAllocDir.Destroy(); err != nil {
|
||||
r.logger.Printf("[ERROR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
|
||||
}
|
||||
if r.otherAllocDir != nil {
|
||||
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err)
|
||||
}
|
||||
if err := r.otherAllocDir.Destroy(); err != nil {
|
||||
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
|
||||
}
|
||||
}
|
||||
r.allocDirLock.Unlock()
|
||||
|
@ -717,9 +726,9 @@ func (r *AllocRunner) Run() {
|
|||
// start any of the task runners and directly wait for the destroy signal to
|
||||
// clean up the allocation.
|
||||
if alloc.TerminalStatus() {
|
||||
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID)
|
||||
r.handleDestroy()
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -728,7 +737,7 @@ func (r *AllocRunner) Run() {
|
|||
go r.watchHealth(wCtx)
|
||||
|
||||
// Start the task runners
|
||||
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.allocID)
|
||||
r.taskLock.Lock()
|
||||
for _, task := range tg.Tasks {
|
||||
if _, ok := r.restored[task.Name]; ok {
|
||||
|
@ -785,7 +794,8 @@ OUTER:
|
|||
}
|
||||
|
||||
if err := r.syncStatus(); err != nil {
|
||||
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
|
||||
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
|
||||
r.allocID, err)
|
||||
}
|
||||
case <-r.ctx.Done():
|
||||
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
|
||||
|
@ -802,7 +812,7 @@ OUTER:
|
|||
// Free up the context. It has likely exited already
|
||||
watcherCancel()
|
||||
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
|
||||
}
|
||||
|
||||
// SetPreviousAllocDir sets the previous allocation directory of the current
|
||||
|
@ -850,16 +860,16 @@ func (r *AllocRunner) handleDestroy() {
|
|||
case <-r.ctx.Done():
|
||||
if err := r.DestroyContext(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
|
||||
r.alloc.ID, err)
|
||||
r.allocID, err)
|
||||
}
|
||||
if err := r.DestroyState(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
|
||||
r.alloc.ID, err)
|
||||
r.allocID, err)
|
||||
}
|
||||
|
||||
return
|
||||
case <-r.updateCh:
|
||||
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.alloc.ID)
|
||||
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.allocID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -905,7 +915,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour
|
|||
tr, ok := r.tasks[taskFilter]
|
||||
r.taskLock.RUnlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
|
||||
return nil, fmt.Errorf("allocation %q has no task %q", r.allocID, taskFilter)
|
||||
}
|
||||
l := tr.LatestResourceUsage()
|
||||
if l != nil {
|
||||
|
|
|
@ -98,6 +98,24 @@ func NewAllocDir(logger *log.Logger, allocDir string) *AllocDir {
|
|||
}
|
||||
}
|
||||
|
||||
// Copy an AllocDir and all of its TaskDirs. Returns nil if AllocDir is
|
||||
// nil.
|
||||
func (d *AllocDir) Copy() *AllocDir {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
dcopy := &AllocDir{
|
||||
AllocDir: d.AllocDir,
|
||||
SharedDir: d.SharedDir,
|
||||
TaskDirs: make(map[string]*TaskDir, len(d.TaskDirs)),
|
||||
logger: d.logger,
|
||||
}
|
||||
for k, v := range d.TaskDirs {
|
||||
dcopy.TaskDirs[k] = v.Copy()
|
||||
}
|
||||
return dcopy
|
||||
}
|
||||
|
||||
// NewTaskDir creates a new TaskDir and adds it to the AllocDirs TaskDirs map.
|
||||
func (d *AllocDir) NewTaskDir(name string) *TaskDir {
|
||||
td := newTaskDir(d.logger, d.AllocDir, name)
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/testutil"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -50,7 +51,10 @@ var (
|
|||
)
|
||||
|
||||
func testLogger() *log.Logger {
|
||||
return log.New(os.Stderr, "", log.LstdFlags)
|
||||
if testing.Verbose() {
|
||||
return log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
return log.New(ioutil.Discard, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Test that AllocDir.Build builds just the alloc directory.
|
||||
|
@ -409,6 +413,25 @@ func TestAllocDir_CreateDir(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestAllocDir_Copy asserts that AllocDir.Copy does a deep copy of itself and
|
||||
// all TaskDirs.
|
||||
func TestAllocDir_Copy(t *testing.T) {
|
||||
a := NewAllocDir(testLogger(), "foo")
|
||||
a.NewTaskDir("bar")
|
||||
a.NewTaskDir("baz")
|
||||
|
||||
b := a.Copy()
|
||||
if diff := pretty.Diff(a, b); len(diff) > 0 {
|
||||
t.Errorf("differences between copies: %# v", pretty.Formatter(diff))
|
||||
}
|
||||
|
||||
// Make sure TaskDirs map is copied
|
||||
a.NewTaskDir("new")
|
||||
if b.TaskDirs["new"] != nil {
|
||||
t.Errorf("TaskDirs map shared between copied")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPathFuncs(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "nomadtest-pathfuncs")
|
||||
if err != nil {
|
||||
|
|
|
@ -57,6 +57,14 @@ func newTaskDir(logger *log.Logger, allocDir, taskName string) *TaskDir {
|
|||
}
|
||||
}
|
||||
|
||||
// Copy a TaskDir. Panics if TaskDir is nil as TaskDirs should never be nil.
|
||||
func (t *TaskDir) Copy() *TaskDir {
|
||||
// No nested structures other than the logger which is safe to share,
|
||||
// so just copy the struct
|
||||
tcopy := *t
|
||||
return &tcopy
|
||||
}
|
||||
|
||||
// Build default directories and permissions in a task directory. chrootCreated
|
||||
// allows skipping chroot creation if the caller knows it has already been
|
||||
// done.
|
||||
|
|
Loading…
Reference in a new issue