open-nomad/nomad/periodic.go

614 lines
16 KiB
Go
Raw Normal View History

2015-12-01 22:54:57 +00:00
package nomad
2015-12-18 20:26:28 +00:00
import (
"container/heap"
2017-08-03 20:40:34 +00:00
"context"
2015-12-18 20:26:28 +00:00
"fmt"
"log"
"strconv"
"strings"
2015-12-18 20:26:28 +00:00
"sync"
"time"
2015-12-01 22:54:57 +00:00
2017-02-08 04:31:23 +00:00
memdb "github.com/hashicorp/go-memdb"
2015-12-18 20:26:28 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
2015-12-18 20:26:28 +00:00
2017-09-07 23:56:15 +00:00
tracked map[structs.NamespacedID]*structs.Job
2015-12-18 20:26:28 +00:00
heap *periodicHeap
updateCh chan struct{}
2017-08-03 20:40:34 +00:00
stopFn context.CancelFunc
2015-12-18 20:26:28 +00:00
logger *log.Logger
l sync.RWMutex
2015-12-01 22:54:57 +00:00
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
2016-01-13 18:19:53 +00:00
// for it and returns the eval.
DispatchJob(job *structs.Job) (*structs.Evaluation, error)
// RunningChildren returns whether the passed job has any running children.
RunningChildren(job *structs.Job) (bool, error)
}
// DispatchJob creates an evaluation for the passed job and commits both the
2016-01-13 18:19:53 +00:00
// evaluation and the job to the raft log. It returns the eval.
func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
// Commit this update via Raft
2017-06-30 02:08:25 +00:00
job.SetSubmitTime()
2017-09-07 23:56:15 +00:00
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
2017-09-19 14:47:10 +00:00
fsmErr, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err, ok := fsmErr.(error); ok && err != nil {
return nil, err
}
if err != nil {
2016-01-13 18:19:53 +00:00
return nil, err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
2017-09-07 23:56:15 +00:00
Namespace: job.Namespace,
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerPeriodicJob,
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
2016-01-13 18:19:53 +00:00
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
2016-01-13 18:19:53 +00:00
return nil, err
}
2016-01-13 18:19:53 +00:00
// Update its indexes.
eval.CreateIndex = evalIndex
eval.ModifyIndex = evalIndex
return eval, nil
}
// RunningChildren checks whether the passed job has any running children.
func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
state, err := s.fsm.State().Snapshot()
if err != nil {
return false, err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
2017-09-07 23:56:15 +00:00
iter, err := state.JobsByIDPrefix(ws, job.Namespace, prefix)
if err != nil {
return false, err
}
var child *structs.Job
for i := iter.Next(); i != nil; i = iter.Next() {
child = i.(*structs.Job)
2016-01-07 20:54:41 +00:00
// Ensure the job is actually a child.
if child.ParentID != job.ID {
continue
}
// Get the childs evaluations.
2017-09-07 23:56:15 +00:00
evals, err := state.EvalsByJob(ws, child.Namespace, child.ID)
if err != nil {
return false, err
}
// Check if any of the evals are active or have running allocations.
for _, eval := range evals {
if !eval.TerminalStatus() {
return true, nil
}
2017-02-08 04:31:23 +00:00
allocs, err := state.AllocsByEval(ws, eval.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
return true, nil
}
}
}
}
// There are no evals or allocations that aren't terminal.
return false, nil
}
2015-12-18 20:26:28 +00:00
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
2015-12-18 20:26:28 +00:00
return &PeriodicDispatch{
dispatcher: dispatcher,
2017-09-07 23:56:15 +00:00
tracked: make(map[structs.NamespacedID]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
logger: logger,
2015-12-18 20:26:28 +00:00
}
}
2015-12-01 22:54:57 +00:00
2015-12-18 20:26:28 +00:00
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
2015-12-01 22:54:57 +00:00
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
2015-12-18 20:26:28 +00:00
p.l.Lock()
2017-08-03 20:40:34 +00:00
defer p.l.Unlock()
wasRunning := p.enabled
2015-12-18 20:26:28 +00:00
p.enabled = enabled
2015-12-01 22:54:57 +00:00
2017-08-03 20:40:34 +00:00
// If we are transistioning from enabled to disabled, stop the daemon and
// flush.
if !enabled && wasRunning {
p.stopFn()
p.flush()
} else if enabled && !wasRunning {
// If we are transitioning from disabled to enabled, run the daemon.
ctx, cancel := context.WithCancel(context.Background())
p.stopFn = cancel
go p.run(ctx)
}
2015-12-18 20:26:28 +00:00
}
// Tracked returns the set of tracked job IDs.
2015-12-16 21:46:09 +00:00
func (p *PeriodicDispatch) Tracked() []*structs.Job {
2015-12-18 20:26:28 +00:00
p.l.RLock()
defer p.l.RUnlock()
2015-12-16 21:46:09 +00:00
tracked := make([]*structs.Job, len(p.tracked))
2015-12-18 20:26:28 +00:00
i := 0
for _, job := range p.tracked {
2015-12-16 21:46:09 +00:00
tracked[i] = job
2015-12-18 20:26:28 +00:00
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
2017-09-26 22:26:33 +00:00
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
2015-12-18 20:26:28 +00:00
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return false, nil
2015-12-18 20:26:28 +00:00
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, tracked := p.tracked[tuple]
2015-12-18 20:26:28 +00:00
if disabled {
2015-12-16 21:46:09 +00:00
if tracked {
2017-09-07 23:56:15 +00:00
p.removeLocked(tuple)
2015-12-16 21:46:09 +00:00
}
// If the job is disabled and we aren't tracking it, do nothing.
return false, nil
}
2015-12-18 20:26:28 +00:00
// Add or update the job.
2017-09-07 23:56:15 +00:00
p.tracked[tuple] = job
2017-02-15 22:37:06 +00:00
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
2015-12-18 20:26:28 +00:00
if tracked {
if err := p.heap.Update(job, next); err != nil {
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
2015-12-18 20:26:28 +00:00
}
2017-09-07 23:56:15 +00:00
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
} else {
if err := p.heap.Push(job, next); err != nil {
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
2015-12-18 20:26:28 +00:00
}
2017-09-07 23:56:15 +00:00
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
// Signal an update.
2017-08-03 20:40:34 +00:00
select {
case p.updateCh <- struct{}{}:
default:
2015-12-18 20:26:28 +00:00
}
return true, nil
2015-12-01 22:54:57 +00:00
}
2015-12-18 20:26:28 +00:00
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) Remove(namespace, jobID string) error {
2015-12-18 20:26:28 +00:00
p.l.Lock()
defer p.l.Unlock()
2017-09-07 23:56:15 +00:00
return p.removeLocked(structs.NamespacedID{
ID: jobID,
Namespace: namespace,
})
2015-12-10 00:46:06 +00:00
}
2015-12-18 20:26:28 +00:00
2015-12-10 00:46:06 +00:00
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op. It assumes this is called while a lock is held.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) removeLocked(jobID structs.NamespacedID) error {
2015-12-18 20:26:28 +00:00
// Do nothing if not enabled
if !p.enabled {
2015-12-19 01:51:30 +00:00
return nil
2015-12-18 20:26:28 +00:00
}
2015-12-24 01:47:37 +00:00
job, tracked := p.tracked[jobID]
if !tracked {
return nil
}
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
2017-09-07 23:56:15 +00:00
return fmt.Errorf("failed to remove tracked job %q (%s): %v", jobID.ID, jobID.Namespace, err)
2015-12-18 20:26:28 +00:00
}
// Signal an update.
2017-08-03 20:40:34 +00:00
select {
case p.updateCh <- struct{}{}:
default:
2015-12-18 20:26:28 +00:00
}
2017-09-07 23:56:15 +00:00
p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q (%s)", jobID.ID, jobID.Namespace)
2015-12-01 22:54:57 +00:00
return nil
}
2016-01-13 18:19:53 +00:00
// ForceRun causes the periodic job to be evaluated immediately and returns the
// subsequent eval.
2017-09-07 23:56:15 +00:00
func (p *PeriodicDispatch) ForceRun(namespace, jobID string) (*structs.Evaluation, error) {
2015-12-18 20:26:28 +00:00
p.l.Lock()
// Do nothing if not enabled
if !p.enabled {
p.l.Unlock()
2016-01-13 18:19:53 +00:00
return nil, fmt.Errorf("periodic dispatch disabled")
2015-12-18 20:26:28 +00:00
}
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: jobID,
Namespace: namespace,
}
job, tracked := p.tracked[tuple]
2015-12-18 20:26:28 +00:00
if !tracked {
p.l.Unlock()
2017-09-07 23:56:15 +00:00
return nil, fmt.Errorf("can't force run non-tracked job %q (%s)", jobID, namespace)
2015-12-18 20:26:28 +00:00
}
2015-12-21 23:02:27 +00:00
p.l.Unlock()
2017-02-15 22:37:06 +00:00
return p.createEval(job, time.Now().In(job.Periodic.GetLocation()))
2015-12-18 20:26:28 +00:00
}
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
2017-08-03 20:40:34 +00:00
return p.enabled
}
// run is a long-lived function that waits till a job's periodic spec is met and
2015-12-18 20:26:28 +00:00
// then creates an evaluation to run the job.
2017-08-03 20:40:34 +00:00
func (p *PeriodicDispatch) run(ctx context.Context) {
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
if launch.IsZero() {
launchCh = nil
} else {
2017-02-15 22:37:06 +00:00
launchDur := launch.Sub(time.Now().In(job.Periodic.GetLocation()))
launchCh = time.After(launchDur)
2017-09-07 23:56:15 +00:00
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q (%s) in %s", job.ID, job.Namespace, launchDur)
}
2015-12-05 00:53:36 +00:00
select {
2017-08-03 20:40:34 +00:00
case <-ctx.Done():
return
case <-p.updateCh:
continue
case <-launchCh:
p.dispatch(job, launch)
}
2015-12-18 20:26:28 +00:00
}
}
2015-12-18 20:26:28 +00:00
// dispatch creates an evaluation for the job and updates its next launchtime
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
2015-12-18 20:26:28 +00:00
nextLaunch := job.Periodic.Next(launchTime)
if err := p.heap.Update(job, nextLaunch); err != nil {
2017-09-07 23:56:15 +00:00
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q (%s): %v", job.ID, job.Namespace, err)
}
// If the job prohibits overlapping and there are running children, we skip
// the launch.
if job.Periodic.ProhibitOverlap {
running, err := p.dispatcher.RunningChildren(job)
if err != nil {
msg := fmt.Sprintf("[ERR] nomad.periodic: failed to determine if"+
2017-09-07 23:56:15 +00:00
" periodic job %q (%s) has running children: %v", job.ID, job.Namespace, err)
p.logger.Println(msg)
p.l.Unlock()
return
}
if running {
msg := fmt.Sprintf("[DEBUG] nomad.periodic: skipping launch of"+
2017-09-07 23:56:15 +00:00
" periodic job %q (%s) because job prohibits overlap", job.ID, job.Namespace)
p.logger.Println(msg)
p.l.Unlock()
return
}
}
2017-09-07 23:56:15 +00:00
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q (%v) at %v", job.ID, job.Namespace, launchTime)
p.l.Unlock()
p.createEval(job, launchTime)
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
2015-12-18 20:26:28 +00:00
// If there is nothing wait for an update.
p.l.RLock()
defer p.l.RUnlock()
2015-12-18 20:26:28 +00:00
if p.heap.Length() == 0 {
return nil, time.Time{}
2015-12-18 20:26:28 +00:00
}
nextJob := p.heap.Peek()
if nextJob == nil {
return nil, time.Time{}
2015-12-18 20:26:28 +00:00
}
return nextJob.job, nextJob.next
2015-12-18 20:26:28 +00:00
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it. This should not be called with the lock held.
2016-01-13 18:19:53 +00:00
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) {
2015-12-18 20:26:28 +00:00
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
2016-01-13 18:19:53 +00:00
return nil, err
2015-12-18 20:26:28 +00:00
}
2016-01-13 18:19:53 +00:00
eval, err := p.dispatcher.DispatchJob(derived)
if err != nil {
2017-09-07 23:56:15 +00:00
p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q (%s): %v",
periodicJob.ID, periodicJob.Namespace, err)
2016-01-13 18:19:53 +00:00
return nil, err
2015-12-18 20:26:28 +00:00
}
2016-01-13 18:19:53 +00:00
return eval, nil
2015-12-18 20:26:28 +00:00
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Printf("[ERR] nomad.periodic: deriving job from"+
2017-09-07 23:56:15 +00:00
" periodic job %q (%s) failed; deregistering from periodic runner: %v",
periodicJob.ID, periodicJob.Namespace, r)
p.Remove(periodicJob.Namespace, periodicJob.ID)
2015-12-18 20:26:28 +00:00
derived = nil
2017-09-07 23:56:15 +00:00
err = fmt.Errorf("Failed to create a copy of the periodic job %q (%s): %v",
periodicJob.ID, periodicJob.Namespace, r)
2015-12-18 20:26:28 +00:00
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
2015-12-18 20:26:28 +00:00
derived.Periodic = nil
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
2016-01-07 22:24:25 +00:00
return fmt.Sprintf("%s%s%d", periodicJob.ID, structs.PeriodicLaunchSuffix, time.Unix())
2015-12-18 20:26:28 +00:00
}
2015-12-19 01:51:30 +00:00
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
2016-01-07 22:24:25 +00:00
index := strings.LastIndex(jobID, structs.PeriodicLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
2016-01-07 22:24:25 +00:00
launch, err := strconv.Atoi(jobID[index+len(structs.PeriodicLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
2015-12-21 21:55:26 +00:00
return time.Unix(int64(launch), 0), nil
}
2017-08-03 20:40:34 +00:00
// flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) flush() {
2015-12-18 20:26:28 +00:00
p.updateCh = make(chan struct{}, 1)
2017-09-07 23:56:15 +00:00
p.tracked = make(map[structs.NamespacedID]*structs.Job)
2015-12-18 20:26:28 +00:00
p.heap = NewPeriodicHeap()
2017-08-03 20:40:34 +00:00
p.stopFn = nil
2015-12-18 20:26:28 +00:00
}
2015-12-07 22:24:06 +00:00
// periodicHeap wraps a heap and gives operations other than Push/Pop.
2015-12-18 20:26:28 +00:00
type periodicHeap struct {
2017-09-07 23:56:15 +00:00
index map[structs.NamespacedID]*periodicJob
2015-12-18 20:26:28 +00:00
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
2017-09-07 23:56:15 +00:00
index: make(map[structs.NamespacedID]*periodicJob),
2015-12-18 20:26:28 +00:00
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("job %q (%s) already exists", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
pJob := &periodicJob{job, next, 0}
2017-09-07 23:56:15 +00:00
p.index[tuple] = pJob
2015-12-18 20:26:28 +00:00
heap.Push(&p.heap, pJob)
2015-12-01 22:54:57 +00:00
return nil
}
2015-12-18 20:26:28 +00:00
func (p *periodicHeap) Pop() *periodicJob {
2015-12-18 20:26:28 +00:00
if len(p.heap) == 0 {
return nil
2015-12-18 20:26:28 +00:00
}
pJob := heap.Pop(&p.heap).(*periodicJob)
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: pJob.job.ID,
Namespace: pJob.job.Namespace,
}
delete(p.index, tuple)
return pJob
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Peek() *periodicJob {
2015-12-18 20:26:28 +00:00
if len(p.heap) == 0 {
return nil
2015-12-18 20:26:28 +00:00
}
return p.heap[0]
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
_, ok := p.index[tuple]
2015-12-18 20:26:28 +00:00
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
2015-12-05 00:53:36 +00:00
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
2015-12-18 20:26:28 +00:00
return nil
}
2017-09-07 23:56:15 +00:00
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Remove(job *structs.Job) error {
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if pJob, ok := p.index[tuple]; ok {
2015-12-18 20:26:28 +00:00
heap.Remove(&p.heap, pJob.index)
2017-09-07 23:56:15 +00:00
delete(p.index, tuple)
2015-12-18 20:26:28 +00:00
return nil
}
2017-09-07 23:56:15 +00:00
return fmt.Errorf("heap doesn't contain job %q (%s)", job.ID, job.Namespace)
2015-12-18 20:26:28 +00:00
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}