diff --git a/nomad/periodic.go b/nomad/periodic.go index 08f45b91f..0d9e1547e 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -75,6 +75,7 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) { if p.running { close(p.stopCh) <-p.waitCh + p.running = false } p.Flush() } @@ -203,29 +204,90 @@ func (p *PeriodicDispatch) ForceRun(jobID string) error { return p.createEval(job, time.Now()) } -// run is a long-lived function that waits til a job's periodic spec is met and +// shouldRun returns whether the long lived run function should run. +func (p *PeriodicDispatch) shouldRun() bool { + p.l.RLock() + defer p.l.RUnlock() + return p.enabled && p.running +} + +// run is a long-lived function that waits till a job's periodic spec is met and // then creates an evaluation to run the job. func (p *PeriodicDispatch) run() { defer close(p.waitCh) - - // Do nothing if not enabled. - p.l.RLock() - if !p.enabled { - p.l.RUnlock() - return - } - p.l.RUnlock() - var now time.Time + for p.shouldRun() { + job, launch, err := p.nextLaunch() + if err != nil { + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) + return + } else if job == nil { + return + } + now = time.Now() + p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launch.Sub(now)) + + select { + case <-p.stopCh: + return + case <-p.updateCh: + continue + case <-time.After(launch.Sub(now)): + // Get the current time so that we don't miss any jobs will we're creating evals. + now = time.Now() + p.dispatch(launch, now) + } + } +} + +// dispatch scans the periodic jobs in order of launch time and creates +// evaluations for all jobs whose next launch time is equal to that of the +// passed launchTime. The now time is used to determine the next launch time for +// the dispatched jobs. +func (p *PeriodicDispatch) dispatch(launchTime time.Time, now time.Time) { + p.l.Lock() + defer p.l.Unlock() + + // Create evals for all the jobs with the same launch time. + for { + if p.heap.Length() == 0 { + return + } + + j, err := p.heap.Peek() + if err != nil { + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) + return + } + + if j.next != launchTime { + return + } + + if err := p.heap.Update(j.job, j.job.Periodic.Next(now)); err != nil { + p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err) + } + + p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime) + go p.createEval(j.job, launchTime) + } +} + +// nextLaunch returns the next job to launch and when it should be launched. If +// the next job can't be determined, an error is returned. If the dispatcher is +// stopped, a nil job will be returned. +func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time, error) { PICK: // If there is nothing wait for an update. p.l.RLock() if p.heap.Length() == 0 { p.l.RUnlock() + + // Block until there is an update, or the dispatcher is stopped. select { case <-p.stopCh: - return + return nil, time.Time{}, nil case <-p.updateCh: } p.l.RLock() @@ -236,70 +298,23 @@ PICK: if err != nil { select { case <-p.stopCh: - return + return nil, time.Time{}, nil default: - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) - return + return nil, time.Time{}, err } } - launchTime := nextJob.next - // If there are only invalid times, wait for an update. - if launchTime.IsZero() { - p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID) + if nextJob.next.IsZero() { select { case <-p.stopCh: - return + return nil, time.Time{}, nil case <-p.updateCh: goto PICK } } - now = time.Now() - p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", - nextJob.job.ID, nextJob.next.Sub(now)) - - select { - case <-p.stopCh: - return - case <-p.updateCh: - goto PICK - case <-time.After(nextJob.next.Sub(now)): - // Get the current time so that we don't miss any jobs will we are - // creating evals. - nowUpdate := time.Now() - - // Create evals for all the jobs with the same launch time. - p.l.Lock() - for { - if p.heap.Length() == 0 { - break - } - - j, err := p.heap.Peek() - if err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) - break - } - - if j.next != launchTime { - break - } - - if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err) - } - - p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime) - go p.createEval(j.job, launchTime) - } - - p.l.Unlock() - now = nowUpdate - } - - goto PICK + return nextJob.job, nextJob.next, nil } // createEval instantiates a job based on the passed periodic job and submits an diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index b9e397080..4cde0b3f9 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -516,6 +516,61 @@ func TestPeriodicDispatch_Complex(t *testing.T) { } } +func TestPeriodicDispatch_NextLaunch(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create two job that will be launched at the same time. + invalid := time.Unix(0, 0) + expected := time.Now().Round(1 * time.Second).Add(1 * time.Second) + job := testPeriodicJob(invalid) + job2 := testPeriodicJob(expected) + + // Make sure the periodic dispatcher isn't running. + close(s1.periodicDispatcher.stopCh) + s1.periodicDispatcher.stopCh = make(chan struct{}) + + // Run nextLaunch. + timeout := make(chan struct{}) + var j *structs.Job + var launch time.Time + var err error + go func() { + j, launch, err = s1.periodicDispatcher.nextLaunch() + close(timeout) + }() + + // Add them. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + // Delay adding a valid job. + time.Sleep(200 * time.Millisecond) + if err := s1.periodicDispatcher.Add(job2); err != nil { + t.Fatalf("Add failed %v", err) + } + + select { + case <-time.After(2 * time.Second): + t.Fatal("timeout") + case <-timeout: + if err != nil { + t.Fatalf("nextLaunch() failed: %v", err) + } + if j != job2 { + t.Fatalf("Incorrect job returned; got %v; want %v", j, job2) + } + if launch != expected { + t.Fatalf("Incorrect launch time; got %v; want %v", launch, expected) + } + } +} + func shuffle(jobs []*structs.Job) { rand.Seed(time.Now().Unix()) for i := range jobs {