2018-06-22 00:35:07 +00:00
|
|
|
package taskrunner
|
|
|
|
|
|
|
|
import (
|
2018-07-11 04:22:04 +00:00
|
|
|
"bytes"
|
2018-06-22 00:35:07 +00:00
|
|
|
"context"
|
|
|
|
"fmt"
|
2018-07-11 04:22:04 +00:00
|
|
|
"io"
|
2018-06-22 00:35:07 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
metrics "github.com/armon/go-metrics"
|
2018-07-11 04:22:04 +00:00
|
|
|
"github.com/boltdb/bolt"
|
2018-06-22 00:35:07 +00:00
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
|
|
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
|
|
|
|
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
|
|
|
|
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state"
|
2018-06-29 00:01:05 +00:00
|
|
|
"github.com/hashicorp/nomad/client/config"
|
2018-06-22 00:35:07 +00:00
|
|
|
"github.com/hashicorp/nomad/client/driver"
|
|
|
|
"github.com/hashicorp/nomad/client/driver/env"
|
2018-07-11 04:22:04 +00:00
|
|
|
oldstate "github.com/hashicorp/nomad/client/state"
|
2018-07-12 23:15:33 +00:00
|
|
|
"github.com/hashicorp/nomad/client/vaultclient"
|
2018-06-22 00:35:07 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
2018-07-11 04:22:04 +00:00
|
|
|
"github.com/ugorji/go/codec"
|
|
|
|
"golang.org/x/crypto/blake2b"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// taskRunnerStateAllKey holds all the task runners state. At the moment
|
|
|
|
// there is no need to split it
|
|
|
|
taskRunnerStateAllKey = []byte("simple-all")
|
2018-06-22 00:35:07 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type TaskRunner struct {
|
2018-06-29 22:39:22 +00:00
|
|
|
// allocID and taskName are immutable so store a copy to access without
|
|
|
|
// locks
|
|
|
|
allocID string
|
|
|
|
taskName string
|
2018-06-29 00:20:13 +00:00
|
|
|
|
|
|
|
alloc *structs.Allocation
|
|
|
|
allocLock sync.Mutex
|
|
|
|
|
|
|
|
clientConfig *config.Config
|
2018-06-22 00:35:07 +00:00
|
|
|
|
2018-07-11 04:22:04 +00:00
|
|
|
// state captures the state of the task for updating the allocation
|
|
|
|
state *structs.TaskState
|
|
|
|
|
|
|
|
// localState captures the node-local state of the task for when the
|
|
|
|
// Nomad agent restarts
|
2018-07-12 23:15:33 +00:00
|
|
|
localState *state.LocalState
|
|
|
|
localStateLock sync.RWMutex
|
2018-07-11 04:22:04 +00:00
|
|
|
|
|
|
|
// stateDB is for persisting localState
|
|
|
|
stateDB *bolt.DB
|
|
|
|
|
|
|
|
// persistedHash is the hash of the last persisted state for skipping
|
|
|
|
// unnecessary writes
|
|
|
|
persistedHash []byte
|
2018-06-22 00:35:07 +00:00
|
|
|
|
2018-06-29 21:53:31 +00:00
|
|
|
// ctx is the task runner's context and is done whe the task runner
|
2018-06-29 23:22:18 +00:00
|
|
|
// should exit. Shutdown hooks are run.
|
2018-06-22 00:35:07 +00:00
|
|
|
ctx context.Context
|
|
|
|
|
2018-06-29 23:22:18 +00:00
|
|
|
// ctxCancel is used to exit the task runner's Run loop without
|
|
|
|
// stopping the task. Shutdown hooks are run.
|
2018-06-22 00:35:07 +00:00
|
|
|
ctxCancel context.CancelFunc
|
|
|
|
|
|
|
|
// Logger is the logger for the task runner.
|
|
|
|
logger log.Logger
|
|
|
|
|
2018-06-29 21:53:31 +00:00
|
|
|
// updateCh receives Alloc updates
|
|
|
|
updateCh chan *structs.Allocation
|
|
|
|
|
2018-06-22 00:35:07 +00:00
|
|
|
// waitCh is closed when the task runner has transitioned to a terminal
|
|
|
|
// state
|
|
|
|
waitCh chan struct{}
|
|
|
|
|
|
|
|
// driver is the driver for the task.
|
|
|
|
driver driver.Driver
|
|
|
|
|
|
|
|
// handle is the handle to the currently running driver
|
2018-06-29 23:22:18 +00:00
|
|
|
handle driver.DriverHandle
|
|
|
|
//XXX(schmichael) I think the handle is only manipulated in the Restore
|
|
|
|
// and Run loops, so there's never concurrent access.
|
|
|
|
//handleLock sync.Mutex
|
2018-06-22 00:35:07 +00:00
|
|
|
|
2018-07-11 04:22:04 +00:00
|
|
|
// task is the task being run
|
2018-06-22 00:35:07 +00:00
|
|
|
task *structs.Task
|
|
|
|
taskLock sync.RWMutex
|
|
|
|
|
|
|
|
// taskDir is the directory structure for this task.
|
|
|
|
taskDir *allocdir.TaskDir
|
|
|
|
|
|
|
|
// envBuilder is used to build the task's environment
|
|
|
|
envBuilder *env.Builder
|
|
|
|
|
|
|
|
// restartTracker is used to decide if the task should be restarted.
|
|
|
|
restartTracker *restarts.RestartTracker
|
|
|
|
|
|
|
|
// runnerHooks are task runner lifecycle hooks that should be run on state
|
|
|
|
// transistions.
|
|
|
|
runnerHooks []interfaces.TaskHook
|
|
|
|
|
2018-07-12 23:15:33 +00:00
|
|
|
// vaultClient is the client to use to derive and renew Vault tokens
|
|
|
|
vaultClient vaultclient.VaultClient
|
|
|
|
|
|
|
|
// vaultToken is the current Vault token. It should be accessed with the
|
|
|
|
// getter.
|
|
|
|
vaultToken string
|
|
|
|
vaultTokenLock sync.Mutex
|
|
|
|
|
2018-06-22 00:35:07 +00:00
|
|
|
// baseLabels are used when emitting tagged metrics. All task runner metrics
|
|
|
|
// will have these tags, and optionally more.
|
|
|
|
baseLabels []metrics.Label
|
|
|
|
}
|
|
|
|
|
|
|
|
type Config struct {
|
2018-06-29 00:01:05 +00:00
|
|
|
Alloc *structs.Allocation
|
|
|
|
ClientConfig *config.Config
|
|
|
|
Task *structs.Task
|
|
|
|
TaskDir *allocdir.TaskDir
|
|
|
|
Logger log.Logger
|
2018-06-22 00:35:07 +00:00
|
|
|
|
2018-07-12 23:15:33 +00:00
|
|
|
// VaultClient is the client to use to derive and renew Vault tokens
|
|
|
|
VaultClient vaultclient.VaultClient
|
|
|
|
|
2018-07-11 04:22:04 +00:00
|
|
|
// LocalState is optionally restored task state
|
|
|
|
LocalState *state.LocalState
|
|
|
|
|
|
|
|
// StateDB is used to store and restore state.
|
|
|
|
StateDB *bolt.DB
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
|
|
|
// Create a context for the runner
|
|
|
|
trCtx, trCancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
// Initialize the environment builder
|
|
|
|
envBuilder := env.NewBuilder(
|
2018-06-29 00:01:05 +00:00
|
|
|
config.ClientConfig.Node,
|
|
|
|
config.Alloc,
|
2018-06-22 00:35:07 +00:00
|
|
|
config.Task,
|
2018-06-29 21:53:31 +00:00
|
|
|
config.ClientConfig.Region,
|
|
|
|
)
|
2018-06-22 00:35:07 +00:00
|
|
|
|
|
|
|
tr := &TaskRunner{
|
2018-06-29 00:20:13 +00:00
|
|
|
alloc: config.Alloc,
|
|
|
|
allocID: config.Alloc.ID,
|
|
|
|
clientConfig: config.ClientConfig,
|
|
|
|
task: config.Task,
|
|
|
|
taskDir: config.TaskDir,
|
2018-06-29 22:39:22 +00:00
|
|
|
taskName: config.Task.Name,
|
2018-06-29 00:20:13 +00:00
|
|
|
envBuilder: envBuilder,
|
2018-07-12 23:15:33 +00:00
|
|
|
vaultClient: config.VaultClient,
|
2018-07-11 04:22:04 +00:00
|
|
|
//XXX Make a Copy to avoid races?
|
|
|
|
state: config.Alloc.TaskStates[config.Task.Name],
|
|
|
|
localState: config.LocalState,
|
|
|
|
stateDB: config.StateDB,
|
|
|
|
ctx: trCtx,
|
|
|
|
ctxCancel: trCancel,
|
|
|
|
updateCh: make(chan *structs.Allocation),
|
|
|
|
waitCh: make(chan struct{}),
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create the logger based on the allocation ID
|
|
|
|
tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name)
|
|
|
|
|
|
|
|
// Build the restart tracker.
|
2018-06-29 21:53:31 +00:00
|
|
|
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
|
2018-06-22 00:35:07 +00:00
|
|
|
if tg == nil {
|
|
|
|
tr.logger.Error("alloc missing task group")
|
|
|
|
return nil, fmt.Errorf("alloc missing task group")
|
|
|
|
}
|
2018-06-29 21:53:31 +00:00
|
|
|
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
|
2018-06-22 00:35:07 +00:00
|
|
|
|
|
|
|
// Initialize the task state
|
|
|
|
tr.initState()
|
|
|
|
|
|
|
|
// Get the driver
|
|
|
|
if err := tr.initDriver(); err != nil {
|
|
|
|
tr.logger.Error("failed to create driver", "error", err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize the runners hooks.
|
|
|
|
tr.initHooks()
|
|
|
|
|
|
|
|
// Initialize base labels
|
|
|
|
tr.initLabels()
|
|
|
|
|
|
|
|
return tr, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tr *TaskRunner) initState() {
|
|
|
|
if tr.state == nil {
|
2018-07-11 04:22:04 +00:00
|
|
|
tr.state = &structs.TaskState{
|
|
|
|
State: structs.TaskStatePending,
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
}
|
2018-07-11 04:22:04 +00:00
|
|
|
if tr.localState == nil {
|
|
|
|
tr.localState = state.NewLocalState()
|
|
|
|
}
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (tr *TaskRunner) initLabels() {
|
2018-06-29 00:20:13 +00:00
|
|
|
alloc := tr.Alloc()
|
2018-06-22 00:35:07 +00:00
|
|
|
tr.baseLabels = []metrics.Label{
|
|
|
|
{
|
|
|
|
Name: "job",
|
|
|
|
Value: alloc.Job.Name,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Name: "task_group",
|
|
|
|
Value: alloc.TaskGroup,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Name: "alloc_id",
|
2018-06-29 00:20:13 +00:00
|
|
|
Value: tr.allocID,
|
2018-06-22 00:35:07 +00:00
|
|
|
},
|
|
|
|
{
|
|
|
|
Name: "task",
|
2018-06-29 22:39:22 +00:00
|
|
|
Value: tr.taskName,
|
2018-06-22 00:35:07 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tr *TaskRunner) Run() {
|
|
|
|
defer close(tr.waitCh)
|
|
|
|
|
2018-06-29 23:22:18 +00:00
|
|
|
MAIN:
|
2018-06-22 00:35:07 +00:00
|
|
|
for {
|
|
|
|
// Run the prerun hooks
|
2018-06-29 23:22:18 +00:00
|
|
|
if err := tr.prerun(); err != nil {
|
2018-06-22 00:35:07 +00:00
|
|
|
tr.logger.Error("prerun failed", "error", err)
|
|
|
|
tr.restartTracker.SetStartError(err)
|
|
|
|
goto RESTART
|
|
|
|
}
|
|
|
|
|
2018-06-29 21:53:31 +00:00
|
|
|
// Run the task
|
2018-06-29 23:22:18 +00:00
|
|
|
if err := tr.runDriver(); err != nil {
|
2018-06-22 00:35:07 +00:00
|
|
|
tr.logger.Error("running driver failed", "error", err)
|
|
|
|
tr.restartTracker.SetStartError(err)
|
|
|
|
goto RESTART
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the postrun hooks
|
2018-06-29 23:22:18 +00:00
|
|
|
if err := tr.postrun(); err != nil {
|
2018-06-22 00:35:07 +00:00
|
|
|
tr.logger.Error("postrun failed", "error", err)
|
|
|
|
}
|
|
|
|
|
2018-06-29 23:22:18 +00:00
|
|
|
WAIT:
|
|
|
|
select {
|
|
|
|
case waitRes := <-tr.handle.WaitCh():
|
|
|
|
tr.restartTracker.SetWaitResult(waitRes)
|
|
|
|
case _ = <-tr.updateCh:
|
|
|
|
//XXX Need to copy handleUpdate over
|
|
|
|
tr.logger.Warn("update not handled")
|
|
|
|
goto WAIT
|
|
|
|
case <-tr.ctx.Done():
|
|
|
|
tr.logger.Debug("task runner cancelled")
|
|
|
|
break MAIN
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
RESTART:
|
|
|
|
// Actually restart by sleeping and also watching for destroy events
|
2018-06-29 23:22:18 +00:00
|
|
|
restart, restartWait := tr.shouldRestart()
|
|
|
|
if !restart {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
deadline := time.Now().Add(restartWait)
|
|
|
|
timer := time.NewTimer(restartWait)
|
|
|
|
for time.Now().Before(deadline) {
|
2018-06-22 00:35:07 +00:00
|
|
|
select {
|
2018-06-29 23:22:18 +00:00
|
|
|
case <-timer.C:
|
|
|
|
case _ = <-tr.updateCh:
|
|
|
|
//XXX Need to copy handleUpdate over
|
|
|
|
tr.logger.Warn("update not handled")
|
2018-06-22 00:35:07 +00:00
|
|
|
case <-tr.ctx.Done():
|
2018-06-29 23:22:18 +00:00
|
|
|
tr.logger.Debug("task runner cancelled")
|
|
|
|
break MAIN
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
}
|
2018-06-29 23:22:18 +00:00
|
|
|
timer.Stop()
|
|
|
|
}
|
2018-06-22 00:35:07 +00:00
|
|
|
|
2018-06-29 23:22:18 +00:00
|
|
|
// Run the shutdown hooks
|
|
|
|
if err := tr.shutdown(); err != nil {
|
|
|
|
tr.logger.Error("postrun failed", "error", err)
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
2018-06-29 23:22:18 +00:00
|
|
|
|
|
|
|
tr.logger.Debug("task run loop exiting")
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
|
|
|
// Determine if we should restart
|
|
|
|
state, when := tr.restartTracker.GetState()
|
|
|
|
reason := tr.restartTracker.GetReason()
|
|
|
|
switch state {
|
|
|
|
case structs.TaskNotRestarting, structs.TaskTerminated:
|
|
|
|
tr.logger.Info("not restarting task", "reason", reason)
|
|
|
|
if state == structs.TaskNotRestarting {
|
2018-06-27 23:57:31 +00:00
|
|
|
tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask())
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
return false, 0
|
|
|
|
case structs.TaskRestarting:
|
|
|
|
tr.logger.Info("restarting task", "reason", reason, "delay", when)
|
2018-06-27 23:57:31 +00:00
|
|
|
tr.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason))
|
2018-06-22 00:35:07 +00:00
|
|
|
return true, 0
|
|
|
|
default:
|
|
|
|
tr.logger.Error("restart tracker returned unknown state", "state", state)
|
|
|
|
return true, when
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// runDriver runs the driver and waits for it to exit
|
2018-06-29 23:22:18 +00:00
|
|
|
func (tr *TaskRunner) runDriver() error {
|
2018-06-22 00:35:07 +00:00
|
|
|
// Run prestart
|
|
|
|
ctx := driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
|
|
|
|
_, err := tr.driver.Prestart(ctx, tr.task)
|
|
|
|
if err != nil {
|
|
|
|
tr.logger.Error("driver pre-start failed", "error", err)
|
2018-06-29 23:22:18 +00:00
|
|
|
return err
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create a new context for Start since the environment may have been updated.
|
|
|
|
ctx = driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
|
|
|
|
|
|
|
|
// Start the job
|
|
|
|
sresp, err := tr.driver.Start(ctx, tr.task)
|
|
|
|
if err != nil {
|
|
|
|
tr.logger.Warn("driver start failed", "error", err)
|
2018-06-29 23:22:18 +00:00
|
|
|
return err
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Wait on the handle
|
2018-06-29 23:22:18 +00:00
|
|
|
tr.handle = sresp.Handle
|
|
|
|
//XXX need to capture the driver network
|
2018-06-22 00:35:07 +00:00
|
|
|
|
|
|
|
// Emit an event that we started
|
2018-06-27 23:57:31 +00:00
|
|
|
tr.SetState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
2018-06-29 23:22:18 +00:00
|
|
|
return nil
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// initDriver creates the driver for the task
|
|
|
|
func (tr *TaskRunner) initDriver() error {
|
|
|
|
// Create a task-specific event emitter callback to expose minimal
|
|
|
|
// state to drivers
|
|
|
|
eventEmitter := func(m string, args ...interface{}) {
|
|
|
|
msg := fmt.Sprintf(m, args...)
|
|
|
|
tr.logger.Debug("driver event", "event", msg)
|
2018-06-27 23:57:31 +00:00
|
|
|
tr.SetState("", structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
2018-06-29 00:20:13 +00:00
|
|
|
alloc := tr.Alloc()
|
2018-06-22 00:35:07 +00:00
|
|
|
driverCtx := driver.NewDriverContext(
|
|
|
|
alloc.Job.Name,
|
|
|
|
alloc.TaskGroup,
|
2018-06-29 22:39:22 +00:00
|
|
|
tr.taskName,
|
2018-06-29 00:20:13 +00:00
|
|
|
tr.allocID,
|
|
|
|
tr.clientConfig, // XXX Why does it need this
|
|
|
|
tr.clientConfig.Node, // XXX THIS I NEED TO FIX
|
2018-06-29 00:01:05 +00:00
|
|
|
tr.logger.StandardLogger(nil), // XXX Should pass this through
|
2018-06-22 00:35:07 +00:00
|
|
|
eventEmitter)
|
|
|
|
|
|
|
|
driver, err := driver.NewDriver(tr.task.Driver, driverCtx)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
tr.driver = driver
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-07-11 04:22:04 +00:00
|
|
|
// persistLocalState persists local state to disk synchronously.
|
|
|
|
func (tr *TaskRunner) persistLocalState() error {
|
|
|
|
// buffer for writing to boltdb
|
|
|
|
var buf bytes.Buffer
|
|
|
|
|
|
|
|
// Hash for skipping unnecessary writes
|
|
|
|
h, err := blake2b.New256(nil)
|
|
|
|
if err != nil {
|
|
|
|
// Programming error that should never happen!
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Multiplex writes to both
|
|
|
|
w := io.MultiWriter(h, &buf)
|
|
|
|
|
|
|
|
// Encode as msgpack value
|
2018-07-12 23:15:33 +00:00
|
|
|
tr.localStateLock.Lock()
|
|
|
|
err = codec.NewEncoder(w, structs.MsgpackHandle).Encode(&tr.localState)
|
|
|
|
tr.localStateLock.Unlock()
|
|
|
|
if err != nil {
|
2018-07-11 04:22:04 +00:00
|
|
|
return fmt.Errorf("failed to serialize snapshot: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the hashes are equal, skip the write
|
|
|
|
hashVal := h.Sum(nil)
|
|
|
|
if bytes.Equal(hashVal, tr.persistedHash) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start the transaction.
|
|
|
|
return tr.stateDB.Batch(func(tx *bolt.Tx) error {
|
|
|
|
// Grab the task bucket
|
|
|
|
//XXX move into new state pkg
|
|
|
|
taskBkt, err := oldstate.GetTaskBucket(tx, tr.allocID, tr.taskName)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := oldstate.PutData(taskBkt, taskRunnerStateAllKey, buf.Bytes()); err != nil {
|
|
|
|
return fmt.Errorf("failed to write task_runner state: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Store the hash that was persisted
|
|
|
|
tx.OnCommit(func() {
|
|
|
|
tr.persistedHash = hashVal
|
|
|
|
})
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetState sets the task runners allocation state.
|
2018-06-27 23:57:31 +00:00
|
|
|
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
|
2018-06-22 00:35:07 +00:00
|
|
|
// Ensure the event is populated with human readable strings
|
|
|
|
event.PopulateEventDisplayMessage()
|
|
|
|
|
2018-07-11 04:22:04 +00:00
|
|
|
task := tr.state
|
2018-06-22 00:35:07 +00:00
|
|
|
|
|
|
|
// Update the state of the task
|
|
|
|
if state != "" {
|
|
|
|
task.State = state
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle the event
|
|
|
|
if event == nil {
|
|
|
|
if event.FailsTask {
|
|
|
|
task.Failed = true
|
|
|
|
}
|
|
|
|
|
|
|
|
if event.Type == structs.TaskRestarting {
|
2018-06-29 00:20:13 +00:00
|
|
|
if !tr.clientConfig.DisableTaggedMetrics {
|
2018-06-22 00:35:07 +00:00
|
|
|
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
|
|
|
|
}
|
|
|
|
//if r.config.BackwardsCompatibleMetrics {
|
|
|
|
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
|
|
|
|
//}
|
|
|
|
task.Restarts++
|
|
|
|
task.LastRestart = time.Unix(0, event.Time)
|
|
|
|
}
|
|
|
|
appendTaskEvent(task, event)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Handle the state transistion.
|
|
|
|
switch state {
|
|
|
|
case structs.TaskStateRunning:
|
|
|
|
// Capture the start time if it is just starting
|
|
|
|
if task.State != structs.TaskStateRunning {
|
|
|
|
task.StartedAt = time.Now().UTC()
|
2018-06-29 00:20:13 +00:00
|
|
|
if !tr.clientConfig.DisableTaggedMetrics {
|
2018-06-22 00:35:07 +00:00
|
|
|
metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels)
|
|
|
|
}
|
|
|
|
//if r.config.BackwardsCompatibleMetrics {
|
|
|
|
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "running"}, 1)
|
|
|
|
//}
|
|
|
|
}
|
|
|
|
case structs.TaskStateDead:
|
|
|
|
// Capture the finished time if not already set
|
|
|
|
if task.FinishedAt.IsZero() {
|
|
|
|
task.FinishedAt = time.Now().UTC()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Emitting metrics to indicate task complete and failures
|
|
|
|
if task.Failed {
|
2018-06-29 00:20:13 +00:00
|
|
|
if !tr.clientConfig.DisableTaggedMetrics {
|
2018-06-22 00:35:07 +00:00
|
|
|
metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels)
|
|
|
|
}
|
|
|
|
//if r.config.BackwardsCompatibleMetrics {
|
|
|
|
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1)
|
|
|
|
//}
|
|
|
|
} else {
|
2018-06-29 00:20:13 +00:00
|
|
|
if !tr.clientConfig.DisableTaggedMetrics {
|
2018-06-22 00:35:07 +00:00
|
|
|
metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels)
|
|
|
|
}
|
|
|
|
//if r.config.BackwardsCompatibleMetrics {
|
|
|
|
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "complete"}, 1)
|
|
|
|
//}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a copy and notify the alloc runner of the transition
|
2018-06-29 00:01:05 +00:00
|
|
|
//FIXME
|
|
|
|
//if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil {
|
|
|
|
//tr.logger.Error("failed to save state", "error", err)
|
|
|
|
//}
|
2018-06-22 00:35:07 +00:00
|
|
|
}
|
|
|
|
|
2018-07-13 20:45:57 +00:00
|
|
|
func (tr *TaskRunner) EmitEvent(source, message string) {
|
|
|
|
event := structs.NewTaskEvent(source).SetMessage(message)
|
|
|
|
tr.SetState("", event)
|
|
|
|
}
|
|
|
|
|
2018-06-29 21:53:31 +00:00
|
|
|
// WaitCh is closed when TaskRunner.Run exits.
|
|
|
|
func (tr *TaskRunner) WaitCh() <-chan struct{} {
|
|
|
|
return tr.waitCh
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the running allocation with a new version received from the server.
|
|
|
|
//
|
|
|
|
// This method is safe for calling concurrently with Run() and does not modify
|
|
|
|
// the passed in allocation.
|
|
|
|
func (tr *TaskRunner) Update(update *structs.Allocation) {
|
|
|
|
select {
|
|
|
|
case tr.updateCh <- update:
|
|
|
|
case <-tr.WaitCh():
|
|
|
|
//XXX Do we log here like we used to? If we're just
|
|
|
|
//shutting down it's not an error to drop the update as
|
|
|
|
//it will be applied on startup
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown the task runner. Does not stop the task or garbage collect a
|
|
|
|
// stopped task.
|
|
|
|
//
|
|
|
|
// This method is safe for calling concurently with Run(). Callers must
|
|
|
|
// receive on WaitCh() to block until Run() has exited.
|
|
|
|
func (tr *TaskRunner) Shutdown() {
|
|
|
|
tr.ctxCancel()
|
|
|
|
}
|
|
|
|
|
2018-06-22 00:35:07 +00:00
|
|
|
// appendTaskEvent updates the task status by appending the new event.
|
|
|
|
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
|
|
|
|
capacity := 10
|
|
|
|
if state.Events == nil {
|
|
|
|
state.Events = make([]*structs.TaskEvent, 0, capacity)
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we hit capacity, then shift it.
|
|
|
|
if len(state.Events) == capacity {
|
|
|
|
old := state.Events
|
|
|
|
state.Events = make([]*structs.TaskEvent, 0, capacity)
|
|
|
|
state.Events = append(state.Events, old[1:]...)
|
|
|
|
}
|
|
|
|
|
|
|
|
state.Events = append(state.Events, event)
|
|
|
|
}
|