open-nomad/client/allocrunnerv2/alloc_runner.go

172 lines
4.1 KiB
Go
Raw Normal View History

2018-06-22 00:35:07 +00:00
package allocrunnerv2
import (
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunnerv2/config"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"
)
// allocRunner is used to run all the tasks in a given allocation
type allocRunner struct {
// Logger is the logger for the alloc runner.
logger log.Logger
// Config is the configuration for the alloc runner.
config *config.Config
// waitCh is closed when the alloc runner has transitioned to a terminal
// state
waitCh chan struct{}
// Alloc captures the allocation being run.
alloc *structs.Allocation
allocLock sync.Mutex
// state captures the state of the alloc runner
state *state.State
// allocDir is used to build the allocations directory structure.
allocDir *allocdir.AllocDir
// runnerHooks are alloc runner lifecycle hooks that should be run on state
// transistions.
runnerHooks []interfaces.RunnerHook
// tasks are the set of task runners
2018-06-28 00:27:03 +00:00
tasks map[string]*taskrunner.TaskRunner
// updateCh receives allocation updates
updateCh chan *structs.Allocation
2018-06-22 00:35:07 +00:00
}
// NewAllocRunner returns a new allocation runner.
2018-06-28 00:27:03 +00:00
func NewAllocRunner(config *config.Config) (*allocRunner, error) {
2018-06-22 00:35:07 +00:00
ar := &allocRunner{
2018-06-28 00:27:03 +00:00
config: config,
alloc: config.Allocation,
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
2018-06-22 00:35:07 +00:00
}
// Create the logger based on the allocation ID
ar.logger = config.Logger.With("alloc_id", ar.ID())
// Initialize the runners hooks.
ar.initRunnerHooks()
return ar, nil
}
func (ar *allocRunner) WaitCh() <-chan struct{} {
return ar.waitCh
}
// XXX How does alloc Restart work
// Run is the main go-routine that executes all the tasks.
func (ar *allocRunner) Run() {
// Close the wait channel
defer close(ar.waitCh)
2018-06-28 00:27:03 +00:00
var err error
var taskWaitCh <-chan struct{}
2018-06-22 00:35:07 +00:00
// Run the prerun hooks
// XXX Equivalent to TR.Prerun hook
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST
}
// Run the runners
2018-06-28 00:27:03 +00:00
taskWaitCh, err = ar.runImpl()
if err != nil {
2018-06-22 00:35:07 +00:00
ar.logger.Error("starting tasks failed", "error", err)
}
2018-06-28 00:27:03 +00:00
for {
select {
case <-taskWaitCh:
// TaskRunners have all exited
case updated := <-ar.updateCh:
// Updated alloc received
//XXX Update hooks
//XXX Update ar.alloc
for _, tr := range ar.tasks {
tr.Update(updated)
}
}
}
2018-06-22 00:35:07 +00:00
POST:
// Run the postrun hooks
// XXX Equivalent to TR.Poststop hook
if err := ar.postrun(); err != nil {
ar.logger.Error("postrun failed", "error", err)
}
}
// runImpl is used to run the runners.
2018-06-28 00:27:03 +00:00
func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
2018-06-22 00:35:07 +00:00
// Grab the task group
tg := ar.alloc.Job.LookupTaskGroup(ar.alloc.TaskGroup)
if tg == nil {
// XXX Fail and exit
ar.logger.Error("failed to lookup task group", "task_group", ar.alloc.TaskGroup)
2018-06-28 00:27:03 +00:00
return nil, fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup)
2018-06-22 00:35:07 +00:00
}
for _, task := range tg.Tasks {
if err := ar.runTask(task); err != nil {
2018-06-28 00:27:03 +00:00
return nil, err
2018-06-22 00:35:07 +00:00
}
}
2018-06-28 00:27:03 +00:00
// Return a combined WaitCh that is closed when all task runners have
// exited.
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
for _, task := range ar.tasks {
2018-06-22 00:35:07 +00:00
<-task.WaitCh()
2018-06-28 00:27:03 +00:00
}
}()
2018-06-22 00:35:07 +00:00
2018-06-28 00:27:03 +00:00
return waitCh, nil
2018-06-22 00:35:07 +00:00
}
// runTask is used to run a task.
func (ar *allocRunner) runTask(task *structs.Task) error {
// Create the runner
config := &taskrunner.Config{
Parent: &allocRunnerShim{ar},
Task: task,
Logger: ar.logger,
}
tr, err := taskrunner.NewTaskRunner(config)
if err != nil {
return err
}
// Start the runner
go tr.Run()
// Store the runner
ar.tasks[task.Name] = tr
return nil
}
2018-06-28 00:27:03 +00:00
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (ar *allocRunner) Update(update *structs.Allocation) {
ar.updateCh <- update
}