tr: cleanup main loop and shutdown hook impl
This commit is contained in:
parent
561260d6fe
commit
c5504bd939
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
dstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
|
@ -34,12 +33,11 @@ type TaskRunner struct {
|
|||
state *state.State
|
||||
|
||||
// ctx is the task runner's context and is done whe the task runner
|
||||
// should exit. If a task runner is destroyed it will exit regardless
|
||||
// of whether the context is done.
|
||||
// should exit. Shutdown hooks are run.
|
||||
ctx context.Context
|
||||
|
||||
// ctxCancel is used to exit the task runner's Run loop (without
|
||||
// stopping or destroying the task)
|
||||
// ctxCancel is used to exit the task runner's Run loop without
|
||||
// stopping the task. Shutdown hooks are run.
|
||||
ctxCancel context.CancelFunc
|
||||
|
||||
// Logger is the logger for the task runner.
|
||||
|
@ -56,8 +54,10 @@ type TaskRunner struct {
|
|||
driver driver.Driver
|
||||
|
||||
// handle is the handle to the currently running driver
|
||||
handle driver.DriverHandle
|
||||
handleLock sync.Mutex
|
||||
handle driver.DriverHandle
|
||||
//XXX(schmichael) I think the handle is only manipulated in the Restore
|
||||
// and Run loops, so there's never concurrent access.
|
||||
//handleLock sync.Mutex
|
||||
|
||||
// task is the task beign run
|
||||
task *structs.Task
|
||||
|
@ -184,57 +184,69 @@ func (tr *TaskRunner) initLabels() {
|
|||
func (tr *TaskRunner) Run() {
|
||||
defer close(tr.waitCh)
|
||||
|
||||
var err error
|
||||
var restart bool
|
||||
var restartWait time.Duration
|
||||
var waitRes *dstructs.WaitResult
|
||||
MAIN:
|
||||
for {
|
||||
// Run the prerun hooks
|
||||
if err = tr.prerun(); err != nil {
|
||||
if err := tr.prerun(); err != nil {
|
||||
tr.logger.Error("prerun failed", "error", err)
|
||||
tr.restartTracker.SetStartError(err)
|
||||
goto RESTART
|
||||
}
|
||||
|
||||
// Run the task
|
||||
waitRes, err = tr.runDriver()
|
||||
if err != nil {
|
||||
if err := tr.runDriver(); err != nil {
|
||||
tr.logger.Error("running driver failed", "error", err)
|
||||
tr.restartTracker.SetStartError(err)
|
||||
goto RESTART
|
||||
}
|
||||
tr.restartTracker.SetWaitResult(waitRes)
|
||||
|
||||
// Run the postrun hooks
|
||||
if err = tr.postrun(); err != nil {
|
||||
if err := tr.postrun(); err != nil {
|
||||
tr.logger.Error("postrun failed", "error", err)
|
||||
}
|
||||
|
||||
// Check if the context is closed already and go straight to destroy
|
||||
if err := tr.ctx.Err(); err != nil {
|
||||
goto DESTROY
|
||||
WAIT:
|
||||
select {
|
||||
case waitRes := <-tr.handle.WaitCh():
|
||||
tr.restartTracker.SetWaitResult(waitRes)
|
||||
case _ = <-tr.updateCh:
|
||||
//XXX Need to copy handleUpdate over
|
||||
tr.logger.Warn("update not handled")
|
||||
goto WAIT
|
||||
case <-tr.ctx.Done():
|
||||
tr.logger.Debug("task runner cancelled")
|
||||
break MAIN
|
||||
}
|
||||
|
||||
RESTART:
|
||||
// Actually restart by sleeping and also watching for destroy events
|
||||
restart, restartWait = tr.shouldRestart()
|
||||
if restart {
|
||||
restart, restartWait := tr.shouldRestart()
|
||||
if !restart {
|
||||
break
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(restartWait)
|
||||
timer := time.NewTimer(restartWait)
|
||||
for time.Now().Before(deadline) {
|
||||
select {
|
||||
case <-time.After(restartWait):
|
||||
continue
|
||||
case <-timer.C:
|
||||
case _ = <-tr.updateCh:
|
||||
//XXX Need to copy handleUpdate over
|
||||
tr.logger.Warn("update not handled")
|
||||
case <-tr.ctx.Done():
|
||||
tr.logger.Debug("task runner cancelled")
|
||||
break MAIN
|
||||
}
|
||||
}
|
||||
|
||||
DESTROY:
|
||||
// Run the destroy hooks
|
||||
if err = tr.destroy(); err != nil {
|
||||
tr.logger.Error("postrun failed", "error", err)
|
||||
}
|
||||
|
||||
tr.logger.Debug("task run loop exiting")
|
||||
return
|
||||
timer.Stop()
|
||||
}
|
||||
|
||||
// Run the shutdown hooks
|
||||
if err := tr.shutdown(); err != nil {
|
||||
tr.logger.Error("postrun failed", "error", err)
|
||||
}
|
||||
|
||||
tr.logger.Debug("task run loop exiting")
|
||||
}
|
||||
|
||||
func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
||||
|
@ -259,13 +271,13 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
|||
}
|
||||
|
||||
// runDriver runs the driver and waits for it to exit
|
||||
func (tr *TaskRunner) runDriver() (*dstructs.WaitResult, error) {
|
||||
func (tr *TaskRunner) runDriver() error {
|
||||
// Run prestart
|
||||
ctx := driver.NewExecContext(tr.taskDir, tr.envBuilder.Build())
|
||||
_, err := tr.driver.Prestart(ctx, tr.task)
|
||||
if err != nil {
|
||||
tr.logger.Error("driver pre-start failed", "error", err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new context for Start since the environment may have been updated.
|
||||
|
@ -275,21 +287,16 @@ func (tr *TaskRunner) runDriver() (*dstructs.WaitResult, error) {
|
|||
sresp, err := tr.driver.Start(ctx, tr.task)
|
||||
if err != nil {
|
||||
tr.logger.Warn("driver start failed", "error", err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait on the handle
|
||||
tr.handleLock.Lock()
|
||||
handle := sresp.Handle
|
||||
tr.handle = handle
|
||||
tr.handleLock.Unlock()
|
||||
tr.handle = sresp.Handle
|
||||
//XXX need to capture the driver network
|
||||
|
||||
// Emit an event that we started
|
||||
tr.SetState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
|
||||
// Wait for the task to exit
|
||||
waitRes := <-handle.WaitCh()
|
||||
return waitRes, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// initDriver creates the driver for the task
|
||||
|
|
|
@ -54,12 +54,6 @@ func (tr *TaskRunner) prerun() error {
|
|||
}
|
||||
|
||||
name := pre.Name()
|
||||
var start time.Time
|
||||
if tr.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
tr.logger.Trace("running pre-run hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
// Build the request
|
||||
req := interfaces.TaskPrerunRequest{
|
||||
Task: tr.Task(),
|
||||
|
@ -78,6 +72,13 @@ func (tr *TaskRunner) prerun() error {
|
|||
req.VaultToken = tr.state.VaultToken
|
||||
tr.state.RUnlock()
|
||||
|
||||
// Time the prerun hook
|
||||
var start time.Time
|
||||
if tr.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
tr.logger.Trace("running pre-run hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
// Run the pre-run hook
|
||||
var resp interfaces.TaskPrerunResponse
|
||||
if err := pre.Prerun(&req, &resp); err != nil {
|
||||
|
@ -159,15 +160,14 @@ func (tr *TaskRunner) postrun() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// destroy is used to run the runners destroy hooks.
|
||||
// XXX Naming change
|
||||
func (tr *TaskRunner) destroy() error {
|
||||
// shutdown is used to run the shutdown hooks.
|
||||
func (tr *TaskRunner) shutdown() error {
|
||||
if tr.logger.IsTrace() {
|
||||
start := time.Now()
|
||||
tr.logger.Trace("running destroy hooks", "start", start)
|
||||
tr.logger.Trace("running poststop hooks", "start", start)
|
||||
defer func() {
|
||||
end := time.Now()
|
||||
tr.logger.Trace("finished destroy hooks", "end", end, "duration", end.Sub(start))
|
||||
tr.logger.Trace("finished poststop hooks", "end", end, "duration", end.Sub(start))
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue