open-nomad/client/allocrunner/task_hook_coordinator.go

164 lines
4 KiB
Go

package allocrunner
import (
"context"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskHookCoordinator helps coordinate when main start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
logger hclog.Logger
closedCh chan struct{}
mainTaskCtx context.Context
mainTaskCtxCancel func()
poststartTaskCtx context.Context
poststartTaskCtxCancel func()
prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
mainTasksPending map[string]struct{}
}
func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
closedCh := make(chan struct{})
close(closedCh)
mainTaskCtx, mainCancelFn := context.WithCancel(context.Background())
poststartTaskCtx, poststartCancelFn := context.WithCancel(context.Background())
c := &taskHookCoordinator{
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: mainCancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
mainTasksPending: map[string]struct{}{},
poststartTaskCtx: poststartTaskCtx,
poststartTaskCtxCancel: poststartCancelFn,
}
c.setTasks(tasks)
return c
}
func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
for _, task := range tasks {
if task.Lifecycle == nil {
c.mainTasksPending[task.Name] = struct{}{}
continue
}
switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
if task.Lifecycle.Sidecar {
c.prestartSidecar[task.Name] = struct{}{}
} else {
c.prestartEphemeral[task.Name] = struct{}{}
}
case structs.TaskLifecycleHookPoststart:
// Poststart hooks don't need to be tracked.
default:
c.logger.Error("invalid lifecycle hook", "task", task.Name, "hook", task.Lifecycle.Hook)
}
}
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
}
func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}
func (c *taskHookCoordinator) hasPendingMainTasks() bool {
return len(c.mainTasksPending) > 0
}
func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} {
if task.Lifecycle == nil {
return c.mainTaskCtx.Done()
}
switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
// Prestart tasks start without checking status of other tasks
return c.closedCh
case structs.TaskLifecycleHookPoststart:
return c.poststartTaskCtx.Done()
default:
return c.mainTaskCtx.Done()
}
}
// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
for task := range c.prestartSidecar {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}
delete(c.prestartSidecar, task)
}
for task := range c.prestartEphemeral {
st := states[task]
if st == nil || !st.Successful() {
continue
}
delete(c.prestartEphemeral, task)
}
for task := range c.mainTasksPending {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}
delete(c.mainTasksPending, task)
}
// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
if !c.hasPendingMainTasks() {
c.poststartTaskCtxCancel()
}
}
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
return true
}
}
return false
}
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
return true
}
}
return false
}