Add AllocRunner.allocID for ease-of-use

Since the AllocRunner.alloc struct can be mutated, most of AllocRunner
needs to acquire a lock to get the alloc's ID. Log lines always need to
include the alloc ID, so we often skipped acquiring a lock just to grab
the ID and accepted the race.

Let's make the race detector a little happier by storing the ID in a
single assignment field.
This commit is contained in:
Michael Schurter 2017-07-17 14:11:06 -07:00
parent 181fda825a
commit cdb2e96d99
1 changed files with 33 additions and 22 deletions

View File

@ -51,6 +51,11 @@ type AllocRunner struct {
updater AllocStateUpdater
logger *log.Logger
// allocID is the ID of this runner's allocation. Since it does not
// change for the lifetime of the AllocRunner it is safe to read
// without acquiring a lock (unlike alloc).
allocID string
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
@ -151,6 +156,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
updater: updater,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(0),
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
@ -174,7 +180,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
path := filepath.Join(r.config.StateDir, "alloc", r.allocID, "state.json")
return path
}
@ -188,7 +194,7 @@ func (r *AllocRunner) RestoreState() error {
var upgrading bool
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.allocID)
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
@ -202,7 +208,7 @@ func (r *AllocRunner) RestoreState() error {
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.allocID)
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
for taskName := range snap.Context.AllocDir.TaskDirs {
r.allocDir.NewTaskDir(taskName)
@ -218,7 +224,7 @@ func (r *AllocRunner) RestoreState() error {
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
bkt, err := getAllocationBucket(tx, r.allocID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}
@ -300,7 +306,7 @@ func (r *AllocRunner) RestoreState() error {
}
if restartReason, err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err)
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.allocID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
@ -308,13 +314,13 @@ func (r *AllocRunner) RestoreState() error {
if upgrading {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.allocID, name, err)
}
}
// Restart task runner if RestoreState gave a reason
if restartReason != "" {
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason)
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.allocID, name, restartReason)
tr.Restart("upgrade", restartReason)
}
}
@ -360,7 +366,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
return r.stateDB.Batch(func(tx *bolt.Tx) error {
// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
allocBkt, err := getAllocationBucket(tx, r.allocID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
@ -429,7 +435,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
if err := tr.SaveState(); err != nil {
return fmt.Errorf("failed to save state for alloc %s task '%s': %v",
r.alloc.ID, tr.task.Name, err)
r.allocID, tr.task.Name, err)
}
return nil
}
@ -437,7 +443,7 @@ func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
if err := deleteAllocationBucket(tx, r.allocID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
@ -551,7 +557,11 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.syncStatus()
if err := r.syncStatus(); err != nil {
// Only WARN instead of ERR because we continue on
r.logger.Printf("[WARN] client: error persisting alloc %q state: %v",
r.allocID, err)
}
case <-r.ctx.Done():
return
}
@ -687,7 +697,7 @@ func (r *AllocRunner) Run() {
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
r.logger.Printf("[ERR] client: alloc %q for missing task group %q", r.allocID, alloc.TaskGroup)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
return
}
@ -704,7 +714,7 @@ func (r *AllocRunner) Run() {
if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err)
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", r.allocID, err)
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
@ -716,9 +726,9 @@ func (r *AllocRunner) Run() {
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.allocID)
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
return
}
@ -727,7 +737,7 @@ func (r *AllocRunner) Run() {
go r.watchHealth(wCtx)
// Start the task runners
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.allocID)
r.taskLock.Lock()
for _, task := range tg.Tasks {
if _, ok := r.restored[task.Name]; ok {
@ -784,7 +794,8 @@ OUTER:
}
if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
r.allocID, err)
}
case <-r.ctx.Done():
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
@ -801,7 +812,7 @@ OUTER:
// Free up the context. It has likely exited already
watcherCancel()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
}
// SetPreviousAllocDir sets the previous allocation directory of the current
@ -849,16 +860,16 @@ func (r *AllocRunner) handleDestroy() {
case <-r.ctx.Done():
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
r.allocID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
r.allocID, err)
}
return
case <-r.updateCh:
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.alloc.ID)
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.allocID)
}
}
}
@ -904,7 +915,7 @@ func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResour
tr, ok := r.tasks[taskFilter]
r.taskLock.RUnlock()
if !ok {
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
return nil, fmt.Errorf("allocation %q has no task %q", r.allocID, taskFilter)
}
l := tr.LatestResourceUsage()
if l != nil {