open-nomad/client/task_runner.go

238 lines
5.9 KiB
Go
Raw Normal View History

2015-08-23 22:30:16 +00:00
package client
import (
"crypto/md5"
"encoding/hex"
2015-08-29 22:46:10 +00:00
"fmt"
2015-08-23 22:30:16 +00:00
"log"
"os"
"path/filepath"
2015-08-23 22:30:16 +00:00
"sync"
"github.com/hashicorp/nomad/client/config"
2015-08-23 23:49:48 +00:00
"github.com/hashicorp/nomad/client/driver"
2015-08-23 22:30:16 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
config *config.Config
updater TaskStateUpdater
logger *log.Logger
ctx *driver.ExecContext
2015-08-29 22:46:10 +00:00
allocID string
2015-08-23 22:30:16 +00:00
task *structs.Task
2015-08-23 22:30:16 +00:00
updateCh chan *structs.Task
handle driver.DriverHandle
2015-08-23 22:30:16 +00:00
destroy bool
destroyCh chan struct{}
destroyLock sync.Mutex
waitCh chan struct{}
2015-08-23 22:30:16 +00:00
}
// taskRunnerState is used to snapshot the state of the task runner
type taskRunnerState struct {
2015-08-30 02:14:47 +00:00
Task *structs.Task
HandleID string
}
// TaskStateUpdater is used to update the status of a task
type TaskStateUpdater func(taskName, status, desc string)
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, ctx *driver.ExecContext,
allocID string, task *structs.Task) *TaskRunner {
tc := &TaskRunner{
config: config,
updater: updater,
logger: logger,
ctx: ctx,
allocID: allocID,
task: task,
updateCh: make(chan *structs.Task, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
2015-08-23 22:30:16 +00:00
}
return tc
}
2015-08-29 22:46:10 +00:00
// WaitCh returns a channel to wait for termination
func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
}
// stateFilePath returns the path to our state file
func (r *TaskRunner) stateFilePath() string {
// Get the MD5 of the task name
hashVal := md5.Sum([]byte(r.task.Name))
hashHex := hex.EncodeToString(hashVal[:])
dirName := fmt.Sprintf("task-%s", hashHex)
// Generate the path
path := filepath.Join(r.config.StateDir, "alloc", r.allocID,
dirName, "state.json")
return path
}
// RestoreState is used to restore our state
func (r *TaskRunner) RestoreState() error {
// Load the snapshot
var snap taskRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// Restore fields
r.task = snap.Task
2015-08-30 02:14:47 +00:00
// Restore the driver
if snap.HandleID != "" {
driver, err := r.createDriver()
if err != nil {
return err
}
handle, err := driver.Open(r.ctx, snap.HandleID)
if err != nil {
r.logger.Printf("[ERR] client: failed to open handle to task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
return err
}
r.handle = handle
}
return nil
}
// SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error {
snap := taskRunnerState{
Task: r.task,
}
2015-08-30 02:14:47 +00:00
if r.handle != nil {
snap.HandleID = r.handle.ID()
}
return persistState(r.stateFilePath(), &snap)
}
// DestroyState is used to cleanup after ourselves
func (r *TaskRunner) DestroyState() error {
return os.RemoveAll(r.stateFilePath())
}
2015-08-29 22:46:10 +00:00
// setStatus is used to update the status of the task runner
func (r *TaskRunner) setStatus(status, desc string) {
r.updater(r.task.Name, status, desc)
2015-08-23 23:49:48 +00:00
}
2015-08-30 02:14:47 +00:00
// createDriver makes a driver for the task
func (r *TaskRunner) createDriver() (driver.Driver, error) {
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger)
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
2015-08-23 23:49:48 +00:00
if err != nil {
2015-08-30 02:14:47 +00:00
err = fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.allocID, err)
r.logger.Printf("[ERR] client: %s", err)
}
return driver, err
}
// startTask is used to start the task if there is no handle
func (r *TaskRunner) startTask() error {
// Create a driver
driver, err := r.createDriver()
if err != nil {
r.setStatus(structs.AllocClientStatusFailed, err.Error())
return err
2015-08-23 23:49:48 +00:00
}
2015-08-23 22:30:16 +00:00
2015-08-23 23:49:48 +00:00
// Start the job
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
2015-08-29 22:46:10 +00:00
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
r.setStatus(structs.AllocClientStatusFailed,
fmt.Sprintf("failed to start: %v", err))
2015-08-30 02:14:47 +00:00
return err
2015-08-23 23:49:48 +00:00
}
2015-08-30 02:14:47 +00:00
r.handle = handle
2015-08-29 22:46:10 +00:00
r.setStatus(structs.AllocClientStatusRunning, "task started")
2015-08-30 02:14:47 +00:00
return nil
}
// Run is a long running routine used to manage the task
func (r *TaskRunner) Run() {
defer close(r.waitCh)
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.allocID)
// Start the task if not yet started
if r.handle == nil {
if err := r.startTask(); err != nil {
return
}
}
2015-08-23 23:49:48 +00:00
OUTER:
2015-08-23 23:49:48 +00:00
// Wait for updates
2015-08-23 22:30:16 +00:00
for {
select {
2015-08-30 02:14:47 +00:00
case err := <-r.handle.WaitCh():
2015-08-23 23:49:48 +00:00
if err != nil {
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v",
2015-08-29 22:46:10 +00:00
r.task.Name, r.allocID, err)
r.setStatus(structs.AllocClientStatusDead,
fmt.Sprintf("task failed with: %v", err))
2015-08-23 23:49:48 +00:00
} else {
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'",
2015-08-29 22:46:10 +00:00
r.task.Name, r.allocID)
r.setStatus(structs.AllocClientStatusDead,
"task completed")
2015-08-23 23:49:48 +00:00
}
break OUTER
2015-08-29 22:46:10 +00:00
2015-08-23 22:36:06 +00:00
case update := <-r.updateCh:
2015-08-29 22:46:10 +00:00
// Update
2015-08-23 22:36:06 +00:00
r.task = update
2015-08-30 02:14:47 +00:00
if err := r.handle.Update(update); err != nil {
2015-08-29 22:46:10 +00:00
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
}
2015-08-23 22:36:06 +00:00
case <-r.destroyCh:
2015-08-29 22:46:10 +00:00
// Send the kill signal, and use the WaitCh to block until complete
2015-08-30 02:14:47 +00:00
if err := r.handle.Kill(); err != nil {
2015-08-29 22:46:10 +00:00
r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v",
r.task.Name, r.allocID, err)
}
2015-08-23 22:30:16 +00:00
}
}
2015-08-30 02:14:47 +00:00
// Cleanup after ourselves
r.DestroyState()
2015-08-23 22:30:16 +00:00
}
// Update is used to update the task of the context
2015-08-23 22:36:06 +00:00
func (r *TaskRunner) Update(update *structs.Task) {
2015-08-23 22:30:16 +00:00
select {
2015-08-23 22:36:06 +00:00
case r.updateCh <- update:
2015-08-23 22:30:16 +00:00
default:
2015-08-29 22:46:10 +00:00
r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')",
update.Name, r.allocID)
2015-08-23 22:30:16 +00:00
}
}
// Destroy is used to indicate that the task context should be destroyed
2015-08-23 22:36:06 +00:00
func (r *TaskRunner) Destroy() {
r.destroyLock.Lock()
defer r.destroyLock.Unlock()
2015-08-23 22:30:16 +00:00
2015-08-23 22:36:06 +00:00
if r.destroy {
2015-08-23 22:30:16 +00:00
return
}
2015-08-23 22:36:06 +00:00
r.destroy = true
close(r.destroyCh)
2015-08-23 22:30:16 +00:00
}