Simplify run function and add nextLaunch test

This commit is contained in:
Alex Dadgar 2015-12-16 17:07:50 -08:00
parent 8165c1fc22
commit a60783a4ca
2 changed files with 133 additions and 63 deletions

View File

@ -75,6 +75,7 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) {
if p.running {
close(p.stopCh)
<-p.waitCh
p.running = false
}
p.Flush()
}
@ -203,29 +204,90 @@ func (p *PeriodicDispatch) ForceRun(jobID string) error {
return p.createEval(job, time.Now())
}
// run is a long-lived function that waits til a job's periodic spec is met and
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled && p.running
}
// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
// Do nothing if not enabled.
p.l.RLock()
if !p.enabled {
p.l.RUnlock()
return
}
p.l.RUnlock()
var now time.Time
for p.shouldRun() {
job, launch, err := p.nextLaunch()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
} else if job == nil {
return
}
now = time.Now()
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launch.Sub(now))
select {
case <-p.stopCh:
return
case <-p.updateCh:
continue
case <-time.After(launch.Sub(now)):
// Get the current time so that we don't miss any jobs will we're creating evals.
now = time.Now()
p.dispatch(launch, now)
}
}
}
// dispatch scans the periodic jobs in order of launch time and creates
// evaluations for all jobs whose next launch time is equal to that of the
// passed launchTime. The now time is used to determine the next launch time for
// the dispatched jobs.
func (p *PeriodicDispatch) dispatch(launchTime time.Time, now time.Time) {
p.l.Lock()
defer p.l.Unlock()
// Create evals for all the jobs with the same launch time.
for {
if p.heap.Length() == 0 {
return
}
j, err := p.heap.Peek()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
}
if j.next != launchTime {
return
}
if err := p.heap.Update(j.job, j.job.Periodic.Next(now)); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime)
go p.createEval(j.job, launchTime)
}
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time, error) {
PICK:
// If there is nothing wait for an update.
p.l.RLock()
if p.heap.Length() == 0 {
p.l.RUnlock()
// Block until there is an update, or the dispatcher is stopped.
select {
case <-p.stopCh:
return
return nil, time.Time{}, nil
case <-p.updateCh:
}
p.l.RLock()
@ -236,70 +298,23 @@ PICK:
if err != nil {
select {
case <-p.stopCh:
return
return nil, time.Time{}, nil
default:
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
return nil, time.Time{}, err
}
}
launchTime := nextJob.next
// If there are only invalid times, wait for an update.
if launchTime.IsZero() {
p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID)
if nextJob.next.IsZero() {
select {
case <-p.stopCh:
return
return nil, time.Time{}, nil
case <-p.updateCh:
goto PICK
}
}
now = time.Now()
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s",
nextJob.job.ID, nextJob.next.Sub(now))
select {
case <-p.stopCh:
return
case <-p.updateCh:
goto PICK
case <-time.After(nextJob.next.Sub(now)):
// Get the current time so that we don't miss any jobs will we are
// creating evals.
nowUpdate := time.Now()
// Create evals for all the jobs with the same launch time.
p.l.Lock()
for {
if p.heap.Length() == 0 {
break
}
j, err := p.heap.Peek()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
break
}
if j.next != launchTime {
break
}
if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime)
go p.createEval(j.job, launchTime)
}
p.l.Unlock()
now = nowUpdate
}
goto PICK
return nextJob.job, nextJob.next, nil
}
// createEval instantiates a job based on the passed periodic job and submits an

View File

@ -516,6 +516,61 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
}
}
func TestPeriodicDispatch_NextLaunch(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Create two job that will be launched at the same time.
invalid := time.Unix(0, 0)
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job := testPeriodicJob(invalid)
job2 := testPeriodicJob(expected)
// Make sure the periodic dispatcher isn't running.
close(s1.periodicDispatcher.stopCh)
s1.periodicDispatcher.stopCh = make(chan struct{})
// Run nextLaunch.
timeout := make(chan struct{})
var j *structs.Job
var launch time.Time
var err error
go func() {
j, launch, err = s1.periodicDispatcher.nextLaunch()
close(timeout)
}()
// Add them.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Delay adding a valid job.
time.Sleep(200 * time.Millisecond)
if err := s1.periodicDispatcher.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
select {
case <-time.After(2 * time.Second):
t.Fatal("timeout")
case <-timeout:
if err != nil {
t.Fatalf("nextLaunch() failed: %v", err)
}
if j != job2 {
t.Fatalf("Incorrect job returned; got %v; want %v", j, job2)
}
if launch != expected {
t.Fatalf("Incorrect launch time; got %v; want %v", launch, expected)
}
}
}
func shuffle(jobs []*structs.Job) {
rand.Seed(time.Now().Unix())
for i := range jobs {