open-nomad/nomad/eval_broker_test.go
Tim Gross a51149736d
Rename nomad.broker.total_blocked metric (#15835)
This changeset fixes a long-standing point of confusion in metrics emitted by
the eval broker. The eval broker has a queue of "blocked" evals that are waiting
for an in-flight ("unacked") eval of the same job to be completed. But this
"blocked" state is not the same as the `blocked` status that we write to raft
and expose in the Nomad API to end users. There's a second metric
`nomad.blocked_eval.total_blocked` that refers to evaluations in that
state. This has caused ongoing confusion in major customer incidents and even in
our own documentation! (Fixed in this PR.)

There's little functional change in this PR aside from the name of the metric
emitted, but there's a bit refactoring to clean up the names in `eval_broker.go`
so that there aren't name collisions and multiple names for the same
state. Changes included are:
* Everything that was previously called "pending" referred to entities that were
  associated witht he "ready" metric. These are all now called "ready" to match
  the metric.
* Everything named "blocked" in `eval_broker.go` is now named "pending", except
  for a couple of comments that actually refer to blocked RPCs.
* Added a note to the upgrade guide docs for 1.5.0.
* Fixed the scheduling performance metrics docs because the description for
  `nomad.broker.total_blocked` was actually the description for
  `nomad.blocked_eval.total_blocked`.
2023-01-20 14:23:56 -05:00

1584 lines
37 KiB
Go

package nomad
import (
"container/heap"
"encoding/json"
"errors"
"fmt"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
var (
defaultSched = []string{
structs.JobTypeService,
structs.JobTypeBatch,
}
)
func testBrokerConfig() *Config {
config := DefaultConfig()
// Tune the Nack timeout
config.EvalNackTimeout = 5 * time.Second
// Tune the Nack delay
config.EvalNackInitialReenqueueDelay = 5 * time.Millisecond
config.EvalNackSubsequentReenqueueDelay = 50 * time.Millisecond
return config
}
func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
config := testBrokerConfig()
if timeout != 0 {
config.EvalNackTimeout = timeout
}
return testBrokerFromConfig(t, config)
}
func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}
return b
}
func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue, but broker is disabled!
eval := mock.Eval()
b.Enqueue(eval)
// Verify nothing was done
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if b.Enabled() {
t.Fatalf("should not be enabled")
}
// Enable the broker, and enqueue
b.SetEnabled(true)
b.Enqueue(eval)
// Double enqueue is a no-op
b.Enqueue(eval)
if !b.Enabled() {
t.Fatalf("should be enabled")
}
// Verify enqueue is done
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
tokenOut, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut != token {
t.Fatalf("Bad: %#v %#v", token, tokenOut)
}
// OutstandingReset should verify the token
err = b.OutstandingReset("nope", "foo")
if err != ErrNotOutstanding {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, "foo")
if err != ErrTokenMismatch {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, tokenOut)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Nack with wrong token should fail
err = b.Nack(eval.ID, "foobarbaz")
if err == nil {
t.Fatalf("should fail to nack")
}
// Nack back into the queue
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
tokenOut2, ok := b.Outstanding(out.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if tokenOut2 != token2 {
t.Fatalf("Bad: %#v %#v", token2, tokenOut2)
}
// Ack with wrong token
err = b.Ack(eval.ID, "zip")
if err == nil {
t.Fatalf("should fail to ack")
}
// Ack finally
err = b.Ack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Nack_Delay(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue, but broker is disabled!
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Nack back into the queue
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats to ensure that it is waiting
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
// Capture the time
start := time.Now()
// Nack back into the queue
err = b.Nack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
delay := time.Now().Sub(start)
if delay < b.subsequentNackDelay {
t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay)
}
// Dequeue should work again
out3, token3, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out3 != eval {
t.Fatalf("bad : %#v", out3)
}
if token3 == token || token3 == token2 {
t.Fatalf("should get a new token")
}
// Ack finally
err = b.Ack(eval.ID, token3)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
ns1 := "namespace-one"
ns2 := "namespace-two"
jobID := "example"
newEval := func(idx uint64, ns string) *structs.Evaluation {
eval := mock.Eval()
eval.ID = fmt.Sprintf("eval:%d", idx)
eval.JobID = jobID
eval.Namespace = ns
eval.CreateIndex = idx
eval.ModifyIndex = idx
b.Enqueue(eval)
return eval
}
// first job
eval1 := newEval(1, ns1)
newEval(2, ns1)
newEval(3, ns1)
eval4 := newEval(4, ns1)
// second job
eval5 := newEval(5, ns2)
newEval(6, ns2)
eval7 := newEval(7, ns2)
// retreive the stats from the broker, less some stats that aren't
// interesting for this test and make the test much more verbose
// to include
getStats := func() BrokerStats {
t.Helper()
stats := b.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalPending: 5, TotalCancelable: 0}, getStats())
// Dequeue should get 1st eval
out, token, err := b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval1, must.Sprint("expected 1st eval"))
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalPending: 5, TotalCancelable: 0}, getStats())
// Current wait index should be 4 but Ack to exercise behavior
// when worker's Eval.getWaitIndex gets a stale index
err = b.Ack(eval1.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0,
TotalPending: 2, TotalCancelable: 2}, getStats())
// eval4 and eval5 are ready
// eval6 and eval7 are pending
// Dequeue should get 4th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval4, must.Sprint("expected 4th eval"))
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1,
TotalPending: 2, TotalCancelable: 2}, getStats())
// Ack should clear the rest of namespace-one pending but leave
// namespace-two untouched
err = b.Ack(eval4.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalPending: 2, TotalCancelable: 2}, getStats())
// Dequeue should get 5th eval
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval5, must.Sprint("expected 5th eval"))
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalPending: 2, TotalCancelable: 2}, getStats())
// Ack should clear remaining namespace-two pending evals
err = b.Ack(eval5.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalPending: 0, TotalCancelable: 3}, getStats())
// Dequeue should get 7th eval because that's all that's left
out, token, err = b.Dequeue(defaultSched, time.Second)
must.NoError(t, err)
must.Eq(t, out, eval7, must.Sprint("expected 7th eval"))
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1,
TotalPending: 0, TotalCancelable: 3}, getStats())
// Last ack should leave the broker empty except for cancels
err = b.Ack(eval7.ID, token)
must.NoError(t, err)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalPending: 0, TotalCancelable: 3}, getStats())
}
func TestEvalBroker_Enqueue_Disable(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
// Enqueue
eval := mock.Eval()
b.SetEnabled(true)
b.Enqueue(eval)
// Flush via SetEnabled
b.SetEnabled(false)
// Check the stats
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats.ByScheduler[eval.Type]; ok {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
baseEval := mock.Eval()
b.SetEnabled(true)
{
// Enqueue
b.Enqueue(baseEval.Copy())
delayedEval := baseEval.Copy()
delayedEval.Wait = 30
b.Enqueue(delayedEval)
waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}
// Flush via SetEnabled
b.SetEnabled(false)
{
// Check the stats
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalPending, "Expected pending to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
{
// Enqueue again now we're disabled
b.Enqueue(baseEval.Copy())
delayedEval := baseEval.Copy()
delayedEval.Wait = 30 * time.Second
b.Enqueue(delayedEval)
waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}
{
// Check the stats again
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalPending, "Expected pending to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
}
func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected: %#v", out)
}
if diff := end.Sub(start); diff < 5*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
errCh := make(chan error)
go func() {
defer close(errCh)
out, _, err := b.Dequeue(defaultSched, 0)
if err != nil {
errCh <- err
return
}
if out == nil {
errCh <- errors.New("expected a non-nil value")
return
}
}()
// Sleep for a little bit
select {
case <-time.After(5 * time.Millisecond):
case err := <-errCh:
if err != nil {
t.Fatalf("error from dequeue goroutine: %s", err)
}
t.Fatalf("Dequeue(0) should block, not finish")
}
// Enqueue to unblock the dequeue.
eval := mock.Eval()
b.Enqueue(eval)
select {
case err := <-errCh:
if err != nil {
t.Fatalf("error from dequeue goroutine: %s", err)
}
case <-time.After(5 * time.Millisecond):
t.Fatal("timeout: Dequeue(0) should return after enqueue")
}
}
// Ensure higher priority dequeued first
func TestEvalBroker_Dequeue_Priority(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval1 := mock.Eval()
eval1.Priority = 10
b.Enqueue(eval1)
eval2 := mock.Eval()
eval2.Priority = 30
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.Priority = 20
b.Enqueue(eval3)
out1, _, _ := b.Dequeue(defaultSched, time.Second)
if out1 != eval2 {
t.Fatalf("bad: %#v", out1)
}
out2, _, _ := b.Dequeue(defaultSched, time.Second)
if out2 != eval3 {
t.Fatalf("bad: %#v", out2)
}
out3, _, _ := b.Dequeue(defaultSched, time.Second)
if out3 != eval1 {
t.Fatalf("bad: %#v", out3)
}
}
// Ensure FIFO at fixed priority
func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
NUM := 100
for i := NUM; i > 0; i-- {
eval1 := mock.Eval()
eval1.CreateIndex = uint64(i)
eval1.ModifyIndex = uint64(i)
b.Enqueue(eval1)
}
for i := 1; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
must.Eq(t, uint64(i), out1.CreateIndex,
must.Sprintf("eval was not FIFO by CreateIndex"),
)
}
}
// Ensure fairness between schedulers
func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
NUM := 1000
for i := 0; i < NUM; i++ {
eval1 := mock.Eval()
if i < (NUM / 2) {
eval1.Type = structs.JobTypeService
} else {
eval1.Type = structs.JobTypeBatch
}
b.Enqueue(eval1)
}
counter := 0
for i := 0; i < NUM; i++ {
out1, _, _ := b.Dequeue(defaultSched, time.Second)
switch out1.Type {
case structs.JobTypeService:
if counter < 0 {
counter = 0
}
counter += 1
case structs.JobTypeBatch:
if counter > 0 {
counter = 0
}
counter -= 1
}
// This will fail randomly at times. It is very hard to
// test deterministically that its acting randomly.
if counter >= 250 || counter <= -250 {
t.Fatalf("unlikely sequence: %d", counter)
}
}
}
// Ensure we get unblocked
func TestEvalBroker_Dequeue_Blocked(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation)
errCh := make(chan error)
go func() {
defer close(errCh)
defer close(outCh)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
errCh <- err
return
}
end := time.Now()
if d := end.Sub(start); d < 5*time.Millisecond {
errCh <- fmt.Errorf("test broker dequeue duration too fast: %v", d)
return
}
outCh <- out
}()
// Wait for a bit, or t.Fatal if an error has already happened in
// the goroutine
select {
case <-time.After(5 * time.Millisecond):
// no errors yet, soldier on
case err := <-errCh:
if err != nil {
t.Fatalf("error from anonymous goroutine before enqueue: %v", err)
}
}
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Ensure dequeue
select {
case out := <-outCh:
if out != eval {
prettyExp, _ := json.MarshalIndent(eval, "", "\t")
prettyGot, _ := json.MarshalIndent(out, "", "\t")
t.Fatalf("dequeue result expected:\n%s\ngot:\n%s",
string(prettyExp), string(prettyGot))
}
case err := <-errCh:
t.Fatalf("error from anonymous goroutine after enqueue: %v", err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for dequeue result")
}
}
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_Timeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, _, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 5*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Reset in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.OutstandingReset(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 75*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Pause in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("pause nack timeout error: %v", err)
}
errCh := make(chan error)
go func() {
defer close(errCh)
time.Sleep(20 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
errCh <- err
return
}
}()
// Dequeue, should block until the timer is resumed
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("dequeue error: %v", err)
}
if out != eval {
prettyExp, _ := json.MarshalIndent(eval, "", "\t")
prettyGot, _ := json.MarshalIndent(out, "", "\t")
t.Fatalf("dequeue result expected:\n%s\ngot:\n%s",
string(prettyExp), string(prettyGot))
}
// Check the nack timer
if diff := end.Sub(start); diff < 95*time.Millisecond {
t.Fatalf("deqeue happened too fast: %#v", diff)
}
// check the result of ResumeNackTimeout
err = <-errCh
if err != nil {
t.Fatalf("resume nack timeout error:%s", err)
}
}
func TestEvalBroker_DeliveryLimit(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Nack with wrong token should fail
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Check the stats
stats := b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue from failed queue
out, token, err := b.Dequeue([]string{failedQueue}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Unacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack finally
err = b.Ack(out.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[failedQueue].Ready != 0 {
t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue])
}
if stats.ByScheduler[failedQueue].Unacked != 0 {
t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue])
}
}
func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
if i == 2 {
b.Ack(eval.ID, token)
} else {
// Nack with wrong token should fail
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}
// Check the stats
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats.ByScheduler[failedQueue]; ok {
t.Fatalf("bad: %#v", stats)
}
}
// TestEvalBroker_Wait asserts delayed evaluations cannot be dequeued until
// their wait duration has elapsed.
func TestEvalBroker_Wait(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create an eval that should wait
eval := mock.Eval()
eval.Wait = 100 * time.Millisecond
b.Enqueue(eval)
// Verify waiting
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 1 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should not return the eval until wait has elapsed
out, token, err := b.Dequeue(defaultSched, 1)
require.Nil(t, out)
require.Empty(t, token)
require.NoError(t, err)
// Let the wait elapse
time.Sleep(200 * time.Millisecond)
// Verify ready
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
t.Fatalf("bad: %#v", stats)
}
// Dequeue should work
out, _, err = b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
}
// Ensure that delayed evaluations work as expected
func TestEvalBroker_WaitUntil(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
b := testBroker(t, 0)
b.SetEnabled(true)
now := time.Now()
// Create a few of evals with WaitUntil set
eval1 := mock.Eval()
eval1.WaitUntil = now.Add(1 * time.Second)
eval1.CreateIndex = 1
b.Enqueue(eval1)
eval2 := mock.Eval()
eval2.WaitUntil = now.Add(100 * time.Millisecond)
// set CreateIndex to use as a tie breaker when eval2
// and eval3 are both in the pending evals heap
eval2.CreateIndex = 2
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.WaitUntil = now.Add(20 * time.Millisecond)
eval3.CreateIndex = 1
b.Enqueue(eval3)
require.Equal(3, b.stats.TotalWaiting)
// sleep enough for two evals to be ready
time.Sleep(200 * time.Millisecond)
// first dequeue should return eval3
out, _, err := b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval3, out)
// second dequeue should return eval2
out, _, err = b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval2, out)
// third dequeue should return eval1
out, _, err = b.Dequeue(defaultSched, 2*time.Second)
require.Nil(err)
require.Equal(eval1, out)
require.Equal(0, b.stats.TotalWaiting)
}
// Ensure that priority is taken into account when enqueueing many evaluations.
func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Start with a blocked dequeue
outCh := make(chan *structs.Evaluation)
errCh := make(chan error)
go func() {
defer close(errCh)
defer close(outCh)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
errCh <- err
return
}
end := time.Now()
if d := end.Sub(start); d < 5*time.Millisecond {
errCh <- fmt.Errorf("test broker dequeue duration too fast: %v", d)
return
}
outCh <- out
}()
// Wait for a bit, or t.Fatal if an error has already happened in
// the goroutine
select {
case <-time.After(5 * time.Millisecond):
// no errors yet, soldier on
case err := <-errCh:
if err != nil {
t.Fatalf("error from anonymous goroutine before enqueue: %v", err)
}
}
// Enqueue
evals := make(map[*structs.Evaluation]string, 8)
expectedPriority := 90
for i := 10; i <= expectedPriority; i += 10 {
eval := mock.Eval()
eval.Priority = i
evals[eval] = ""
}
b.EnqueueAll(evals)
// Ensure dequeue
select {
case out := <-outCh:
if out.Priority != expectedPriority {
pretty, _ := json.MarshalIndent(out, "", "\t")
t.Logf("bad priority on *structs.Evaluation: %s", string(pretty))
t.Fatalf("priority wanted:%d, priority got:%d", expectedPriority, out.Priority)
}
case err := <-errCh:
t.Fatalf("error from anonymous goroutine after enqueue: %v", err)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for dequeue result")
}
}
func TestEvalBroker_EnqueueAll_Requeue_Ack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create the evaluation, enqueue and dequeue
eval := mock.Eval()
b.Enqueue(eval)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Requeue the same evaluation.
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
// The stats should show one unacked
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Ack the evaluation.
if err := b.Ack(eval.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Check stats again as this should cause the re-enqueued one to transition
// into the ready state
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Another dequeue should be successful
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out)
}
if token == token2 {
t.Fatalf("bad : %s and %s", token, token2)
}
}
func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create the evaluation, enqueue and dequeue
eval := mock.Eval()
b.Enqueue(eval)
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Requeue the same evaluation.
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
// The stats should show one unacked
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 1 {
t.Fatalf("bad: %#v", stats)
}
// Nack the evaluation.
if err := b.Nack(eval.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Check stats again as this should cause the re-enqueued one to be dropped
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if len(b.requeue) != 0 {
return false, fmt.Errorf("bad: %#v", b.requeue)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
}
func TestEvalBroker_NamespacedJobs(t *testing.T) {
ci.Parallel(t)
b := testBroker(t, 0)
b.SetEnabled(true)
// Create evals with the same jobid and different namespace
jobId := "test-jobID"
eval1 := mock.Eval()
eval1.JobID = jobId
eval1.Namespace = "n1"
b.Enqueue(eval1)
// This eval should not block
eval2 := mock.Eval()
eval2.JobID = jobId
eval2.Namespace = "default"
b.Enqueue(eval2)
// This eval should block
eval3 := mock.Eval()
eval3.JobID = jobId
eval3.Namespace = "default"
b.Enqueue(eval3)
require := require.New(t)
out1, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval1.ID, out1.ID)
out2, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Equal(eval2.ID, out2.ID)
out3, _, err := b.Dequeue(defaultSched, 5*time.Millisecond)
require.Nil(err)
require.Nil(out3)
require.Equal(1, len(b.pending))
}
func TestEvalBroker_ReadyEvals_Ordering(t *testing.T) {
ready := ReadyEvaluations{}
newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.JobID = jobID
eval.ID = evalID
eval.Priority = priority
eval.CreateIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&ready, newEval("example1", "eval01", 50, 1))
heap.Push(&ready, newEval("example3", "eval03", 70, 3))
heap.Push(&ready, newEval("example2", "eval02", 50, 2))
next := heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval03", next.ID,
test.Sprint("expected highest Priority to be next ready"))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval01", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
heap.Push(&ready, newEval("example4", "eval04", 50, 4))
next = heap.Pop(&ready).(*structs.Evaluation)
test.Eq(t, "eval02", next.ID,
test.Sprint("expected oldest CreateIndex to be next ready"))
}
func TestEvalBroker_PendingEval_Ordering(t *testing.T) {
pending := PendingEvaluations{}
newEval := func(evalID string, priority int, index uint64) *structs.Evaluation {
eval := mock.Eval()
eval.ID = evalID
eval.Priority = priority
eval.ModifyIndex = uint64(index)
return eval
}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
heap.Push(&pending, newEval("eval03", 50, 3))
heap.Push(&pending, newEval("eval02", 100, 2))
heap.Push(&pending, newEval("eval01", 50, 1))
next := heap.Pop(&pending).(*structs.Evaluation)
test.Eq(t, "eval02", next.ID,
test.Sprint("expected eval with highest priority to be next"))
next = heap.Pop(&pending).(*structs.Evaluation)
test.Eq(t, "eval03", next.ID,
test.Sprint("expected eval with highest modify index to be next"))
heap.Push(&pending, newEval("eval04", 30, 4))
next = heap.Pop(&pending).(*structs.Evaluation)
test.Eq(t, "eval01", next.ID,
test.Sprint("expected eval with highest priority to be nexct"))
}
func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) {
ci.Parallel(t)
pending := PendingEvaluations{}
// note: we're intentionally pushing these out-of-order to assert we're
// getting them back out in the intended order and not just as inserted
for i := 100; i > 0; i -= 10 {
eval := mock.Eval()
eval.JobID = "example"
eval.CreateIndex = uint64(i)
eval.ModifyIndex = uint64(i)
heap.Push(&pending, eval)
}
canceled := pending.MarkForCancel()
must.Eq(t, 9, len(canceled))
must.Eq(t, 1, pending.Len())
raw := heap.Pop(&pending)
must.NotNil(t, raw)
eval := raw.(*structs.Evaluation)
must.Eq(t, 100, eval.ModifyIndex)
}
// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
// workflows
func TestEvalBroker_IntegrationTest(t *testing.T) {
ci.Parallel(t)
srv, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent dequeue
c.EvalReapCancelableInterval = time.Minute * 10 // Prevent sweep-up
})
defer cleanupS1()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
store := srv.fsm.State()
// create a system job, a node for it to run on, and a set of node up/down
// events that will result in evaluations queued.
job := mock.SystemJob()
jobReq := &structs.JobRegisterRequest{
Job: job,
EvalPriority: 50,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var jobResp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)
node := mock.Node()
nodeReq := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReq, &nodeResp)
must.NoError(t, err)
for i := 0; i < 10; i++ {
status := structs.NodeStatusDown
if i%2 == 0 {
status = structs.NodeStatusReady
}
statusReq := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: status,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var statusResp structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", statusReq, &statusResp)
must.NoError(t, err)
}
// ensure we have the expected number of evaluations and eval broker state
// retreive the stats from the broker, less some uninteresting ones
getStats := func() BrokerStats {
t.Helper()
stats := srv.evalBroker.Stats()
stats.DelayedEvals = nil
stats.ByScheduler = nil
return *stats
}
getEvalStatuses := func() map[string]int {
t.Helper()
statuses := map[string]int{}
iter, err := store.Evals(nil, state.SortDefault)
must.NoError(t, err)
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
statuses[eval.Status] += 1
if eval.Status == structs.EvalStatusCancelled {
must.Eq(t, "canceled after more recent eval was processed", eval.StatusDescription)
}
}
return statuses
}
must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses())
must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0,
TotalPending: 10, TotalCancelable: 0}, getStats())
// start schedulers: all the evals are for a single job so there should only
// be one eval processesed at a time no matter how many schedulers we run
config := DefaultConfig()
config.NumSchedulers = 4
config.EvalReapCancelableInterval = time.Minute * 10
require.NoError(t, srv.Reload(config))
// assert that all but 2 evals were canceled and that the eval broker state
// has been cleared
require.Eventually(t, func() bool {
got := getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
}, 2*time.Second, time.Millisecond*100)
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalPending: 0, TotalCancelable: 0}, getStats())
}