task runner to avoid running task if terminal
This change fixes a bug where nomad would avoid running alloc tasks if the alloc is client terminal but the server copy on the client isn't marked as running. Here, we fix the case by having task runner uses the allocRunner.shouldRun() instead of only checking the server updated alloc. Here, we preserve much of the invariants such that `tr.Run()` is always run, and don't change the overall alloc runner and task runner lifecycles. Fixes https://github.com/hashicorp/nomad/issues/5883
This commit is contained in:
parent
448b759578
commit
3d89ae0f1e
|
@ -131,3 +131,87 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
|
|||
require.Equal(t, events[2].Type, structs.TaskStarted)
|
||||
require.Equal(t, events[3].Type, structs.TaskTerminated)
|
||||
}
|
||||
|
||||
// TestAllocRunner_Restore_Completed asserts that restoring a completed
|
||||
// batch alloc doesn't run it again
|
||||
func TestAllocRunner_Restore_CompletedBatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// 1. Run task and wait for it to complete
|
||||
// 2. Start new alloc runner
|
||||
// 3. Assert task didn't run again
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "2ms",
|
||||
}
|
||||
|
||||
conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
|
||||
defer cleanup()
|
||||
|
||||
// Maintain state for subsequent run
|
||||
conf.StateDB = state.NewMemDB(conf.Logger)
|
||||
|
||||
// Start and wait for task to be running
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
go ar.Run()
|
||||
defer destroy(ar)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
s := ar.AllocState()
|
||||
if s.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("expected complete, got %s", s.ClientStatus)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// once job finishes, it shouldn't run again
|
||||
require.False(t, ar.shouldRun())
|
||||
initialRunEvents := ar.AllocState().TaskStates[task.Name].Events
|
||||
require.Len(t, initialRunEvents, 4)
|
||||
|
||||
ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, ls)
|
||||
require.Equal(t, structs.TaskStateDead, ts.State)
|
||||
|
||||
// Start a new alloc runner and assert it gets stopped
|
||||
conf2, cleanup2 := testAllocRunnerConfig(t, alloc)
|
||||
defer cleanup2()
|
||||
|
||||
// Use original statedb to maintain hook state
|
||||
conf2.StateDB = conf.StateDB
|
||||
|
||||
// Restore, start, and wait for task to be killed
|
||||
ar2, err := NewAllocRunner(conf2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, ar2.Restore())
|
||||
|
||||
go ar2.Run()
|
||||
defer destroy(ar2)
|
||||
|
||||
// AR waitCh must be closed even when task doesn't run again
|
||||
select {
|
||||
case <-ar2.WaitCh():
|
||||
case <-time.After(10 * time.Second):
|
||||
require.Fail(t, "alloc.waitCh wasn't closed")
|
||||
}
|
||||
|
||||
// TR waitCh must be closed too!
|
||||
select {
|
||||
case <-ar2.tasks[task.Name].WaitCh():
|
||||
case <-time.After(10 * time.Second):
|
||||
require.Fail(t, "tr.waitCh wasn't closed")
|
||||
}
|
||||
|
||||
// Assert that events are unmodified, which they would if task re-run
|
||||
events := ar2.AllocState().TaskStates[task.Name].Events
|
||||
require.Equal(t, initialRunEvents, events)
|
||||
}
|
||||
|
|
|
@ -394,6 +394,22 @@ func (tr *TaskRunner) Run() {
|
|||
defer close(tr.waitCh)
|
||||
var result *drivers.ExitResult
|
||||
|
||||
tr.stateLock.RLock()
|
||||
dead := tr.state.State == structs.TaskStateDead
|
||||
tr.stateLock.RUnlock()
|
||||
|
||||
// if restoring a dead task, ensure that task is cleared and all post hooks
|
||||
// are called without additional state updates
|
||||
if dead {
|
||||
// clear driver handle if it was successfully restored on
|
||||
// already dead task
|
||||
tr.clearDriverHandle()
|
||||
if err := tr.stop(); err != nil {
|
||||
tr.logger.Error("stop failed on terminal task", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Updates are handled asynchronously with the other hooks but each
|
||||
// triggered update - whether due to alloc updates or a new vault token
|
||||
// - should be handled serially.
|
||||
|
@ -899,7 +915,7 @@ func (tr *TaskRunner) Restore() error {
|
|||
}
|
||||
|
||||
alloc := tr.Alloc()
|
||||
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
|
||||
if tr.state.State == structs.TaskStateDead || alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue