Merge pull request #6843 from hashicorp/lifecycle

Task Lifecycle
This commit is contained in:
Mahmood Ali 2020-03-21 19:15:09 -04:00 committed by GitHub
commit 2bb9eb4e78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1300 additions and 102 deletions

View file

@ -609,11 +609,26 @@ type DispatchPayloadConfig struct {
File string
}
const (
TaskLifecycleHookPrestart = "prestart"
)
type TaskLifecycle struct {
Hook string `mapstructure:"hook"`
Sidecar bool `mapstructure:"sidecar"`
}
// Determine if lifecycle has user-input values
func (l *TaskLifecycle) Empty() bool {
return l == nil || (l.Hook == "")
}
// Task is a single process in a task group.
type Task struct {
Name string
Driver string
User string
Lifecycle *TaskLifecycle
Config map[string]interface{}
Constraints []*Constraint
Affinities []*Affinity
@ -665,6 +680,9 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
for _, vm := range t.VolumeMounts {
vm.Canonicalize()
}
if t.Lifecycle.Empty() {
t.Lifecycle = nil
}
}
// TaskArtifact is used to download artifacts before running a task.

View file

@ -376,6 +376,36 @@ func TestTask_VolumeMount(t *testing.T) {
require.Equal(t, *vm.PropagationMode, "private")
}
func TestTask_Canonicalize_TaskLifecycle(t *testing.T) {
testCases := []struct {
name string
expected *TaskLifecycle
task *Task
}{
{
name: "empty",
task: &Task{
Lifecycle: &TaskLifecycle{},
},
expected: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tg := &TaskGroup{
Name: stringToPtr("foo"),
}
j := &Job{
ID: stringToPtr("test"),
}
tc.task.Canonicalize(tg, j)
require.Equal(t, tc.expected, tc.task.Lifecycle)
})
}
}
// Ensures no regression on https://github.com/hashicorp/nomad/issues/3132
func TestTaskGroup_Canonicalize_Update(t *testing.T) {
// Job with an Empty() Update

View file

@ -146,6 +146,8 @@ type allocRunner struct {
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}
taskHookCoordinator *taskHookCoordinator
}
// NewAllocRunner returns a new allocation runner.
@ -190,6 +192,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
// Create alloc dir
ar.allocDir = allocdir.NewAllocDir(ar.logger, filepath.Join(config.ClientConfig.AllocDir, alloc.ID))
ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks)
// Initialize the runners hooks.
if err := ar.initRunnerHooks(config.ClientConfig); err != nil {
return nil, err
@ -207,20 +211,21 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
for _, task := range tasks {
config := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
ConsulSI: ar.sidsClient,
Vault: ar.vaultClient,
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
ConsulSI: ar.sidsClient,
Vault: ar.vaultClient,
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
}
// Create, but do not Run, the task runner
@ -357,13 +362,18 @@ func (ar *allocRunner) Restore() error {
ar.state.DeploymentStatus = ds
ar.stateLock.Unlock()
states := make(map[string]*structs.TaskState)
// Restore task runners
for _, tr := range ar.tasks {
if err := tr.Restore(); err != nil {
return err
}
states[tr.Task().Name] = tr.TaskState()
}
ar.taskHookCoordinator.taskStateUpdated(states)
return nil
}
@ -488,6 +498,8 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
}
ar.taskHookCoordinator.taskStateUpdated(states)
// Get the client allocation
calloc := ar.clientAlloc(states)

View file

@ -423,6 +423,7 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
// Wait for tasks to be stopped because leader is dead
testutil.WaitForResult(func() (bool, error) {
alloc := ar2.Alloc()
// TODO: this test does not test anything!!! alloc.TaskStates is an empty map
for task, state := range alloc.TaskStates {
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("Task %q should be dead: %v", task, state.State)
@ -444,6 +445,44 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
}
}
func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) {
t.Parallel()
alloc := mock.LifecycleAlloc()
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
// Use a memory backed statedb
conf.StateDB = state.NewMemDB(conf.Logger)
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
// We should see all tasks with Prestart hooks are not blocked from running:
// i.e. the "init" and "side" task hook coordinator channels are closed
require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["init"].Task())), "init channel was open, should be closed")
require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task())), "side channel was open, should be closed")
isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task()))
// Mimic client dies while init task running, and client restarts after init task finished
ar.tasks["init"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskTerminated))
ar.tasks["side"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
// Create a new AllocRunner to test RestoreState and Run
ar2, err := NewAllocRunner(conf)
require.NoError(t, err)
if err := ar2.Restore(); err != nil {
t.Fatalf("error restoring state: %v", err)
}
// We want to see Restore resume execution with correct hook ordering:
// i.e. we should see the "web" main task hook coordinator channel is closed
require.Truef(t, isChannelClosed(ar2.taskHookCoordinator.startConditionForTask(ar.tasks["web"].Task())), "web channel was open, should be closed")
}
func TestAllocRunner_Update_Semantics(t *testing.T) {
t.Parallel()
require := require.New(t)

View file

@ -0,0 +1,110 @@
package allocrunner
import (
"context"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskHookCoordinator helps coordinate when main start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
logger hclog.Logger
closedCh chan struct{}
mainTaskCtx context.Context
mainTaskCtxCancel func()
prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
}
func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
closedCh := make(chan struct{})
close(closedCh)
mainTaskCtx, cancelFn := context.WithCancel(context.Background())
c := &taskHookCoordinator{
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: cancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
}
c.setTasks(tasks)
return c
}
func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
for _, task := range tasks {
if task.Lifecycle == nil {
// move nothing
continue
}
switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
if task.Lifecycle.Sidecar {
c.prestartSidecar[task.Name] = struct{}{}
} else {
c.prestartEphemeral[task.Name] = struct{}{}
}
default:
c.logger.Error("invalid lifecycle hook", "hook", task.Lifecycle.Hook)
}
}
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
}
func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}
func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} {
if task.Lifecycle != nil && task.Lifecycle.Hook == structs.TaskLifecycleHookPrestart {
return c.closedCh
}
return c.mainTaskCtx.Done()
}
// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
if c.mainTaskCtx.Err() != nil {
// nothing to do here
return
}
for task := range c.prestartSidecar {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}
delete(c.prestartSidecar, task)
}
for task := range c.prestartEphemeral {
st := states[task]
if st == nil || !st.Successful() {
continue
}
delete(c.prestartEphemeral, task)
}
// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
}

View file

@ -0,0 +1,232 @@
package allocrunner
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
)
func TestTaskHookCoordinator_OnlyMainApp(t *testing.T) {
alloc := mock.Alloc()
tasks := alloc.Job.TaskGroups[0].Tasks
task := tasks[0]
logger := testlog.HCLogger(t)
coord := newTaskHookCoordinator(logger, tasks)
ch := coord.startConditionForTask(task)
require.Truef(t, isChannelClosed(ch), "%s channel was open, should be closed", task.Name)
}
func TestTaskHookCoordinator_PrestartRunsBeforeMain(t *testing.T) {
logger := testlog.HCLogger(t)
alloc := mock.LifecycleAlloc()
tasks := alloc.Job.TaskGroups[0].Tasks
mainTask := tasks[0]
sideTask := tasks[1]
initTask := tasks[2]
coord := newTaskHookCoordinator(logger, tasks)
initCh := coord.startConditionForTask(initTask)
sideCh := coord.startConditionForTask(sideTask)
mainCh := coord.startConditionForTask(mainTask)
require.Truef(t, isChannelClosed(initCh), "%s channel was open, should be closed", initTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
}
func TestTaskHookCoordinator_MainRunsAfterPrestart(t *testing.T) {
logger := testlog.HCLogger(t)
alloc := mock.LifecycleAlloc()
tasks := alloc.Job.TaskGroups[0].Tasks
mainTask := tasks[0]
sideTask := tasks[1]
initTask := tasks[2]
coord := newTaskHookCoordinator(logger, tasks)
initCh := coord.startConditionForTask(initTask)
sideCh := coord.startConditionForTask(sideTask)
mainCh := coord.startConditionForTask(mainTask)
require.Truef(t, isChannelClosed(initCh), "%s channel was open, should be closed", initTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
states := map[string]*structs.TaskState{
mainTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
initTask.Name: {
State: structs.TaskStateDead,
Failed: false,
StartedAt: time.Now(),
FinishedAt: time.Now(),
},
sideTask.Name: {
State: structs.TaskStateRunning,
Failed: false,
StartedAt: time.Now(),
},
}
coord.taskStateUpdated(states)
require.Truef(t, isChannelClosed(initCh), "%s channel was open, should be closed", initTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Truef(t, isChannelClosed(mainCh), "%s channel was open, should be closed", mainTask.Name)
}
func TestTaskHookCoordinator_MainRunsAfterManyInitTasks(t *testing.T) {
logger := testlog.HCLogger(t)
alloc := mock.LifecycleAlloc()
alloc.Job = mock.VariableLifecycleJob(structs.Resources{CPU: 100, MemoryMB: 256}, 1, 2, 0)
tasks := alloc.Job.TaskGroups[0].Tasks
mainTask := tasks[0]
init1Task := tasks[1]
init2Task := tasks[2]
coord := newTaskHookCoordinator(logger, tasks)
mainCh := coord.startConditionForTask(mainTask)
init1Ch := coord.startConditionForTask(init1Task)
init2Ch := coord.startConditionForTask(init2Task)
require.Truef(t, isChannelClosed(init1Ch), "%s channel was open, should be closed", init1Task.Name)
require.Truef(t, isChannelClosed(init2Ch), "%s channel was open, should be closed", init2Task.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
states := map[string]*structs.TaskState{
mainTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
init1Task.Name: {
State: structs.TaskStateDead,
Failed: false,
StartedAt: time.Now(),
FinishedAt: time.Now(),
},
init2Task.Name: {
State: structs.TaskStateDead,
Failed: false,
StartedAt: time.Now(),
},
}
coord.taskStateUpdated(states)
require.Truef(t, isChannelClosed(init1Ch), "%s channel was open, should be closed", init1Task.Name)
require.Truef(t, isChannelClosed(init2Ch), "%s channel was open, should be closed", init2Task.Name)
require.Truef(t, isChannelClosed(mainCh), "%s channel was open, should be closed", mainTask.Name)
}
func TestTaskHookCoordinator_FailedInitTask(t *testing.T) {
logger := testlog.HCLogger(t)
alloc := mock.LifecycleAlloc()
alloc.Job = mock.VariableLifecycleJob(structs.Resources{CPU: 100, MemoryMB: 256}, 1, 2, 0)
tasks := alloc.Job.TaskGroups[0].Tasks
mainTask := tasks[0]
init1Task := tasks[1]
init2Task := tasks[2]
coord := newTaskHookCoordinator(logger, tasks)
mainCh := coord.startConditionForTask(mainTask)
init1Ch := coord.startConditionForTask(init1Task)
init2Ch := coord.startConditionForTask(init2Task)
require.Truef(t, isChannelClosed(init1Ch), "%s channel was open, should be closed", init1Task.Name)
require.Truef(t, isChannelClosed(init2Ch), "%s channel was open, should be closed", init2Task.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
states := map[string]*structs.TaskState{
mainTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
init1Task.Name: {
State: structs.TaskStateDead,
Failed: false,
StartedAt: time.Now(),
FinishedAt: time.Now(),
},
init2Task.Name: {
State: structs.TaskStateDead,
Failed: true,
StartedAt: time.Now(),
},
}
coord.taskStateUpdated(states)
require.Truef(t, isChannelClosed(init1Ch), "%s channel was open, should be closed", init1Task.Name)
require.Truef(t, isChannelClosed(init2Ch), "%s channel was open, should be closed", init2Task.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
}
func TestTaskHookCoordinator_SidecarNeverStarts(t *testing.T) {
logger := testlog.HCLogger(t)
alloc := mock.LifecycleAlloc()
tasks := alloc.Job.TaskGroups[0].Tasks
mainTask := tasks[0]
sideTask := tasks[1]
initTask := tasks[2]
coord := newTaskHookCoordinator(logger, tasks)
initCh := coord.startConditionForTask(initTask)
sideCh := coord.startConditionForTask(sideTask)
mainCh := coord.startConditionForTask(mainTask)
require.Truef(t, isChannelClosed(initCh), "%s channel was open, should be closed", initTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
states := map[string]*structs.TaskState{
mainTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
initTask.Name: {
State: structs.TaskStateDead,
Failed: false,
StartedAt: time.Now(),
FinishedAt: time.Now(),
},
sideTask.Name: {
State: structs.TaskStatePending,
Failed: false,
},
}
coord.taskStateUpdated(states)
require.Truef(t, isChannelClosed(initCh), "%s channel was open, should be closed", initTask.Name)
require.Truef(t, isChannelClosed(sideCh), "%s channel was open, should be closed", sideTask.Name)
require.Falsef(t, isChannelClosed(mainCh), "%s channel was closed, should be open", mainTask.Name)
}
func isChannelClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

View file

@ -20,11 +20,12 @@ const (
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)
func NewRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTracker {
onSuccess := true
if jobType == structs.JobTypeBatch {
onSuccess = false
func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
onSuccess := jobType != structs.JobTypeBatch
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
onSuccess = tlc.Sidecar
}
return &RestartTracker{
startTime: time.Now(),
onSuccess: onSuccess,

View file

@ -5,6 +5,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -34,7 +36,7 @@ func testExitResult(exit int) *drivers.ExitResult {
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := NewRestartTracker(p, structs.JobTypeService)
rt := NewRestartTracker(p, structs.JobTypeService, nil)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetExitResult(testExitResult(127)).GetState()
if state != structs.TaskRestarting {
@ -60,7 +62,7 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) {
func TestClient_RestartTracker_ModeFail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := NewRestartTracker(p, structs.JobTypeSystem)
rt := NewRestartTracker(p, structs.JobTypeSystem, nil)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetExitResult(testExitResult(127)).GetState()
if state != structs.TaskRestarting {
@ -80,7 +82,7 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := NewRestartTracker(p, structs.JobTypeBatch)
rt := NewRestartTracker(p, structs.JobTypeBatch, nil)
if state, _ := rt.SetExitResult(testExitResult(0)).GetState(); state != structs.TaskTerminated {
t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated)
}
@ -92,28 +94,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
p.Attempts = 0
// Test with a non-zero exit code
rt := NewRestartTracker(p, structs.JobTypeService)
rt := NewRestartTracker(p, structs.JobTypeService, nil)
if state, when := rt.SetExitResult(testExitResult(1)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
}
// Even with a zero (successful) exit code non-batch jobs should exit
// with TaskNotRestarting
rt = NewRestartTracker(p, structs.JobTypeService)
rt = NewRestartTracker(p, structs.JobTypeService, nil)
if state, when := rt.SetExitResult(testExitResult(0)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
}
// Batch jobs with a zero exit code and 0 attempts *do* exit cleanly
// with Terminated
rt = NewRestartTracker(p, structs.JobTypeBatch)
rt = NewRestartTracker(p, structs.JobTypeBatch, nil)
if state, when := rt.SetExitResult(testExitResult(0)).GetState(); state != structs.TaskTerminated {
t.Fatalf("expect terminated, got restart/delay: %v/%v", state, when)
}
// Batch jobs with a non-zero exit code and 0 attempts exit with
// TaskNotRestarting
rt = NewRestartTracker(p, structs.JobTypeBatch)
rt = NewRestartTracker(p, structs.JobTypeBatch, nil)
if state, when := rt.SetExitResult(testExitResult(1)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("expect no restart, got restart/delay: %v/%v", state, when)
}
@ -123,7 +125,7 @@ func TestClient_RestartTracker_TaskKilled(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := NewRestartTracker(p, structs.JobTypeService)
rt := NewRestartTracker(p, structs.JobTypeService, nil)
if state, when := rt.SetKilled().GetState(); state != structs.TaskKilled && when != 0 {
t.Fatalf("expect no restart; got %v %v", state, when)
}
@ -133,7 +135,7 @@ func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := NewRestartTracker(p, structs.JobTypeService)
rt := NewRestartTracker(p, structs.JobTypeService, nil)
if state, when := rt.SetRestartTriggered(false).GetState(); state != structs.TaskRestarting && when != 0 {
t.Fatalf("expect restart immediately, got %v %v", state, when)
}
@ -143,7 +145,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 1
rt := NewRestartTracker(p, structs.JobTypeService)
rt := NewRestartTracker(p, structs.JobTypeService, nil)
if state, when := rt.SetRestartTriggered(true).GetState(); state != structs.TaskRestarting || when == 0 {
t.Fatalf("expect restart got %v %v", state, when)
}
@ -155,7 +157,7 @@ func TestClient_RestartTracker_RestartTriggered_Failure(t *testing.T) {
func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := NewRestartTracker(p, structs.JobTypeSystem)
rt := NewRestartTracker(p, structs.JobTypeSystem, nil)
recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetStartError(recErr).GetState()
@ -176,7 +178,7 @@ func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
func TestClient_RestartTracker_StartError_Recoverable_Delay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := NewRestartTracker(p, structs.JobTypeSystem)
rt := NewRestartTracker(p, structs.JobTypeSystem, nil)
recErr := structs.NewRecoverableError(fmt.Errorf("foo"), true)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetStartError(recErr).GetState()
@ -197,3 +199,118 @@ func TestClient_RestartTracker_StartError_Recoverable_Delay(t *testing.T) {
t.Fatalf("NextRestart() returned %v; want > %v and <= %v", when, p.Delay, p.Interval)
}
}
func TestClient_RestartTracker_Lifecycle(t *testing.T) {
t.Parallel()
testCase := []struct {
name string
taskLifecycleConfig *structs.TaskLifecycleConfig
jobType string
shouldRestartOnSuccess bool
shouldRestartOnFailure bool
}{
{
name: "system job no lifecycle",
taskLifecycleConfig: nil,
jobType: structs.JobTypeSystem,
shouldRestartOnSuccess: true,
shouldRestartOnFailure: true,
},
{
name: "service job no lifecycle",
taskLifecycleConfig: nil,
jobType: structs.JobTypeService,
shouldRestartOnSuccess: true,
shouldRestartOnFailure: true,
},
{
name: "batch job no lifecycle",
taskLifecycleConfig: nil,
jobType: structs.JobTypeBatch,
shouldRestartOnSuccess: false,
shouldRestartOnFailure: true,
},
{
name: "system job w/ ephemeral prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
jobType: structs.JobTypeSystem,
shouldRestartOnSuccess: false,
shouldRestartOnFailure: true,
},
{
name: "system job w/ sidecar prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
jobType: structs.JobTypeSystem,
shouldRestartOnSuccess: true,
shouldRestartOnFailure: true,
},
{
name: "service job w/ ephemeral prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
jobType: structs.JobTypeService,
shouldRestartOnSuccess: false,
shouldRestartOnFailure: true,
},
{
name: "service job w/ sidecar prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
jobType: structs.JobTypeService,
shouldRestartOnSuccess: true,
shouldRestartOnFailure: true,
},
{
name: "batch job w/ ephemeral prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
jobType: structs.JobTypeService,
shouldRestartOnSuccess: false,
shouldRestartOnFailure: true,
},
{
name: "batch job w/ sidecar prestart hook",
taskLifecycleConfig: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
jobType: structs.JobTypeBatch,
shouldRestartOnSuccess: true,
shouldRestartOnFailure: true,
},
}
for _, testCase := range testCase {
t.Run(testCase.name, func(t *testing.T) {
restartPolicy := testPolicy(true, testCase.jobType)
restartTracker := NewRestartTracker(restartPolicy, testCase.jobType, testCase.taskLifecycleConfig)
state, _ := restartTracker.SetExitResult(testExitResult(0)).GetState()
if !testCase.shouldRestartOnSuccess {
require.Equal(t, structs.TaskTerminated, state)
} else {
require.Equal(t, structs.TaskRestarting, state)
}
state, _ = restartTracker.SetExitResult(testExitResult(127)).GetState()
if !testCase.shouldRestartOnFailure {
require.Equal(t, structs.TaskTerminated, state)
} else {
require.Equal(t, structs.TaskRestarting, state)
}
})
}
}

View file

@ -202,6 +202,9 @@ type TaskRunner struct {
// GetClientAllocs has been called in case of a failed restore.
serversContactedCh <-chan struct{}
// startConditionMetCtx is done when TR should start the task
startConditionMetCtx <-chan struct{}
// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
@ -247,6 +250,9 @@ type Config struct {
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
// startConditionMetCtx is done when TR should start the task
StartConditionMetCtx <-chan struct{}
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
@ -271,32 +277,33 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
}
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
consulClient: config.Consul,
siClient: config.ConsulSI,
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
consulClient: config.Consul,
siClient: config.ConsulSI,
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
}
// Create the logger based on the allocation ID
@ -320,7 +327,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr.logger.Error("alloc missing task group")
return nil, fmt.Errorf("alloc missing task group")
}
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type)
tr.restartTracker = restarts.NewRestartTracker(tg.RestartPolicy, tr.alloc.Job.Type, config.Task.Lifecycle)
// Get the driver
if err := tr.initDriver(); err != nil {
@ -454,6 +461,14 @@ func (tr *TaskRunner) Run() {
}
}
select {
case <-tr.startConditionMetCtx:
// yay proceed
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
}
MAIN:
for !tr.Alloc().TerminalStatus() {
select {

View file

@ -96,20 +96,26 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
cleanup()
}
// Create a closed channel to mock TaskHookCoordinator.startConditionForTask.
// Closed channel indicates this task is not blocked on prestart hooks.
closedCh := make(chan struct{})
close(closedCh)
conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
Alloc: alloc,
ClientConfig: clientConf,
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
ConsulSI: consulapi.NewMockServiceIdentitiesClient(),
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
StartConditionMetCtx: closedCh,
}
return conf, trCleanup
}

View file

@ -143,19 +143,25 @@ func TestConsul_Integration(t *testing.T) {
close(consulRan)
}()
// Create a closed channel to mock TaskHookCoordinator.startConditionForTask.
// Closed channel indicates this task is not blocked on prestart hooks.
closedCh := make(chan struct{})
close(closedCh)
// Build the config
config := &taskrunner.Config{
Alloc: alloc,
ClientConfig: conf,
Consul: serviceClient,
Task: task,
TaskDir: taskDir,
Logger: logger,
Vault: vclient,
StateDB: state.NoopDB{},
StateUpdater: logUpdate,
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc,
ClientConfig: conf,
Consul: serviceClient,
Task: task,
TaskDir: taskDir,
Logger: logger,
Vault: vclient,
StateDB: state.NoopDB{},
StateUpdater: logUpdate,
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
StartConditionMetCtx: closedCh,
}
tr, err := taskrunner.NewTaskRunner(config)

View file

@ -924,6 +924,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
File: apiTask.DispatchPayload.File,
}
}
if apiTask.Lifecycle != nil {
structsTask.Lifecycle = &structs.TaskLifecycleConfig{
Hook: apiTask.Lifecycle.Hook,
Sidecar: apiTask.Lifecycle.Sidecar,
}
}
}
func ApiResourcesToStructs(in *api.Resources) *structs.Resources {

View file

@ -184,16 +184,20 @@ func (c *AllocStatusCommand) Run(args []string) int {
}
// Format the allocation data
output, err := formatAllocBasicInfo(alloc, client, length, verbose)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
c.Ui.Output(output)
if short {
c.Ui.Output(formatAllocShortInfo(alloc, client))
} else {
output, err := formatAllocBasicInfo(alloc, client, length, verbose)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
c.Ui.Output(output)
if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
c.Ui.Output("")
c.Ui.Output(formatAllocNetworkInfo(alloc))
if alloc.AllocatedResources != nil && len(alloc.AllocatedResources.Shared.Networks) > 0 && alloc.AllocatedResources.Shared.Networks[0].HasPorts() {
c.Ui.Output("")
c.Ui.Output(formatAllocNetworkInfo(alloc))
}
}
if short {
@ -222,6 +226,20 @@ func (c *AllocStatusCommand) Run(args []string) int {
return 0
}
func formatAllocShortInfo(alloc *api.Allocation, client *api.Client) string {
formattedCreateTime := prettyTimeDiff(time.Unix(0, alloc.CreateTime), time.Now())
formattedModifyTime := prettyTimeDiff(time.Unix(0, alloc.ModifyTime), time.Now())
basic := []string{
fmt.Sprintf("ID|%s", alloc.ID),
fmt.Sprintf("Name|%s", alloc.Name),
fmt.Sprintf("Created|%s", formattedCreateTime),
fmt.Sprintf("Modified|%s", formattedModifyTime),
}
return formatKV(basic)
}
func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength int, verbose bool) (string, error) {
var formattedCreateTime, formattedModifyTime string
@ -649,7 +667,21 @@ func (c *AllocStatusCommand) outputVerboseResourceUsage(task string, resourceUsa
// shortTaskStatus prints out the current state of each task.
func (c *AllocStatusCommand) shortTaskStatus(alloc *api.Allocation) {
tasks := make([]string, 0, len(alloc.TaskStates)+1)
tasks = append(tasks, "Name|State|Last Event|Time")
tasks = append(tasks, "Name|State|Last Event|Time|Lifecycle")
taskLifecycles := map[string]string{}
for _, t := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks {
lc := "main"
if t.Lifecycle != nil {
sidecar := ""
if t.Lifecycle.Sidecar {
sidecar = "sidecar"
}
lc = fmt.Sprintf("%s %s", t.Lifecycle.Hook, sidecar)
}
taskLifecycles[t.Name] = lc
}
for task := range c.sortedTaskStateIterator(alloc.TaskStates) {
state := alloc.TaskStates[task]
lastState := state.State
@ -662,8 +694,8 @@ func (c *AllocStatusCommand) shortTaskStatus(alloc *api.Allocation) {
lastTime = formatUnixNanoTime(last.Time)
}
tasks = append(tasks, fmt.Sprintf("%s|%s|%s|%s",
task, lastState, lastEvent, lastTime))
tasks = append(tasks, fmt.Sprintf("%s|%s|%s|%s|%s",
task, lastState, lastEvent, lastTime, taskLifecycles[task]))
}
c.Ui.Output(c.Colorize().Color("\n[bold]Tasks[reset]"))

View file

@ -58,6 +58,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
"constraint",
"affinity",
"dispatch_payload",
"lifecycle",
"driver",
"env",
"kill_timeout",
@ -87,6 +88,7 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
delete(m, "constraint")
delete(m, "affinity")
delete(m, "dispatch_payload")
delete(m, "lifecycle")
delete(m, "env")
delete(m, "logs")
delete(m, "meta")
@ -275,6 +277,33 @@ func parseTask(item *ast.ObjectItem) (*api.Task, error) {
}
}
// If we have a lifecycle block parse that
if o := listVal.Filter("lifecycle"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return nil, fmt.Errorf("only one lifecycle block is allowed in a task. Number of lifecycle blocks found: %d", len(o.Items))
}
var m map[string]interface{}
lifecycleBlock := o.Items[0]
// Check for invalid keys
valid := []string{
"hook",
"sidecar",
}
if err := helper.CheckHCLKeys(lifecycleBlock.Val, valid); err != nil {
return nil, multierror.Prefix(err, "lifecycle ->")
}
if err := hcl.DecodeObject(&m, lifecycleBlock.Val); err != nil {
return nil, err
}
t.Lifecycle = &api.TaskLifecycle{}
if err := mapstructure.WeakDecode(m, t.Lifecycle); err != nil {
return nil, err
}
}
return &t, nil
}

View file

@ -339,6 +339,10 @@ func TestParse(t *testing.T) {
Name: "storagelocker",
Driver: "docker",
User: "",
Lifecycle: &api.TaskLifecycle{
Hook: "prestart",
Sidecar: true,
},
Config: map[string]interface{}{
"image": "hashicorp/storagelocker",
},

View file

@ -292,6 +292,11 @@ job "binstore-storagelocker" {
task "storagelocker" {
driver = "docker"
lifecycle {
hook = "prestart"
sidecar = true
}
config {
image = "hashicorp/storagelocker"
}

View file

@ -270,6 +270,246 @@ func Job() *structs.Job {
return job
}
func LifecycleSideTask(resources structs.Resources, i int) *structs.Task {
return &structs.Task{
Name: fmt.Sprintf("side-%d", i),
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &resources,
}
}
func LifecycleInitTask(resources structs.Resources, i int) *structs.Task {
return &structs.Task{
Name: fmt.Sprintf("init-%d", i),
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &resources,
}
}
func LifecycleMainTask(resources structs.Resources, i int) *structs.Task {
return &structs.Task{
Name: fmt.Sprintf("main-%d", i),
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &resources,
}
}
func VariableLifecycleJob(resources structs.Resources, main int, init int, side int) *structs.Job {
tasks := []*structs.Task{}
for i := 0; i < main; i++ {
tasks = append(tasks, LifecycleMainTask(resources, i))
}
for i := 0; i < init; i++ {
tasks = append(tasks, LifecycleInitTask(resources, i))
}
for i := 0; i < side; i++ {
tasks = append(tasks, LifecycleSideTask(resources, i))
}
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
{
Name: "web",
Count: 1,
Tasks: tasks,
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}
func LifecycleJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeBatch,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{
{
Name: "web",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 0,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeFail,
},
Tasks: []*structs.Task{
{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
{
Name: "side",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
{
Name: "init",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "1s",
},
Lifecycle: &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: false,
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
},
},
},
},
},
Meta: map[string]string{
"owner": "armon",
},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}
func LifecycleAlloc() *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: "12345678-abcd-efab-cdef-123456789abc",
Namespace: structs.DefaultNamespace,
TaskGroup: "web",
// TODO Remove once clientv2 gets merged
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
TaskResources: map[string]*structs.Resources{
"web": {
CPU: 1000,
MemoryMB: 256,
},
"init": {
CPU: 1000,
MemoryMB: 256,
},
"side": {
CPU: 1000,
MemoryMB: 256,
},
},
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
"init": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
"side": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1000,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 256,
},
},
},
},
Job: LifecycleJob(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
alloc.JobID = alloc.Job.ID
return alloc
}
func MaxParallelJob() *structs.Job {
update := *structs.DefaultUpdateStrategy
update.MaxParallel = 0

View file

@ -25,8 +25,8 @@ import (
"github.com/gorhill/cronexpr"
hcodec "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper"
@ -2937,7 +2937,8 @@ func (n *NodeReservedNetworkResources) ParseReservedHostPorts() ([]uint64, error
// AllocatedResources is the set of resources to be used by an allocation.
type AllocatedResources struct {
// Tasks is a mapping of task name to the resources for the task.
Tasks map[string]*AllocatedTaskResources
Tasks map[string]*AllocatedTaskResources
TaskLifecycles map[string]*TaskLifecycleConfig
// Shared is the set of resource that are shared by all tasks in the group.
Shared AllocatedSharedResources
@ -2958,6 +2959,13 @@ func (a *AllocatedResources) Copy() *AllocatedResources {
out.Tasks[task] = resource.Copy()
}
}
if a.TaskLifecycles != nil {
out.TaskLifecycles = make(map[string]*TaskLifecycleConfig, len(out.TaskLifecycles))
for task, lifecycle := range a.TaskLifecycles {
out.TaskLifecycles[task] = lifecycle.Copy()
}
}
return &out
}
@ -2972,9 +2980,29 @@ func (a *AllocatedResources) Comparable() *ComparableResources {
c := &ComparableResources{
Shared: a.Shared,
}
for _, r := range a.Tasks {
c.Flattened.Add(r)
prestartSidecarTasks := &AllocatedTaskResources{}
prestartEphemeralTasks := &AllocatedTaskResources{}
main := &AllocatedTaskResources{}
for taskName, r := range a.Tasks {
lc := a.TaskLifecycles[taskName]
if lc == nil {
main.Add(r)
} else if lc.Hook == TaskLifecycleHookPrestart {
if lc.Sidecar {
prestartSidecarTasks.Add(r)
} else {
prestartEphemeralTasks.Add(r)
}
}
}
// update this loop to account for lifecycle hook
prestartEphemeralTasks.Max(main)
prestartSidecarTasks.Add(prestartEphemeralTasks)
c.Flattened.Add(prestartSidecarTasks)
// Add network resources that are at the task group level
for _, network := range a.Shared.Networks {
c.Flattened.Add(&AllocatedTaskResources{
@ -3063,6 +3091,35 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) {
}
}
func (a *AllocatedTaskResources) Max(other *AllocatedTaskResources) {
if other == nil {
return
}
a.Cpu.Max(&other.Cpu)
a.Memory.Max(&other.Memory)
for _, n := range other.Networks {
// Find the matching interface by IP or CIDR
idx := a.NetIndex(n)
if idx == -1 {
a.Networks = append(a.Networks, n.Copy())
} else {
a.Networks[idx].Add(n)
}
}
for _, d := range other.Devices {
// Find the matching device
idx := AllocatedDevices(a.Devices).Index(d)
if idx == -1 {
a.Devices = append(a.Devices, d.Copy())
} else {
a.Devices[idx].Add(d)
}
}
}
// Comparable turns AllocatedTaskResources into ComparableResources
// as a helper step in preemption
func (a *AllocatedTaskResources) Comparable() *ComparableResources {
@ -3157,6 +3214,16 @@ func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) {
a.CpuShares -= delta.CpuShares
}
func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) {
if other == nil {
return
}
if other.CpuShares > a.CpuShares {
a.CpuShares = other.CpuShares
}
}
// AllocatedMemoryResources captures the allocated memory resources.
type AllocatedMemoryResources struct {
MemoryMB int64
@ -3178,6 +3245,16 @@ func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) {
a.MemoryMB -= delta.MemoryMB
}
func (a *AllocatedMemoryResources) Max(other *AllocatedMemoryResources) {
if other == nil {
return
}
if other.MemoryMB > a.MemoryMB {
a.MemoryMB = other.MemoryMB
}
}
type AllocatedDevices []*AllocatedDeviceResource
// Index finds the matching index using the passed device. If not found, -1 is
@ -4385,6 +4462,40 @@ func (d *DispatchPayloadConfig) Validate() error {
return nil
}
const (
TaskLifecycleHookPrestart = "prestart"
)
type TaskLifecycleConfig struct {
Hook string
Sidecar bool
}
func (d *TaskLifecycleConfig) Copy() *TaskLifecycleConfig {
if d == nil {
return nil
}
nd := new(TaskLifecycleConfig)
*nd = *d
return nd
}
func (d *TaskLifecycleConfig) Validate() error {
if d == nil {
return nil
}
switch d.Hook {
case TaskLifecycleHookPrestart:
case "":
return fmt.Errorf("no lifecycle hook provided")
default:
return fmt.Errorf("invalid hook: %v", d.Hook)
}
return nil
}
var (
// These default restart policies needs to be in sync with
// Canonicalize in api/tasks.go
@ -5407,6 +5518,8 @@ type Task struct {
// DispatchPayload configures how the task retrieves its input from a dispatch
DispatchPayload *DispatchPayloadConfig
Lifecycle *TaskLifecycleConfig
// Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad.
Meta map[string]string
@ -5486,6 +5599,7 @@ func (t *Task) Copy() *Task {
nt.LogConfig = nt.LogConfig.Copy()
nt.Meta = helper.CopyMapStringString(nt.Meta)
nt.DispatchPayload = nt.DispatchPayload.Copy()
nt.Lifecycle = nt.Lifecycle.Copy()
if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))
@ -5665,6 +5779,14 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk, jobType string, tgServices
}
}
// Validate the Lifecycle block if there
if t.Lifecycle != nil {
if err := t.Lifecycle.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Lifecycle validation failed: %v", err))
}
}
// Validation for TaskKind field which is used for Consul Connect integration
if t.Kind.IsConnectProxy() {
// This task is a Connect proxy so it should not have service stanzas

View file

@ -2867,6 +2867,51 @@ func TestPeriodicConfig_DST(t *testing.T) {
require.Equal(e2, n2.UTC())
}
func TestTaskLifecycleConfig_Validate(t *testing.T) {
testCases := []struct {
name string
tlc *TaskLifecycleConfig
err error
}{
{
name: "prestart completed",
tlc: &TaskLifecycleConfig{
Hook: "prestart",
Sidecar: false,
},
err: nil,
},
{
name: "prestart running",
tlc: &TaskLifecycleConfig{
Hook: "prestart",
Sidecar: true,
},
err: nil,
},
{
name: "no hook",
tlc: &TaskLifecycleConfig{
Sidecar: true,
},
err: fmt.Errorf("no lifecycle hook provided"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.tlc.Validate()
if tc.err != nil {
require.Error(t, err)
require.Contains(t, err.Error(), tc.err.Error())
} else {
require.Nil(t, err)
}
})
}
}
func TestRestartPolicy_Validate(t *testing.T) {
// Policy with acceptable restart options passes
p := &RestartPolicy{

View file

@ -484,7 +484,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
Tasks: option.TaskResources,
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(tg.EphemeralDisk.SizeMB),
},

View file

@ -4360,6 +4360,124 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestGenericSched_AllocFit(t *testing.T) {
testCases := []struct {
Name string
NodeCpu int64
TaskResources structs.Resources
MainTaskCount int
InitTaskCount int
SideTaskCount int
ShouldPlaceAlloc bool
}{
{
Name: "simple init + sidecar",
NodeCpu: 1200,
TaskResources: structs.Resources{
CPU: 500,
MemoryMB: 256,
},
MainTaskCount: 1,
InitTaskCount: 1,
SideTaskCount: 1,
ShouldPlaceAlloc: true,
},
{
Name: "too big init + sidecar",
NodeCpu: 1200,
TaskResources: structs.Resources{
CPU: 700,
MemoryMB: 256,
},
MainTaskCount: 1,
InitTaskCount: 1,
SideTaskCount: 1,
ShouldPlaceAlloc: false,
},
{
Name: "many init + sidecar",
NodeCpu: 1200,
TaskResources: structs.Resources{
CPU: 100,
MemoryMB: 100,
},
MainTaskCount: 3,
InitTaskCount: 5,
SideTaskCount: 5,
ShouldPlaceAlloc: true,
},
{
Name: "too many init + sidecar",
NodeCpu: 1200,
TaskResources: structs.Resources{
CPU: 100,
MemoryMB: 100,
},
MainTaskCount: 10,
InitTaskCount: 10,
SideTaskCount: 10,
ShouldPlaceAlloc: false,
},
{
Name: "too many too big",
NodeCpu: 1200,
TaskResources: structs.Resources{
CPU: 1000,
MemoryMB: 100,
},
MainTaskCount: 10,
InitTaskCount: 10,
SideTaskCount: 10,
ShouldPlaceAlloc: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
h := NewHarness(t)
node := mock.Node()
node.NodeResources.Cpu.CpuShares = testCase.NodeCpu
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node))
// Create a job with sidecar & init tasks
job := mock.VariableLifecycleJob(testCase.TaskResources, testCase.MainTaskCount, testCase.InitTaskCount, testCase.SideTaskCount)
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
allocs := 0
if testCase.ShouldPlaceAlloc {
allocs = 1
}
// Ensure no plan as it should be a no-op
require.Len(t, h.Plans, allocs)
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure no allocations placed
require.Len(t, out, allocs)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
})
}
}
func TestGenericSched_ChainedAlloc(t *testing.T) {
h := NewHarness(t)

View file

@ -21,6 +21,7 @@ type RankedNode struct {
FinalScore float64
Scores []float64
TaskResources map[string]*structs.AllocatedTaskResources
TaskLifecycles map[string]*structs.TaskLifecycleConfig
AllocResources *structs.AllocatedSharedResources
// Allocs is used to cache the proposed allocations on the
@ -53,8 +54,10 @@ func (r *RankedNode) SetTaskResources(task *structs.Task,
resource *structs.AllocatedTaskResources) {
if r.TaskResources == nil {
r.TaskResources = make(map[string]*structs.AllocatedTaskResources)
r.TaskLifecycles = make(map[string]*structs.TaskLifecycleConfig)
}
r.TaskResources[task.Name] = resource
r.TaskLifecycles[task.Name] = task.Lifecycle
}
// RankFeasibleIterator is used to iteratively yield nodes along
@ -206,6 +209,8 @@ OUTER:
total := &structs.AllocatedResources{
Tasks: make(map[string]*structs.AllocatedTaskResources,
len(iter.taskGroup.Tasks)),
TaskLifecycles: make(map[string]*structs.TaskLifecycleConfig,
len(iter.taskGroup.Tasks)),
Shared: structs.AllocatedSharedResources{
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
},
@ -391,6 +396,7 @@ OUTER:
// Accumulate the total resource requirement
total.Tasks[task.Name] = taskResources
total.TaskLifecycles[task.Name] = task.Lifecycle
}
// Store current set of running allocs before adding resources for the task group

View file

@ -340,7 +340,8 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Set fields based on if we found an allocation option
resources := &structs.AllocatedResources{
Tasks: option.TaskResources,
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(missing.TaskGroup.EphemeralDisk.SizeMB),
},

View file

@ -637,7 +637,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.AllocatedResources = &structs.AllocatedResources{
Tasks: option.TaskResources,
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(update.TaskGroup.EphemeralDisk.SizeMB),
},
@ -914,7 +915,8 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.AllocatedResources = &structs.AllocatedResources{
Tasks: option.TaskResources,
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(newTG.EphemeralDisk.SizeMB),
},