Fix panic when Ack occurs at delivery limit

This commit is contained in:
Alex Dadgar 2016-02-11 11:07:18 -08:00
parent 48ec4a6377
commit d2e88f0116
2 changed files with 45 additions and 1 deletions

View File

@ -448,7 +448,7 @@ func (b *EvalBroker) Ack(evalID, token string) error {
// Update the stats
b.stats.TotalUnacked -= 1
queue := unack.Eval.Type
if b.evals[evalID] >= b.deliveryLimit {
if b.evals[evalID] > b.deliveryLimit {
queue = failedQueue
}
bySched := b.stats.ByScheduler[queue]

View File

@ -748,6 +748,50 @@ func TestEvalBroker_DeliveryLimit(t *testing.T) {
}
}
func TestEvalBroker_AckAtDeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
for i := 0; i < 3; i++ {
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
if i == 2 {
b.Ack(eval.ID, token)
} else {
// Nack with wrong token should fail
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}
// Check the stats
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if _, ok := stats.ByScheduler[failedQueue]; ok {
t.Fatalf("bad: %#v", stats)
}
}
// Ensure fairness between schedulers
func TestEvalBroker_Wait(t *testing.T) {
b := testBroker(t, 0)