open-nomad/client/allocrunnerv2/alloc_runner.go

293 lines
7.8 KiB
Go
Raw Normal View History

2018-06-22 00:35:07 +00:00
package allocrunnerv2
import (
2018-07-17 20:57:57 +00:00
"context"
2018-06-22 00:35:07 +00:00
"fmt"
"path/filepath"
2018-06-22 00:35:07 +00:00
"sync"
2018-07-11 04:21:12 +00:00
"github.com/boltdb/bolt"
2018-06-22 00:35:07 +00:00
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner"
2018-06-22 00:35:07 +00:00
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
"github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner"
2018-07-13 16:45:29 +00:00
"github.com/hashicorp/nomad/client/config"
clientstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
2018-07-13 16:45:29 +00:00
"github.com/hashicorp/nomad/client/vaultclient"
2018-06-22 00:35:07 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// allocRunner is used to run all the tasks in a given allocation
type allocRunner struct {
// Logger is the logger for the alloc runner.
logger log.Logger
2018-07-13 16:45:29 +00:00
clientConfig *config.Config
// vaultClient is the used to manage Vault tokens
vaultClient vaultclient.VaultClient
2018-06-22 00:35:07 +00:00
// waitCh is closed when the alloc runner has transitioned to a terminal
// state
waitCh chan struct{}
// Alloc captures the allocation being run.
alloc *structs.Allocation
allocLock sync.RWMutex
2018-06-22 00:35:07 +00:00
2018-07-11 04:21:12 +00:00
//XXX implement for local state
2018-06-22 00:35:07 +00:00
// state captures the state of the alloc runner
state *state.State
2018-07-11 04:21:12 +00:00
stateDB *bolt.DB
2018-06-22 00:35:07 +00:00
// allocDir is used to build the allocations directory structure.
allocDir *allocdir.AllocDir
// runnerHooks are alloc runner lifecycle hooks that should be run on state
// transistions.
runnerHooks []interfaces.RunnerHook
// tasks are the set of task runners
2018-06-28 00:27:03 +00:00
tasks map[string]*taskrunner.TaskRunner
// updateCh receives allocation updates via the Update method
2018-06-28 00:27:03 +00:00
updateCh chan *structs.Allocation
2018-06-22 00:35:07 +00:00
}
// NewAllocRunner returns a new allocation runner.
func NewAllocRunner(config *Config) (*allocRunner, error) {
alloc := config.Alloc
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup)
}
2018-06-22 00:35:07 +00:00
ar := &allocRunner{
alloc: alloc,
2018-07-13 16:45:29 +00:00
clientConfig: config.ClientConfig,
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
2018-07-13 16:45:29 +00:00
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
stateDB: config.StateDB,
2018-06-22 00:35:07 +00:00
}
// Create alloc dir
//XXX update AllocDir to hc log
ar.allocDir = allocdir.NewAllocDir(nil, filepath.Join(config.ClientConfig.AllocDir, alloc.ID))
2018-06-22 00:35:07 +00:00
// Create the logger based on the allocation ID
ar.logger = config.Logger.With("alloc_id", alloc.ID)
2018-06-22 00:35:07 +00:00
// Initialize the runners hooks.
ar.initRunnerHooks()
// Create the TaskRunners
if err := ar.initTaskRunners(tg.Tasks); err != nil {
return nil, err
}
return ar, nil
}
// initTaskRunners creates task runners but does *not* run them.
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
for _, task := range tasks {
config := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
VaultClient: ar.vaultClient,
}
// Create, but do not Run, the task runner
tr, err := taskrunner.NewTaskRunner(config)
if err != nil {
return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err)
}
ar.tasks[task.Name] = tr
}
return nil
2018-06-22 00:35:07 +00:00
}
func (ar *allocRunner) WaitCh() <-chan struct{} {
return ar.waitCh
}
// XXX How does alloc Restart work
// Run is the main go-routine that executes all the tasks.
func (ar *allocRunner) Run() {
// Close the wait channel
defer close(ar.waitCh)
2018-06-28 00:27:03 +00:00
var taskWaitCh <-chan struct{}
2018-07-17 00:19:56 +00:00
// Run the prestart hooks
// XXX Equivalent to TR.Prestart hook
2018-06-22 00:35:07 +00:00
if err := ar.prerun(); err != nil {
ar.logger.Error("prerun failed", "error", err)
goto POST
}
// Run the runners
taskWaitCh = ar.runImpl()
2018-06-22 00:35:07 +00:00
2018-07-17 20:57:57 +00:00
MAIN:
2018-06-28 00:27:03 +00:00
for {
select {
case <-taskWaitCh:
// TaskRunners have all exited
2018-07-17 20:57:57 +00:00
break MAIN
2018-06-28 00:27:03 +00:00
case updated := <-ar.updateCh:
// Updated alloc received
//XXX Update hooks
//XXX Update ar.alloc
for _, tr := range ar.tasks {
tr.Update(updated)
}
}
}
2018-06-22 00:35:07 +00:00
POST:
// Run the postrun hooks
// XXX Equivalent to TR.Poststop hook
if err := ar.postrun(); err != nil {
ar.logger.Error("postrun failed", "error", err)
}
}
// runImpl is used to run the runners.
func (ar *allocRunner) runImpl() <-chan struct{} {
for _, task := range ar.tasks {
go task.Run()
2018-06-22 00:35:07 +00:00
}
2018-06-28 00:27:03 +00:00
// Return a combined WaitCh that is closed when all task runners have
// exited.
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
for _, task := range ar.tasks {
2018-06-22 00:35:07 +00:00
<-task.WaitCh()
2018-06-28 00:27:03 +00:00
}
}()
2018-06-22 00:35:07 +00:00
return waitCh
2018-06-22 00:35:07 +00:00
}
2018-06-28 00:27:03 +00:00
// Alloc returns the current allocation being run by this runner.
//XXX how do we handle mutate the state saving stuff
func (ar *allocRunner) Alloc() *structs.Allocation {
ar.allocLock.RLock()
defer ar.allocLock.RUnlock()
return ar.alloc
}
// SaveState does all the state related stuff. Who knows. FIXME
//XXX do we need to do periodic syncing? if Saving is only called *before* Run
// *and* within Run -- *and* Updates are applid within Run -- we may be able to
// skip quite a bit of locking? maybe?
func (ar *allocRunner) SaveState() error {
2018-07-18 18:43:08 +00:00
// XXX Do we move this to the client
return ar.stateDB.Update(func(tx *bolt.Tx) error {
2018-07-18 18:43:08 +00:00
//XXX Track AllocModifyIndex to only write alloc on change?
// Write the allocation
return clientstate.PutAllocation(tx, ar.Alloc())
})
}
// Restore state from database. Must be called after NewAllocRunner but before
// Run.
func (ar *allocRunner) Restore() error {
return ar.stateDB.View(func(tx *bolt.Tx) error {
// Restore task runners
for _, tr := range ar.tasks {
if err := tr.Restore(tx); err != nil {
return err
}
}
return nil
})
}
// TaskStateUpdated is called when a task's state has been updated. This hook is
// used to compute changes to the alloc's ClientStatus and to update the server
// with the new state.
func (ar *allocRunner) TaskStateUpdated(task string, state *structs.TaskState) error {
return nil
}
2018-06-28 00:27:03 +00:00
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (ar *allocRunner) Update(update *structs.Allocation) {
ar.updateCh <- update
}
// Destroy the alloc runner by stopping it if it is still running and cleaning
// up all of its resources.
//
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) Destroy() {
//TODO
2018-07-17 20:57:57 +00:00
for _, tr := range ar.tasks {
tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskKilled))
}
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
// garbage collected).
//
// This method is safe for calling concurrently with Run(). Callers must
// receive on WaitCh() to block until alloc runner has stopped and been
// destroyed.
//XXX TODO
func (ar *allocRunner) IsDestroyed() bool {
return false
}
// IsWaiting returns true if the alloc runner is waiting for its previous
// allocation to terminate.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsWaiting() bool {
return false
}
// IsMigrating returns true if the alloc runner is migrating data from its
// previous allocation.
//
// This method is safe for calling concurrently with Run().
//XXX TODO
func (ar *allocRunner) IsMigrating() bool {
return false
}
// StatsReporter needs implementing
//XXX
func (ar *allocRunner) StatsReporter() allocrunner.AllocStatsReporter {
return noopStatsReporter{}
}
//FIXME implement
type noopStatsReporter struct{}
func (noopStatsReporter) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
return nil, fmt.Errorf("not implemented")
}