package nomad import ( "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) var ( defaultSched = []string{ structs.JobTypeService, structs.JobTypeBatch, } ) func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { if timeout == 0 { timeout = 5 * time.Second } b, err := NewEvalBroker(timeout, 3) if err != nil { t.Fatalf("err: %v", err) } return b } func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { b := testBroker(t, 0) // Enqueue, but broker is disabled! eval := mock.Eval() err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } // Verify nothing was done stats := b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if b.Enabled() { t.Fatalf("should not be enabled") } // Enable the broker, and enqueue b.SetEnabled(true) err = b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } // Double enqueue is a no-op err = b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } if !b.Enabled() { t.Fatalf("should be enabled") } // Verify enqueue is done stats = b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Ready != 1 { t.Fatalf("bad: %#v", stats) } // Dequeue should work out, token, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad : %#v", out) } tokenOut, ok := b.Outstanding(out.ID) if !ok { t.Fatalf("should be outstanding") } if tokenOut != token { t.Fatalf("Bad: %#v %#v", token, tokenOut) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 1 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Ready != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Unacked != 1 { t.Fatalf("bad: %#v", stats) } // Nack with wrong token should fail err = b.Nack(eval.ID, "foobarbaz") if err == nil { t.Fatalf("should fail to nack") } // Nack back into the queue err = b.Nack(eval.ID, token) if err != nil { t.Fatalf("err: %v", err) } if _, ok := b.Outstanding(out.ID); ok { t.Fatalf("should not be outstanding") } // Check the stats stats = b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Ready != 1 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Unacked != 0 { t.Fatalf("bad: %#v", stats) } // Dequeue should work again out2, token2, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out2 != eval { t.Fatalf("bad : %#v", out2) } if token2 == token { t.Fatalf("should get a new token") } tokenOut2, ok := b.Outstanding(out.ID) if !ok { t.Fatalf("should be outstanding") } if tokenOut2 != token2 { t.Fatalf("Bad: %#v %#v", token2, tokenOut2) } // Ack with wrong token err = b.Ack(eval.ID, "zip") if err == nil { t.Fatalf("should fail to ack") } // Ack finally err = b.Ack(eval.ID, token2) if err != nil { t.Fatalf("err: %v", err) } if _, ok := b.Outstanding(out.ID); ok { t.Fatalf("should not be outstanding") } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Ready != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[eval.Type].Unacked != 0 { t.Fatalf("bad: %#v", stats) } } func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) eval := mock.Eval() err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } eval2 := mock.Eval() eval2.JobID = eval.JobID eval2.CreateIndex = eval.CreateIndex + 1 err = b.Enqueue(eval2) if err != nil { t.Fatalf("err: %v", err) } eval3 := mock.Eval() eval3.JobID = eval.JobID eval3.CreateIndex = eval.CreateIndex + 2 err = b.Enqueue(eval3) if err != nil { t.Fatalf("err: %v", err) } stats := b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 2 { t.Fatalf("bad: %#v", stats) } // Dequeue should work out, token, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad : %#v", out) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 2 { t.Fatalf("bad: %#v", stats) } // Ack out err = b.Ack(eval.ID, token) if err != nil { t.Fatalf("err: %v", err) } // Check the stats stats = b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 1 { t.Fatalf("bad: %#v", stats) } // Dequeue should work out, token, err = b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval2 { t.Fatalf("bad : %#v", out) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 1 { t.Fatalf("bad: %#v", stats) } // Ack out err = b.Ack(eval2.ID, token) if err != nil { t.Fatalf("err: %v", err) } // Check the stats stats = b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 0 { t.Fatalf("bad: %#v", stats) } // Dequeue should work out, token, err = b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval3 { t.Fatalf("bad : %#v", out) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 0 { t.Fatalf("bad: %#v", stats) } // Ack out err = b.Ack(eval3.ID, token) if err != nil { t.Fatalf("err: %v", err) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalBlocked != 0 { t.Fatalf("bad: %#v", stats) } } func TestEvalBroker_Enqueue_Disable(t *testing.T) { b := testBroker(t, 0) // Enqueue eval := mock.Eval() b.SetEnabled(true) err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } // Flush via SetEnabled b.SetEnabled(false) // Check the stats stats := b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if _, ok := stats.ByScheduler[eval.Type]; ok { t.Fatalf("bad: %#v", stats) } } func TestEvalBroker_Dequeue_Timeout(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) start := time.Now() out, _, err := b.Dequeue(defaultSched, 5*time.Millisecond) end := time.Now() if err != nil { t.Fatalf("err: %v", err) } if out != nil { t.Fatalf("unexpected: %#v", out) } if diff := end.Sub(start); diff < 5*time.Millisecond { t.Fatalf("bad: %#v", diff) } } // Ensure higher priority dequeued first func TestEvalBroker_Dequeue_Priority(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) eval1 := mock.Eval() eval1.Priority = 10 b.Enqueue(eval1) eval2 := mock.Eval() eval2.Priority = 30 b.Enqueue(eval2) eval3 := mock.Eval() eval3.Priority = 20 b.Enqueue(eval3) out1, _, _ := b.Dequeue(defaultSched, time.Second) if out1 != eval2 { t.Fatalf("bad: %#v", out1) } out2, _, _ := b.Dequeue(defaultSched, time.Second) if out2 != eval3 { t.Fatalf("bad: %#v", out2) } out3, _, _ := b.Dequeue(defaultSched, time.Second) if out3 != eval1 { t.Fatalf("bad: %#v", out3) } } // Ensure FIFO at fixed priority func TestEvalBroker_Dequeue_FIFO(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) NUM := 100 for i := 0; i < NUM; i++ { eval1 := mock.Eval() eval1.CreateIndex = uint64(i) eval1.ModifyIndex = uint64(i) b.Enqueue(eval1) } for i := 0; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) if out1.CreateIndex != uint64(i) { t.Fatalf("bad: %d %#v", i, out1) } } } // Ensure fairness between schedulers func TestEvalBroker_Dequeue_Fairness(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) NUM := 100 for i := 0; i < NUM; i++ { eval1 := mock.Eval() if i < (NUM / 2) { eval1.Type = structs.JobTypeService } else { eval1.Type = structs.JobTypeBatch } b.Enqueue(eval1) } counter := 0 for i := 0; i < NUM; i++ { out1, _, _ := b.Dequeue(defaultSched, time.Second) switch out1.Type { case structs.JobTypeService: if counter < 0 { counter = 0 } counter += 1 case structs.JobTypeBatch: if counter > 0 { counter = 0 } counter -= 1 } // This will fail randomly at times. It is very hard to // test deterministically that its acting randomly. if counter >= 25 || counter <= -25 { t.Fatalf("unlikely sequence: %d", counter) } } } // Ensure we get unblocked func TestEvalBroker_Dequeue_Blocked(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) // Start with a blocked dequeue outCh := make(chan *structs.Evaluation, 1) go func() { start := time.Now() out, _, err := b.Dequeue(defaultSched, time.Second) end := time.Now() outCh <- out if err != nil { t.Fatalf("err: %v", err) } if d := end.Sub(start); d < 5*time.Millisecond { t.Fatalf("bad: %v", d) } }() // Wait for a bit time.Sleep(5 * time.Millisecond) // Enqueue eval := mock.Eval() err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure dequeue select { case out := <-outCh: if out != eval { t.Fatalf("bad: %v", out) } case <-time.After(time.Second): t.Fatalf("timeout") } } // Ensure we nack in a timely manner func TestEvalBroker_Nack_Timeout(t *testing.T) { b := testBroker(t, 5*time.Millisecond) b.SetEnabled(true) // Enqueue eval := mock.Eval() err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } // Dequeue out, _, err := b.Dequeue(defaultSched, time.Second) start := time.Now() if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad: %v", out) } // Dequeue, should block on Nack timer out, _, err = b.Dequeue(defaultSched, time.Second) end := time.Now() if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad: %v", out) } // Check the nack timer if diff := end.Sub(start); diff < 5*time.Millisecond { t.Fatalf("bad: %#v", diff) } } func TestEvalBroker_DeliveryLimit(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) eval := mock.Eval() err := b.Enqueue(eval) if err != nil { t.Fatalf("err: %v", err) } for i := 0; i < 3; i++ { // Dequeue should work out, token, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad : %#v", out) } // Nack with wrong token should fail err = b.Nack(eval.ID, token) if err != nil { t.Fatalf("err: %v", err) } } // Check the stats stats := b.Stats() if stats.TotalReady != 1 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[failedQueue].Ready != 1 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[failedQueue].Unacked != 0 { t.Fatalf("bad: %#v", stats) } // Dequeue from failed queue out, token, err := b.Dequeue([]string{failedQueue}, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out != eval { t.Fatalf("bad : %#v", out) } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 1 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[failedQueue].Ready != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[failedQueue].Unacked != 1 { t.Fatalf("bad: %#v", stats) } // Ack finally err = b.Ack(out.ID, token) if err != nil { t.Fatalf("err: %v", err) } if _, ok := b.Outstanding(out.ID); ok { t.Fatalf("should not be outstanding") } // Check the stats stats = b.Stats() if stats.TotalReady != 0 { t.Fatalf("bad: %#v", stats) } if stats.TotalUnacked != 0 { t.Fatalf("bad: %#v", stats) } if stats.ByScheduler[failedQueue].Ready != 0 { t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue]) } if stats.ByScheduler[failedQueue].Unacked != 0 { t.Fatalf("bad: %#v", stats.ByScheduler[failedQueue]) } }