Merge pull request #5699 from hashicorp/dani/b-eval-broker-lifetime
Eval Broker: Prevent redundant enqueue's when a node is not a leader
This commit is contained in:
commit
d202582502
|
@ -161,6 +161,8 @@ func (b *EvalBroker) Enabled() bool {
|
|||
// should only be enabled on the active leader.
|
||||
func (b *EvalBroker) SetEnabled(enabled bool) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
prevEnabled := b.enabled
|
||||
b.enabled = enabled
|
||||
if !prevEnabled && enabled {
|
||||
|
@ -169,7 +171,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
|
|||
b.delayedEvalCancelFunc = cancel
|
||||
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
|
||||
}
|
||||
b.l.Unlock()
|
||||
|
||||
if !enabled {
|
||||
b.flush()
|
||||
}
|
||||
|
@ -208,6 +210,11 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
|
|||
// outstanding, the evaluation is blocked until an Ack/Nack is received.
|
||||
// processEnqueue must be called with the lock held.
|
||||
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
||||
// If we're not enabled, don't enable more queuing.
|
||||
if !b.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if already enqueued
|
||||
if _, ok := b.evals[eval.ID]; ok {
|
||||
if token == "" {
|
||||
|
@ -259,8 +266,10 @@ func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
|
|||
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
delete(b.timeWait, eval.ID)
|
||||
b.stats.TotalWaiting -= 1
|
||||
|
||||
b.enqueueLocked(eval, eval.Type)
|
||||
}
|
||||
|
||||
|
@ -678,11 +687,9 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Flush is used to clear the state of the broker
|
||||
// Flush is used to clear the state of the broker. It must be called from within
|
||||
// the lock.
|
||||
func (b *EvalBroker) flush() {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
// Unblock any waiters
|
||||
for _, waitCh := range b.waiting {
|
||||
close(waitCh)
|
||||
|
@ -778,13 +785,13 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan
|
|||
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
|
||||
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
// If there is nothing wait for an update.
|
||||
if b.delayHeap.Length() == 0 {
|
||||
b.l.RUnlock()
|
||||
return nil, time.Time{}
|
||||
}
|
||||
nextEval := b.delayHeap.Peek()
|
||||
b.l.RUnlock()
|
||||
if nextEval == nil {
|
||||
return nil, time.Time{}
|
||||
}
|
||||
|
|
|
@ -647,6 +647,64 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) {
|
||||
t.Parallel()
|
||||
b := testBroker(t, 0)
|
||||
baseEval := mock.Eval()
|
||||
b.SetEnabled(true)
|
||||
|
||||
{
|
||||
// Enqueue
|
||||
b.Enqueue(baseEval.Copy())
|
||||
|
||||
delayedEval := baseEval.Copy()
|
||||
delayedEval.Wait = 30
|
||||
b.Enqueue(delayedEval)
|
||||
|
||||
waitEval := baseEval.Copy()
|
||||
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
|
||||
b.Enqueue(waitEval)
|
||||
}
|
||||
|
||||
// Flush via SetEnabled
|
||||
b.SetEnabled(false)
|
||||
|
||||
{
|
||||
// Check the stats
|
||||
stats := b.Stats()
|
||||
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
|
||||
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
|
||||
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
|
||||
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
|
||||
_, ok := stats.ByScheduler[baseEval.Type]
|
||||
require.False(t, ok, "Expected scheduler to have no stats")
|
||||
}
|
||||
|
||||
{
|
||||
// Enqueue again now we're disabled
|
||||
b.Enqueue(baseEval.Copy())
|
||||
|
||||
delayedEval := baseEval.Copy()
|
||||
delayedEval.Wait = 30 * time.Second
|
||||
b.Enqueue(delayedEval)
|
||||
|
||||
waitEval := baseEval.Copy()
|
||||
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
|
||||
b.Enqueue(waitEval)
|
||||
}
|
||||
|
||||
{
|
||||
// Check the stats again
|
||||
stats := b.Stats()
|
||||
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
|
||||
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
|
||||
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
|
||||
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
|
||||
_, ok := stats.ByScheduler[baseEval.Type]
|
||||
require.False(t, ok, "Expected scheduler to have no stats")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
b := testBroker(t, 0)
|
||||
|
|
Loading…
Reference in a new issue