open-nomad/nomad/eval_broker_test.go
Tim Gross 6415fb4284
eval broker: shed all but one blocked eval per job after ack (#14621)
When an evaluation is acknowledged by a scheduler, the resulting plan is
guaranteed to cover up to the `waitIndex` set by the worker based on the most
recent evaluation for that job in the state store. At that point, we no longer
need to retain blocked evaluations in the broker that are older than that index.

Move all but the highest priority / highest `ModifyIndex` blocked eval into a
canceled set. When the `Eval.Ack` RPC returns from the eval broker it will
signal a reap of a batch of cancelable evals to write to raft. This paces the
cancelations limited by how frequently the schedulers are acknowledging evals;
this should reduce the risk of cancelations from overwhelming raft relative to
scheduler progress. In order to avoid straggling batches when the cluster is
quiet, we also include a periodic sweep through the cancelable list.
2022-11-16 16:10:11 -05:00

1584 lines
37 KiB
Go

package nomad
import (
"container/heap"
"encoding/json"
"errors"
"fmt"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
var (
defaultSched = []string{
structs.JobTypeService,
structs.JobTypeBatch,
}
)
func testBrokerConfig() *Config {
config := DefaultConfig()
// Tune the Nack timeout
config.EvalNackTimeout = 5 * time.Second
// Tune the Nack delay
config.EvalNackInitialReenqueueDelay = 5 * time.Millisecond
config.EvalNackSubsequentReenqueueDelay = 50 * time.Millisecond
return config
}
func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
config := testBrokerConfig()
if timeout != 0 {
config.EvalNackTimeout = timeout
}
return testBrokerFromConfig(t, config)
}
func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}
return b
}
func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue, but broker is disabled!
eval := mock.Eval()
b.Enqueue(eval)
// Verify nothing was done
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if b.Enabled() {
t.Fatalf("should not be enabled")
}
// Enable the broker, and enqueue
b.SetEnabled(true)
b.Enqueue(eval)
// Double enqueue is a no-op
b.Enqueue(eval)
if !b.Enabled() {
t.Fatalf("should be enabled")
}
// Verify enqueue is done
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
tokenOut, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut != token {
t.Fatalf("Bad: %#v %#v", token, tokenOut)
}
// OutstandingReset should verify the token
err = b.OutstandingReset("nope", "foo")
if err != ErrNotOutstanding {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, "foo")
if err != ErrTokenMismatch {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, tokenOut)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Nack with wrong token should fail
err = b.Nack(eval.ID, "foobarbaz")
if err == nil {
t.Fatalf("should fail to nack")
}
// Nack back into the queue
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
tokenOut2, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut2 != token2 {
t.Fatalf("Bad: %#v %#v", token2, tokenOut2)
}
// Ack with wrong token
err = b.Ack(eval.ID, "zip")
if err == nil {
t.Fatalf("should fail to ack")
}
// Ack finally
err = b.Ack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Nack_Delay(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue, but broker is disabled!
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Nack back into the queue
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats to ensure that it is waiting
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
// Capture the time
start := time.Now()
// Nack back into the queue
err = b.Nack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
delay := time.Now().Sub(start)
if delay < b.subsequentNackDelay {
t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay)
}
// Dequeue should work again
out3, token3, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out3 != eval {
t.Fatalf("bad : %#v", out3)
}
if token3 == token || token3 == token2 {
t.Fatalf("should get a new token")
}
// Ack finally
err = b.Ack(eval.ID, token3)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
ns1 := "namespace-one"
ns2 := "namespace-two"
jobID := "example"
newEval := func(idx uint64, ns string) *structs.Evaluation {
eval := mock.Eval()
eval.ID = fmt.Sprintf("eval:%d", idx)
eval.JobID = jobID
eval.Namespace = ns
eval.CreateIndex = idx
eval.ModifyIndex = idx
b.Enqueue(eval)
return eval
}
// first job
eval1 := newEval(1, ns1)
newEval(2, ns1)
newEval(3, ns1)
eval4 := newEval(4, ns1)
// second job
eval5 := newEval(5, ns2)
newEval(6, ns2)
eval7 := newEval(7, ns2)
// retreive the stats from the broker, less some stats that aren't
// interesting for this test and make the test much more verbose
// to include
getStats := func() BrokerStats {
t.Helper()
stats := b.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalBlocked: 5, TotalCancelable: 0}, getStats())
// Dequeue should get 1st eval
out, token, err := b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval1, must.Sprint("expected 1st eval"))
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalBlocked: 5, TotalCancelable: 0}, getStats())
// Current wait index should be 4 but Ack to exercise behavior
// when worker's Eval.getWaitIndex gets a stale index
err = b.Ack(eval1.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// eval4 and eval5 are ready
// eval6 and eval7 are blocked
// Dequeue should get 4th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval4, must.Sprint("expected 4th eval"))
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Ack should clear the rest of namespace-one blocked but leave
// namespace-two untouched
err = b.Ack(eval4.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Dequeue should get 5th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval5, must.Sprint("expected 5th eval"))
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalBlocked: 2, TotalCancelable: 2}, getStats())
// Ack should clear remaining namespace-two blocked evals
err = b.Ack(eval5.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
// Dequeue should get 7th eval because that's all that's left
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval7, must.Sprint("expected 7th eval"))
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
// Last ack should leave the broker empty except for cancels
err = b.Ack(eval7.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 3}, getStats())
}
func TestEvalBroker_Enqueue_Disable(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue
eval := mock.Eval()
b.SetEnabled(true)
b.Enqueue(eval)
// Flush via SetEnabled
b.SetEnabled(false)
// Check the stats
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats.ByScheduler[eval.Type]; ok {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
baseEval := mock.Eval()
b.SetEnabled(true)
{
// Enqueue
b.Enqueue(baseEval.Copy())
delayedEval := baseEval.Copy()
delayedEval.Wait = 30
b.Enqueue(delayedEval)
waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}
// Flush via SetEnabled
b.SetEnabled(false)
{
// Check the stats
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
{
// Enqueue again now we're disabled
b.Enqueue(baseEval.Copy())
delayedEval := baseEval.Copy()
delayedEval.Wait = 30 * time.Second
b.Enqueue(delayedEval)
waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}
{
// Check the stats again
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
}
func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected: %#v", out)
}
if diff := end.Sub(start); diff < 5*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
errCh := make(chan error)
go func() {
defer close(errCh)
out, _, err := b.Dequeue(defaultSched, 0)
if err != nil {
errCh <- err
return
}
if out == nil {
errCh <- errors.New("expected a non-nil value")
return
}
}()
// Sleep for a little bit
select {
case <-time.After(5 * time.Millisecond):
case err := <-errCh:
if err != nil {
t.Fatalf("error from dequeue goroutine: %s", err)
}
t.Fatalf("Dequeue(0) should block, not finish")
}
// Enqueue to unblock the dequeue.
eval := mock.Eval()
b.Enqueue(eval)
select {
case err := <-errCh:
if err != nil {
t.Fatalf("error from dequeue goroutine: %s", err)
}
case <-time.After(5 * time.Millisecond):
t.Fatal("timeout: Dequeue(0) should return after enqueue")
}
}
// Ensure higher priority dequeued first
func TestEvalBroker_Dequeue_Priority(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval1 := mock.Eval()
eval1.Priority = 10
b.Enqueue(eval1)
eval2 := mock.Eval()
eval2.Priority = 30
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.Priority = 20
b.Enqueue(eval3)
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1 != eval2 {
t.Fatalf("bad: %#v", out1)
}
out2, _, _ := b.Dequeue(defaultSched, time.Second)
if out2 != eval3 {
t.Fatalf("bad: %#v", out2)
}
out3, _, _ := b.Dequeue(defaultSched, time.Second)
if out3 != eval1 {
t.Fatalf("bad: %#v", out3)
}
}
// Ensure FIFO at fixed priority
func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
NUM := 100
for i := NUM; i > 0; i-- {
eval1 := mock.Eval()
eval1.CreateIndex = uint64(i)
eval1.ModifyIndex = uint64(i)
b.Enqueue(eval1)
}
for i := 1; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
must.Eq(t, uint64(i), out1.CreateIndex,
must.Sprintf("eval was not FIFO by CreateIndex"),
)
}
}
// Ensure fairness between schedulers
func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
NUM := 1000
for i := 0; i < NUM; i++ {
eval1 := mock.Eval()
if i < (NUM / 2) {
eval1.Type = structs.JobTypeService
} else {
eval1.Type = structs.JobTypeBatch
}
b.Enqueue(eval1)
}
counter := 0
for i := 0; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
switch out1.Type {
case structs.JobTypeService:
if counter < 0 {
counter = 0
}
counter += 1
case structs.JobTypeBatch:
if counter > 0 {
counter = 0
}
counter -= 1
}
// This will fail randomly at times. It is very hard to
// test deterministically that its acting randomly.
if counter >= 250 || counter <= -250 {
t.Fatalf("unlikely sequence: %d", counter)
}
}
}
// Ensure we get unblocked
func TestEvalBroker_Dequeue_Blocked(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation)
errCh := make(chan error)
go func() {
defer close(errCh)
defer close(outCh)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
errCh <- err
return
}
end := time.Now()
if d := end.Sub(start); d < 5*time.Millisecond {
errCh <- fmt.Errorf("test broker dequeue duration too fast: %v", d)
return
}
outCh <- out
}()
// Wait for a bit, or t.Fatal if an error has already happened in
// the goroutine
select {
case <-time.After(5 * time.Millisecond):
// no errors yet, soldier on
case err := <-errCh:
if err != nil {
t.Fatalf("error from anonymous goroutine before enqueue: %v", err)
}
}
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Ensure dequeue
select {
case out := <-outCh:
if out != eval {
prettyExp, _ := json.MarshalIndent(eval, "", "\t")
prettyGot, _ := json.MarshalIndent(out, "", "\t")
t.Fatalf("dequeue result expected:\n%s\ngot:\n%s",
string(prettyExp), string(prettyGot))
}
case err := <-errCh:
t.Fatalf("error from anonymous goroutine after enqueue: %v", err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for dequeue result")
}
}
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, _, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 5*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Reset in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.OutstandingReset(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 75*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Pause in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("pause nack timeout error: %v", err)
}
errCh := make(chan error)
go func() {
defer close(errCh)
time.Sleep(20 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
errCh <- err
return
}
}()
// Dequeue, should block until the timer is resumed
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("dequeue error: %v", err)
}
if out != eval {
prettyExp, _ := json.MarshalIndent(eval, "", "\t")
prettyGot, _ := json.MarshalIndent(out, "", "\t")
t.Fatalf("dequeue result expected:\n%s\ngot:\n%s",
string(prettyExp), string(prettyGot))
}
// Check the nack timer
if diff := end.Sub(start); diff < 95*time.Millisecond {
t.Fatalf("deqeue happened too fast: %#v", diff)
}
// check the result of ResumeNackTimeout
err = <-errCh
if err != nil {
t.Fatalf("resume nack timeout error:%s", err)
}
}
func TestEvalBroker_DeliveryLimit(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Nack with wrong token should fail
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Check the stats
stats := b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue from failed queue
out, token, err := b.Dequeue([]string{failedQueue}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Unacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack finally
err = b.Ack(out.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 0 {
t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue])
}
if stats.ByScheduler[failedQueue].Unacked != 0 {
t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue])
}
}
func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
if i == 2 {
b.Ack(eval.ID, token)
} else {
// Nack with wrong token should fail
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}
// Check the stats
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats.ByScheduler[failedQueue]; ok {
t.Fatalf("bad: %#v", stats)
}
}
// TestEvalBroker_Wait asserts delayed evaluations cannot be dequeued until
// their wait duration has elapsed.
func TestEvalBroker_Wait(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create an eval that should wait
eval := mock.Eval()
eval.Wait = 100 * time.Millisecond
b.Enqueue(eval)
// Verify waiting
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should not return the eval until wait has elapsed
out, token, err := b.Dequeue(defaultSched, 1)
require.Nil(t, out)
require.Empty(t, token)
require.NoError(t, err)
// Let the wait elapse
time.Sleep(200 * time.Millisecond)
// Verify ready
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, _, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
}
// Ensure that delayed evaluations work as expected
func TestEvalBroker_WaitUntil(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
b := testBroker(t, 0)
b.SetEnabled(true)
now := time.Now()
// Create a few of evals with WaitUntil set
eval1 := mock.Eval()
eval1.WaitUntil = now.Add(1 * time.Second)
eval1.CreateIndex = 1
b.Enqueue(eval1)
eval2 := mock.Eval()
eval2.WaitUntil = now.Add(100 * time.Millisecond)
// set CreateIndex to use as a tie breaker when eval2
// and eval3 are both in the pending evals heap
eval2.CreateIndex = 2
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.WaitUntil = now.Add(20 * time.Millisecond)
eval3.CreateIndex = 1
b.Enqueue(eval3)
require.Equal(3, b.stats.TotalWaiting)
// sleep enough for two evals to be ready
time.Sleep(200 * time.Millisecond)
// first dequeue should return eval3
out, _, err := b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval3, out)
// second dequeue should return eval2
out, _, err = b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval2, out)
// third dequeue should return eval1
out, _, err = b.Dequeue(defaultSched, 2*time.Second)
require.Nil(err)
require.Equal(eval1, out)
require.Equal(0, b.stats.TotalWaiting)
}
// Ensure that priority is taken into account when enqueueing many evaluations.
func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation)
errCh := make(chan error)
go func() {
defer close(errCh)
defer close(outCh)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
errCh <- err
return
}
end := time.Now()
if d := end.Sub(start); d < 5*time.Millisecond {
errCh <- fmt.Errorf("test broker dequeue duration too fast: %v", d)
return
}
outCh <- out
}()
// Wait for a bit, or t.Fatal if an error has already happened in
// the goroutine
select {
case <-time.After(5 * time.Millisecond):
// no errors yet, soldier on
case err := <-errCh:
if err != nil {
t.Fatalf("error from anonymous goroutine before enqueue: %v", err)
}
}
// Enqueue
evals := make(map[*structs.Evaluation]string, 8)
expectedPriority := 90
for i := 10; i <= expectedPriority; i += 10 {
eval := mock.Eval()
eval.Priority = i
evals[eval] = ""
}
b.EnqueueAll(evals)
// Ensure dequeue
select {
case out := <-outCh:
if out.Priority != expectedPriority {
pretty, _ := json.MarshalIndent(out, "", "\t")
t.Logf("bad priority on *structs.Evaluation: %s", string(pretty))
t.Fatalf("priority wanted:%d, priority got:%d", expectedPriority, out.Priority)
}
case err := <-errCh:
t.Fatalf("error from anonymous goroutine after enqueue: %v", err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for dequeue result")
}
}
func TestEvalBroker_EnqueueAll_Requeue_Ack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create the evaluation, enqueue and dequeue
eval := mock.Eval()
b.Enqueue(eval)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Requeue the same evaluation.
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
// The stats should show one unacked
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack the evaluation.
if err := b.Ack(eval.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Check stats again as this should cause the re-enqueued one to transition
// into the ready state
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Another dequeue should be successful
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out)
}
if token == token2 {
t.Fatalf("bad : %s and %s", token, token2)
}
}
func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create the evaluation, enqueue and dequeue
eval := mock.Eval()
b.Enqueue(eval)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Requeue the same evaluation.
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
// The stats should show one unacked
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Nack the evaluation.
if err := b.Nack(eval.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Check stats again as this should cause the re-enqueued one to be dropped
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if len(b.requeue) != 0 {
return false, fmt.Errorf("bad: %#v", b.requeue)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
}
func TestEvalBroker_NamespacedJobs(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create evals with the same jobid and different namespace
jobId := "test-jobID"
eval1 := mock.Eval()
eval1.JobID = jobId
eval1.Namespace = "n1"
b.Enqueue(eval1)
// This eval should not block
eval2 := mock.Eval()
eval2.JobID = jobId
eval2.Namespace = "default"
b.Enqueue(eval2)
// This eval should block
eval3 := mock.Eval()
eval3.JobID = jobId
eval3.Namespace = "default"
b.Enqueue(eval3)
require := require.New(t)
out1, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval1.ID, out1.ID)
out2, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval2.ID, out2.ID)
out3, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Nil(out3)
require.Equal(1, len(b.blocked))
}
func TestEvalBroker_PendingEvals_Ordering(t *testing.T) {
ready := PendingEvaluations{}
newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.JobID = jobID
eval.ID = evalID
eval.Priority = priority
eval.CreateIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&ready, newEval("example1", "eval01", 50, 1))
heap.Push(&ready, newEval("example3", "eval03", 70, 3))
heap.Push(&ready, newEval("example2", "eval02", 50, 2))
next := heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval03", next.ID,
test.Sprint("expected highest Priority to be next ready"))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval01", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
heap.Push(&ready, newEval("example4", "eval04", 50, 4))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval02", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
}
func TestEvalBroker_BlockedEval_Ordering(t *testing.T) {
blocked := BlockedEvaluations{}
newEval := func(evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.ID = evalID
eval.Priority = priority
eval.ModifyIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&blocked, newEval("eval03", 50, 3))
heap.Push(&blocked, newEval("eval02", 100, 2))
heap.Push(&blocked, newEval("eval01", 50, 1))
unblocked := heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval02", unblocked.ID,
test.Sprint("expected eval with highest priority to get unblocked"))
unblocked = heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval03", unblocked.ID,
test.Sprint("expected eval with highest modify index to get unblocked"))
heap.Push(&blocked, newEval("eval04", 30, 4))
unblocked = heap.Pop(&blocked).(*structs.Evaluation)
test.Eq(t, "eval01", unblocked.ID,
test.Sprint("expected eval with highest priority to get unblocked"))
}
func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) {
ci.Parallel(t)
blocked := BlockedEvaluations{}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
for i := 100; i > 0; i -= 10 {
eval := mock.Eval()
eval.JobID = "example"
eval.CreateIndex = uint64(i)
eval.ModifyIndex = uint64(i)
heap.Push(&blocked, eval)
}
canceled := blocked.MarkForCancel()
must.Eq(t, 9, len(canceled))
must.Eq(t, 1, blocked.Len())
raw := heap.Pop(&blocked)
must.NotNil(t, raw)
eval := raw.(*structs.Evaluation)
must.Eq(t, 100, eval.ModifyIndex)
}
// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
// workflows
func TestEvalBroker_IntegrationTest(t *testing.T) {
ci.Parallel(t)
srv, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent dequeue
c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up
})
defer cleanupS1()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
store := srv.fsm.State()
// create a system job, a node for it to run on, and a set of node up/down
// events that will result in evaluations queued.
job := mock.SystemJob()
jobReq := &structs.JobRegisterRequest{
Job: job,
EvalPriority: 50,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var jobResp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)
node := mock.Node()
nodeReq := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp)
must.NoError(t, err)
for i := 0; i < 10; i++ {
status := structs.NodeStatusDown
if i%2 == 0 {
status = structs.NodeStatusReady
}
statusReq := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: status,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var statusResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp)
must.NoError(t, err)
}
// ensure we have the expected number of evaluations and eval broker state
// retreive the stats from the broker, less some uninteresting ones
getStats := func() BrokerStats {
t.Helper()
stats := srv.evalBroker.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
getEvalStatuses := func() map[string]int {
t.Helper()
statuses := map[string]int{}
iter, err := store.Evals(nil, state.SortDefault)
must.NoError(t, err)
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
statuses[eval.Status] += 1
if eval.Status == structs.EvalStatusCancelled {
must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription)
}
}
return statuses
}
must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses())
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalBlocked: 10, TotalCancelable: 0}, getStats())
// start schedulers: all the evals are for a single job so there should only
// be one eval processesed at a time no matter how many schedulers we run
config := DefaultConfig()
config.NumSchedulers = 4
config.EvalReapCancelableInterval = time.Minute * 10
require.NoError(t, srv.Reload(config))
// assert that all but 2 evals were canceled and that the eval broker state
// has been cleared
require.Eventually(t, func() bool {
got := getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
}, 2*time.Second, time.Millisecond*100)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalBlocked: 0, TotalCancelable: 0}, getStats())
}