open-nomad/nomad/periodic.go
Michael Schurter 80263861aa test: fix race around updateCh handling
PeriodicDispatch.SetEnabled sets updateCh in one goroutine, and
PeriodicDispatch.run accesses updateCh in another.

The race can be prevented by having SetEnabled pass updateCh to run.

Race detector output from `go test -race -run TestServer_RPC` in nomad/

```
==================
WARNING: DATA RACE
Write at 0x00c0001d3f48 by goroutine 75:
  github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:468
+0x256
  github.com/hashicorp/nomad/nomad.(*Server).revokeLeadership()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:724
+0x267
  github.com/hashicorp/nomad/nomad.(*Server).leaderLoop.func1()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:131
+0x3c
  github.com/hashicorp/nomad/nomad.(*Server).leaderLoop()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:163
+0x4dd
  github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72
+0x6c

Previous read at 0x00c0001d3f48 by goroutine 515:
  github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).run()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:338
+0x177

Goroutine 75 (running) created at:
  github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:70
+0x269

Goroutine 515 (running) created at:
  github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).SetEnabled()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:176
+0x1bc
  github.com/hashicorp/nomad/nomad.(*Server).establishLeadership()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:231
+0x582
  github.com/hashicorp/nomad/nomad.(*Server).leaderLoop()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:117
+0x82e
  github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1()
      /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72
+0x6c
==================
```
2018-12-19 15:48:02 -08:00

615 lines
16 KiB
Go

package nomad
import (
"container/heap"
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
tracked map[structs.NamespacedID]*structs.Job
heap *periodicHeap
updateCh chan struct{}
stopFn context.CancelFunc
logger log.Logger
l sync.RWMutex
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
// for it and returns the eval.
DispatchJob(job *structs.Job) (*structs.Evaluation, error)
// RunningChildren returns whether the passed job has any running children.
RunningChildren(job *structs.Job) (bool, error)
}
// DispatchJob creates an evaluation for the passed job and commits both the
// evaluation and the job to the raft log. It returns the eval.
func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
// Commit this update via Raft
job.SetSubmitTime()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
fsmErr, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err, ok := fsmErr.(error); ok && err != nil {
return nil, err
}
if err != nil {
return nil, err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerPeriodicJob,
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return nil, err
}
// Update its indexes.
eval.CreateIndex = evalIndex
eval.ModifyIndex = evalIndex
return eval, nil
}
// RunningChildren checks whether the passed job has any running children.
func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
state, err := s.fsm.State().Snapshot()
if err != nil {
return false, err
}
ws := memdb.NewWatchSet()
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
iter, err := state.JobsByIDPrefix(ws, job.Namespace, prefix)
if err != nil {
return false, err
}
var child *structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
child = i.(*structs.Job)
// Ensure the job is actually a child.
if child.ParentID != job.ID {
continue
}
// Get the childs evaluations.
evals, err := state.EvalsByJob(ws, child.Namespace, child.ID)
if err != nil {
return false, err
}
// Check if any of the evals are active or have running allocations.
for _, eval := range evals {
if !eval.TerminalStatus() {
return true, nil
}
allocs, err := state.AllocsByEval(ws, eval.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
return true, nil
}
}
}
}
// There are no evals or allocations that aren't terminal.
return false, nil
}
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(logger log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
return &PeriodicDispatch{
dispatcher: dispatcher,
tracked: make(map[structs.NamespacedID]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
logger: logger.Named("periodic"),
}
}
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
defer p.l.Unlock()
wasRunning := p.enabled
p.enabled = enabled
// If we are transitioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx, p.updateCh)
}
}
// Tracked returns the set of tracked job IDs.
func (p *PeriodicDispatch) Tracked() []*structs.Job {
p.l.RLock()
defer p.l.RUnlock()
tracked := make([]*structs.Job, len(p.tracked))
i := 0
for _, job := range p.tracked {
tracked[i] = job
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return nil
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodicActive()
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, tracked := p.tracked[tuple]
if disabled {
if tracked {
p.removeLocked(tuple)
}
// If the job is disabled and we aren't tracking it, do nothing.
return nil
}
// Add or update the job.
p.tracked[tuple] = job
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
}
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Debug("updated periodic job", "job", job.NamespacedID())
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Debug("registered periodic job", "job", job.NamespacedID())
}
// Signal an update.
select {
case p.updateCh <- struct{}{}:
default:
}
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
func (p *PeriodicDispatch) Remove(namespace, jobID string) error {
p.l.Lock()
defer p.l.Unlock()
return p.removeLocked(structs.NamespacedID{
ID: jobID,
Namespace: namespace,
})
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op. It assumes this is called while a lock is held.
func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
// Do nothing if not enabled
if !p.enabled {
return nil
}
job, tracked := p.tracked[jobID]
if !tracked {
return nil
}
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
return fmt.Errorf("failed to remove tracked job %q (%s): %v", jobID.ID, jobID.Namespace, err)
}
// Signal an update.
select {
case p.updateCh <- struct{}{}:
default:
}
p.logger.Debug("deregistered periodic job", "job", job.NamespacedID())
return nil
}
// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
p.l.Lock()
// Do nothing if not enabled
if !p.enabled {
p.l.Unlock()
return nil, fmt.Errorf("periodic dispatch disabled")
}
tuple := structs.NamespacedID{
ID: jobID,
Namespace: namespace,
}
job, tracked := p.tracked[tuple]
if !tracked {
p.l.Unlock()
return nil, fmt.Errorf("can't force run non-tracked job %q (%s)", jobID, namespace)
}
p.l.Unlock()
return p.createEval(job, time.Now().In(job.Periodic.GetLocation()))
}
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled
}
// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
if launch.IsZero() {
launchCh = nil
} else {
launchDur := launch.Sub(time.Now().In(job.Periodic.GetLocation()))
launchCh = time.After(launchDur)
p.logger.Debug("scheduled periodic job launch", "launch_delay", launchDur, "job", job.NamespacedID())
}
select {
case <-ctx.Done():
return
case <-updateCh:
continue
case <-launchCh:
p.dispatch(job, launch)
}
}
}
// dispatch creates an evaluation for the job and updates its next launchtime
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
p.logger.Error("failed to parse next periodic launch", "job", job.NamespacedID(), "error", err)
} else if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Error("failed to update next launch of periodic job", "job", job.NamespacedID(), "error", err)
}
// If the job prohibits overlapping and there are running children, we skip
// the launch.
if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
p.logger.Error("failed to determine if periodic job has running children", "job", "error", err)
p.l.Unlock()
return
}
if running {
p.logger.Debug("skipping launch of periodic job because job prohibits overlap", "job", job.NamespacedID())
p.l.Unlock()
return
}
}
p.logger.Debug(" launching job", "job", job.NamespacedID(), "launch_time", launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
// If there is nothing wait for an update.
p.l.RLock()
defer p.l.RUnlock()
if p.heap.Length() == 0 {
return nil, time.Time{}
}
nextJob := p.heap.Peek()
if nextJob == nil {
return nil, time.Time{}
}
return nextJob.job, nextJob.next
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it. This should not be called with the lock held.
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) {
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
return nil, err
}
eval, err := p.dispatcher.DispatchJob(derived)
if err != nil {
p.logger.Error("failed to dispatch job", "job", periodicJob.NamespacedID(), "error", err)
return nil, err
}
return eval, nil
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Error("deriving child job from periodic job failed; deregistering from periodic runner",
"job", periodicJob.NamespacedID(), "error", r)
p.Remove(periodicJob.Namespace, periodicJob.ID)
derived = nil
err = fmt.Errorf("Failed to create a copy of the periodic job %q (%s): %v",
periodicJob.ID, periodicJob.Namespace, r)
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s%s%d", periodicJob.ID, structs.PeriodicLaunchSuffix, time.Unix())
}
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
index := strings.LastIndex(jobID, structs.PeriodicLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
launch, err := strconv.Atoi(jobID[index+len(structs.PeriodicLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
return time.Unix(int64(launch), 0), nil
}
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
p.updateCh = make(chan struct{}, 1)
p.tracked = make(map[structs.NamespacedID]*structs.Job)
p.heap = NewPeriodicHeap()
p.stopFn = nil
}
// periodicHeap wraps a heap and gives operations other than Push/Pop.
type periodicHeap struct {
index map[structs.NamespacedID]*periodicJob
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
index: make(map[structs.NamespacedID]*periodicJob),
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("job %q (%s) already exists", job.ID, job.Namespace)
}
pJob := &periodicJob{job, next, 0}
p.index[tuple] = pJob
heap.Push(&p.heap, pJob)
return nil
}
func (p *periodicHeap) Pop() *periodicJob {
if len(p.heap) == 0 {
return nil
}
pJob := heap.Pop(&p.heap).(*periodicJob)
tuple := structs.NamespacedID{
ID: pJob.job.ID,
Namespace: pJob.job.Namespace,
}
delete(p.index, tuple)
return pJob
}
func (p *periodicHeap) Peek() *periodicJob {
if len(p.heap) == 0 {
return nil
}
return p.heap[0]
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, ok := p.index[tuple]
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
return nil
}
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
}
func (p *periodicHeap) Remove(job *structs.Job) error {
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
heap.Remove(&p.heap, pJob.index)
delete(p.index, tuple)
return nil
}
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}