open-nomad/client/alloc_runner.go

334 lines
8.1 KiB
Go
Raw Normal View History

2015-08-23 22:06:47 +00:00
package client
2015-08-23 22:15:48 +00:00
import (
2015-08-29 22:46:10 +00:00
"encoding/json"
"fmt"
2015-08-23 22:15:48 +00:00
"log"
"os"
"path/filepath"
2015-08-23 22:15:48 +00:00
"sync"
2015-08-29 22:46:10 +00:00
"time"
2015-08-23 22:15:48 +00:00
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
2015-08-23 23:49:48 +00:00
"github.com/hashicorp/nomad/client/driver"
2015-08-23 22:15:48 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
2015-08-23 22:06:47 +00:00
2015-08-29 22:46:10 +00:00
const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second
)
// taskStatus is used to track the status of a task
type taskStatus struct {
Status string
Description string
}
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
config *config.Config
2015-08-23 22:06:47 +00:00
client *Client
2015-08-23 22:15:48 +00:00
logger *log.Logger
alloc *structs.Allocation
2015-08-29 22:46:10 +00:00
dirtyCh chan struct{}
ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskLock sync.RWMutex
2015-08-23 23:49:48 +00:00
2015-08-29 22:46:10 +00:00
taskStatus map[string]taskStatus
taskStatusLock sync.RWMutex
2015-08-23 22:15:48 +00:00
updateCh chan *structs.Allocation
destroy bool
destroyCh chan struct{}
destroyLock sync.Mutex
2015-08-23 22:06:47 +00:00
}
// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Alloc *structs.Allocation
TaskStatus map[string]taskStatus
2015-08-30 02:14:47 +00:00
Context *driver.ExecContext
}
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(config *config.Config, client *Client, alloc *structs.Allocation) *AllocRunner {
2015-08-30 02:14:47 +00:00
ar := &AllocRunner{
config: config,
2015-08-29 22:46:10 +00:00
client: client,
logger: client.logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStatus: make(map[string]taskStatus),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
2015-08-23 22:06:47 +00:00
}
2015-08-30 02:14:47 +00:00
return ar
2015-08-23 22:06:47 +00:00
}
// stateFilePath returns the path to our state file
func (r *AllocRunner) stateFilePath() string {
return filepath.Join(r.config.StateDir, "alloc", r.alloc.ID, "state.json")
}
// RestoreState is used to restore the state of the alloc runner
func (r *AllocRunner) RestoreState() error {
// Load the snapshot
var snap allocRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
return err
}
// Restore fields
r.alloc = snap.Alloc
r.taskStatus = snap.TaskStatus
2015-08-30 02:14:47 +00:00
r.ctx = snap.Context
// Restore the task runners
var mErr multierror.Error
for name := range r.taskStatus {
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.config, r, r.ctx, task)
r.tasks[name] = tr
if err := tr.RestoreState(); err != nil {
r.logger.Printf("[ERR] client: failed to restore state for alloc %s task '%s': %v", r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
} else {
go tr.Run()
}
}
return mErr.ErrorOrNil()
}
// SaveState is used to snapshot our state
func (r *AllocRunner) SaveState() error {
r.taskStatusLock.RLock()
snap := allocRunnerState{
Alloc: r.alloc,
TaskStatus: r.taskStatus,
2015-08-30 02:14:47 +00:00
Context: r.ctx,
}
err := persistState(r.stateFilePath(), &snap)
r.taskStatusLock.RUnlock()
if err != nil {
return err
}
// Save state for each task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
var mErr multierror.Error
for name, tr := range r.tasks {
if err := tr.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
r.alloc.ID, name, err)
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// DestroyState is used to cleanup after ourselves
func (r *AllocRunner) DestroyState() error {
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
}
2015-08-23 22:06:47 +00:00
// Alloc returns the associated allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
2015-08-23 22:06:47 +00:00
}
2015-08-29 22:46:10 +00:00
// setAlloc is used to update the allocation of the runner
// we preserve the existing client status and description
func (r *AllocRunner) setAlloc(alloc *structs.Allocation) {
if r.alloc != nil {
alloc.ClientStatus = r.alloc.ClientStatus
alloc.ClientDescription = r.alloc.ClientDescription
}
r.alloc = alloc
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() {
var retryCh <-chan time.Time
for {
select {
case <-retryCh:
case <-r.dirtyCh:
case <-r.destroyCh:
return
}
// Scan the task status to termine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Attempt to update the status
if err := r.client.updateAllocStatus(r.alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
retryCh = time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv))
}
retryCh = nil
}
}
// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
r.alloc.ClientDescription = desc
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
// setTaskStatus is used to set the status of a task
func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
r.taskStatus[taskName] = taskStatus{
Status: status,
Description: desc,
}
select {
case r.dirtyCh <- struct{}{}:
default:
}
}
2015-08-23 22:06:47 +00:00
// Run is a long running goroutine used to manage an allocation
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Run() {
2015-08-29 22:46:10 +00:00
go r.syncStatus()
2015-08-23 22:15:48 +00:00
2015-08-29 22:46:10 +00:00
// Check if the allocation is in a terminal status
2015-08-23 23:49:48 +00:00
alloc := r.alloc
2015-08-29 22:46:10 +00:00
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID)
return
}
r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID)
// Find the task group to run in the allocation
2015-08-30 02:14:47 +00:00
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
2015-08-23 23:49:48 +00:00
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
2015-08-29 22:46:10 +00:00
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("missing task group '%s'", alloc.TaskGroup))
2015-08-23 23:49:48 +00:00
return
}
// Create the execution context
2015-08-30 02:14:47 +00:00
if r.ctx == nil {
r.ctx = driver.NewExecContext()
}
2015-08-23 23:49:48 +00:00
// Start the task runners
r.taskLock.Lock()
2015-08-23 23:49:48 +00:00
for _, task := range tg.Tasks {
2015-08-30 02:14:47 +00:00
// Skip tasks that were restored
if _, ok := r.tasks[task.Name]; ok {
continue
}
tr := NewTaskRunner(r.config, r, r.ctx, task)
2015-08-23 23:49:48 +00:00
r.tasks[task.Name] = tr
go tr.Run()
}
r.taskLock.Unlock()
2015-08-23 23:49:48 +00:00
2015-08-29 22:46:10 +00:00
OUTER:
2015-08-23 23:49:48 +00:00
// Wait for updates
2015-08-23 22:06:47 +00:00
for {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case update := <-r.updateCh:
2015-08-29 22:46:10 +00:00
// Check if we're in a terminal status
if update.TerminalStatus() {
r.setAlloc(update)
break OUTER
}
// Update the task groups
r.taskLock.RLock()
2015-08-29 22:46:10 +00:00
for _, task := range tg.Tasks {
tr := r.tasks[task.Name]
tr.Update(task)
}
r.taskLock.RUnlock()
2015-08-29 22:46:10 +00:00
2015-08-23 22:36:06 +00:00
case <-r.destroyCh:
2015-08-29 22:46:10 +00:00
break OUTER
2015-08-23 22:15:48 +00:00
}
2015-08-23 22:06:47 +00:00
}
2015-08-29 22:46:10 +00:00
// Destroy each sub-task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
2015-08-29 22:46:10 +00:00
for _, tr := range r.tasks {
tr.Destroy()
}
// Wait for termination of the task runners
for _, tr := range r.tasks {
<-tr.WaitCh()
}
// Check if we should destroy our state
if r.destroy {
r.DestroyState()
}
2015-08-29 22:46:10 +00:00
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
2015-08-23 22:06:47 +00:00
}
// Update is used to update the allocation of the context
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Update(update *structs.Allocation) {
2015-08-23 22:15:48 +00:00
select {
2015-08-23 22:36:06 +00:00
case r.updateCh <- update:
2015-08-23 22:15:48 +00:00
default:
2015-08-23 22:36:06 +00:00
r.logger.Printf("[ERR] client: dropping update to alloc '%s'", update.ID)
2015-08-23 22:15:48 +00:00
}
2015-08-23 22:06:47 +00:00
}
// Destroy is used to indicate that the allocation context should be destroyed
2015-08-23 22:36:06 +00:00
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
defer r.destroyLock.Unlock()
2015-08-23 22:15:48 +00:00
2015-08-23 22:36:06 +00:00
if r.destroy {
2015-08-23 22:15:48 +00:00
return
}
2015-08-23 22:36:06 +00:00
r.destroy = true
close(r.destroyCh)
2015-08-23 22:06:47 +00:00
}