Address some code review comments

This commit is contained in:
Preetha Appan 2018-03-05 21:46:54 -06:00
parent 2ba976dec8
commit 1ab8f2b57a
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
5 changed files with 37 additions and 28 deletions

View File

@ -9,21 +9,22 @@ import (
)
// DelayHeap wraps a heap and gives operations other than Push/Pop.
// The inner heap is sorted by the time in the WaitUntil field of DelayHeapNode
// The inner heap is sorted by the time in the WaitUntil field of delayHeapNode
type DelayHeap struct {
index map[structs.NamespacedID]*DelayHeapNode
index map[structs.NamespacedID]*delayHeapNode
heap delayedHeapImp
}
// HeapNode is an interface type implemented by objects stored in the DelayHeap
type HeapNode interface {
Data() interface{}
ID() string
Namespace() string
Data() interface{} // The data object
ID() string // ID of the object, used in conjunction with namespace for deduplication
Namespace() string // Namespace of the object, can be empty
}
// DelayHeapNode encapsulates the node stored in DelayHeap
// delayHeapNode encapsulates the node stored in DelayHeap
// WaitUntil is used as the sorting criteria
type DelayHeapNode struct {
type delayHeapNode struct {
// Node is the data object stored in the delay heap
Node HeapNode
// WaitUntil is the time delay associated with the node
@ -33,7 +34,7 @@ type DelayHeapNode struct {
index int
}
type delayedHeapImp []*DelayHeapNode
type delayedHeapImp []*delayHeapNode
func (h delayedHeapImp) Len() int {
return len(h)
@ -63,7 +64,7 @@ func (h delayedHeapImp) Swap(i, j int) {
}
func (h *delayedHeapImp) Push(x interface{}) {
node := x.(*DelayHeapNode)
node := x.(*delayHeapNode)
n := len(*h)
node.index = n
*h = append(*h, node)
@ -80,7 +81,7 @@ func (h *delayedHeapImp) Pop() interface{} {
func NewDelayHeap() *DelayHeap {
return &DelayHeap{
index: make(map[structs.NamespacedID]*DelayHeapNode),
index: make(map[structs.NamespacedID]*delayHeapNode),
heap: make(delayedHeapImp, 0),
}
}
@ -94,18 +95,18 @@ func (p *DelayHeap) Push(dataNode HeapNode, next time.Time) error {
return fmt.Errorf("node %q (%s) already exists", dataNode.ID(), dataNode.Namespace())
}
delayHeapNode := &DelayHeapNode{dataNode, next, 0}
delayHeapNode := &delayHeapNode{dataNode, next, 0}
p.index[tuple] = delayHeapNode
heap.Push(&p.heap, delayHeapNode)
return nil
}
func (p *DelayHeap) Pop() *DelayHeapNode {
func (p *DelayHeap) Pop() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}
delayHeapNode := heap.Pop(&p.heap).(*DelayHeapNode)
delayHeapNode := heap.Pop(&p.heap).(*delayHeapNode)
tuple := structs.NamespacedID{
ID: delayHeapNode.Node.ID(),
Namespace: delayHeapNode.Node.Namespace(),
@ -114,7 +115,7 @@ func (p *DelayHeap) Pop() *DelayHeapNode {
return delayHeapNode
}
func (p *DelayHeap) Peek() *DelayHeapNode {
func (p *DelayHeap) Peek() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}

View File

@ -105,8 +105,8 @@ func TestDelayHeap_Update(t *testing.T) {
}
func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*DelayHeapNode {
var entries []*DelayHeapNode
func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*delayHeapNode {
var entries []*delayHeapNode
for node := delayHeap.Pop(); node != nil; {
entries = append(entries, node)
node = delayHeap.Pop()

View File

@ -161,14 +161,17 @@ func (b *EvalBroker) Enabled() bool {
// should only be enabled on the active leader.
func (b *EvalBroker) SetEnabled(enabled bool) {
b.l.Lock()
prevEnabled := b.enabled
b.enabled = enabled
if !prevEnabled && enabled {
// start the go routine for delayed evals
ctx, cancel := context.WithCancel(context.Background())
b.delayedEvalCancelFunc = cancel
go b.runDelayedEvalsWatcher(ctx)
}
b.l.Unlock()
if !enabled {
b.Flush()
b.flush()
}
}
@ -230,6 +233,7 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
if !eval.WaitUntil.IsZero() {
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
b.stats.TotalWaiting += 1
// Signal an update.
select {
case b.delayedEvalsUpdateCh <- struct{}{}:
@ -675,7 +679,7 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
}
// Flush is used to clear the state of the broker
func (b *EvalBroker) Flush() {
func (b *EvalBroker) flush() {
b.l.Lock()
defer b.l.Unlock()
@ -733,11 +737,11 @@ func (d *evalWrapper) Namespace() string {
return d.eval.Namespace
}
// runDelayedEvalsWatcher is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for
// pending evaluations before enqueuing them
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) {
var timerChannel <-chan time.Time
for b.enabled {
for {
eval, waitUntil := b.nextDelayedEval()
if waitUntil.IsZero() {
timerChannel = nil
@ -752,7 +756,10 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) {
case <-timerChannel:
// remove from the heap since we can enqueue it now
b.delayHeap.Remove(&evalWrapper{eval})
b.l.Lock()
b.stats.TotalWaiting -= 1
b.enqueueLocked(eval, eval.Type)
b.l.Unlock()
case <-b.delayedEvalsUpdateCh:
continue
}

View File

@ -1141,7 +1141,7 @@ func TestEvalBroker_Wait(t *testing.T) {
}
}
// Test that delayed evaluations work as expected
// Ensure that delayed evaluations work as expected
func TestEvalBroker_WaitUntil(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -1166,7 +1166,7 @@ func TestEvalBroker_WaitUntil(t *testing.T) {
eval3.WaitUntil = now.Add(20 * time.Millisecond)
eval3.CreateIndex = 1
b.Enqueue(eval3)
require.Equal(3, b.stats.TotalWaiting)
// sleep enough for two evals to be ready
time.Sleep(200 * time.Millisecond)
@ -1184,6 +1184,7 @@ func TestEvalBroker_WaitUntil(t *testing.T) {
out, _, err = b.Dequeue(defaultSched, 2*time.Second)
require.Nil(err)
require.Equal(eval1, out)
require.Equal(0, b.stats.TotalWaiting)
}
// Ensure that priority is taken into account when enqueueing many evaluations.

View File

@ -2825,7 +2825,7 @@ func (r *ReschedulePolicy) Validate() error {
}
//Validate Interval and other delay parameters if attempts are limited
// Validate Interval and other delay parameters if attempts are limited
if !r.Unlimited {
if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, r.Interval))