2015-08-23 22:30:16 +00:00
|
|
|
package client
|
|
|
|
|
|
|
|
import (
|
2015-08-30 01:16:49 +00:00
|
|
|
"crypto/md5"
|
|
|
|
"encoding/hex"
|
2015-08-29 22:46:10 +00:00
|
|
|
"fmt"
|
2015-08-23 22:30:16 +00:00
|
|
|
"log"
|
2015-08-30 01:16:49 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2015-08-23 22:30:16 +00:00
|
|
|
"sync"
|
2015-11-03 17:24:26 +00:00
|
|
|
"time"
|
2015-08-23 22:30:16 +00:00
|
|
|
|
2016-06-01 14:36:07 +00:00
|
|
|
"github.com/armon/go-metrics"
|
|
|
|
|
2016-02-07 01:08:12 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2015-08-30 01:16:49 +00:00
|
|
|
"github.com/hashicorp/nomad/client/config"
|
2015-08-23 23:49:48 +00:00
|
|
|
"github.com/hashicorp/nomad/client/driver"
|
2016-03-15 17:53:20 +00:00
|
|
|
"github.com/hashicorp/nomad/client/getter"
|
2016-05-09 14:57:26 +00:00
|
|
|
"github.com/hashicorp/nomad/client/stats"
|
2015-08-23 22:30:16 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
2015-11-14 06:07:13 +00:00
|
|
|
|
2016-04-12 01:46:16 +00:00
|
|
|
"github.com/hashicorp/nomad/client/driver/env"
|
2015-11-14 06:07:13 +00:00
|
|
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
2015-08-23 22:30:16 +00:00
|
|
|
)
|
|
|
|
|
2016-02-17 05:00:49 +00:00
|
|
|
const (
|
|
|
|
// killBackoffBaseline is the baseline time for exponential backoff while
|
|
|
|
// killing a task.
|
|
|
|
killBackoffBaseline = 5 * time.Second
|
|
|
|
|
|
|
|
// killBackoffLimit is the the limit of the exponential backoff for killing
|
|
|
|
// the task.
|
2016-03-03 00:27:01 +00:00
|
|
|
killBackoffLimit = 2 * time.Minute
|
2016-02-17 05:00:49 +00:00
|
|
|
|
|
|
|
// killFailureLimit is how many times we will attempt to kill a task before
|
|
|
|
// giving up and potentially leaking resources.
|
2016-03-03 00:27:01 +00:00
|
|
|
killFailureLimit = 5
|
2016-02-17 05:00:49 +00:00
|
|
|
)
|
|
|
|
|
2016-05-09 16:53:00 +00:00
|
|
|
// TaskStatsReporter exposes APIs to query resource usage of a Task
|
2016-05-09 14:57:26 +00:00
|
|
|
type TaskStatsReporter interface {
|
2016-05-25 19:49:38 +00:00
|
|
|
// ResourceUsage returns the latest resource usage data point collected for
|
|
|
|
// the task
|
2016-05-27 21:15:51 +00:00
|
|
|
ResourceUsage() []*cstructs.TaskResourceUsage
|
2016-05-25 19:49:38 +00:00
|
|
|
|
|
|
|
// ResourceUsageTS returns all the resource usage data points since a given
|
|
|
|
// time
|
2016-05-27 21:15:51 +00:00
|
|
|
ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage
|
2016-05-09 14:57:26 +00:00
|
|
|
}
|
|
|
|
|
2015-08-23 22:32:46 +00:00
|
|
|
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
|
|
|
|
type TaskRunner struct {
|
2015-11-05 19:12:31 +00:00
|
|
|
config *config.Config
|
|
|
|
updater TaskStateUpdater
|
|
|
|
logger *log.Logger
|
|
|
|
ctx *driver.ExecContext
|
2015-12-14 22:53:49 +00:00
|
|
|
alloc *structs.Allocation
|
2015-12-18 20:17:13 +00:00
|
|
|
restartTracker *RestartTracker
|
2015-08-23 22:30:16 +00:00
|
|
|
|
2016-05-25 04:33:13 +00:00
|
|
|
resourceUsage *stats.RingBuff
|
|
|
|
resourceUsageLock sync.RWMutex
|
2016-04-29 18:06:19 +00:00
|
|
|
|
2016-02-10 21:44:53 +00:00
|
|
|
task *structs.Task
|
2016-04-12 01:46:16 +00:00
|
|
|
taskEnv *env.TaskEnvironment
|
2016-02-10 21:44:53 +00:00
|
|
|
updateCh chan *structs.Allocation
|
|
|
|
handle driver.DriverHandle
|
|
|
|
handleLock sync.Mutex
|
2015-08-23 22:30:16 +00:00
|
|
|
|
2016-03-15 17:53:20 +00:00
|
|
|
// artifactsDownloaded tracks whether the tasks artifacts have been
|
|
|
|
// downloaded
|
|
|
|
artifactsDownloaded bool
|
|
|
|
|
2015-08-23 22:30:16 +00:00
|
|
|
destroy bool
|
|
|
|
destroyCh chan struct{}
|
|
|
|
destroyLock sync.Mutex
|
2015-08-30 02:42:35 +00:00
|
|
|
waitCh chan struct{}
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
|
|
|
|
2015-08-30 01:16:49 +00:00
|
|
|
// taskRunnerState is used to snapshot the state of the task runner
|
|
|
|
type taskRunnerState struct {
|
2016-03-29 00:24:10 +00:00
|
|
|
Version string
|
|
|
|
Task *structs.Task
|
|
|
|
HandleID string
|
|
|
|
ArtifactDownloaded bool
|
2015-08-30 01:16:49 +00:00
|
|
|
}
|
|
|
|
|
2015-11-14 06:07:13 +00:00
|
|
|
// TaskStateUpdater is used to signal that tasks state has changed.
|
2016-02-02 19:09:29 +00:00
|
|
|
type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
|
2015-08-30 02:42:35 +00:00
|
|
|
|
2015-08-23 22:32:46 +00:00
|
|
|
// NewTaskRunner is used to create a new task context
|
2015-08-30 02:42:35 +00:00
|
|
|
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
|
|
|
updater TaskStateUpdater, ctx *driver.ExecContext,
|
2016-03-23 19:19:19 +00:00
|
|
|
alloc *structs.Allocation, task *structs.Task) *TaskRunner {
|
2016-02-04 00:16:48 +00:00
|
|
|
|
2016-02-05 20:07:56 +00:00
|
|
|
// Merge in the task resources
|
|
|
|
task.Resources = alloc.TaskResources[task.Name]
|
|
|
|
|
2016-02-04 00:16:48 +00:00
|
|
|
// Build the restart tracker.
|
|
|
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
|
|
if tg == nil {
|
|
|
|
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
|
2015-11-03 17:24:26 +00:00
|
|
|
|
2016-05-25 05:30:10 +00:00
|
|
|
resourceUsage, err := stats.NewRingBuff(config.StatsDataPoints)
|
2016-05-09 14:57:26 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Printf("[ERR] client: can't create resource usage buffer: %v", err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-08-23 22:32:46 +00:00
|
|
|
tc := &TaskRunner{
|
2015-11-05 19:12:31 +00:00
|
|
|
config: config,
|
|
|
|
updater: updater,
|
|
|
|
logger: logger,
|
2015-11-06 00:38:19 +00:00
|
|
|
restartTracker: restartTracker,
|
2016-05-09 14:57:26 +00:00
|
|
|
resourceUsage: resourceUsage,
|
2015-11-05 19:12:31 +00:00
|
|
|
ctx: ctx,
|
2015-12-14 22:53:49 +00:00
|
|
|
alloc: alloc,
|
2015-11-05 19:12:31 +00:00
|
|
|
task: task,
|
2016-03-03 00:27:01 +00:00
|
|
|
updateCh: make(chan *structs.Allocation, 64),
|
2015-11-05 19:12:31 +00:00
|
|
|
destroyCh: make(chan struct{}),
|
|
|
|
waitCh: make(chan struct{}),
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
2016-02-19 22:49:43 +00:00
|
|
|
|
2015-08-23 22:30:16 +00:00
|
|
|
return tc
|
|
|
|
}
|
|
|
|
|
2016-03-22 20:49:52 +00:00
|
|
|
// MarkReceived marks the task as received.
|
|
|
|
func (r *TaskRunner) MarkReceived() {
|
|
|
|
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
|
|
|
|
}
|
|
|
|
|
2015-08-29 22:46:10 +00:00
|
|
|
// WaitCh returns a channel to wait for termination
|
|
|
|
func (r *TaskRunner) WaitCh() <-chan struct{} {
|
|
|
|
return r.waitCh
|
|
|
|
}
|
|
|
|
|
2015-08-30 01:16:49 +00:00
|
|
|
// stateFilePath returns the path to our state file
|
|
|
|
func (r *TaskRunner) stateFilePath() string {
|
|
|
|
// Get the MD5 of the task name
|
|
|
|
hashVal := md5.Sum([]byte(r.task.Name))
|
|
|
|
hashHex := hex.EncodeToString(hashVal[:])
|
|
|
|
dirName := fmt.Sprintf("task-%s", hashHex)
|
|
|
|
|
|
|
|
// Generate the path
|
2015-12-14 22:53:49 +00:00
|
|
|
path := filepath.Join(r.config.StateDir, "alloc", r.alloc.ID,
|
2015-08-30 01:16:49 +00:00
|
|
|
dirName, "state.json")
|
|
|
|
return path
|
|
|
|
}
|
|
|
|
|
|
|
|
// RestoreState is used to restore our state
|
|
|
|
func (r *TaskRunner) RestoreState() error {
|
|
|
|
// Load the snapshot
|
|
|
|
var snap taskRunnerState
|
|
|
|
if err := restoreState(r.stateFilePath(), &snap); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Restore fields
|
|
|
|
r.task = snap.Task
|
2016-03-29 00:24:10 +00:00
|
|
|
r.artifactsDownloaded = snap.ArtifactDownloaded
|
2015-08-30 01:16:49 +00:00
|
|
|
|
2016-04-13 21:24:08 +00:00
|
|
|
if err := r.setTaskEnv(); err != nil {
|
|
|
|
err := fmt.Errorf("failed to create task environment for task %q in allocation %q: %v",
|
|
|
|
r.task.Name, r.alloc.ID, err)
|
|
|
|
r.logger.Printf("[ERR] client: %s", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-08-30 02:14:47 +00:00
|
|
|
// Restore the driver
|
|
|
|
if snap.HandleID != "" {
|
|
|
|
driver, err := r.createDriver()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
handle, err := driver.Open(r.ctx, snap.HandleID)
|
2015-11-09 23:55:31 +00:00
|
|
|
|
|
|
|
// In the case it fails, we relaunch the task in the Run() method.
|
2015-08-30 02:14:47 +00:00
|
|
|
if err != nil {
|
|
|
|
r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v",
|
2015-12-14 22:53:49 +00:00
|
|
|
r.task.Name, r.alloc.ID, err)
|
2015-11-09 23:55:31 +00:00
|
|
|
return nil
|
2015-08-30 02:14:47 +00:00
|
|
|
}
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Lock()
|
2015-08-30 02:14:47 +00:00
|
|
|
r.handle = handle
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Unlock()
|
2015-08-30 02:14:47 +00:00
|
|
|
}
|
2015-08-30 01:16:49 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SaveState is used to snapshot our state
|
|
|
|
func (r *TaskRunner) SaveState() error {
|
|
|
|
snap := taskRunnerState{
|
2016-03-29 00:24:10 +00:00
|
|
|
Task: r.task,
|
|
|
|
Version: r.config.Version,
|
|
|
|
ArtifactDownloaded: r.artifactsDownloaded,
|
2015-08-30 01:16:49 +00:00
|
|
|
}
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Lock()
|
2015-08-30 02:14:47 +00:00
|
|
|
if r.handle != nil {
|
|
|
|
snap.HandleID = r.handle.ID()
|
|
|
|
}
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Unlock()
|
2015-08-30 01:16:49 +00:00
|
|
|
return persistState(r.stateFilePath(), &snap)
|
|
|
|
}
|
|
|
|
|
|
|
|
// DestroyState is used to cleanup after ourselves
|
|
|
|
func (r *TaskRunner) DestroyState() error {
|
|
|
|
return os.RemoveAll(r.stateFilePath())
|
|
|
|
}
|
|
|
|
|
2015-11-14 06:07:13 +00:00
|
|
|
// setState is used to update the state of the task runner
|
|
|
|
func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
|
|
|
|
// Persist our state to disk.
|
2015-11-12 23:53:42 +00:00
|
|
|
if err := r.SaveState(); err != nil {
|
|
|
|
r.logger.Printf("[ERR] client: failed to save state of Task Runner: %v", r.task.Name)
|
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
|
|
|
|
// Indicate the task has been updated.
|
2016-02-02 19:09:29 +00:00
|
|
|
r.updater(r.task.Name, state, event)
|
2016-02-02 01:47:53 +00:00
|
|
|
}
|
|
|
|
|
2016-04-12 01:46:16 +00:00
|
|
|
// setTaskEnv sets the task environment. It returns an error if it could not be
|
|
|
|
// created.
|
|
|
|
func (r *TaskRunner) setTaskEnv() error {
|
2016-05-06 05:01:17 +00:00
|
|
|
taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task.Copy(), r.alloc)
|
2016-01-11 17:58:26 +00:00
|
|
|
if err != nil {
|
2016-04-12 01:46:16 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.taskEnv = taskEnv
|
|
|
|
return nil
|
|
|
|
}
|
2016-01-11 17:58:26 +00:00
|
|
|
|
2016-04-12 01:46:16 +00:00
|
|
|
// createDriver makes a driver for the task
|
|
|
|
func (r *TaskRunner) createDriver() (driver.Driver, error) {
|
|
|
|
if r.taskEnv == nil {
|
2016-04-13 21:24:08 +00:00
|
|
|
err := fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
|
|
|
|
return nil, err
|
2016-01-11 17:58:26 +00:00
|
|
|
}
|
|
|
|
|
2016-04-12 01:46:16 +00:00
|
|
|
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, r.taskEnv)
|
2015-09-10 01:06:23 +00:00
|
|
|
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
|
2015-08-23 23:49:48 +00:00
|
|
|
if err != nil {
|
2015-08-30 02:14:47 +00:00
|
|
|
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
|
2015-12-14 22:53:49 +00:00
|
|
|
r.task.Driver, r.alloc.ID, err)
|
2015-08-30 02:14:47 +00:00
|
|
|
r.logger.Printf("[ERR] client: %s", err)
|
2016-01-11 17:58:26 +00:00
|
|
|
return nil, err
|
2015-08-30 02:14:47 +00:00
|
|
|
}
|
|
|
|
return driver, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run is a long running routine used to manage the task
|
|
|
|
func (r *TaskRunner) Run() {
|
|
|
|
defer close(r.waitCh)
|
|
|
|
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
|
2015-12-14 22:53:49 +00:00
|
|
|
r.task.Name, r.alloc.ID)
|
2015-08-30 02:14:47 +00:00
|
|
|
|
2016-03-24 17:55:14 +00:00
|
|
|
if err := r.validateTask(); err != nil {
|
|
|
|
r.setState(
|
|
|
|
structs.TaskStateDead,
|
|
|
|
structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-04-12 01:46:16 +00:00
|
|
|
if err := r.setTaskEnv(); err != nil {
|
|
|
|
r.setState(
|
|
|
|
structs.TaskStateDead,
|
|
|
|
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-11-16 20:26:11 +00:00
|
|
|
r.run()
|
2015-11-14 06:07:13 +00:00
|
|
|
return
|
|
|
|
}
|
2015-08-23 23:49:48 +00:00
|
|
|
|
2016-03-24 17:55:14 +00:00
|
|
|
// validateTask validates the fields of the task and returns an error if the
|
|
|
|
// task is invalid.
|
|
|
|
func (r *TaskRunner) validateTask() error {
|
|
|
|
var mErr multierror.Error
|
|
|
|
|
|
|
|
// Validate the user.
|
|
|
|
unallowedUsers := r.config.ReadStringListToMapDefault("user.blacklist", config.DefaultUserBlacklist)
|
|
|
|
checkDrivers := r.config.ReadStringListToMapDefault("user.checked_drivers", config.DefaultUserCheckedDrivers)
|
|
|
|
if _, driverMatch := checkDrivers[r.task.Driver]; driverMatch {
|
|
|
|
if _, unallowed := unallowedUsers[r.task.User]; unallowed {
|
|
|
|
mErr.Errors = append(mErr.Errors, fmt.Errorf("running as user %q is disallowed", r.task.User))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Validate the artifacts
|
|
|
|
for i, artifact := range r.task.Artifacts {
|
|
|
|
// Verify the artifact doesn't escape the task directory.
|
|
|
|
if err := artifact.Validate(); err != nil {
|
|
|
|
// If this error occurs there is potentially a server bug or
|
|
|
|
// mallicious, server spoofing.
|
|
|
|
r.logger.Printf("[ERR] client: allocation %q, task %v, artifact %#v (%v) fails validation: %v",
|
|
|
|
r.alloc.ID, r.task.Name, artifact, i, err)
|
|
|
|
mErr.Errors = append(mErr.Errors, fmt.Errorf("artifact (%d) failed validation: %v", i, err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(mErr.Errors) == 1 {
|
|
|
|
return mErr.Errors[0]
|
|
|
|
}
|
|
|
|
return mErr.ErrorOrNil()
|
|
|
|
}
|
|
|
|
|
2015-11-16 20:26:11 +00:00
|
|
|
func (r *TaskRunner) run() {
|
2016-03-15 17:53:20 +00:00
|
|
|
// Predeclare things so we an jump to the RESTART
|
|
|
|
var handleEmpty bool
|
2016-05-31 23:09:05 +00:00
|
|
|
var stopCollection chan struct{}
|
2016-03-15 17:53:20 +00:00
|
|
|
|
2015-08-23 22:30:16 +00:00
|
|
|
for {
|
2016-03-15 17:53:20 +00:00
|
|
|
// Download the task's artifacts
|
2016-03-15 20:28:25 +00:00
|
|
|
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
|
2016-03-15 17:53:20 +00:00
|
|
|
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
|
|
|
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
|
|
|
|
if !ok {
|
|
|
|
err := fmt.Errorf("task directory couldn't be found")
|
|
|
|
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
|
|
|
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
|
2016-03-22 02:59:58 +00:00
|
|
|
r.restartTracker.SetStartError(err)
|
|
|
|
goto RESTART
|
2016-03-15 17:53:20 +00:00
|
|
|
}
|
|
|
|
|
2016-03-24 17:55:14 +00:00
|
|
|
for _, artifact := range r.task.Artifacts {
|
2016-04-12 01:46:16 +00:00
|
|
|
if err := getter.GetArtifact(r.taskEnv, artifact, taskDir, r.logger); err != nil {
|
2016-03-15 17:53:20 +00:00
|
|
|
r.setState(structs.TaskStateDead,
|
|
|
|
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
|
|
|
|
r.restartTracker.SetStartError(cstructs.NewRecoverableError(err, true))
|
|
|
|
goto RESTART
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
r.artifactsDownloaded = true
|
|
|
|
}
|
|
|
|
|
2016-02-29 00:56:05 +00:00
|
|
|
// Start the task if not yet started or it is being forced. This logic
|
|
|
|
// is necessary because in the case of a restore the handle already
|
|
|
|
// exists.
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Lock()
|
2016-03-15 17:53:20 +00:00
|
|
|
handleEmpty = r.handle == nil
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Unlock()
|
2016-05-31 23:09:05 +00:00
|
|
|
|
2016-02-29 00:56:05 +00:00
|
|
|
if handleEmpty {
|
2016-05-31 23:09:05 +00:00
|
|
|
stopCollection = make(chan struct{})
|
|
|
|
startErr := r.startTask(stopCollection)
|
2016-02-29 00:56:05 +00:00
|
|
|
r.restartTracker.SetStartError(startErr)
|
|
|
|
if startErr != nil {
|
|
|
|
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr))
|
|
|
|
goto RESTART
|
2015-08-29 22:46:10 +00:00
|
|
|
}
|
2015-11-16 20:26:11 +00:00
|
|
|
}
|
|
|
|
|
2016-03-23 19:19:19 +00:00
|
|
|
// Mark the task as started
|
2016-02-29 00:56:05 +00:00
|
|
|
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
2015-11-18 08:50:45 +00:00
|
|
|
|
2015-11-16 20:26:11 +00:00
|
|
|
// Wait for updates
|
2016-02-29 00:56:05 +00:00
|
|
|
WAIT:
|
2015-11-16 20:26:11 +00:00
|
|
|
for {
|
|
|
|
select {
|
2016-02-29 00:56:05 +00:00
|
|
|
case waitRes := <-r.handle.WaitCh():
|
|
|
|
if waitRes == nil {
|
|
|
|
panic("nil wait")
|
|
|
|
}
|
|
|
|
|
2016-05-31 23:09:05 +00:00
|
|
|
// Stop collection the task's resource usage
|
|
|
|
close(stopCollection)
|
|
|
|
|
2016-02-29 00:56:05 +00:00
|
|
|
// Log whether the task was successful or not.
|
|
|
|
r.restartTracker.SetWaitResult(waitRes)
|
|
|
|
r.setState(structs.TaskStateDead, r.waitErrorToEvent(waitRes))
|
|
|
|
if !waitRes.Successful() {
|
|
|
|
r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes)
|
|
|
|
} else {
|
|
|
|
r.logger.Printf("[INFO] client: task %q for alloc %q completed successfully", r.task.Name, r.alloc.ID)
|
|
|
|
}
|
|
|
|
|
|
|
|
break WAIT
|
2015-11-16 20:26:11 +00:00
|
|
|
case update := <-r.updateCh:
|
2016-02-04 03:43:44 +00:00
|
|
|
if err := r.handleUpdate(update); err != nil {
|
|
|
|
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
2015-11-16 20:26:11 +00:00
|
|
|
}
|
|
|
|
case <-r.destroyCh:
|
2016-02-17 05:00:49 +00:00
|
|
|
// Kill the task using an exponential backoff in-case of failures.
|
|
|
|
destroySuccess, err := r.handleDestroy()
|
|
|
|
if !destroySuccess {
|
|
|
|
// We couldn't successfully destroy the resource created.
|
|
|
|
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
|
2015-11-17 20:03:59 +00:00
|
|
|
}
|
|
|
|
|
2016-02-17 05:00:49 +00:00
|
|
|
// Store that the task has been destroyed and any associated error.
|
2016-02-29 00:56:05 +00:00
|
|
|
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
|
|
|
return
|
2015-08-29 22:46:10 +00:00
|
|
|
}
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
|
2016-02-29 00:56:05 +00:00
|
|
|
RESTART:
|
|
|
|
state, when := r.restartTracker.GetState()
|
2016-03-15 21:34:25 +00:00
|
|
|
r.restartTracker.SetStartError(nil).SetWaitResult(nil)
|
2016-03-24 22:43:55 +00:00
|
|
|
reason := r.restartTracker.GetReason()
|
2016-02-29 00:56:05 +00:00
|
|
|
switch state {
|
|
|
|
case structs.TaskNotRestarting, structs.TaskTerminated:
|
2015-12-14 22:53:49 +00:00
|
|
|
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
|
2016-02-29 00:56:05 +00:00
|
|
|
if state == structs.TaskNotRestarting {
|
2016-03-24 22:43:55 +00:00
|
|
|
r.setState(structs.TaskStateDead,
|
|
|
|
structs.NewTaskEvent(structs.TaskNotRestarting).
|
|
|
|
SetRestartReason(reason))
|
2016-02-29 00:56:05 +00:00
|
|
|
}
|
|
|
|
return
|
|
|
|
case structs.TaskRestarting:
|
|
|
|
r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when)
|
2016-03-24 22:43:55 +00:00
|
|
|
r.setState(structs.TaskStatePending,
|
|
|
|
structs.NewTaskEvent(structs.TaskRestarting).
|
|
|
|
SetRestartDelay(when).
|
|
|
|
SetRestartReason(reason))
|
2016-02-29 00:56:05 +00:00
|
|
|
default:
|
|
|
|
r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state)
|
2015-11-16 20:26:11 +00:00
|
|
|
return
|
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
|
2015-11-16 20:26:11 +00:00
|
|
|
// Sleep but watch for destroy events.
|
|
|
|
select {
|
|
|
|
case <-time.After(when):
|
|
|
|
case <-r.destroyCh:
|
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
|
2015-11-16 20:26:11 +00:00
|
|
|
// Destroyed while we were waiting to restart, so abort.
|
|
|
|
r.destroyLock.Lock()
|
2016-02-29 00:56:05 +00:00
|
|
|
destroyed := r.destroy
|
2015-11-16 20:26:11 +00:00
|
|
|
r.destroyLock.Unlock()
|
|
|
|
if destroyed {
|
|
|
|
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name)
|
|
|
|
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
return
|
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
|
2016-02-29 00:56:05 +00:00
|
|
|
// Clear the handle so a new driver will be created.
|
2016-02-29 18:45:08 +00:00
|
|
|
r.handleLock.Lock()
|
2016-02-29 00:56:05 +00:00
|
|
|
r.handle = nil
|
2016-02-29 18:45:08 +00:00
|
|
|
r.handleLock.Unlock()
|
2015-11-16 20:26:11 +00:00
|
|
|
}
|
2015-11-14 06:07:13 +00:00
|
|
|
}
|
|
|
|
|
2016-05-31 23:09:05 +00:00
|
|
|
// startTask creates the driver and start the task. Resource usage is also
|
|
|
|
// collect in a launched goroutine. Collection ends when the passed channel is
|
|
|
|
// closed
|
|
|
|
func (r *TaskRunner) startTask(stopCollection <-chan struct{}) error {
|
2016-02-29 00:56:05 +00:00
|
|
|
// Create a driver
|
|
|
|
driver, err := r.createDriver()
|
|
|
|
if err != nil {
|
|
|
|
r.logger.Printf("[ERR] client: failed to create driver of task '%s' for alloc '%s': %v",
|
|
|
|
r.task.Name, r.alloc.ID, err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start the job
|
|
|
|
handle, err := driver.Start(r.ctx, r.task)
|
|
|
|
if err != nil {
|
|
|
|
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
|
|
|
|
r.task.Name, r.alloc.ID, err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
r.handleLock.Lock()
|
|
|
|
r.handle = handle
|
|
|
|
r.handleLock.Unlock()
|
2016-05-31 23:09:05 +00:00
|
|
|
go r.collectResourceUsageStats(stopCollection)
|
2016-02-29 00:56:05 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-05-31 23:09:05 +00:00
|
|
|
// collectResourceUsageStats starts collecting resource usage stats of a Task.
|
|
|
|
// Collection ends when the passed channel is closed
|
|
|
|
func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) {
|
2016-05-27 01:23:44 +00:00
|
|
|
// start collecting the stats right away and then start collecting every
|
|
|
|
// collection interval
|
|
|
|
next := time.NewTimer(0)
|
|
|
|
defer next.Stop()
|
2016-04-29 18:06:19 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-next.C:
|
|
|
|
ru, err := r.handle.Stats()
|
|
|
|
if err != nil {
|
2016-05-25 04:33:13 +00:00
|
|
|
r.logger.Printf("[DEBUG] client: error fetching stats of task %v: %v", r.task.Name, err)
|
2016-04-29 18:06:19 +00:00
|
|
|
}
|
2016-06-01 14:36:07 +00:00
|
|
|
if ru != nil {
|
2016-06-02 21:14:15 +00:00
|
|
|
r.resourceUsageLock.Lock()
|
|
|
|
r.resourceUsage.Enqueue(ru)
|
|
|
|
r.resourceUsageLock.Unlock()
|
2016-06-01 14:36:07 +00:00
|
|
|
r.emitStats(ru)
|
|
|
|
}
|
2016-05-25 05:30:10 +00:00
|
|
|
next.Reset(r.config.StatsCollectionInterval)
|
2016-05-31 23:09:05 +00:00
|
|
|
case <-stopCollection:
|
2016-04-29 18:06:19 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-09 16:53:00 +00:00
|
|
|
// TaskStatsReporter returns the stats reporter of the task
|
2016-05-09 14:57:26 +00:00
|
|
|
func (r *TaskRunner) StatsReporter() TaskStatsReporter {
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
2016-05-09 16:53:00 +00:00
|
|
|
// ResourceUsage returns the last resource utilization datapoint collected
|
2016-05-27 21:15:51 +00:00
|
|
|
func (r *TaskRunner) ResourceUsage() []*cstructs.TaskResourceUsage {
|
2016-04-29 18:06:19 +00:00
|
|
|
r.resourceUsageLock.RLock()
|
|
|
|
defer r.resourceUsageLock.RUnlock()
|
2016-05-09 14:57:26 +00:00
|
|
|
val := r.resourceUsage.Peek()
|
2016-05-09 19:11:41 +00:00
|
|
|
ru, _ := val.(*cstructs.TaskResourceUsage)
|
2016-05-27 21:15:51 +00:00
|
|
|
return []*cstructs.TaskResourceUsage{ru}
|
2016-05-09 14:57:26 +00:00
|
|
|
}
|
|
|
|
|
2016-05-09 16:53:00 +00:00
|
|
|
// ResourceUsageTS returns the list of all the resource utilization datapoints
|
|
|
|
// collected
|
2016-05-27 21:15:51 +00:00
|
|
|
func (r *TaskRunner) ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage {
|
2016-05-09 14:57:26 +00:00
|
|
|
r.resourceUsageLock.RLock()
|
|
|
|
defer r.resourceUsageLock.RUnlock()
|
2016-05-27 18:25:36 +00:00
|
|
|
|
2016-05-09 14:57:26 +00:00
|
|
|
values := r.resourceUsage.Values()
|
2016-05-27 18:25:36 +00:00
|
|
|
low := 0
|
|
|
|
high := len(values) - 1
|
|
|
|
var idx int
|
|
|
|
|
|
|
|
for {
|
2016-05-29 03:03:52 +00:00
|
|
|
mid := (low + high) / 2
|
2016-05-27 18:25:36 +00:00
|
|
|
midVal, _ := values[mid].(*cstructs.TaskResourceUsage)
|
2016-05-27 21:15:51 +00:00
|
|
|
if midVal.Timestamp < since {
|
2016-05-27 18:25:36 +00:00
|
|
|
low = mid + 1
|
2016-05-27 21:15:51 +00:00
|
|
|
} else if midVal.Timestamp > since {
|
2016-05-27 18:25:36 +00:00
|
|
|
high = mid - 1
|
2016-05-27 21:15:51 +00:00
|
|
|
} else if midVal.Timestamp == since {
|
2016-05-27 18:25:36 +00:00
|
|
|
idx = mid
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if low > high {
|
|
|
|
idx = low
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
values = values[idx:]
|
2016-05-09 14:57:26 +00:00
|
|
|
ts := make([]*cstructs.TaskResourceUsage, len(values))
|
|
|
|
for index, val := range values {
|
2016-05-09 19:11:41 +00:00
|
|
|
ru, _ := val.(*cstructs.TaskResourceUsage)
|
|
|
|
ts[index] = ru
|
2016-05-09 14:57:26 +00:00
|
|
|
}
|
|
|
|
return ts
|
2016-04-29 18:06:19 +00:00
|
|
|
}
|
|
|
|
|
2016-02-04 03:43:44 +00:00
|
|
|
// handleUpdate takes an updated allocation and updates internal state to
|
|
|
|
// reflect the new config for the task.
|
|
|
|
func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
|
|
|
// Extract the task group from the alloc.
|
|
|
|
tg := update.Job.LookupTaskGroup(update.TaskGroup)
|
|
|
|
if tg == nil {
|
|
|
|
return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Extract the task.
|
2016-02-07 01:08:12 +00:00
|
|
|
var updatedTask *structs.Task
|
2016-02-04 03:43:44 +00:00
|
|
|
for _, t := range tg.Tasks {
|
|
|
|
if t.Name == r.task.Name {
|
2016-02-07 01:08:12 +00:00
|
|
|
updatedTask = t
|
2016-02-04 03:43:44 +00:00
|
|
|
}
|
|
|
|
}
|
2016-02-07 01:08:12 +00:00
|
|
|
if updatedTask == nil {
|
2016-02-04 03:43:44 +00:00
|
|
|
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
|
|
|
|
}
|
2016-02-05 20:07:56 +00:00
|
|
|
|
|
|
|
// Merge in the task resources
|
2016-02-07 01:08:12 +00:00
|
|
|
updatedTask.Resources = update.TaskResources[updatedTask.Name]
|
2016-02-04 03:43:44 +00:00
|
|
|
|
|
|
|
// Update will update resources and store the new kill timeout.
|
2016-02-07 01:08:12 +00:00
|
|
|
var mErr multierror.Error
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Lock()
|
2016-02-04 03:43:44 +00:00
|
|
|
if r.handle != nil {
|
2016-02-07 01:08:12 +00:00
|
|
|
if err := r.handle.Update(updatedTask); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
|
2016-02-04 03:43:44 +00:00
|
|
|
}
|
|
|
|
}
|
2016-02-10 21:44:53 +00:00
|
|
|
r.handleLock.Unlock()
|
2016-02-04 03:43:44 +00:00
|
|
|
|
|
|
|
// Update the restart policy.
|
|
|
|
if r.restartTracker != nil {
|
|
|
|
r.restartTracker.SetPolicy(tg.RestartPolicy)
|
|
|
|
}
|
|
|
|
|
2016-02-07 01:08:12 +00:00
|
|
|
// Store the updated alloc.
|
2016-02-04 03:43:44 +00:00
|
|
|
r.alloc = update
|
2016-02-07 01:08:12 +00:00
|
|
|
r.task = updatedTask
|
|
|
|
return mErr.ErrorOrNil()
|
2016-02-04 03:43:44 +00:00
|
|
|
}
|
|
|
|
|
2016-02-17 05:00:49 +00:00
|
|
|
// handleDestroy kills the task handle. In the case that killing fails,
|
|
|
|
// handleDestroy will retry with an exponential backoff and will give up at a
|
|
|
|
// given limit. It returns whether the task was destroyed and the error
|
|
|
|
// associated with the last kill attempt.
|
|
|
|
func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
|
|
|
|
// Cap the number of times we attempt to kill the task.
|
|
|
|
for i := 0; i < killFailureLimit; i++ {
|
|
|
|
if err = r.handle.Kill(); err != nil {
|
|
|
|
// Calculate the new backoff
|
|
|
|
backoff := (1 << (2 * uint64(i))) * killBackoffBaseline
|
|
|
|
if backoff > killBackoffLimit {
|
|
|
|
backoff = killBackoffLimit
|
|
|
|
}
|
|
|
|
|
|
|
|
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc %q. Retrying in %v: %v",
|
|
|
|
r.task.Name, r.alloc.ID, backoff, err)
|
|
|
|
time.Sleep(time.Duration(backoff))
|
|
|
|
} else {
|
|
|
|
// Kill was successful
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-11-14 06:07:13 +00:00
|
|
|
// Helper function for converting a WaitResult into a TaskTerminated event.
|
|
|
|
func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent {
|
2015-11-16 22:46:18 +00:00
|
|
|
return structs.NewTaskEvent(structs.TaskTerminated).
|
|
|
|
SetExitCode(res.ExitCode).
|
|
|
|
SetSignal(res.Signal).
|
|
|
|
SetExitMessage(res.Err)
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Update is used to update the task of the context
|
2016-02-04 03:43:44 +00:00
|
|
|
func (r *TaskRunner) Update(update *structs.Allocation) {
|
2015-08-23 22:30:16 +00:00
|
|
|
select {
|
2015-08-23 22:36:06 +00:00
|
|
|
case r.updateCh <- update:
|
2015-08-23 22:30:16 +00:00
|
|
|
default:
|
2015-08-29 22:46:10 +00:00
|
|
|
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
|
2016-02-04 03:43:44 +00:00
|
|
|
r.task.Name, r.alloc.ID)
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroy is used to indicate that the task context should be destroyed
|
2015-08-23 22:36:06 +00:00
|
|
|
func (r *TaskRunner) Destroy() {
|
|
|
|
r.destroyLock.Lock()
|
|
|
|
defer r.destroyLock.Unlock()
|
2015-08-23 22:30:16 +00:00
|
|
|
|
2015-08-23 22:36:06 +00:00
|
|
|
if r.destroy {
|
2015-08-23 22:30:16 +00:00
|
|
|
return
|
|
|
|
}
|
2015-08-23 22:36:06 +00:00
|
|
|
r.destroy = true
|
|
|
|
close(r.destroyCh)
|
2015-08-23 22:30:16 +00:00
|
|
|
}
|
2016-06-01 14:36:07 +00:00
|
|
|
|
|
|
|
// emitStats emits resource usage stats of tasks to remote metrics collector
|
|
|
|
// sinks
|
|
|
|
func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "max_usage"}, float32(ru.ResourceUsage.MemoryStats.MaxUsage))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage))
|
|
|
|
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "cpu", "percent"}, float32(ru.ResourceUsage.CpuStats.Percent))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "cpu", "throttled_time"}, float32(ru.ResourceUsage.CpuStats.ThrottledTime))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, "cpu", "throttled_periods"}, float32(ru.ResourceUsage.CpuStats.ThrottledPeriods))
|
|
|
|
|
|
|
|
for pid, pidStats := range ru.Pids {
|
|
|
|
// Not emitting max, kernel usages since we never get them on a per-pid
|
|
|
|
// basis
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "memory", "rss"}, float32(pidStats.MemoryStats.RSS))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "memory", "cache"}, float32(pidStats.MemoryStats.Cache))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "memory", "swap"}, float32(pidStats.MemoryStats.Swap))
|
|
|
|
|
|
|
|
// Not emitting throttled time and periods since we never get them on a
|
|
|
|
// per pid basis
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "cpu", "percent"}, float32(pidStats.CpuStats.Percent))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "cpu", "system"}, float32(pidStats.CpuStats.SystemMode))
|
|
|
|
metrics.EmitKey([]string{r.alloc.Job.Name, r.alloc.Name, r.alloc.ID, r.task.Name, pid, "cpu", "user"}, float32(pidStats.CpuStats.UserMode))
|
|
|
|
}
|
|
|
|
}
|