Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true (#16583)
* Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true Fixes #11052 When restoring periodic dispatcher, all periodic jobs are forced without checking for previous childre. * Multiple instances of a periodic job are run simultaneously, when prohibit_overlap is true Fixes #11052 When restoring periodic dispatcher, all periodic jobs are forced without checking for previous children. * style: refactor force run function * fix: remove defer and inline unlock for speed optimization * Update nomad/leader.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * style: refactor tests to use must * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * Update nomad/leader_test.go Co-authored-by: James Rasell <jrasell@users.noreply.github.com> * fix: move back from defer to calling unlock before returning. createEval cant be called with the lock on * style: refactor test to use must * added new entry to changelog and update comments --------- Co-authored-by: James Rasell <jrasell@hashicorp.com> Co-authored-by: James Rasell <jrasell@users.noreply.github.com>
This commit is contained in:
parent
21b675244e
commit
320884b8ee
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: Added verification of cron jobs already running before forcing new evals right after leader change
|
||||||
|
```
|
|
@ -774,16 +774,45 @@ func (s *Server) restorePeriodicDispatcher() error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We skip if the job doesn't allow overlap and there are already
|
||||||
|
// instances running
|
||||||
|
allowed, err := s.cronJobOverlapAllowed(job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get job status: %v", err)
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil {
|
if _, err := s.periodicDispatcher.ForceEval(job.Namespace, job.ID); err != nil {
|
||||||
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
|
logger.Error("force run of periodic job failed", "job", job.NamespacedID(), "error", err)
|
||||||
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
|
return fmt.Errorf("force run of periodic job %q failed: %v", job.NamespacedID(), err)
|
||||||
}
|
}
|
||||||
logger.Debug("periodic job force runned during leadership establishment", "job", job.NamespacedID())
|
|
||||||
|
logger.Debug("periodic job force run during leadership establishment", "job", job.NamespacedID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cronJobOverlapAllowed checks if the job allows for overlap and if there are already
|
||||||
|
// instances of the job running in order to determine if a new evaluation needs to
|
||||||
|
// be created upon periodic dispatcher restore
|
||||||
|
func (s *Server) cronJobOverlapAllowed(job *structs.Job) (bool, error) {
|
||||||
|
if job.Periodic.ProhibitOverlap {
|
||||||
|
running, err := s.periodicDispatcher.dispatcher.RunningChildren(job)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to determine if periodic job has running children %q error %q", job.NamespacedID(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if running {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// schedulePeriodic is used to do periodic job dispatch while we are leader
|
// schedulePeriodic is used to do periodic job dispatch while we are leader
|
||||||
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
|
||||||
evalGC := time.NewTicker(s.config.EvalGCInterval)
|
evalGC := time.NewTicker(s.config.EvalGCInterval)
|
||||||
|
|
|
@ -411,7 +411,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// Sleep till after the job should have been launched.
|
// Sleep till after the job should have been launched.
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
// Restore the periodic dispatcher.
|
// Restore the periodic dispatcher.
|
||||||
s1.periodicDispatcher.SetEnabled(true)
|
s1.periodicDispatcher.SetEnabled(true)
|
||||||
|
@ -438,6 +438,27 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockJobEvalDispatcher struct {
|
||||||
|
forceEvalCalled, children bool
|
||||||
|
evalToReturn *structs.Evaluation
|
||||||
|
JobEvalDispatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
|
||||||
|
mjed.forceEvalCalled = true
|
||||||
|
return mjed.evalToReturn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
|
||||||
|
return mjed.children, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
|
||||||
|
job := testPeriodicJob(times...)
|
||||||
|
job.Periodic.ProhibitOverlap = true
|
||||||
|
return job
|
||||||
|
}
|
||||||
|
|
||||||
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
@ -445,6 +466,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
||||||
c.NumSchedulers = 0
|
c.NumSchedulers = 0
|
||||||
})
|
})
|
||||||
defer cleanupS1()
|
defer cleanupS1()
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC)
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
// Inject a periodic job that triggered once in the past, should trigger now
|
// Inject a periodic job that triggered once in the past, should trigger now
|
||||||
|
@ -465,7 +487,16 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an eval for the past launch.
|
// Create an eval for the past launch.
|
||||||
s1.periodicDispatcher.createEval(job, past)
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
md := &mockJobEvalDispatcher{
|
||||||
|
children: false,
|
||||||
|
evalToReturn: eval,
|
||||||
|
JobEvalDispatcher: s1,
|
||||||
|
}
|
||||||
|
|
||||||
|
s1.periodicDispatcher.dispatcher = md
|
||||||
|
|
||||||
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
||||||
s1.periodicDispatcher.SetEnabled(false)
|
s1.periodicDispatcher.SetEnabled(false)
|
||||||
|
@ -475,6 +506,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
||||||
|
|
||||||
// Restore the periodic dispatcher.
|
// Restore the periodic dispatcher.
|
||||||
s1.periodicDispatcher.SetEnabled(true)
|
s1.periodicDispatcher.SetEnabled(true)
|
||||||
|
|
||||||
s1.restorePeriodicDispatcher()
|
s1.restorePeriodicDispatcher()
|
||||||
|
|
||||||
// Ensure the job is tracked.
|
// Ensure the job is tracked.
|
||||||
|
@ -495,6 +527,128 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
||||||
if last.Launch == past {
|
if last.Launch == past {
|
||||||
t.Fatalf("restorePeriodicDispatcher did not force launch")
|
t.Fatalf("restorePeriodicDispatcher did not force launch")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||||
|
c.NumSchedulers = 0
|
||||||
|
})
|
||||||
|
defer cleanupS1()
|
||||||
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
|
// Inject a periodic job that triggered once in the past, should trigger now
|
||||||
|
// and once in the future.
|
||||||
|
now := time.Now()
|
||||||
|
past := now.Add(-1 * time.Second)
|
||||||
|
future := now.Add(10 * time.Second)
|
||||||
|
|
||||||
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
||||||
|
req := structs.JobRegisterRequest{
|
||||||
|
Job: job,
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
// Create an eval for the past launch.
|
||||||
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
md := &mockJobEvalDispatcher{
|
||||||
|
children: false,
|
||||||
|
evalToReturn: eval,
|
||||||
|
}
|
||||||
|
|
||||||
|
s1.periodicDispatcher.dispatcher = md
|
||||||
|
|
||||||
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
||||||
|
s1.periodicDispatcher.SetEnabled(false)
|
||||||
|
|
||||||
|
// Sleep till after the job should have been launched.
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
// Restore the periodic dispatcher.
|
||||||
|
s1.periodicDispatcher.SetEnabled(true)
|
||||||
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
||||||
|
|
||||||
|
// Ensure the job is tracked.
|
||||||
|
tuple := structs.NamespacedID{
|
||||||
|
ID: job.ID,
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
}
|
||||||
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
||||||
|
|
||||||
|
// Check that an eval was made.
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
||||||
|
must.NoError(t, err)
|
||||||
|
must.NotNil(t, last)
|
||||||
|
|
||||||
|
must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))
|
||||||
|
|
||||||
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
|
||||||
|
ci.Parallel(t)
|
||||||
|
|
||||||
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||||
|
c.NumSchedulers = 0
|
||||||
|
})
|
||||||
|
defer cleanupS1()
|
||||||
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
|
// Inject a periodic job that triggered once in the past, should trigger now
|
||||||
|
// and once in the future.
|
||||||
|
now := time.Now()
|
||||||
|
past := now.Add(-1 * time.Second)
|
||||||
|
future := now.Add(10 * time.Second)
|
||||||
|
|
||||||
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
||||||
|
req := structs.JobRegisterRequest{
|
||||||
|
Job: job,
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
// Create an eval for the past launch.
|
||||||
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
||||||
|
must.NoError(t, err)
|
||||||
|
|
||||||
|
md := &mockJobEvalDispatcher{
|
||||||
|
children: true,
|
||||||
|
evalToReturn: eval,
|
||||||
|
}
|
||||||
|
|
||||||
|
s1.periodicDispatcher.dispatcher = md
|
||||||
|
|
||||||
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
||||||
|
s1.periodicDispatcher.SetEnabled(false)
|
||||||
|
|
||||||
|
// Sleep till after the job should have been launched.
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
// Restore the periodic dispatcher.
|
||||||
|
s1.periodicDispatcher.SetEnabled(true)
|
||||||
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
||||||
|
|
||||||
|
// Ensure the job is tracked.
|
||||||
|
tuple := structs.NamespacedID{
|
||||||
|
ID: job.ID,
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
}
|
||||||
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
||||||
|
|
||||||
|
must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLeader_PeriodicDispatch(t *testing.T) {
|
func TestLeader_PeriodicDispatch(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue