open-nomad/client/alloc_runner.go

964 lines
28 KiB
Go
Raw Normal View History

2015-08-23 22:06:47 +00:00
package client
2015-08-23 22:15:48 +00:00
import (
2017-07-03 22:03:42 +00:00
"context"
2015-08-29 22:46:10 +00:00
"fmt"
2015-08-23 22:15:48 +00:00
"log"
"os"
"path/filepath"
2015-08-23 22:15:48 +00:00
"sync"
2015-08-29 22:46:10 +00:00
"time"
2015-08-23 22:15:48 +00:00
2017-04-29 22:43:23 +00:00
"github.com/boltdb/bolt"
2016-02-20 00:31:04 +00:00
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
2016-09-14 20:30:01 +00:00
"github.com/hashicorp/nomad/client/vaultclient"
2017-07-03 04:49:56 +00:00
"github.com/hashicorp/nomad/helper"
2015-08-23 22:15:48 +00:00
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
2015-08-23 22:15:48 +00:00
)
2015-08-23 22:06:47 +00:00
2015-08-29 22:46:10 +00:00
const (
2016-02-20 00:02:48 +00:00
// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
2016-05-15 16:41:34 +00:00
// send that the task was received to the server because another transition
2016-02-20 00:02:48 +00:00
// to running or failed is likely to occur immediately after and a single
2016-05-15 16:41:34 +00:00
// update will transfer all past state information. If not other transition
// has occurred up to this limit, we will send to the server.
2016-02-19 23:49:32 +00:00
taskReceivedSyncLimit = 30 * time.Second
2015-08-29 22:46:10 +00:00
)
2017-04-29 22:43:23 +00:00
var (
// The following are the key paths written to the state database
allocRunnerStateAllocKey = []byte("alloc")
2017-04-29 22:43:23 +00:00
allocRunnerStateImmutableKey = []byte("immutable")
allocRunnerStateMutableKey = []byte("mutable")
allocRunnerStateAllocDirKey = []byte("alloc-dir")
)
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)
type AllocStatsReporter interface {
LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error)
}
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
updater AllocStateUpdater
logger *log.Logger
2015-08-23 22:15:48 +00:00
2016-02-10 21:44:53 +00:00
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
2017-07-03 04:49:56 +00:00
allocHealth *bool // Whether the allocation is healthy
allocBroadcast *cstructs.AllocBroadcaster
2016-02-10 21:44:53 +00:00
allocLock sync.Mutex
2015-08-29 22:46:10 +00:00
dirtyCh chan struct{}
allocDir *allocdir.AllocDir
allocDirLock sync.Mutex
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
taskLock sync.RWMutex
2015-08-23 23:49:48 +00:00
2016-03-22 02:59:58 +00:00
taskStatusLock sync.RWMutex
2015-08-29 22:46:10 +00:00
2015-08-23 22:15:48 +00:00
updateCh chan *structs.Allocation
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI
2016-09-14 20:30:01 +00:00
otherAllocDir *allocdir.AllocDir
2017-07-03 22:03:42 +00:00
ctx context.Context
exitFn context.CancelFunc
waitCh chan struct{}
2017-04-29 22:43:23 +00:00
// State related fields
// stateDB is used to store the alloc runners state
stateDB *bolt.DB
2017-06-23 00:32:20 +00:00
// persistedEval is the last persisted evaluation ID. Since evaluation
// IDs change on every allocation update we only need to persist the
// allocation when its eval ID != the last persisted eval ID.
persistedEvalLock sync.Mutex
persistedEval string
2017-04-29 22:43:23 +00:00
// immutablePersisted and allocDirPersisted are used to track whether the
// immutable data and the alloc dir have been persisted. Once persisted we
// can lower write volume by not re-writing these values
immutablePersisted bool
allocDirPersisted bool
2015-08-23 22:06:47 +00:00
}
2017-05-02 20:31:56 +00:00
// COMPAT: Remove in 0.7.0
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Version string
Alloc *structs.Allocation
AllocDir *allocdir.AllocDir
AllocClientStatus string
AllocClientDescription string
2017-01-06 18:18:38 +00:00
// COMPAT: Remove in 0.7.0: removing will break upgrading directly from
// 0.5.2, so don't remove in the 0.6 series.
// Context is deprecated and only used to migrate from older releases.
// It will be removed in the future.
Context *struct {
AllocID string // unused; included for completeness
AllocDir struct {
AllocDir string
SharedDir string // unused; included for completeness
TaskDirs map[string]string
}
} `json:"Context,omitempty"`
}
// allocRunnerAllocState is state that only has to be written when the alloc
// changes.
type allocRunnerAllocState struct {
Alloc *structs.Allocation
}
// allocRunnerImmutableState is state that only has to be written once.
2017-04-29 22:43:23 +00:00
type allocRunnerImmutableState struct {
Version string
}
// allocRunnerMutableState is state that has to be written on each save as it
// changes over the life-cycle of the alloc_runner.
type allocRunnerMutableState struct {
AllocClientStatus string
AllocClientDescription string
TaskStates map[string]*structs.TaskState
}
// NewAllocRunner is used to create a new allocation context
2017-04-29 22:43:23 +00:00
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
2015-08-30 02:14:47 +00:00
ar := &AllocRunner{
2017-07-03 04:49:56 +00:00
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocBroadcast: cstructs.NewAllocBroadcaster(0),
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
2017-07-03 04:49:56 +00:00
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
2015-08-23 22:06:47 +00:00
}
2017-07-03 22:03:42 +00:00
// TODO Should be passed a context
ar.ctx, ar.exitFn = context.WithCancel(context.TODO())
2015-08-30 02:14:47 +00:00
return ar
2015-08-23 22:06:47 +00:00
}
2017-05-02 20:31:56 +00:00
// pre060StateFilePath returns the path to our state file that would have been
// written pre v0.6.0
// COMPAT: Remove in 0.7.0
func (r *AllocRunner) pre060StateFilePath() string {
r.allocLock.Lock()
defer r.allocLock.Unlock()
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
return path
}
// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
2017-05-02 20:31:56 +00:00
2017-05-09 17:50:24 +00:00
// COMPAT: Remove in 0.7.0
2017-05-02 20:31:56 +00:00
// Check if the old snapshot is there
oldPath := r.pre060StateFilePath()
var snap allocRunnerState
2017-05-09 17:50:24 +00:00
var upgrading bool
2017-05-02 20:31:56 +00:00
if err := pre060RestoreState(oldPath, &snap); err == nil {
// Restore fields
2017-05-09 17:50:24 +00:00
r.logger.Printf("[INFO] client: restoring pre v0.6.0 alloc runner state for alloc %q", r.alloc.ID)
2017-05-02 20:31:56 +00:00
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
if r.alloc != nil {
r.taskStates = snap.Alloc.TaskStates
}
2017-05-09 17:50:24 +00:00
// COMPAT: Remove in 0.7.0
2017-05-02 20:31:56 +00:00
// #2132 Upgrade path: if snap.AllocDir is nil, try to convert old
// Context struct to new AllocDir struct
if snap.AllocDir == nil && snap.Context != nil {
r.logger.Printf("[DEBUG] client: migrating state snapshot for alloc %q", r.alloc.ID)
2017-05-03 18:15:30 +00:00
r.allocDir = allocdir.NewAllocDir(r.logger, snap.Context.AllocDir.AllocDir)
2017-05-02 20:31:56 +00:00
for taskName := range snap.Context.AllocDir.TaskDirs {
2017-05-03 18:15:30 +00:00
r.allocDir.NewTaskDir(taskName)
2017-05-02 20:31:56 +00:00
}
}
2017-05-02 20:31:56 +00:00
// Delete the old state
os.RemoveAll(oldPath)
2017-05-09 17:50:24 +00:00
upgrading = true
2017-05-02 20:31:56 +00:00
} else if !os.IsNotExist(err) {
// Something corrupt in the old state file
return err
} else {
// We are doing a normal restore
err := r.stateDB.View(func(tx *bolt.Tx) error {
bkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to get allocation bucket: %v", err)
}
2017-04-29 22:43:23 +00:00
2017-05-02 20:31:56 +00:00
// Get the state objects
var mutable allocRunnerMutableState
var immutable allocRunnerImmutableState
var allocState allocRunnerAllocState
2017-05-02 20:31:56 +00:00
var allocDir allocdir.AllocDir
if err := getObject(bkt, allocRunnerStateAllocKey, &allocState); err != nil {
return fmt.Errorf("failed to read alloc runner alloc state: %v", err)
}
2017-05-02 20:31:56 +00:00
if err := getObject(bkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to read alloc runner immutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to read alloc runner mutable state: %v", err)
}
if err := getObject(bkt, allocRunnerStateAllocDirKey, &allocDir); err != nil {
return fmt.Errorf("failed to read alloc runner alloc_dir state: %v", err)
}
// Populate the fields
r.alloc = allocState.Alloc
2017-05-02 20:31:56 +00:00
r.allocDir = &allocDir
r.allocClientStatus = mutable.AllocClientStatus
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
2017-05-09 17:50:24 +00:00
r.alloc.ClientStatus = getClientStatus(r.taskStates)
2017-05-02 20:31:56 +00:00
return nil
})
if err != nil {
return fmt.Errorf("failed to read allocation state: %v", err)
}
}
2016-06-17 21:58:53 +00:00
var snapshotErrors multierror.Error
if r.alloc == nil {
snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil allocation"))
}
if r.allocDir == nil {
snapshotErrors.Errors = append(snapshotErrors.Errors, fmt.Errorf("alloc_runner snapshot includes a nil alloc dir"))
}
2016-06-17 21:58:53 +00:00
if e := snapshotErrors.ErrorOrNil(); e != nil {
return e
}
2017-05-02 20:31:56 +00:00
tg := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if tg == nil {
return fmt.Errorf("restored allocation doesn't contain task group %q", r.alloc.TaskGroup)
}
// Restore the task runners
var mErr multierror.Error
2017-05-02 20:31:56 +00:00
for _, task := range tg.Tasks {
name := task.Name
state := r.taskStates[name]
// Mark the task as restored.
r.restored[name] = struct{}{}
td, ok := r.allocDir.TaskDirs[name]
if !ok {
// Create the task dir metadata if it doesn't exist.
// Since task dirs are created during r.Run() the
// client may save state and exit before all task dirs
// are created
td = r.allocDir.NewTaskDir(name)
}
2017-04-29 22:43:23 +00:00
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr
2015-11-09 23:55:31 +00:00
// Skip tasks in terminal states.
if state.State == structs.TaskStateDead {
2015-11-09 23:55:31 +00:00
continue
}
if restartReason, err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task %q: %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
2016-02-19 23:49:32 +00:00
} else if !r.alloc.TerminalStatus() {
// Only start if the alloc isn't in a terminal status.
go tr.Run()
2017-05-09 17:50:24 +00:00
if upgrading {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[WARN] client: initial save state for alloc %s task %s failed: %v", r.alloc.ID, name, err)
}
}
2017-05-09 18:06:05 +00:00
// Restart task runner if RestoreState gave a reason
if restartReason != "" {
2017-05-09 18:20:35 +00:00
r.logger.Printf("[INFO] client: restarting alloc %s task %s: %v", r.alloc.ID, name, restartReason)
tr.Restart("upgrade", restartReason)
}
}
}
2016-09-14 20:30:01 +00:00
return mErr.ErrorOrNil()
}
2015-11-10 00:15:11 +00:00
// SaveState is used to snapshot the state of the alloc runner
// if the fullSync is marked as false only the state of the Alloc Runner
// is snapshotted. If fullSync is marked as true, we snapshot
// all the Task Runners associated with the Alloc
func (r *AllocRunner) SaveState() error {
if err := r.saveAllocRunnerState(); err != nil {
return err
}
2015-11-10 00:15:11 +00:00
// Save state for each task
2016-07-21 20:41:01 +00:00
runners := r.getTaskRunners()
var mErr multierror.Error
2016-07-21 20:41:01 +00:00
for _, tr := range runners {
2015-11-10 00:15:11 +00:00
if err := r.saveTaskRunnerState(tr); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
func (r *AllocRunner) saveAllocRunnerState() error {
2017-04-29 22:43:23 +00:00
// Grab all the relevant data
2016-02-10 21:44:53 +00:00
alloc := r.Alloc()
r.allocLock.Lock()
2016-02-10 21:44:53 +00:00
allocClientStatus := r.allocClientStatus
allocClientDescription := r.allocClientDescription
r.allocLock.Unlock()
r.allocDirLock.Lock()
allocDir := r.allocDir.Copy()
r.allocDirLock.Unlock()
2016-02-10 21:44:53 +00:00
2017-05-01 18:09:08 +00:00
// Start the transaction.
return r.stateDB.Batch(func(tx *bolt.Tx) error {
2017-04-29 22:43:23 +00:00
2017-05-01 18:09:08 +00:00
// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
2017-04-29 22:43:23 +00:00
}
2017-06-23 00:32:20 +00:00
// Write the allocation if the eval has changed
r.persistedEvalLock.Lock()
lastPersisted := r.persistedEval
r.persistedEvalLock.Unlock()
if alloc.EvalID != lastPersisted {
allocState := &allocRunnerAllocState{
Alloc: alloc,
2017-05-01 18:09:08 +00:00
}
2017-04-29 22:43:23 +00:00
if err := putObject(allocBkt, allocRunnerStateAllocKey, &allocState); err != nil {
return fmt.Errorf("failed to write alloc_runner alloc state: %v", err)
2017-05-01 18:09:08 +00:00
}
tx.OnCommit(func() {
2017-06-23 00:32:20 +00:00
r.persistedEvalLock.Lock()
r.persistedEval = alloc.EvalID
r.persistedEvalLock.Unlock()
2017-05-01 18:09:08 +00:00
})
2017-04-29 22:43:23 +00:00
}
// Write immutable data iff it hasn't been written yet
2017-05-01 18:09:08 +00:00
if !r.immutablePersisted {
immutable := &allocRunnerImmutableState{
Version: r.config.Version,
}
2017-04-29 22:43:23 +00:00
2017-05-01 18:09:08 +00:00
if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to write alloc_runner immutable state: %v", err)
}
tx.OnCommit(func() {
r.immutablePersisted = true
})
2017-04-29 22:43:23 +00:00
}
2017-05-01 18:09:08 +00:00
// Write the alloc dir data if it hasn't been written before and it exists.
if !r.allocDirPersisted && allocDir != nil {
2017-05-01 18:09:08 +00:00
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
}
2017-04-29 22:43:23 +00:00
2017-05-01 18:09:08 +00:00
tx.OnCommit(func() {
r.allocDirPersisted = true
})
}
2017-04-29 22:43:23 +00:00
2017-05-01 18:09:08 +00:00
// Write the mutable state every time
mutable := &allocRunnerMutableState{
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: alloc.TaskStates,
}
if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to write alloc_runner mutable state: %v", err)
}
2017-04-29 22:43:23 +00:00
2017-05-01 18:09:08 +00:00
return nil
})
}
2015-11-10 00:15:11 +00:00
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
2016-08-08 23:57:21 +00:00
if err := tr.SaveState(); err != nil {
2016-08-10 22:17:32 +00:00
return fmt.Errorf("failed to save state for alloc %s task '%s': %v",
r.alloc.ID, tr.task.Name, err)
}
2016-08-08 23:57:21 +00:00
return nil
}
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
2017-05-02 20:31:56 +00:00
return r.stateDB.Update(func(tx *bolt.Tx) error {
if err := deleteAllocationBucket(tx, r.alloc.ID); err != nil {
return fmt.Errorf("failed to delete allocation bucket: %v", err)
}
return nil
})
}
2015-08-31 00:35:58 +00:00
// DestroyContext is used to destroy the context
func (r *AllocRunner) DestroyContext() error {
return r.allocDir.Destroy()
2015-08-31 00:35:58 +00:00
}
2017-05-02 20:31:56 +00:00
// GetAllocDir returns the alloc dir for the alloc runner
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
2016-02-10 21:44:53 +00:00
// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
for task, state := range states {
copy[task] = state.Copy()
}
return copy
}
2015-08-23 22:06:47 +00:00
// Alloc returns the associated allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
// Don't do a deep copy of the job
alloc := r.alloc.CopySkipJob()
2016-05-15 16:41:34 +00:00
// The status has explicitly been set.
2016-02-10 21:44:53 +00:00
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
// Copy over the task states so we don't lose them
r.taskStatusLock.RLock()
alloc.TaskStates = copyTaskStates(r.taskStates)
r.taskStatusLock.RUnlock()
2016-02-10 21:44:53 +00:00
r.allocLock.Unlock()
return alloc
}
2017-07-03 04:49:56 +00:00
// The health has been set
if r.allocHealth != nil {
if alloc.DeploymentStatus == nil {
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{}
}
alloc.DeploymentStatus.Healthy = helper.BoolToPtr(*r.allocHealth)
}
r.allocLock.Unlock()
// Scan the task states to determine the status of the alloc
r.taskStatusLock.RLock()
2016-02-10 21:44:53 +00:00
alloc.TaskStates = copyTaskStates(r.taskStates)
2017-05-09 17:50:24 +00:00
alloc.ClientStatus = getClientStatus(r.taskStates)
r.taskStatusLock.RUnlock()
// If the client status is failed and we are part of a deployment, mark the
// alloc as unhealthy. This guards against the watcher not be started.
r.allocLock.Lock()
if alloc.ClientStatus == structs.AllocClientStatusFailed &&
alloc.DeploymentID != "" && !alloc.DeploymentStatus.IsUnhealthy() {
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}
}
r.allocLock.Unlock()
2017-05-09 17:50:24 +00:00
return alloc
}
// getClientStatus takes in the task states for a given allocation and computes
// the client status
func getClientStatus(taskStates map[string]*structs.TaskState) string {
var pending, running, dead, failed bool
for _, state := range taskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
if state.Failed {
failed = true
} else {
dead = true
}
}
}
// Determine the alloc status
if failed {
2017-05-09 17:50:24 +00:00
return structs.AllocClientStatusFailed
} else if running {
2017-05-09 17:50:24 +00:00
return structs.AllocClientStatusRunning
} else if pending {
2017-05-09 17:50:24 +00:00
return structs.AllocClientStatusPending
} else if dead {
2017-05-09 17:50:24 +00:00
return structs.AllocClientStatusComplete
}
2017-05-09 17:50:24 +00:00
return ""
2015-08-23 22:06:47 +00:00
}
2015-08-31 00:10:17 +00:00
// dirtySyncState is used to watch for state being marked dirty to sync
func (r *AllocRunner) dirtySyncState() {
2015-08-29 22:46:10 +00:00
for {
select {
case <-r.dirtyCh:
r.syncStatus()
2017-07-03 22:03:42 +00:00
case <-r.ctx.Done():
2015-08-29 22:46:10 +00:00
return
}
2015-08-31 00:10:17 +00:00
}
}
2015-08-29 22:46:10 +00:00
2015-08-31 00:10:17 +00:00
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()
r.updater(alloc)
2017-07-03 04:49:56 +00:00
r.allocBroadcast.Send(alloc)
return r.saveAllocRunnerState()
2015-08-29 22:46:10 +00:00
}
// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
2016-02-10 21:44:53 +00:00
r.allocLock.Lock()
r.allocClientStatus = status
r.allocClientDescription = desc
r.allocLock.Unlock()
2015-08-29 22:46:10 +00:00
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
2016-11-04 23:57:24 +00:00
// setTaskState is used to set the status of a task. If state is empty then the
// event is appended but not synced with the server. The event may be omitted
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
if !ok {
2016-03-01 22:09:25 +00:00
taskState = &structs.TaskState{}
r.taskStates[taskName] = taskState
}
// Set the tasks state.
2016-11-04 23:57:24 +00:00
if event != nil {
if event.FailsTask {
taskState.Failed = true
}
2017-07-03 04:49:56 +00:00
if event.Type == structs.TaskRestarting {
taskState.Restarts++
taskState.LastRestart = time.Unix(0, event.Time)
2017-07-03 04:49:56 +00:00
}
2016-11-04 23:57:24 +00:00
r.appendTaskEvent(taskState, event)
}
2016-11-04 23:57:24 +00:00
if state == "" {
return
}
switch state {
case structs.TaskStateRunning:
// Capture the start time if it is just starting
if taskState.State != structs.TaskStateRunning {
taskState.StartedAt = time.Now().UTC()
}
case structs.TaskStateDead:
// Capture the finished time. If it has never started there is no finish
// time
if !taskState.StartedAt.IsZero() {
taskState.FinishedAt = time.Now().UTC()
}
2017-02-11 01:55:19 +00:00
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*TaskRunner
var otherTaskNames []string
leader := false
for task, tr := range r.tasks {
if task != taskName {
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, task)
} else if tr.task.Leader {
leader = true
}
}
2016-09-14 22:04:25 +00:00
// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
2017-02-11 01:55:19 +00:00
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
2016-09-14 22:04:25 +00:00
}
2017-02-11 01:55:19 +00:00
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
} else if leader {
// If the task was a leader task we should kill all the other tasks.
2017-02-11 01:55:19 +00:00
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames)
2016-09-14 22:04:25 +00:00
}
}
2016-02-19 22:49:43 +00:00
}
// Store the new state
taskState.State = state
2015-08-29 22:46:10 +00:00
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
// appendTaskEvent updates the task status by appending the new event.
func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
capacity := 10
if state.Events == nil {
state.Events = make([]*structs.TaskEvent, 0, capacity)
}
// If we hit capacity, then shift it.
if len(state.Events) == capacity {
old := state.Events
state.Events = make([]*structs.TaskEvent, 0, capacity)
state.Events = append(state.Events, old[1:]...)
}
state.Events = append(state.Events, event)
}
2015-08-23 22:06:47 +00:00
// Run is a long running goroutine used to manage an allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Run() {
2015-10-04 20:36:03 +00:00
defer close(r.waitCh)
2015-08-31 00:10:17 +00:00
go r.dirtySyncState()
2017-07-03 22:03:42 +00:00
2015-08-29 22:46:10 +00:00
// Find the task group to run in the allocation
2017-05-02 20:31:56 +00:00
alloc := r.Alloc()
2015-08-30 02:14:47 +00:00
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
2015-08-23 23:49:48 +00:00
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
2015-08-29 22:46:10 +00:00
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
2015-08-23 23:49:48 +00:00
return
}
// Create the execution context
r.allocDirLock.Lock()
// Build allocation directory (idempotent)
if err := r.allocDir.Build(); err != nil {
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
r.allocDirLock.Unlock()
return
}
if r.otherAllocDir != nil {
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERROR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err)
}
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERROR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
}
2015-08-30 02:14:47 +00:00
}
r.allocDirLock.Unlock()
2015-08-23 23:49:48 +00:00
// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
2016-02-04 22:19:27 +00:00
return
}
// Start the watcher
wCtx, watcherCancel := context.WithCancel(r.ctx)
go r.watchHealth(wCtx)
2015-08-23 23:49:48 +00:00
// Start the task runners
2016-02-04 22:19:27 +00:00
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
r.taskLock.Lock()
2015-08-23 23:49:48 +00:00
for _, task := range tg.Tasks {
if _, ok := r.restored[task.Name]; ok {
2015-08-30 02:14:47 +00:00
continue
}
r.allocDirLock.Lock()
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()
2017-04-29 22:43:23 +00:00
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
2015-08-23 23:49:48 +00:00
r.tasks[task.Name] = tr
tr.MarkReceived()
2016-09-14 20:30:01 +00:00
2015-08-23 23:49:48 +00:00
go tr.Run()
}
r.taskLock.Unlock()
2015-08-23 23:49:48 +00:00
// taskDestroyEvent contains an event that caused the destroyment of a task
// in the allocation.
var taskDestroyEvent *structs.TaskEvent
2015-08-29 22:46:10 +00:00
OUTER:
2015-08-23 23:49:48 +00:00
// Wait for updates
2015-08-23 22:06:47 +00:00
for {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
// If the deployment ids have changed clear the health
if r.alloc.DeploymentID != update.DeploymentID {
r.allocHealth = nil
}
r.alloc = update
r.allocLock.Unlock()
// Create a new watcher
watcherCancel()
wCtx, watcherCancel = context.WithCancel(r.ctx)
go r.watchHealth(wCtx)
2015-08-29 22:46:10 +00:00
// Check if we're in a terminal status
if update.TerminalStatus() {
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
2015-08-29 22:46:10 +00:00
break OUTER
}
// Update the task groups
2016-07-21 20:41:01 +00:00
runners := r.getTaskRunners()
for _, tr := range runners {
tr.Update(update)
2015-08-29 22:46:10 +00:00
}
2017-05-02 20:31:56 +00:00
if err := r.syncStatus(); err != nil {
r.logger.Printf("[WARN] client: failed to sync status upon receiving alloc update: %v", err)
}
2017-07-03 22:03:42 +00:00
case <-r.ctx.Done():
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
2015-08-29 22:46:10 +00:00
break OUTER
2015-08-23 22:15:48 +00:00
}
2015-08-23 22:06:47 +00:00
}
2015-08-29 22:46:10 +00:00
2016-09-15 18:37:20 +00:00
// Kill the task runners
r.destroyTaskRunners(taskDestroyEvent)
// Block until we should destroy the state of the alloc
r.handleDestroy()
// Free up the context. It has likely exited already
watcherCancel()
2016-09-15 18:37:20 +00:00
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}
// SetPreviousAllocDir sets the previous allocation directory of the current
// allocation
func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
r.otherAllocDir = allocDir
}
2016-09-15 18:37:20 +00:00
// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
2016-07-21 20:41:01 +00:00
runners := r.getTaskRunners()
// First destroy the leader
for _, tr := range runners {
if tr.task.Leader {
r.logger.Printf("[DEBUG] client: destroying leader task %q of task group %q first", tr.task.Name, tr.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
}
}
// Then destroy non-leader tasks concurrently
2016-07-21 20:41:01 +00:00
for _, tr := range runners {
if !tr.task.Leader {
tr.Destroy(destroyEvent)
}
2015-08-29 22:46:10 +00:00
}
// Wait for termination of the task runners
2016-07-21 20:41:01 +00:00
for _, tr := range runners {
2015-08-29 22:46:10 +00:00
<-tr.WaitCh()
}
}
2015-08-31 00:10:17 +00:00
// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {
// Final state sync. We do this to ensure that the server has the correct
// state as we wait for a destroy.
r.syncStatus()
2017-03-10 05:05:34 +00:00
for {
select {
2017-07-03 22:03:42 +00:00
case <-r.ctx.Done():
2017-03-10 05:05:34 +00:00
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
}
if err := r.DestroyState(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
}
return
case <-r.updateCh:
r.logger.Printf("[DEBUG] client: dropping update to terminal alloc '%s'", r.alloc.ID)
2015-08-31 00:35:58 +00:00
}
}
2015-08-23 22:06:47 +00:00
}
// Update is used to update the allocation of the context
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Update(update *structs.Allocation) {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case r.updateCh <- update:
2015-08-23 22:15:48 +00:00
default:
2015-08-23 22:36:06 +00:00
r.logger.Printf("[ERR] client: dropping update to alloc '%s'", update.ID)
2015-08-23 22:15:48 +00:00
}
2015-08-23 22:06:47 +00:00
}
// StatsReporter returns an interface to query resource usage statistics of an
// allocation
func (r *AllocRunner) StatsReporter() AllocStatsReporter {
return r
}
2016-07-21 20:41:01 +00:00
// getTaskRunners is a helper that returns a copy of the task runners list using
// the taskLock.
func (r *AllocRunner) getTaskRunners() []*TaskRunner {
// Get the task runners
r.taskLock.RLock()
defer r.taskLock.RUnlock()
runners := make([]*TaskRunner, 0, len(r.tasks))
for _, tr := range r.tasks {
runners = append(runners, tr)
}
return runners
}
// LatestAllocStats returns the latest allocation stats. If the optional taskFilter is set
// the allocation stats will only include the given task.
func (r *AllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
astat := &cstructs.AllocResourceUsage{
Tasks: make(map[string]*cstructs.TaskResourceUsage),
}
var flat []*cstructs.TaskResourceUsage
if taskFilter != "" {
2016-06-20 17:19:06 +00:00
r.taskLock.RLock()
tr, ok := r.tasks[taskFilter]
2016-06-20 17:19:06 +00:00
r.taskLock.RUnlock()
if !ok {
return nil, fmt.Errorf("allocation %q has no task %q", r.alloc.ID, taskFilter)
}
l := tr.LatestResourceUsage()
if l != nil {
astat.Tasks[taskFilter] = l
flat = []*cstructs.TaskResourceUsage{l}
astat.Timestamp = l.Timestamp
}
} else {
2016-06-20 17:19:06 +00:00
// Get the task runners
2016-07-21 20:41:01 +00:00
runners := r.getTaskRunners()
2016-06-20 17:19:06 +00:00
for _, tr := range runners {
l := tr.LatestResourceUsage()
if l != nil {
2016-06-20 17:19:06 +00:00
astat.Tasks[tr.task.Name] = l
flat = append(flat, l)
if l.Timestamp > astat.Timestamp {
astat.Timestamp = l.Timestamp
}
}
}
}
astat.ResourceUsage = sumTaskResourceUsage(flat)
return astat, nil
}
// sumTaskResourceUsage takes a set of task resources and sums their resources
func sumTaskResourceUsage(usages []*cstructs.TaskResourceUsage) *cstructs.ResourceUsage {
summed := &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{},
CpuStats: &cstructs.CpuStats{},
}
for _, usage := range usages {
summed.Add(usage.ResourceUsage)
}
return summed
}
// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}
2015-08-23 22:06:47 +00:00
// Destroy is used to indicate that the allocation context should be destroyed
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Destroy() {
2017-07-03 22:03:42 +00:00
r.exitFn()
2017-07-03 04:49:56 +00:00
r.allocBroadcast.Close()
2015-08-23 22:06:47 +00:00
}
2015-10-04 20:36:03 +00:00
// WaitCh returns a channel to wait for termination
func (r *AllocRunner) WaitCh() <-chan struct{} {
return r.waitCh
}