eval_broker.Enqueue no longer returns an error

This commit is contained in:
Alex Dadgar 2016-05-18 11:35:15 -07:00
parent bf27cc1261
commit 045f7807e0
8 changed files with 34 additions and 132 deletions

View File

@ -140,14 +140,13 @@ func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) {
}
// Enqueue is used to enqueue an evaluation
// TODO: remove the error return value
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()
// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
return nil
return
} else if b.enabled {
b.evals[eval.ID] = 0
}
@ -159,11 +158,10 @@ func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
return nil
return
}
b.enqueueLocked(eval, eval.Type)
return nil
}
// enqueueWaiting is used to enqueue a waiting evaluation

View File

@ -31,10 +31,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
// Enqueue, but broker is disabled!
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Verify nothing was done
stats := b.Stats()
@ -48,16 +45,10 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
// Enable the broker, and enqueue
b.SetEnabled(true)
err = b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Double enqueue is a no-op
err = b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
if !b.Enabled() {
t.Fatalf("should be enabled")
@ -206,26 +197,17 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
b.SetEnabled(true)
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = eval.CreateIndex + 1
err = b.Enqueue(eval2)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.JobID = eval.JobID
eval3.CreateIndex = eval.CreateIndex + 2
err = b.Enqueue(eval3)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval3)
stats := b.Stats()
if stats.TotalReady != 1 {
@ -359,10 +341,7 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) {
// Enqueue
eval := mock.Eval()
b.SetEnabled(true)
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Flush via SetEnabled
b.SetEnabled(false)
@ -425,10 +404,7 @@ func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) {
// Enqueue to unblock the dequeue.
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
select {
case <-doneCh:
@ -558,10 +534,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) {
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Ensure dequeue
select {
@ -581,10 +554,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Dequeue
out, _, err := b.Dequeue(defaultSched, time.Second)
@ -619,10 +589,7 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
@ -662,10 +629,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
@ -711,10 +675,7 @@ func TestEvalBroker_DeliveryLimit(t *testing.T) {
b.SetEnabled(true)
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
@ -803,10 +764,7 @@ func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) {
b.SetEnabled(true)
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
for i := 0; i < 3; i++ {
// Dequeue should work
@ -850,10 +808,7 @@ func TestEvalBroker_Wait(t *testing.T) {
// Create an eval that should wait
eval := mock.Eval()
eval.Wait = 10 * time.Millisecond
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
b.Enqueue(eval)
// Verify waiting
stats := b.Stats()

View File

@ -138,12 +138,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
// Dequeue the eval
get := &structs.EvalDequeueRequest{

View File

@ -331,10 +331,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
for _, eval := range req.Evals {
if eval.ShouldEnqueue() {
if err := n.evalBroker.Enqueue(eval); err != nil {
n.logger.Printf("[ERR] nomad.fsm: failed to enqueue evaluation %s: %v", eval.ID, err)
return err
}
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
}

View File

@ -178,9 +178,7 @@ func (s *Server) restoreEvals() error {
eval := raw.(*structs.Evaluation)
if eval.ShouldEnqueue() {
if err := s.evalBroker.Enqueue(eval); err != nil {
return fmt.Errorf("failed to enqueue evaluation %s: %v", eval.ID, err)
}
s.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
s.blockedEvals.Block(eval)
}

View File

@ -496,12 +496,7 @@ func TestLeader_ReapFailedEval(t *testing.T) {
// Wait for a periodic dispatch
eval := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval)
// Dequeue and Nack
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)

View File

@ -20,12 +20,8 @@ func TestPlanEndpoint_Submit(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -52,12 +52,7 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
// Create the evaluation
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
@ -87,12 +82,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
// Create the evaluation
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
@ -163,12 +153,7 @@ func TestWorker_sendAck(t *testing.T) {
// Create the evaluation
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
// Create a worker
w := &Worker{srv: s1, logger: s1.logger}
@ -266,12 +251,8 @@ func TestWorker_SubmitPlan(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
@ -328,12 +309,8 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
@ -395,12 +372,7 @@ func TestWorker_UpdateEval(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
@ -442,12 +414,8 @@ func TestWorker_CreateEval(t *testing.T) {
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
s1.evalBroker.Enqueue(eval1)
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)