open-nomad/client/allocrunner/taskrunner/task_runner.go

796 lines
26 KiB
Go
Raw Normal View History

2018-06-22 00:35:07 +00:00
package taskrunner
import (
"context"
"fmt"
"sync"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
2018-10-04 23:22:01 +00:00
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
2018-06-22 00:35:07 +00:00
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
2018-07-12 23:15:33 +00:00
"github.com/hashicorp/nomad/client/vaultclient"
2018-06-22 00:35:07 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// killBackoffBaseline is the baseline time for exponential backoff while
// killing a task.
killBackoffBaseline = 5 * time.Second
// killBackoffLimit is the limit of the exponential backoff for killing
// the task.
killBackoffLimit = 2 * time.Minute
// killFailureLimit is how many times we will attempt to kill a task before
// giving up and potentially leaking resources.
killFailureLimit = 5
// triggerUpdatechCap is the capacity for the triggerUpdateCh used for
// triggering updates. It should be exactly 1 as even if multiple
// updates have come in since the last one was handled, we only need to
// handle the last one.
triggerUpdateChCap = 1
)
2018-06-22 00:35:07 +00:00
type TaskRunner struct {
// allocID and taskName are immutable so these fields may be accessed
// without locks
2018-06-29 22:39:22 +00:00
allocID string
taskName string
alloc *structs.Allocation
allocLock sync.Mutex
clientConfig *config.Config
2018-06-22 00:35:07 +00:00
// stateUpdater is used to emit updated task state
stateUpdater interfaces.TaskStateHandler
// state captures the state of the task for updating the allocation
state *structs.TaskState
stateLock sync.Mutex
// localState captures the node-local state of the task for when the
// Nomad agent restarts
2018-07-12 23:15:33 +00:00
localState *state.LocalState
localStateLock sync.RWMutex
// stateDB is for persisting localState and taskState
stateDB cstate.StateDB
// ctx is the task runner's context representing the tasks's lifecycle.
// Canceling the context will cause the task to be destroyed.
2018-06-22 00:35:07 +00:00
ctx context.Context
// ctxCancel is used to exit the task runner's Run loop without
// stopping the task. Shutdown hooks are run.
2018-06-22 00:35:07 +00:00
ctxCancel context.CancelFunc
// Logger is the logger for the task runner.
logger log.Logger
// triggerUpdateCh is ticked whenever update hooks need to be run and
// must be created with cap=1 to signal a pending update and prevent
// callers from deadlocking if the receiver has exited.
triggerUpdateCh chan struct{}
2018-06-29 21:53:31 +00:00
2018-06-22 00:35:07 +00:00
// waitCh is closed when the task runner has transitioned to a terminal
// state
waitCh chan struct{}
// driver is the driver for the task.
driver driver.Driver
2018-09-28 00:32:17 +00:00
// handleLock guards access to handle and handleResult
handleLock sync.Mutex
// handle to the running driver
handle driver.DriverHandle
// handleResult proxies wait results from drivers
handleResult *handleResult
2018-06-22 00:35:07 +00:00
// task is the task being run
2018-06-22 00:35:07 +00:00
task *structs.Task
taskLock sync.RWMutex
// taskDir is the directory structure for this task.
taskDir *allocdir.TaskDir
// envBuilder is used to build the task's environment
envBuilder *env.Builder
// restartTracker is used to decide if the task should be restarted.
restartTracker *restarts.RestartTracker
// runnerHooks are task runner lifecycle hooks that should be run on state
// transistions.
runnerHooks []interfaces.TaskHook
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
2018-07-12 23:15:33 +00:00
// vaultClient is the client to use to derive and renew Vault tokens
vaultClient vaultclient.VaultClient
// vaultToken is the current Vault token. It should be accessed with the
// getter.
vaultToken string
vaultTokenLock sync.Mutex
2018-06-22 00:35:07 +00:00
// baseLabels are used when emitting tagged metrics. All task runner metrics
// will have these tags, and optionally more.
baseLabels []metrics.Label
// logmonHookConfig is used to get the paths to the stdout and stderr fifos
// to be passed to the driver for task logging
logmonHookConfig *logmonHookConfig
// resourceUsage is written via UpdateStats and read via
// LatestResourceUsage. May be nil at all times.
resourceUsage *cstructs.TaskResourceUsage
resourceUsageLock sync.Mutex
2018-06-22 00:35:07 +00:00
}
type Config struct {
Alloc *structs.Allocation
ClientConfig *config.Config
Consul consul.ConsulServiceAPI
Task *structs.Task
TaskDir *allocdir.TaskDir
Logger log.Logger
2018-06-22 00:35:07 +00:00
2018-07-12 23:15:33 +00:00
// VaultClient is the client to use to derive and renew Vault tokens
VaultClient vaultclient.VaultClient
// LocalState is optionally restored task state
LocalState *state.LocalState
// StateDB is used to store and restore state.
StateDB cstate.StateDB
// StateUpdater is used to emit updated task state
StateUpdater interfaces.TaskStateHandler
2018-06-22 00:35:07 +00:00
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
// Create a context for the runner
trCtx, trCancel := context.WithCancel(context.Background())
// Initialize the environment builder
envBuilder := env.NewBuilder(
config.ClientConfig.Node,
config.Alloc,
2018-06-22 00:35:07 +00:00
config.Task,
2018-06-29 21:53:31 +00:00
config.ClientConfig.Region,
)
2018-06-22 00:35:07 +00:00
tr := &TaskRunner{
client: expose task state to client The interesting decision in this commit was to expose AR's state and not a fully materialized Allocation struct. AR.clientAlloc builds an Alloc that contains the task state, so I considered simply memoizing and exposing that method. However, that would lead to AR having two awkwardly similar methods: - Alloc() - which returns the server-sent alloc - ClientAlloc() - which returns the fully materialized client alloc Since ClientAlloc() could be memoized it would be just as cheap to call as Alloc(), so why not replace Alloc() entirely? Replacing Alloc() entirely would require Update() to immediately materialize the task states on server-sent Allocs as there may have been local task state changes since the server received an Alloc update. This quickly becomes difficult to reason about: should Update hooks use the TaskStates? Are state changes caused by TR Update hooks immediately reflected in the Alloc? Should AR persist its copy of the Alloc? If so, are its TaskStates canonical or the TaskStates on TR? So! Forget that. Let's separate the static Allocation from the dynamic AR & TR state! - AR.Alloc() is for static Allocation access (often for the Job) - AR.AllocState() is for the dynamic AR & TR runtime state (deployment status, task states, etc). If code needs to know the status of a task: AllocState() If code needs to know the names of tasks: Alloc() It should be very easy for a developer to reason about which method they should call and what they can do with the return values.
2018-09-27 00:08:43 +00:00
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
state: config.Alloc.TaskStates[config.Task.Name].Copy(),
localState: config.LocalState,
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
ctx: trCtx,
ctxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
2018-06-22 00:35:07 +00:00
}
// Create the logger based on the allocation ID
tr.logger = config.Logger.Named("task_runner").With("task", config.Task.Name)
// Build the restart tracker.
2018-06-29 21:53:31 +00:00
tg := tr.alloc.Job.LookupTaskGroup(tr.alloc.TaskGroup)
2018-06-22 00:35:07 +00:00
if tg == nil {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
}
2018-06-29 21:53:31 +00:00
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
2018-06-22 00:35:07 +00:00
// Initialize the task state
tr.initState()
// Get the driver
if err := tr.initDriver(); err != nil {
tr.logger.Error("failed to create driver", "error", err)
return nil, err
}
// Initialize the runners hooks.
tr.initHooks()
// Initialize base labels
tr.initLabels()
return tr, nil
}
func (tr *TaskRunner) initState() {
if tr.state == nil {
tr.state = &structs.TaskState{
State: structs.TaskStatePending,
2018-06-22 00:35:07 +00:00
}
}
if tr.localState == nil {
tr.localState = state.NewLocalState()
}
2018-06-22 00:35:07 +00:00
}
func (tr *TaskRunner) initLabels() {
alloc := tr.Alloc()
2018-06-22 00:35:07 +00:00
tr.baseLabels = []metrics.Label{
{
Name: "job",
Value: alloc.Job.Name,
},
{
Name: "task_group",
Value: alloc.TaskGroup,
},
{
Name: "alloc_id",
Value: tr.allocID,
2018-06-22 00:35:07 +00:00
},
{
Name: "task",
2018-06-29 22:39:22 +00:00
Value: tr.taskName,
2018-06-22 00:35:07 +00:00
},
}
}
func (tr *TaskRunner) Run() {
defer close(tr.waitCh)
var waitRes *dstructs.WaitResult
2018-06-22 00:35:07 +00:00
// Updates are handled asynchronously with the other hooks but each
// triggered update - whether due to alloc updates or a new vault token
// - should be handled serially.
go tr.handleUpdates()
MAIN:
for tr.ctx.Err() == nil {
2018-07-17 00:19:56 +00:00
// Run the prestart hooks
if err := tr.prestart(); err != nil {
tr.logger.Error("prestart failed", "error", err)
2018-06-22 00:35:07 +00:00
tr.restartTracker.SetStartError(err)
goto RESTART
}
if tr.ctx.Err() != nil {
break MAIN
}
2018-06-29 21:53:31 +00:00
// Run the task
if err := tr.runDriver(); err != nil {
2018-06-22 00:35:07 +00:00
tr.logger.Error("running driver failed", "error", err)
tr.restartTracker.SetStartError(err)
goto RESTART
}
2018-07-17 00:19:56 +00:00
// Run the poststart hooks
if err := tr.poststart(); err != nil {
tr.logger.Error("poststart failed", "error", err)
2018-06-22 00:35:07 +00:00
}
// Grab the result proxy and wait for task to exit
{
_, result := tr.getDriverHandle()
// Do *not* use tr.ctx here as it would cause Wait() to
// unblock before the task exits when Kill() is called.
waitRes = result.Wait(context.Background())
2018-06-22 00:35:07 +00:00
}
// Clear the handle
tr.clearDriverHandle()
// Store the wait result on the restart tracker
tr.restartTracker.SetWaitResult(waitRes)
if err := tr.exited(); err != nil {
tr.logger.Error("exited hooks failed", "error", err)
}
2018-06-22 00:35:07 +00:00
RESTART:
restart, restartDelay := tr.shouldRestart()
if !restart {
break MAIN
}
// Actually restart by sleeping and also watching for destroy events
select {
case <-time.After(restartDelay):
case <-tr.ctx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN
2018-06-22 00:35:07 +00:00
}
}
// If task terminated, update server. All other exit conditions (eg
// killed or out of restarts) will perform their own server updates.
if waitRes != nil {
event := structs.NewTaskEvent(structs.TaskTerminated).
SetExitCode(waitRes.ExitCode).
SetSignal(waitRes.Signal).
SetExitMessage(waitRes.Err)
tr.UpdateState(structs.TaskStateDead, event)
}
2018-06-22 00:35:07 +00:00
2018-07-17 00:19:56 +00:00
// Run the stop hooks
if err := tr.stop(); err != nil {
tr.logger.Error("stop failed", "error", err)
2018-06-22 00:35:07 +00:00
}
tr.logger.Debug("task run loop exiting")
2018-06-22 00:35:07 +00:00
}
// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits
// when Run has returned. Should only be run in a goroutine from Run.
func (tr *TaskRunner) handleUpdates() {
for {
select {
case <-tr.triggerUpdateCh:
case <-tr.waitCh:
return
}
if tr.Alloc().TerminalStatus() {
// Terminal update: kill TaskRunner and let Run execute postrun hooks
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
tr.logger.Warn("error stopping task", "error", err)
}
continue
}
// Non-terminal update; run hooks
tr.updateHooks()
}
}
// shouldRestart determines whether the task should be restarted and updates
// the task state unless the task is killed or terminated.
2018-06-22 00:35:07 +00:00
func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
// Determine if we should restart
state, when := tr.restartTracker.GetState()
reason := tr.restartTracker.GetReason()
switch state {
case structs.TaskKilled:
// Never restart an explicitly killed task. Kill method handles
// updating the server.
return false, 0
2018-06-22 00:35:07 +00:00
case structs.TaskNotRestarting, structs.TaskTerminated:
tr.logger.Info("not restarting task", "reason", reason)
if state == structs.TaskNotRestarting {
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask())
2018-06-22 00:35:07 +00:00
}
return false, 0
case structs.TaskRestarting:
tr.logger.Info("restarting task", "reason", reason, "delay", when)
tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason))
2018-06-22 00:35:07 +00:00
return true, 0
default:
tr.logger.Error("restart tracker returned unknown state", "state", state)
return true, when
}
}
// runDriver runs the driver and waits for it to exit
func (tr *TaskRunner) runDriver() error {
2018-06-22 00:35:07 +00:00
// Run prestart
ctx := driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
_, err := tr.driver.Prestart(ctx, tr.task)
if err != nil {
tr.logger.Error("driver pre-start failed", "error", err)
return err
2018-06-22 00:35:07 +00:00
}
// Create a new context for Start since the environment may have been updated.
ctx = driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
ctx.StdoutFifo = tr.logmonHookConfig.stdoutFifo
ctx.StderrFifo = tr.logmonHookConfig.stderrFifo
2018-06-22 00:35:07 +00:00
// Start the job
sresp, err := tr.driver.Start(ctx, tr.task)
if err != nil {
tr.logger.Warn("driver start failed", "error", err)
return err
2018-06-22 00:35:07 +00:00
}
// Store the driver handle and associated metadata
tr.setDriverHandle(sresp.Handle)
2018-06-22 00:35:07 +00:00
// Emit an event that we started
tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
return nil
2018-06-22 00:35:07 +00:00
}
// initDriver creates the driver for the task
func (tr *TaskRunner) initDriver() error {
// Create a task-specific event emitter callback to expose minimal
// state to drivers
//XXX Replace with EmitEvent -- no need for a shim
2018-06-22 00:35:07 +00:00
eventEmitter := func(m string, args ...interface{}) {
msg := fmt.Sprintf(m, args...)
tr.logger.Debug("driver event", "event", msg)
tr.EmitEvent(structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
2018-06-22 00:35:07 +00:00
}
alloc := tr.Alloc()
2018-06-22 00:35:07 +00:00
driverCtx := driver.NewDriverContext(
alloc.Job.Name,
alloc.TaskGroup,
2018-06-29 22:39:22 +00:00
tr.taskName,
tr.allocID,
tr.clientConfig, // XXX Why does it need this
tr.clientConfig.Node, // XXX THIS I NEED TO FIX
tr.logger.StandardLogger(nil), // XXX Should pass this through
2018-06-22 00:35:07 +00:00
eventEmitter)
driver, err := driver.NewDriver(tr.task.Driver, driverCtx)
if err != nil {
return err
}
tr.driver = driver
return nil
}
// handleDestroy kills the task handle. In the case that killing fails,
// handleDestroy will retry with an exponential backoff and will give up at a
// given limit. It returns whether the task was destroyed and the error
// associated with the last kill attempt.
func (tr *TaskRunner) handleDestroy(handle driver.DriverHandle) (destroyed bool, err error) {
// Cap the number of times we attempt to kill the task.
for i := 0; i < killFailureLimit; i++ {
if err = handle.Kill(); err != nil {
// Calculate the new backoff
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
if backoff > killBackoffLimit {
backoff = killBackoffLimit
}
tr.logger.Error("failed to kill task", "backoff", backoff, "error", err)
time.Sleep(backoff)
} else {
// Kill was successful
return true, nil
}
}
return
}
// persistLocalState persists local state to disk synchronously.
func (tr *TaskRunner) persistLocalState() error {
tr.localStateLock.Lock()
defer tr.localStateLock.Unlock()
return tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState)
}
2018-07-18 18:43:08 +00:00
// XXX If the objects don't exists since the client shutdown before the task
// runner ever saved state, then we should treat it as a new task runner and not
// return an error
//
// Restore task runner state. Called by AllocRunner.Restore after NewTaskRunner
// but before Run so no locks need to be acquired.
func (tr *TaskRunner) Restore() error {
ls, ts, err := tr.stateDB.GetTaskRunnerState(tr.allocID, tr.taskName)
if err != nil {
return err
}
2018-07-18 18:43:08 +00:00
tr.localState = ls
tr.state = ts
return nil
}
// UpdateState sets the task runners allocation state and triggers a server
// update.
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
tr.logger.Debug("setting task state", "state", state, "event", event.Type)
2018-07-19 00:06:44 +00:00
// Update the local state
stateCopy := tr.setStateLocal(state, event)
// Notify the alloc runner of the transition
tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy)
2018-07-19 00:06:44 +00:00
}
// setStateLocal updates the local in-memory state, persists a copy to disk and returns a
// copy of the task's state.
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
2018-06-22 00:35:07 +00:00
//XXX REMOVE ME AFTER TESTING
2018-07-17 20:48:53 +00:00
if state == "" {
panic("UpdateState must not be called with an empty state")
2018-06-22 00:35:07 +00:00
}
2018-07-19 00:06:44 +00:00
// Update the task state
oldState := tr.state.State
2018-07-19 00:06:44 +00:00
taskState := tr.state
taskState.State = state
// Append the event
tr.appendEvent(event)
2018-06-22 00:35:07 +00:00
// Handle the state transition.
2018-06-22 00:35:07 +00:00
switch state {
case structs.TaskStateRunning:
// Capture the start time if it is just starting
if oldState != structs.TaskStateRunning {
taskState.StartedAt = time.Now().UTC()
if !tr.clientConfig.DisableTaggedMetrics {
2018-06-22 00:35:07 +00:00
metrics.IncrCounterWithLabels([]string{"client", "allocs", "running"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "running"}, 1)
//}
}
case structs.TaskStateDead:
// Capture the finished time if not already set
if taskState.FinishedAt.IsZero() {
taskState.FinishedAt = time.Now().UTC()
2018-06-22 00:35:07 +00:00
}
// Emitting metrics to indicate task complete and failures
if taskState.Failed {
if !tr.clientConfig.DisableTaggedMetrics {
2018-06-22 00:35:07 +00:00
metrics.IncrCounterWithLabels([]string{"client", "allocs", "failed"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "failed"}, 1)
//}
} else {
if !tr.clientConfig.DisableTaggedMetrics {
2018-06-22 00:35:07 +00:00
metrics.IncrCounterWithLabels([]string{"client", "allocs", "complete"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "complete"}, 1)
//}
}
}
// Persist the state and event
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState); err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
2018-07-19 00:06:44 +00:00
return tr.state.Copy()
2018-06-22 00:35:07 +00:00
}
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
// transition states.
// Events are persisted locally and sent to the server, but errors are simply
// logged. Use AppendEvent to simply add a new event.
func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
tr.appendEvent(event)
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Warn("error persisting event", "error", err, "event", event)
}
// Notify the alloc runner of the event
tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy())
}
// AppendEvent appends a new TaskEvent to this task's TaskState. The actual
// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
// transition states.
// Events are persisted locally and errors are simply logged. Use EmitEvent
// also update AllocRunner.
func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
tr.appendEvent(event)
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Warn("error persisting event", "error", err, "event", event)
}
}
// appendEvent to task's event slice. Caller must acquire stateLock.
func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
// Ensure the event is populated with human readable strings
event.PopulateEventDisplayMessage()
2018-10-05 02:36:40 +00:00
// Propagate failure from event to task state
if event.FailsTask {
tr.state.Failed = true
}
// XXX This seems like a super awkward spot for this? Why not shouldRestart?
// Update restart metrics
if event.Type == structs.TaskRestarting {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.IncrCounterWithLabels([]string{"client", "allocs", "restart"}, 1, tr.baseLabels)
}
//if r.config.BackwardsCompatibleMetrics {
//metrics.IncrCounter([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, taskName, "restart"}, 1)
//}
tr.state.Restarts++
tr.state.LastRestart = time.Unix(0, event.Time)
}
// Append event to slice
appendTaskEvent(tr.state, event)
2018-07-17 20:42:45 +00:00
return nil
2018-07-13 20:45:57 +00:00
}
2018-06-29 21:53:31 +00:00
// WaitCh is closed when TaskRunner.Run exits.
func (tr *TaskRunner) WaitCh() <-chan struct{} {
return tr.waitCh
}
// Update the running allocation with a new version received from the server.
// Calls Update hooks asynchronously with Run().
2018-06-29 21:53:31 +00:00
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (tr *TaskRunner) Update(update *structs.Allocation) {
// Update tr.alloc
tr.setAlloc(update)
// Trigger update hooks
tr.triggerUpdateHooks()
}
// triggerUpdate if there isn't already an update pending. Should be called
// instead of calling updateHooks directly to serialize runs of update hooks.
// TaskRunner state should be updated prior to triggering update hooks.
//
// Does not block.
func (tr *TaskRunner) triggerUpdateHooks() {
2018-06-29 21:53:31 +00:00
select {
case tr.triggerUpdateCh <- struct{}{}:
default:
// already an update hook pending
2018-06-29 21:53:31 +00:00
}
}
// LatestResourceUsage returns the last resource utilization datapoint
// collected. May return nil if the task is not running or no resource
// utilization has been collected yet.
func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage {
tr.resourceUsageLock.Lock()
ru := tr.resourceUsage
tr.resourceUsageLock.Unlock()
return ru
}
// UpdateStats updates and emits the latest stats from the driver.
func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
tr.resourceUsageLock.Lock()
tr.resourceUsage = ru
tr.resourceUsageLock.Unlock()
if ru != nil {
tr.emitStats(ru)
}
}
//TODO Remove Backwardscompat or use tr.Alloc()?
func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"},
float32(ru.ResourceUsage.MemoryStats.Cache), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"},
float32(ru.ResourceUsage.MemoryStats.Swap), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"},
float32(ru.ResourceUsage.MemoryStats.MaxUsage), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelUsage), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), tr.baseLabels)
}
if tr.clientConfig.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "max_usage"}, float32(ru.ResourceUsage.MemoryStats.MaxUsage))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage))
}
}
//TODO Remove Backwardscompat or use tr.Alloc()?
func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"},
float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"},
float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"},
float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"},
float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"},
float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"},
float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels)
}
if tr.clientConfig.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_time"}, float32(ru.ResourceUsage.CpuStats.ThrottledTime))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_periods"}, float32(ru.ResourceUsage.CpuStats.ThrottledPeriods))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks))
}
}
// emitStats emits resource usage stats of tasks to remote metrics collector
// sinks
func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.PublishAllocationMetrics {
return
}
if ru.ResourceUsage.MemoryStats != nil {
tr.setGaugeForMemory(ru)
}
if ru.ResourceUsage.CpuStats != nil {
tr.setGaugeForCPU(ru)
}
}
2018-06-22 00:35:07 +00:00
// appendTaskEvent updates the task status by appending the new event.
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
const capacity = 10
2018-06-22 00:35:07 +00:00
if state.Events == nil {
state.Events = make([]*structs.TaskEvent, 1, capacity)
state.Events[0] = event
return
2018-06-22 00:35:07 +00:00
}
// If we hit capacity, then shift it.
if len(state.Events) == capacity {
old := state.Events
state.Events = make([]*structs.TaskEvent, 0, capacity)
state.Events = append(state.Events, old[1:]...)
}
state.Events = append(state.Events, event)
}