ar: refactor task killing into 1 method

Update comments and address some PR comments from #4775
This commit is contained in:
Michael Schurter 2018-10-15 20:38:12 -07:00
parent 21d78be961
commit cefbf00bf0
3 changed files with 63 additions and 79 deletions

View file

@ -42,7 +42,7 @@ type allocRunner struct {
// the goroutine is already processing a previous update.
taskStateUpdatedCh chan struct{}
// taskStateUpdateHandlerCh is closed when thte task state handling
// taskStateUpdateHandlerCh is closed when the task state handling
// goroutine exits. It is unsafe to destroy the local allocation state
// before this goroutine exits.
taskStateUpdateHandlerCh chan struct{}
@ -252,7 +252,7 @@ func (ar *allocRunner) Restore() error {
//
// The goroutine is used to compute changes to the alloc's ClientStatus and to
// update the server with the new state.
func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) {
func (ar *allocRunner) TaskStateUpdated() {
select {
case ar.taskStateUpdatedCh <- struct{}{}:
default:
@ -269,10 +269,6 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat
func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh)
//TODO allocdirs are being left in working dir not temp!
//TODO remove taskname and state from TaskStateUpdated
//TODO Ensure Client updates don't callback into AR
for done := false; !done; {
select {
case <-ar.taskStateUpdatedCh:
@ -335,22 +331,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}
// Kill live task runners
var leader *taskrunner.TaskRunner
for _, tr := range liveRunners {
if tr.IsLeader() {
// Capture the leader to kill last
leader = tr
continue
}
tr.Kill(context.Background(), killEvent)
}
// Kill leader last if it exists
if leader != nil {
leader.Kill(context.Background(), killEvent)
}
ar.killTasks()
}
// Get the client allocation
@ -364,6 +345,41 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}
// killTasks kills all task runners, leader (if there is one) first. Errors are
// logged except taskrunner.ErrTaskNotRunning which is ignored.
func (ar *allocRunner) killTasks() {
// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
continue
}
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}
break
}
// Kill the rest concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
if tr.IsLeader() {
continue
}
wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}
}(name, tr)
}
wg.Wait()
}
// clientAlloc takes in the task states and returns an Allocation populated
// with Client specific fields
func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation {
@ -486,43 +502,24 @@ func (ar *allocRunner) Update(update *structs.Allocation) {
// Update ar.alloc
ar.setAlloc(update)
// Run hooks
if err := ar.update(update); err != nil {
ar.logger.Error("error running update hooks", "error", err)
}
// If alloc is being terminated, kill all tasks, leader first
if stopping {
// Kill leader first
for name, tr := range ar.tasks {
if !tr.IsLeader() {
continue
}
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}
break
// Run update hooks if not stopping or dead
if !update.TerminalStatus() {
if err := ar.update(update); err != nil {
ar.logger.Error("error running update hooks", "error", err)
}
// Kill the rest
for name, tr := range ar.tasks {
if tr.IsLeader() {
continue
}
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}
}
}
// Update task runners
for _, tr := range ar.tasks {
tr.Update(update)
}
// If alloc is being terminated, kill all tasks, leader first
if stopping {
ar.killTasks()
}
}
func (ar *allocRunner) Listener() *cstructs.AllocListener {
@ -543,13 +540,8 @@ func (ar *allocRunner) Destroy() {
}
defer ar.destroyedLock.Unlock()
// Stop tasks
for name, tr := range ar.tasks {
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("failed to kill task", "error", err, "task_name", name)
}
}
// Stop any running tasks
ar.killTasks()
// Wait for tasks to exit and postrun hooks to finish
<-ar.waitCh

View file

@ -2,8 +2,6 @@ package interfaces
import (
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
)
@ -24,8 +22,9 @@ type AllocRunner interface {
// TaskStateHandler exposes a handler to be called when a task's state changes
type TaskStateHandler interface {
// TaskStateUpdated is used to emit updated task state
TaskStateUpdated(task string, state *structs.TaskState)
// TaskStateUpdated is used to notify the alloc runner about task state
// changes.
TaskStateUpdated()
}
// AllocStatsReporter gives acess to the latest resource usage from the

View file

@ -610,26 +610,21 @@ func (tr *TaskRunner) Restore() error {
// UpdateState sets the task runners allocation state and triggers a server
// update.
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
tr.logger.Debug("setting task state", "state", state, "event", event.Type)
tr.logger.Trace("setting task state", "state", state, "event", event.Type)
// Update the local state
stateCopy := tr.setStateLocal(state, event)
tr.setStateLocal(state, event)
// Notify the alloc runner of the transition
tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy)
tr.stateUpdater.TaskStateUpdated()
}
// setStateLocal updates the local in-memory state, persists a copy to disk and returns a
// copy of the task's state.
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState {
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
//XXX REMOVE ME AFTER TESTING
if state == "" {
panic("UpdateState must not be called with an empty state")
}
// Update the task state
oldState := tr.state.State
taskState := tr.state
@ -681,8 +676,6 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
// try to persist it again.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
return tr.state.Copy()
}
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
@ -692,6 +685,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
// logged. Use AppendEvent to simply add a new event.
func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
tr.appendEvent(event)
@ -701,11 +695,8 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
tr.logger.Warn("error persisting event", "error", err, "event", event)
}
stateCopy := tr.state.Copy()
tr.stateLock.Unlock()
// Notify the alloc runner of the event
tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy)
tr.stateUpdater.TaskStateUpdated()
}
// AppendEvent appends a new TaskEvent to this task's TaskState. The actual
@ -769,8 +760,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// Update tr.alloc
tr.setAlloc(update)
// Trigger update hooks
tr.triggerUpdateHooks()
// Trigger update hooks if not terminal
if !update.TerminalStatus() {
tr.triggerUpdateHooks()
}
}
// triggerUpdate if there isn't already an update pending. Should be called