package allocrunner import ( "context" "fmt" "path/filepath" "sync" "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" cinterfaces "github.com/hashicorp/nomad/client/interfaces" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) // allocRunner is used to run all the tasks in a given allocation type allocRunner struct { // id is the ID of the allocation. Can be accessed without a lock id string // Logger is the logger for the alloc runner. logger log.Logger clientConfig *config.Config // stateUpdater is used to emit updated task state stateUpdater cinterfaces.AllocStateHandler // consulClient is the client used by the consul service hook for // registering services and checks consulClient consul.ConsulServiceAPI // vaultClient is the used to manage Vault tokens vaultClient vaultclient.VaultClient // waitCh is closed when the Run() loop has exited waitCh chan struct{} // destroyed is true when the Run() loop has exited, postrun hooks have // run, and alloc runner has been destroyed destroyed bool destroyedLock sync.Mutex // Alloc captures the allocation being run. alloc *structs.Allocation allocLock sync.RWMutex // state is the alloc runner's state state *state.State stateLock sync.RWMutex stateDB cstate.StateDB // allocDir is used to build the allocations directory structure. allocDir *allocdir.AllocDir // runnerHooks are alloc runner lifecycle hooks that should be run on state // transistions. runnerHooks []interfaces.RunnerHook // tasks are the set of task runners tasks map[string]*taskrunner.TaskRunner tasksLock sync.RWMutex // allocBroadcaster sends client allocation updates to all listeners allocBroadcaster *cstructs.AllocBroadcaster // prevAllocWatcher allows waiting for a previous allocation to exit // and if necessary migrate its alloc dir. prevAllocWatcher allocwatcher.PrevAllocWatcher } // NewAllocRunner returns a new allocation runner. func NewAllocRunner(config *Config) (*allocRunner, error) { alloc := config.Alloc tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { return nil, fmt.Errorf("failed to lookup task group %q", alloc.TaskGroup) } ar := &allocRunner{ id: alloc.ID, alloc: alloc, clientConfig: config.ClientConfig, consulClient: config.Consul, vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, allocBroadcaster: cstructs.NewAllocBroadcaster(), prevAllocWatcher: config.PrevAllocWatcher, } // Create the logger based on the allocation ID ar.logger = config.Logger.Named("alloc_runner").With("alloc_id", alloc.ID) // Create alloc dir ar.allocDir = allocdir.NewAllocDir(ar.logger, filepath.Join(config.ClientConfig.AllocDir, alloc.ID)) // Initialize the runners hooks. ar.initRunnerHooks() // Create the TaskRunners if err := ar.initTaskRunners(tg.Tasks); err != nil { return nil, err } return ar, nil } // initTaskRunners creates task runners but does *not* run them. func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { for _, task := range tasks { config := &taskrunner.Config{ Alloc: ar.alloc, ClientConfig: ar.clientConfig, Task: task, TaskDir: ar.allocDir.NewTaskDir(task.Name), Logger: ar.logger, StateDB: ar.stateDB, StateUpdater: ar, Consul: ar.consulClient, VaultClient: ar.vaultClient, } // Create, but do not Run, the task runner tr, err := taskrunner.NewTaskRunner(config) if err != nil { return fmt.Errorf("failed creating runner for task %q: %v", task.Name, err) } ar.tasks[task.Name] = tr } return nil } func (ar *allocRunner) WaitCh() <-chan struct{} { return ar.waitCh } // XXX How does alloc Restart work // Run is the main goroutine that executes all the tasks. func (ar *allocRunner) Run() { // Close the wait channel defer close(ar.waitCh) var taskWaitCh <-chan struct{} // Run the prestart hooks // XXX Equivalent to TR.Prestart hook if err := ar.prerun(); err != nil { ar.logger.Error("prerun failed", "error", err) goto POST } // Run the runners taskWaitCh = ar.runImpl() MAIN: for { select { case <-taskWaitCh: // TaskRunners have all exited break MAIN } } POST: // Run the postrun hooks // XXX Equivalent to TR.Poststop hook if err := ar.postrun(); err != nil { ar.logger.Error("postrun failed", "error", err) } } // runImpl is used to run the runners. func (ar *allocRunner) runImpl() <-chan struct{} { for _, task := range ar.tasks { go task.Run() } // Return a combined WaitCh that is closed when all task runners have // exited. waitCh := make(chan struct{}) go func() { defer close(waitCh) for _, task := range ar.tasks { <-task.WaitCh() } }() return waitCh } // Alloc returns the current allocation being run by this runner as sent by the // server. This view of the allocation does not have updated task states. func (ar *allocRunner) Alloc() *structs.Allocation { ar.allocLock.RLock() defer ar.allocLock.RUnlock() return ar.alloc } func (ar *allocRunner) setAlloc(updated *structs.Allocation) { ar.allocLock.Lock() ar.alloc = updated ar.allocLock.Unlock() } // GetAllocDir returns the alloc dir which is safe for concurrent use. func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir { return ar.allocDir } // Restore state from database. Must be called after NewAllocRunner but before // Run. func (ar *allocRunner) Restore() error { // Restore task runners for _, tr := range ar.tasks { if err := tr.Restore(); err != nil { return err } } return nil } // TaskStateUpdated is called by TaskRunner when a task's state has been // updated. This hook is used to compute changes to the alloc's ClientStatus // and to update the server with the new state. func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) { // If a task is dead, we potentially want to kill other tasks in the group if state.State == structs.TaskStateDead { // Find all tasks that are not the one that is dead and check if the one // that is dead is a leader var otherTaskRunners []*taskrunner.TaskRunner var otherTaskNames []string leader := false for name, tr := range ar.tasks { if name != taskName { otherTaskRunners = append(otherTaskRunners, tr) otherTaskNames = append(otherTaskNames, name) } else if tr.Task().Leader { leader = true } } // If the task failed, we should kill all the other tasks in the task group. if state.Failed { if len(otherTaskRunners) > 0 { ar.logger.Debug("task failure, destroying all tasks", "failed_task", taskName, "destroying", otherTaskNames) } for _, tr := range otherTaskRunners { tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) } } else if leader { if len(otherTaskRunners) > 0 { ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", taskName, "destroying", otherTaskNames) } // If the task was a leader task we should kill all the other tasks. for _, tr := range otherTaskRunners { tr.Kill(context.Background(), structs.NewTaskEvent(structs.TaskLeaderDead)) } } } // Gather the state of the other tasks ar.tasksLock.RLock() states := make(map[string]*structs.TaskState, len(ar.tasks)) for name, tr := range ar.tasks { if name == taskName { states[name] = state } else { states[name] = tr.TaskState() } } ar.tasksLock.RUnlock() // Get the client allocation calloc := ar.clientAlloc(states) // Update the server ar.stateUpdater.AllocStateUpdated(calloc) // Broadcast client alloc to listeners ar.allocBroadcaster.Send(calloc) } // clientAlloc takes in the task states and returns an Allocation populated // with Client specific fields func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { ar.stateLock.RLock() defer ar.stateLock.RUnlock() // store task states for AllocState to expose ar.state.TaskStates = taskStates a := &structs.Allocation{ ID: ar.id, TaskStates: taskStates, } if d := ar.state.DeploymentStatus; d != nil { a.DeploymentStatus = d.Copy() } // Compute the ClientStatus if ar.state.ClientStatus != "" { // The client status is being forced a.ClientStatus, a.ClientDescription = ar.state.ClientStatus, ar.state.ClientDescription } else { a.ClientStatus, a.ClientDescription = getClientStatus(taskStates) } // If the allocation is terminal, make sure all required fields are properly // set. if a.ClientTerminalStatus() { alloc := ar.Alloc() // If we are part of a deployment and the task has failed, mark the // alloc as unhealthy. This guards against the watcher not be started. if a.ClientStatus == structs.AllocClientStatusFailed && alloc.DeploymentID != "" && !a.DeploymentStatus.IsUnhealthy() { a.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(false), } } // Make sure we have marked the finished at for every task. This is used // to calculate the reschedule time for failed allocations. now := time.Now() for _, task := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks { ts, ok := a.TaskStates[task.Name] if !ok { ts = &structs.TaskState{} a.TaskStates[task.Name] = ts } if ts.FinishedAt.IsZero() { ts.FinishedAt = now } } } return a } // getClientStatus takes in the task states for a given allocation and computes // the client status and description func getClientStatus(taskStates map[string]*structs.TaskState) (status, description string) { var pending, running, dead, failed bool for _, state := range taskStates { switch state.State { case structs.TaskStateRunning: running = true case structs.TaskStatePending: pending = true case structs.TaskStateDead: if state.Failed { failed = true } else { dead = true } } } // Determine the alloc status if failed { return structs.AllocClientStatusFailed, "Failed tasks" } else if running { return structs.AllocClientStatusRunning, "Tasks are running" } else if pending { return structs.AllocClientStatusPending, "No tasks have started" } else if dead { return structs.AllocClientStatusComplete, "All tasks have completed" } return "", "" } // AllocState returns a copy of allocation state including a snapshot of task // states. func (ar *allocRunner) AllocState() *state.State { ar.stateLock.RLock() state := ar.state.Copy() ar.stateLock.RUnlock() // If TaskStateUpdated has not been called yet, ar.state.TaskStates // won't be set as it is not the canonical source of TaskStates. if len(state.TaskStates) == 0 { ar.tasksLock.RLock() ar.state.TaskStates = make(map[string]*structs.TaskState, len(ar.tasks)) for k, tr := range ar.tasks { state.TaskStates[k] = tr.TaskState() } ar.tasksLock.RUnlock() } return state } // Update the running allocation with a new version received from the server. // // This method sends the updated alloc to Run for serially processing updates. // If there is already a pending update it will be discarded and replaced by // the latest update. func (ar *allocRunner) Update(update *structs.Allocation) { // Update ar.alloc ar.setAlloc(update) // Run hooks if err := ar.update(update); err != nil { ar.logger.Error("error running update hooks", "error", err) } // Update task runners for _, tr := range ar.tasks { tr.Update(update) } } func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } // Destroy the alloc runner by synchronously stopping it if it is still running // and cleaning up all of its resources. // // This method is safe for calling concurrently with Run() and will cause it to // exit (thus closing WaitCh). func (ar *allocRunner) Destroy() { // Stop tasks ar.tasksLock.RLock() for name, tr := range ar.tasks { err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) if err != nil { if err == taskrunner.ErrTaskNotRunning { ar.logger.Trace("task not running", "task_name", name) } else { ar.logger.Warn("failed to kill task", "error", err, "task_name", name) } } } ar.tasksLock.RUnlock() // Wait for tasks to exit and postrun hooks to finish <-ar.waitCh // Run destroy hooks if err := ar.destroy(); err != nil { ar.logger.Warn("error running destroy hooks", "error", err) } // Cleanup state db if err := ar.stateDB.DeleteAllocationBucket(ar.id); err != nil { ar.logger.Warn("failed to delete allocation state", "error", err) } // Mark alloc as destroyed ar.destroyedLock.Lock() ar.destroyed = true ar.destroyedLock.Unlock() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and // garbage collected). // // This method is safe for calling concurrently with Run(). Callers must // receive on WaitCh() to block until alloc runner has stopped and been // destroyed. func (ar *allocRunner) IsDestroyed() bool { ar.destroyedLock.Lock() defer ar.destroyedLock.Unlock() return ar.destroyed } // IsWaiting returns true if the alloc runner is waiting for its previous // allocation to terminate. // // This method is safe for calling concurrently with Run(). func (ar *allocRunner) IsWaiting() bool { return ar.prevAllocWatcher.IsWaiting() } // IsMigrating returns true if the alloc runner is migrating data from its // previous allocation. // // This method is safe for calling concurrently with Run(). func (ar *allocRunner) IsMigrating() bool { return ar.prevAllocWatcher.IsMigrating() } func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter { return ar } // LatestAllocStats returns the latest stats for an allocation. If taskFilter // is set, only stats for that task -- if it exists -- are returned. func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { ar.tasksLock.RLock() defer ar.tasksLock.RUnlock() astat := &cstructs.AllocResourceUsage{ Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)), ResourceUsage: &cstructs.ResourceUsage{ MemoryStats: &cstructs.MemoryStats{}, CpuStats: &cstructs.CpuStats{}, }, } for name, tr := range ar.tasks { if taskFilter != "" && taskFilter != name { // Getting stats for a particular task and its not this one! continue } if usage := tr.LatestResourceUsage(); usage != nil { astat.Tasks[name] = usage astat.ResourceUsage.Add(usage.ResourceUsage) if usage.Timestamp > astat.Timestamp { astat.Timestamp = usage.Timestamp } } } return astat, nil }