From 045f7807e0de7de6ba92ddff579601cc95382b42 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 18 May 2016 11:35:15 -0700 Subject: [PATCH] eval_broker.Enqueue no longer returns an error --- nomad/eval_broker.go | 8 ++-- nomad/eval_broker_test.go | 75 ++++++++----------------------------- nomad/eval_endpoint_test.go | 7 +--- nomad/fsm.go | 5 +-- nomad/leader.go | 4 +- nomad/leader_test.go | 7 +--- nomad/plan_endpoint_test.go | 8 +--- nomad/worker_test.go | 52 +++++-------------------- 8 files changed, 34 insertions(+), 132 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 6e5a3ed59..5d1ed19f9 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -140,14 +140,13 @@ func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { } // Enqueue is used to enqueue an evaluation -// TODO: remove the error return value -func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { +func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { b.l.Lock() defer b.l.Unlock() // Check if already enqueued if _, ok := b.evals[eval.ID]; ok { - return nil + return } else if b.enabled { b.evals[eval.ID] = 0 } @@ -159,11 +158,10 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { }) b.timeWait[eval.ID] = timer b.stats.TotalWaiting += 1 - return nil + return } b.enqueueLocked(eval, eval.Type) - return nil } // enqueueWaiting is used to enqueue a waiting evaluation diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index b81fbd944..d2f083233 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -31,10 +31,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { // Enqueue, but broker is disabled! eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Verify nothing was done stats := b.Stats() @@ -48,16 +45,10 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { // Enable the broker, and enqueue b.SetEnabled(true) - err = b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Double enqueue is a no-op - err = b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) if !b.Enabled() { t.Fatalf("should be enabled") @@ -206,26 +197,17 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) eval2 := mock.Eval() eval2.JobID = eval.JobID eval2.CreateIndex = eval.CreateIndex + 1 - err = b.Enqueue(eval2) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval2) eval3 := mock.Eval() eval3.JobID = eval.JobID eval3.CreateIndex = eval.CreateIndex + 2 - err = b.Enqueue(eval3) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval3) stats := b.Stats() if stats.TotalReady != 1 { @@ -359,10 +341,7 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) { // Enqueue eval := mock.Eval() b.SetEnabled(true) - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Flush via SetEnabled b.SetEnabled(false) @@ -425,10 +404,7 @@ func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) { // Enqueue to unblock the dequeue. eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) select { case <-doneCh: @@ -558,10 +534,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Ensure dequeue select { @@ -581,10 +554,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, _, err := b.Dequeue(defaultSched, time.Second) @@ -619,10 +589,7 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, token, err := b.Dequeue(defaultSched, time.Second) @@ -662,10 +629,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { // Enqueue eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Dequeue out, token, err := b.Dequeue(defaultSched, time.Second) @@ -711,10 +675,7 @@ func TestEvalBroker_DeliveryLimit(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) for i := 0; i < 3; i++ { // Dequeue should work @@ -803,10 +764,7 @@ func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) { b.SetEnabled(true) eval := mock.Eval() - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) for i := 0; i < 3; i++ { // Dequeue should work @@ -850,10 +808,7 @@ func TestEvalBroker_Wait(t *testing.T) { // Create an eval that should wait eval := mock.Eval() eval.Wait = 10 * time.Millisecond - err := b.Enqueue(eval) - if err != nil { - t.Fatalf("err: %v", err) - } + b.Enqueue(eval) // Verify waiting stats := b.Stats() diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3d59f33ed..f037c8bd3 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -138,12 +138,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Dequeue the eval get := &structs.EvalDequeueRequest{ diff --git a/nomad/fsm.go b/nomad/fsm.go index a22fc08c1..6237e9760 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -331,10 +331,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { for _, eval := range req.Evals { if eval.ShouldEnqueue() { - if err := n.evalBroker.Enqueue(eval); err != nil { - n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err) - return err - } + n.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { n.blockedEvals.Block(eval) } diff --git a/nomad/leader.go b/nomad/leader.go index 3bf6ea4a6..04f5f1432 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -178,9 +178,7 @@ func (s *Server) restoreEvals() error { eval := raw.(*structs.Evaluation) if eval.ShouldEnqueue() { - if err := s.evalBroker.Enqueue(eval); err != nil { - return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err) - } + s.evalBroker.Enqueue(eval) } else if eval.ShouldBlock() { s.blockedEvals.Block(eval) } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 57b63aba7..b16f714a5 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -496,12 +496,7 @@ func TestLeader_ReapFailedEval(t *testing.T) { // Wait for a periodic dispatch eval := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval) // Dequeue and Nack out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index d18ca24c5..9e0054d09 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -20,12 +20,8 @@ func TestPlanEndpoint_Submit(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a56dfefe1..c1663592a 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -52,12 +52,7 @@ func TestWorker_dequeueEvaluation(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -87,12 +82,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -163,12 +153,7 @@ func TestWorker_sendAck(t *testing.T) { // Create the evaluation eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) // Create a worker w := &Worker{srv: s1, logger: s1.logger} @@ -266,12 +251,8 @@ func TestWorker_SubmitPlan(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -328,12 +309,8 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -395,12 +372,7 @@ func TestWorker_UpdateEval(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err) @@ -442,12 +414,8 @@ func TestWorker_CreateEval(t *testing.T) { // Create the register request eval1 := mock.Eval() - testutil.WaitForResult(func() (bool, error) { - err := s1.evalBroker.Enqueue(eval1) - return err == nil, err - }, func(err error) { - t.Fatalf("err: %v", err) - }) + s1.evalBroker.Enqueue(eval1) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) if err != nil { t.Fatalf("err: %v", err)