open-nomad/nomad/periodic.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

629 lines
16 KiB
Go
Raw Normal View History

2015-12-01 22:54:57 +00:00
package nomad
2015-12-18 20:26:28 +00:00
import (
"container/heap"
2017-08-03 20:40:34 +00:00
"context"
2015-12-18 20:26:28 +00:00
"fmt"
"strconv"
"strings"
2015-12-18 20:26:28 +00:00
"sync"
"time"
2015-12-01 22:54:57 +00:00
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
2017-02-08 04:31:23 +00:00
memdb "github.com/hashicorp/go-memdb"
2018-09-15 23:23:13 +00:00
"github.com/hashicorp/nomad/helper/uuid"
2015-12-18 20:26:28 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
2015-12-18 20:26:28 +00:00
2017-09-07 23:56:15 +00:00
tracked map[structs.NamespacedID]*structs.Job
2015-12-18 20:26:28 +00:00
heap *periodicHeap
updateCh chan struct{}
2017-08-03 20:40:34 +00:00
stopFn context.CancelFunc
2018-09-15 23:23:13 +00:00
logger log.Logger
2015-12-18 20:26:28 +00:00
l sync.RWMutex
2015-12-01 22:54:57 +00:00
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
2016-01-13 18:19:53 +00:00
// for it and returns the eval.
DispatchJob(job *structs.Job) (*structs.Evaluation, error)
// RunningChildren returns whether the passed job has any running children.
RunningChildren(job *structs.Job) (bool, error)
}
// DispatchJob creates an evaluation for the passed job and commits both the
2016-01-13 18:19:53 +00:00
// evaluation and the job to the raft log. It returns the eval.
func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerPeriodicJob,
JobID: job.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
// Commit this update via Raft
2017-06-30 02:08:25 +00:00
job.SetSubmitTime()
2017-09-07 23:56:15 +00:00
req := structs.JobRegisterRequest{
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
Job: job,
Eval: eval,
2017-09-07 23:56:15 +00:00
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
2017-09-19 14:47:10 +00:00
fsmErr, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err, ok := fsmErr.(error); ok && err != nil {
return nil, err
}
if err != nil {
2016-01-13 18:19:53 +00:00
return nil, err
}
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
eval.CreateIndex = index
eval.ModifyIndex = index
Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters.
2020-07-10 17:31:55 +00:00
// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not, before Nomad 0.12.1
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return nil, err
}
// Update its indexes.
eval.CreateIndex = evalIndex
eval.ModifyIndex = evalIndex
}
2016-01-13 18:19:53 +00:00
return eval, nil
}
// RunningChildren checks whether the passed job has any running children.
func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
state, err := s.fsm.State().Snapshot()
if err != nil {
return false, err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
2017-09-07 23:56:15 +00:00
iter, err := state.JobsByIDPrefix(ws, job.Namespace, prefix)
if err != nil {
return false, err
}
var child *structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
child = i.(*structs.Job)
2016-01-07 20:54:41 +00:00
// Ensure the job is actually a child.
if child.ParentID != job.ID {
continue
}
// Get the childs evaluations.
2017-09-07 23:56:15 +00:00
evals, err := state.EvalsByJob(ws, child.Namespace, child.ID)
if err != nil {
return false, err
}
// Check if any of the evals are active or have running allocations.
for _, eval := range evals {
if !eval.TerminalStatus() {
return true, nil
}
2017-02-08 04:31:23 +00:00
allocs, err := state.AllocsByEval(ws, eval.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
return true, nil
}
}
}
}
// There are no evals or allocations that aren't terminal.
return false, nil
}
2015-12-18 20:26:28 +00:00
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
2018-09-15 23:23:13 +00:00
func NewPeriodicDispatch(logger log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
2015-12-18 20:26:28 +00:00
return &PeriodicDispatch{
dispatcher: dispatcher,
2017-09-07 23:56:15 +00:00
tracked: make(map[structs.NamespacedID]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
2018-09-15 23:23:13 +00:00
logger: logger.Named("periodic"),
2015-12-18 20:26:28 +00:00
}
}
2015-12-01 22:54:57 +00:00
2015-12-18 20:26:28 +00:00
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
2015-12-01 22:54:57 +00:00
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
2015-12-18 20:26:28 +00:00
p.l.Lock()
2017-08-03 20:40:34 +00:00
defer p.l.Unlock()
wasRunning := p.enabled
2015-12-18 20:26:28 +00:00
p.enabled = enabled
2015-12-01 22:54:57 +00:00
2018-03-11 19:06:05 +00:00
// If we are transitioning from enabled to disabled, stop the daemon and
2017-08-03 20:40:34 +00:00
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
test: fix race around updateCh handling PeriodicDispatch.SetEnabled sets updateCh in one goroutine, and PeriodicDispatch.run accesses updateCh in another. The race can be prevented by having SetEnabled pass updateCh to run. Race detector output from `go test -race -run TestServer_RPC` in nomad/ ``` ================== WARNING: DATA RACE Write at 0x00c0001d3f48 by goroutine 75: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:468 +0x256 github.com/hashicorp/nomad/nomad.(*Server).revokeLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:724 +0x267 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:131 +0x3c github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:163 +0x4dd github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c Previous read at 0x00c0001d3f48 by goroutine 515: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).run() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:338 +0x177 Goroutine 75 (running) created at: github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:70 +0x269 Goroutine 515 (running) created at: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:176 +0x1bc github.com/hashicorp/nomad/nomad.(*Server).establishLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:231 +0x582 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:117 +0x82e github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c ================== ```
2018-12-19 20:09:05 +00:00
go p.run(ctx, p.updateCh)
2017-08-03 20:40:34 +00:00
}
2015-12-18 20:26:28 +00:00
}
// Tracked returns the set of tracked job IDs.
2015-12-16 21:46:09 +00:00
func (p *PeriodicDispatch) Tracked() []*structs.Job {
2015-12-18 20:26:28 +00:00
p.l.RLock()
defer p.l.RUnlock()
2015-12-16 21:46:09 +00:00
tracked := make([]*structs.Job, len(p.tracked))
2015-12-18 20:26:28 +00:00
i := 0
for _, job := range p.tracked {
2015-12-16 21:46:09 +00:00
tracked[i] = job
2015-12-18 20:26:28 +00:00
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
2017-09-26 22:26:33 +00:00
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
2015-12-18 20:26:28 +00:00
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return nil
2015-12-18 20:26:28 +00:00
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodicActive()
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, tracked := p.tracked[tuple]
2015-12-18 20:26:28 +00:00
if disabled {
2015-12-16 21:46:09 +00:00
if tracked {
2017-09-07 23:56:15 +00:00
p.removeLocked(tuple)
2015-12-16 21:46:09 +00:00
}
// If the job is disabled and we aren't tracking it, do nothing.
return nil
}
2015-12-18 20:26:28 +00:00
// Add or update the job.
2017-09-07 23:56:15 +00:00
p.tracked[tuple] = job
2018-04-26 20:57:45 +00:00
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if err != nil {
2018-04-26 22:15:43 +00:00
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
2018-04-26 20:57:45 +00:00
}
2015-12-18 20:26:28 +00:00
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
2015-12-18 20:26:28 +00:00
}
2018-09-15 23:23:13 +00:00
p.logger.Debug("updated periodic job", "job", job.NamespacedID())
2015-12-18 20:26:28 +00:00
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
2015-12-18 20:26:28 +00:00
}
2018-09-15 23:23:13 +00:00
p.logger.Debug("registered periodic job", "job", job.NamespacedID())
2015-12-18 20:26:28 +00:00
}
// Signal an update.
2017-08-03 20:40:34 +00:00
select {
case p.updateCh <- struct{}{}:
default:
2015-12-18 20:26:28 +00:00
}
return nil
2015-12-01 22:54:57 +00:00
}
2015-12-18 20:26:28 +00:00
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) Remove(namespace, jobID string) error {
2015-12-18 20:26:28 +00:00
p.l.Lock()
defer p.l.Unlock()
2017-09-07 23:56:15 +00:00
return p.removeLocked(structs.NamespacedID{
ID: jobID,
Namespace: namespace,
})
2015-12-10 00:46:06 +00:00
}
2015-12-18 20:26:28 +00:00
2015-12-10 00:46:06 +00:00
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op. It assumes this is called while a lock is held.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
2015-12-18 20:26:28 +00:00
// Do nothing if not enabled
if !p.enabled {
2015-12-19 01:51:30 +00:00
return nil
2015-12-18 20:26:28 +00:00
}
2015-12-24 01:47:37 +00:00
job, tracked := p.tracked[jobID]
if !tracked {
return nil
}
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
2017-09-07 23:56:15 +00:00
return fmt.Errorf("failed to remove tracked job %q (%s): %v", jobID.ID, jobID.Namespace, err)
2015-12-18 20:26:28 +00:00
}
// Signal an update.
2017-08-03 20:40:34 +00:00
select {
case p.updateCh <- struct{}{}:
default:
2015-12-18 20:26:28 +00:00
}
2018-09-15 23:23:13 +00:00
p.logger.Debug("deregistered periodic job", "job", job.NamespacedID())
2015-12-01 22:54:57 +00:00
return nil
}
2016-01-13 18:19:53 +00:00
// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
2015-12-18 20:26:28 +00:00
p.l.Lock()
// Do nothing if not enabled
if !p.enabled {
p.l.Unlock()
2016-01-13 18:19:53 +00:00
return nil, fmt.Errorf("periodic dispatch disabled")
2015-12-18 20:26:28 +00:00
}
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: jobID,
Namespace: namespace,
}
job, tracked := p.tracked[tuple]
2015-12-18 20:26:28 +00:00
if !tracked {
p.l.Unlock()
2017-09-07 23:56:15 +00:00
return nil, fmt.Errorf("can't force run non-tracked job %q (%s)", jobID, namespace)
2015-12-18 20:26:28 +00:00
}
2015-12-21 23:02:27 +00:00
p.l.Unlock()
2017-02-15 22:37:06 +00:00
return p.createEval(job, time.Now().In(job.Periodic.GetLocation()))
2015-12-18 20:26:28 +00:00
}
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
2017-08-03 20:40:34 +00:00
return p.enabled
}
// run is a long-lived function that waits till a job's periodic spec is met and
2015-12-18 20:26:28 +00:00
// then creates an evaluation to run the job.
test: fix race around updateCh handling PeriodicDispatch.SetEnabled sets updateCh in one goroutine, and PeriodicDispatch.run accesses updateCh in another. The race can be prevented by having SetEnabled pass updateCh to run. Race detector output from `go test -race -run TestServer_RPC` in nomad/ ``` ================== WARNING: DATA RACE Write at 0x00c0001d3f48 by goroutine 75: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:468 +0x256 github.com/hashicorp/nomad/nomad.(*Server).revokeLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:724 +0x267 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:131 +0x3c github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:163 +0x4dd github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c Previous read at 0x00c0001d3f48 by goroutine 515: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).run() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:338 +0x177 Goroutine 75 (running) created at: github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:70 +0x269 Goroutine 515 (running) created at: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:176 +0x1bc github.com/hashicorp/nomad/nomad.(*Server).establishLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:231 +0x582 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:117 +0x82e github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c ================== ```
2018-12-19 20:09:05 +00:00
func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
if launch.IsZero() {
launchCh = nil
} else {
2017-02-15 22:37:06 +00:00
launchDur := launch.Sub(time.Now().In(job.Periodic.GetLocation()))
launchCh = time.After(launchDur)
2018-09-15 23:23:13 +00:00
p.logger.Debug("scheduled periodic job launch", "launch_delay", launchDur, "job", job.NamespacedID())
}
2015-12-05 00:53:36 +00:00
select {
2017-08-03 20:40:34 +00:00
case <-ctx.Done():
return
test: fix race around updateCh handling PeriodicDispatch.SetEnabled sets updateCh in one goroutine, and PeriodicDispatch.run accesses updateCh in another. The race can be prevented by having SetEnabled pass updateCh to run. Race detector output from `go test -race -run TestServer_RPC` in nomad/ ``` ================== WARNING: DATA RACE Write at 0x00c0001d3f48 by goroutine 75: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:468 +0x256 github.com/hashicorp/nomad/nomad.(*Server).revokeLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:724 +0x267 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:131 +0x3c github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:163 +0x4dd github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c Previous read at 0x00c0001d3f48 by goroutine 515: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).run() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:338 +0x177 Goroutine 75 (running) created at: github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:70 +0x269 Goroutine 515 (running) created at: github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:176 +0x1bc github.com/hashicorp/nomad/nomad.(*Server).establishLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:231 +0x582 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:117 +0x82e github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c ================== ```
2018-12-19 20:09:05 +00:00
case <-updateCh:
continue
case <-launchCh:
p.dispatch(job, launch)
}
2015-12-18 20:26:28 +00:00
}
}
2015-12-18 20:26:28 +00:00
// dispatch creates an evaluation for the job and updates its next launchtime
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
2015-12-18 20:26:28 +00:00
2018-04-26 20:57:45 +00:00
nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
2018-09-15 23:23:13 +00:00
p.logger.Error("failed to parse next periodic launch", "job", job.NamespacedID(), "error", err)
2018-04-26 22:15:43 +00:00
} else if err := p.heap.Update(job, nextLaunch); err != nil {
2018-09-15 23:23:13 +00:00
p.logger.Error("failed to update next launch of periodic job", "job", job.NamespacedID(), "error", err)
}
// If the job prohibits overlapping and there are running children, we skip
// the launch.
if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
2019-01-09 14:22:47 +00:00
p.logger.Error("failed to determine if periodic job has running children", "job", job.NamespacedID(), "error", err)
p.l.Unlock()
return
}
if running {
2018-09-15 23:23:13 +00:00
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
p.l.Unlock()
return
}
}
2018-09-15 23:23:13 +00:00
p.logger.Debug(" launching job", "job", job.NamespacedID(), "launch_time", launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
2015-12-18 20:26:28 +00:00
// If there is nothing wait for an update.
p.l.RLock()
defer p.l.RUnlock()
2015-12-18 20:26:28 +00:00
if p.heap.Length() == 0 {
return nil, time.Time{}
2015-12-18 20:26:28 +00:00
}
nextJob := p.heap.Peek()
if nextJob == nil {
return nil, time.Time{}
2015-12-18 20:26:28 +00:00
}
return nextJob.job, nextJob.next
2015-12-18 20:26:28 +00:00
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it. This should not be called with the lock held.
2016-01-13 18:19:53 +00:00
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) {
2015-12-18 20:26:28 +00:00
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
2016-01-13 18:19:53 +00:00
return nil, err
2015-12-18 20:26:28 +00:00
}
2016-01-13 18:19:53 +00:00
eval, err := p.dispatcher.DispatchJob(derived)
if err != nil {
2018-09-15 23:23:13 +00:00
p.logger.Error("failed to dispatch job", "job", periodicJob.NamespacedID(), "error", err)
2016-01-13 18:19:53 +00:00
return nil, err
2015-12-18 20:26:28 +00:00
}
2016-01-13 18:19:53 +00:00
return eval, nil
2015-12-18 20:26:28 +00:00
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
2018-09-15 23:23:13 +00:00
p.logger.Error("deriving child job from periodic job failed; deregistering from periodic runner",
"job", periodicJob.NamespacedID(), "error", r)
2017-09-07 23:56:15 +00:00
p.Remove(periodicJob.Namespace, periodicJob.ID)
2015-12-18 20:26:28 +00:00
derived = nil
2017-09-07 23:56:15 +00:00
err = fmt.Errorf("Failed to create a copy of the periodic job %q (%s): %v",
periodicJob.ID, periodicJob.Namespace, r)
2015-12-18 20:26:28 +00:00
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic in initial status
2015-12-18 20:26:28 +00:00
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
2015-12-18 20:26:28 +00:00
derived.Periodic = nil
derived.Status = ""
derived.StatusDescription = ""
2015-12-18 20:26:28 +00:00
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
2016-01-07 22:24:25 +00:00
return fmt.Sprintf("%s%s%d", periodicJob.ID, structs.PeriodicLaunchSuffix, time.Unix())
2015-12-18 20:26:28 +00:00
}
2015-12-19 01:51:30 +00:00
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
2016-01-07 22:24:25 +00:00
index := strings.LastIndex(jobID, structs.PeriodicLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
2016-01-07 22:24:25 +00:00
launch, err := strconv.Atoi(jobID[index+len(structs.PeriodicLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
2015-12-21 21:55:26 +00:00
return time.Unix(int64(launch), 0), nil
}
2017-08-03 20:40:34 +00:00
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
2015-12-18 20:26:28 +00:00
p.updateCh = make(chan struct{}, 1)
2017-09-07 23:56:15 +00:00
p.tracked = make(map[structs.NamespacedID]*structs.Job)
2015-12-18 20:26:28 +00:00
p.heap = NewPeriodicHeap()
2017-08-03 20:40:34 +00:00
p.stopFn = nil
2015-12-18 20:26:28 +00:00
}
2015-12-07 22:24:06 +00:00
// periodicHeap wraps a heap and gives operations other than Push/Pop.
2015-12-18 20:26:28 +00:00
type periodicHeap struct {
2017-09-07 23:56:15 +00:00
index map[structs.NamespacedID]*periodicJob
2015-12-18 20:26:28 +00:00
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
2017-09-07 23:56:15 +00:00
index: make(map[structs.NamespacedID]*periodicJob),
2015-12-18 20:26:28 +00:00
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("job %q (%s) already exists", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
pJob := &periodicJob{job, next, 0}
2017-09-07 23:56:15 +00:00
p.index[tuple] = pJob
2015-12-18 20:26:28 +00:00
heap.Push(&p.heap, pJob)
2015-12-01 22:54:57 +00:00
return nil
}
2015-12-18 20:26:28 +00:00
func (p *periodicHeap) Pop() *periodicJob {
2015-12-18 20:26:28 +00:00
if len(p.heap) == 0 {
return nil
2015-12-18 20:26:28 +00:00
}
pJob := heap.Pop(&p.heap).(*periodicJob)
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: pJob.job.ID,
Namespace: pJob.job.Namespace,
}
delete(p.index, tuple)
return pJob
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Peek() *periodicJob {
2015-12-18 20:26:28 +00:00
if len(p.heap) == 0 {
return nil
2015-12-18 20:26:28 +00:00
}
return p.heap[0]
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, ok := p.index[tuple]
2015-12-18 20:26:28 +00:00
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
2015-12-05 00:53:36 +00:00
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
2015-12-18 20:26:28 +00:00
return nil
}
2017-09-07 23:56:15 +00:00
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Remove(job *structs.Job) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
2015-12-18 20:26:28 +00:00
heap.Remove(&p.heap, pJob.index)
2017-09-07 23:56:15 +00:00
delete(p.index, tuple)
2015-12-18 20:26:28 +00:00
return nil
}
2017-09-07 23:56:15 +00:00
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}