open-nomad/nomad/periodic.go

615 lines
16 KiB
Go

package nomad
import (
"container/heap"
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
tracked map[structs.NamespacedID]*structs.Job
heap *periodicHeap
updateCh chan struct{}
stopFn context.CancelFunc
logger log.Logger
l sync.RWMutex
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
// for it and returns the eval.
DispatchJob(job *structs.Job) (*structs.Evaluation, error)
// RunningChildren returns whether the passed job has any running children.
RunningChildren(job *structs.Job) (bool, error)
}
// DispatchJob creates an evaluation for the passed job and commits both the
// evaluation and the job to the raft log. It returns the eval.
func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
// Commit this update via Raft
job.SetSubmitTime()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
fsmErr, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err, ok := fsmErr.(error); ok && err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerPeriodicJob,
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return nil, err
}
// Update its indexes.
eval.CreateIndex = evalIndex
eval.ModifyIndex = evalIndex
return eval, nil
}
// RunningChildren checks whether the passed job has any running children.
func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
state, err := s.fsm.State().Snapshot()
if err != nil {
return false, err
}
ws := memdb.NewWatchSet()
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
iter, err := state.JobsByIDPrefix(ws, job.Namespace, prefix)
if err != nil {
return false, err
}
var child *structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
child = i.(*structs.Job)
// Ensure the job is actually a child.
if child.ParentID != job.ID {
continue
}
// Get the childs evaluations.
evals, err := state.EvalsByJob(ws, child.Namespace, child.ID)
if err != nil {
return false, err
}
// Check if any of the evals are active or have running allocations.
for _, eval := range evals {
if !eval.TerminalStatus() {
return true, nil
}
allocs, err := state.AllocsByEval(ws, eval.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
return true, nil
}
}
}
}
// There are no evals or allocations that aren't terminal.
return false, nil
}
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(logger log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
return &PeriodicDispatch{
dispatcher: dispatcher,
tracked: make(map[structs.NamespacedID]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
logger: logger.Named("periodic"),
}
}
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
defer p.l.Unlock()
wasRunning := p.enabled
p.enabled = enabled
// If we are transitioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx)
}
}
// Tracked returns the set of tracked job IDs.
func (p *PeriodicDispatch) Tracked() []*structs.Job {
p.l.RLock()
defer p.l.RUnlock()
tracked := make([]*structs.Job, len(p.tracked))
i := 0
for _, job := range p.tracked {
tracked[i] = job
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return nil
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodicActive()
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, tracked := p.tracked[tuple]
if disabled {
if tracked {
p.removeLocked(tuple)
}
// If the job is disabled and we aren't tracking it, do nothing.
return nil
}
// Add or update the job.
p.tracked[tuple] = job
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
}
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Debug("updated periodic job", "job", job.NamespacedID())
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Debug("registered periodic job", "job", job.NamespacedID())
}
// Signal an update.
select {
case p.updateCh <- struct{}{}:
default:
}
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
func (p *PeriodicDispatch) Remove(namespace, jobID string) error {
p.l.Lock()
defer p.l.Unlock()
return p.removeLocked(structs.NamespacedID{
ID: jobID,
Namespace: namespace,
})
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op. It assumes this is called while a lock is held.
func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
// Do nothing if not enabled
if !p.enabled {
return nil
}
job, tracked := p.tracked[jobID]
if !tracked {
return nil
}
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
return fmt.Errorf("failed to remove tracked job %q (%s): %v", jobID.ID, jobID.Namespace, err)
}
// Signal an update.
select {
case p.updateCh <- struct{}{}:
default:
}
p.logger.Debug("deregistered periodic job", "job", job.NamespacedID())
return nil
}
// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
p.l.Lock()
// Do nothing if not enabled
if !p.enabled {
p.l.Unlock()
return nil, fmt.Errorf("periodic dispatch disabled")
}
tuple := structs.NamespacedID{
ID: jobID,
Namespace: namespace,
}
job, tracked := p.tracked[tuple]
if !tracked {
p.l.Unlock()
return nil, fmt.Errorf("can't force run non-tracked job %q (%s)", jobID, namespace)
}
p.l.Unlock()
return p.createEval(job, time.Now().In(job.Periodic.GetLocation()))
}
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled
}
// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run(ctx context.Context) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
if launch.IsZero() {
launchCh = nil
} else {
launchDur := launch.Sub(time.Now().In(job.Periodic.GetLocation()))
launchCh = time.After(launchDur)
p.logger.Debug("scheduled periodic job launch", "launch_delay", launchDur, "job", job.NamespacedID())
}
select {
case <-ctx.Done():
return
case <-p.updateCh:
continue
case <-launchCh:
p.dispatch(job, launch)
}
}
}
// dispatch creates an evaluation for the job and updates its next launchtime
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
p.logger.Error("failed to parse next periodic launch", "job", job.NamespacedID(), "error", err)
} else if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Error("failed to update next launch of periodic job", "job", job.NamespacedID(), "error", err)
}
// If the job prohibits overlapping and there are running children, we skip
// the launch.
if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", "error", err)
p.l.Unlock()
return
}
if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
p.l.Unlock()
return
}
}
p.logger.Debug(" launching job", "job", job.NamespacedID(), "launch_time", launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
// If there is nothing wait for an update.
p.l.RLock()
defer p.l.RUnlock()
if p.heap.Length() == 0 {
return nil, time.Time{}
}
nextJob := p.heap.Peek()
if nextJob == nil {
return nil, time.Time{}
}
return nextJob.job, nextJob.next
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it. This should not be called with the lock held.
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) {
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
return nil, err
}
eval, err := p.dispatcher.DispatchJob(derived)
if err != nil {
p.logger.Error("failed to dispatch job", "job", periodicJob.NamespacedID(), "error", err)
return nil, err
}
return eval, nil
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Error("deriving child job from periodic job failed; deregistering from periodic runner",
"job", periodicJob.NamespacedID(), "error", r)
p.Remove(periodicJob.Namespace, periodicJob.ID)
derived = nil
err = fmt.Errorf("Failed to create a copy of the periodic job %q (%s): %v",
periodicJob.ID, periodicJob.Namespace, r)
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s%s%d", periodicJob.ID, structs.PeriodicLaunchSuffix, time.Unix())
}
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
index := strings.LastIndex(jobID, structs.PeriodicLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
launch, err := strconv.Atoi(jobID[index+len(structs.PeriodicLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
return time.Unix(int64(launch), 0), nil
}
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
p.updateCh = make(chan struct{}, 1)
p.tracked = make(map[structs.NamespacedID]*structs.Job)
p.heap = NewPeriodicHeap()
p.stopFn = nil
}
// periodicHeap wraps a heap and gives operations other than Push/Pop.
type periodicHeap struct {
index map[structs.NamespacedID]*periodicJob
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
index: make(map[structs.NamespacedID]*periodicJob),
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("job %q (%s) already exists", job.ID, job.Namespace)
}
pJob := &periodicJob{job, next, 0}
p.index[tuple] = pJob
heap.Push(&p.heap, pJob)
return nil
}
func (p *periodicHeap) Pop() *periodicJob {
if len(p.heap) == 0 {
return nil
}
pJob := heap.Pop(&p.heap).(*periodicJob)
tuple := structs.NamespacedID{
ID: pJob.job.ID,
Namespace: pJob.job.Namespace,
}
delete(p.index, tuple)
return pJob
}
func (p *periodicHeap) Peek() *periodicJob {
if len(p.heap) == 0 {
return nil
}
return p.heap[0]
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, ok := p.index[tuple]
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
return nil
}
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
}
func (p *periodicHeap) Remove(job *structs.Job) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
heap.Remove(&p.heap, pJob.index)
delete(p.index, tuple)
return nil
}
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}