diff --git a/api/tasks.go b/api/tasks.go index 27bbdca2d..2de8ea591 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -632,6 +632,7 @@ type DispatchPayloadConfig struct { const ( TaskLifecycleHookPrestart = "prestart" + TaskLifecycleHookPoststart = "poststart" ) type TaskLifecycle struct { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index a9d6dcf34..b64990e04 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -142,6 +142,100 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { }) } +// TestAllocRunner_Lifecycle_Poststart asserts that a service job with 2 +// poststart lifecycle hooks (1 sidecar, 1 ephemeral) starts all 3 tasks, only +// the ephemeral one finishes, and the other 2 exit when the alloc is stopped. +func TestAllocRunner_Lifecycle_Poststart(t *testing.T) { + alloc := mock.LifecycleAlloc() + + alloc.Job.Type = structs.JobTypeService + mainTask := alloc.Job.TaskGroups[0].Tasks[0] + mainTask.Config["run_for"] = "100s" + + sidecarTask := alloc.Job.TaskGroups[0].Tasks[1] + sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart + sidecarTask.Config["run_for"] = "100s" + + ephemeralTask := alloc.Job.TaskGroups[0].Tasks[2] + ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + upd := conf.StateUpdater.(*MockStateUpdater) + + // Wait for main and sidecar tasks to be running, and that the + // ephemeral task ran and exited. + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) + } + + if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning { + return false, fmt.Errorf("expected main task to be running not %s", s) + } + + if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning { + return false, fmt.Errorf("expected sidecar task to be running not %s", s) + } + + if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected ephemeral task to be dead not %s", s) + } + + if last.TaskStates[ephemeralTask.Name].Failed { + return false, fmt.Errorf("expected ephemeral task to be successful not failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for initial state:\n%v", err) + }) + + // Tell the alloc to stop + stopAlloc := alloc.Copy() + stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(stopAlloc) + + // Wait for main and sidecar tasks to stop. + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) + } + + if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected main task to be dead not %s", s) + } + + if last.TaskStates[mainTask.Name].Failed { + return false, fmt.Errorf("expected main task to be successful not failed") + } + + if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected sidecar task to be dead not %s", s) + } + + if last.TaskStates[sidecarTask.Name].Failed { + return false, fmt.Errorf("expected sidecar task to be successful not failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for initial state:\n%v", err) + }) +} + // TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the // entire task group is killed. func TestAllocRunner_TaskMain_KillTG(t *testing.T) { @@ -152,20 +246,34 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) { alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 - // Create three tasks in the task group - sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy() - sidecar.Name = "sidecar" - sidecar.Driver = "mock_driver" - sidecar.KillTimeout = 10 * time.Millisecond - sidecar.Lifecycle = &structs.TaskLifecycleConfig{ + // Create four tasks in the task group + prestart := alloc.Job.TaskGroups[0].Tasks[0].Copy() + prestart.Name = "prestart-sidecar" + prestart.Driver = "mock_driver" + prestart.KillTimeout = 10 * time.Millisecond + prestart.Lifecycle = &structs.TaskLifecycleConfig{ Hook: structs.TaskLifecycleHookPrestart, Sidecar: true, } - sidecar.Config = map[string]interface{}{ + prestart.Config = map[string]interface{}{ "run_for": "100s", } + poststart := alloc.Job.TaskGroups[0].Tasks[0].Copy() + poststart.Name = "poststart-sidecar" + poststart.Driver = "mock_driver" + poststart.KillTimeout = 10 * time.Millisecond + poststart.Lifecycle = &structs.TaskLifecycleConfig{ + Hook: structs.TaskLifecycleHookPoststart, + Sidecar: true, + } + + poststart.Config = map[string]interface{}{ + "run_for": "100s", + } + + // these two main tasks have the same name, is that ok? main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy() main1.Name = "task2" main1.Driver = "mock_driver" @@ -180,11 +288,12 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) { "run_for": "2s", } - alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2} + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{prestart, poststart, main1, main2} alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{ - sidecar.Name: tr, - main1.Name: tr, - main2.Name: tr, + prestart.Name: tr, + poststart.Name: tr, + main1.Name: tr, + main2.Name: tr, } conf, cleanup := testAllocRunnerConfig(t, alloc) @@ -217,8 +326,30 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) { var state *structs.TaskState - // Task1 should be killed because Task2 exited - state = last.TaskStates[sidecar.Name] + // both sidecars should be killed because Task2 exited + state = last.TaskStates[prestart.Name] + if state == nil { + return false, fmt.Errorf("could not find state for task %s", prestart.Name) + } + if state.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) + } + if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + if len(state.Events) < 2 { + // At least have a received and destroyed + return false, fmt.Errorf("Unexpected number of events") + } + + if !hasTaskMainEvent(state) { + return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events) + } + + state = last.TaskStates[poststart.Name] + if state == nil { + return false, fmt.Errorf("could not find state for task %s", poststart.Name) + } if state.State != structs.TaskStateDead { return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) } diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index 8f6d1c10b..7d69cea9f 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -18,23 +18,31 @@ type taskHookCoordinator struct { mainTaskCtx context.Context mainTaskCtxCancel func() + poststartTaskCtx context.Context + poststartTaskCtxCancel func() + prestartSidecar map[string]struct{} prestartEphemeral map[string]struct{} + mainTasksPending map[string]struct{} } func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator { closedCh := make(chan struct{}) close(closedCh) - mainTaskCtx, cancelFn := context.WithCancel(context.Background()) + mainTaskCtx, mainCancelFn := context.WithCancel(context.Background()) + poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background()) c := &taskHookCoordinator{ - logger: logger, - closedCh: closedCh, - mainTaskCtx: mainTaskCtx, - mainTaskCtxCancel: cancelFn, - prestartSidecar: map[string]struct{}{}, - prestartEphemeral: map[string]struct{}{}, + logger: logger, + closedCh: closedCh, + mainTaskCtx: mainTaskCtx, + mainTaskCtxCancel: mainCancelFn, + prestartSidecar: map[string]struct{}{}, + prestartEphemeral: map[string]struct{}{}, + mainTasksPending: map[string]struct{}{}, + poststartTaskCtx: poststartTaskCtx, + poststartTaskCtxCancel: poststartCancelFn, } c.setTasks(tasks) return c @@ -44,7 +52,7 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) { for _, task := range tasks { if task.Lifecycle == nil { - // move nothing + c.mainTasksPending[task.Name] = struct{}{} continue } @@ -55,9 +63,10 @@ func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) { } else { c.prestartEphemeral[task.Name] = struct{}{} } - + case structs.TaskLifecycleHookPoststart: + // Poststart hooks don't need to be tracked. default: - c.logger.Error("invalid lifecycle hook", "hook", task.Lifecycle.Hook) + c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook) } } @@ -70,22 +79,28 @@ func (c *taskHookCoordinator) hasPrestartTasks() bool { return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0 } +func (c *taskHookCoordinator) hasPendingMainTasks() bool { + return len(c.mainTasksPending) > 0 +} + func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} { - if task.Lifecycle != nil && task.Lifecycle.Hook == structs.TaskLifecycleHookPrestart { - return c.closedCh + if task.Lifecycle == nil { + return c.mainTaskCtx.Done() } - return c.mainTaskCtx.Done() - + switch task.Lifecycle.Hook { + case structs.TaskLifecycleHookPrestart: + // Prestart tasks start without checking status of other tasks + return c.closedCh + case structs.TaskLifecycleHookPoststart: + return c.poststartTaskCtx.Done() + default: + return c.mainTaskCtx.Done() + } } // This is not thread safe! This must only be called from one thread per alloc runner. func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) { - if c.mainTaskCtx.Err() != nil { - // nothing to do here - return - } - for task := range c.prestartSidecar { st := states[task] if st == nil || st.StartedAt.IsZero() { @@ -104,10 +119,23 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt delete(c.prestartEphemeral, task) } + for task := range c.mainTasksPending { + st := states[task] + if st == nil || st.StartedAt.IsZero() { + continue + } + + delete(c.mainTasksPending, task) + } + // everything well if !c.hasPrestartTasks() { c.mainTaskCtxCancel() } + + if !c.hasPendingMainTasks() { + c.poststartTaskCtxCancel() + } } // hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks diff --git a/client/allocrunner/task_hook_coordinator_test.go b/client/allocrunner/task_hook_coordinator_test.go index 91405fbad..e4343d915 100644 --- a/client/allocrunner/task_hook_coordinator_test.go +++ b/client/allocrunner/task_hook_coordinator_test.go @@ -224,6 +224,52 @@ func TestTaskHookCoordinator_SidecarNeverStarts(t *testing.T) { require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name) } +func TestTaskHookCoordinator_PoststartStartsAfterMain(t *testing.T) { + logger := testlog.HCLogger(t) + + alloc := mock.LifecycleAlloc() + tasks := alloc.Job.TaskGroups[0].Tasks + + mainTask := tasks[0] + sideTask := tasks[1] + postTask := tasks[2] + + // Make the the third task a poststart hook + postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart + + coord := newTaskHookCoordinator(logger, tasks) + postCh := coord.startConditionForTask(postTask) + sideCh := coord.startConditionForTask(sideTask) + mainCh := coord.startConditionForTask(mainTask) + + require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name) + require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name) + require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", postTask.Name) + + states := map[string]*structs.TaskState{ + postTask.Name: { + State: structs.TaskStatePending, + Failed: false, + }, + mainTask.Name: { + State: structs.TaskStateRunning, + Failed: false, + StartedAt: time.Now(), + }, + sideTask.Name: { + State: structs.TaskStateRunning, + Failed: false, + StartedAt: time.Now(), + }, + } + + coord.taskStateUpdated(states) + + require.Truef(t, isChannelClosed(postCh), "%s channel was open, should be closed", postTask.Name) + require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name) + require.Truef(t, isChannelClosed(mainCh), "%s channel was open, should be closed", mainTask.Name) +} + func isChannelClosed(ch <-chan struct{}) bool { select { case <-ch: diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index f8aea3e61..6ee0056cc 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -21,11 +21,19 @@ const ( ) func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker { + // Batch jobs should not restart if they exit successfully onSuccess := jobType != structs.JobTypeBatch + + // Prestart sidecars should get restarted on success if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart { onSuccess = tlc.Sidecar } + // Poststart sidecars should get restarted on success + if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPoststart { + onSuccess = tlc.Sidecar + } + return &RestartTracker{ startTime: time.Now(), onSuccess: onSuccess, diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 0b0f024cf..f8d4032d3 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -17,6 +17,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/deployment" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/hostvolumes" + _ "github.com/hashicorp/nomad/e2e/lifecycle" _ "github.com/hashicorp/nomad/e2e/metrics" _ "github.com/hashicorp/nomad/e2e/nomad09upgrade" _ "github.com/hashicorp/nomad/e2e/nomadexec" diff --git a/e2e/lifecycle/inputs/batch.nomad b/e2e/lifecycle/inputs/batch.nomad new file mode 100644 index 000000000..744bcf7b6 --- /dev/null +++ b/e2e/lifecycle/inputs/batch.nomad @@ -0,0 +1,127 @@ +# lifecycle hook test job for batch jobs. touches, removes, and tests +# for the existence of files to assert the order of running tasks. +# all tasks should exit 0 and the alloc dir should contain the following +# files: ./init-ran, ./main-ran, ./poststart-run + +job "batch-lifecycle" { + + datacenters = ["dc1"] + + type = "batch" + + group "test" { + + task "init" { + + lifecycle { + hook = "prestart" + } + + driver = "docker" + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["local/prestart.sh"] + } + + template { + data = <