Merge pull request #325 from hashicorp/f-reset-nack

Reset evaluation Nack timer in response to scheduler operations
This commit is contained in:
Alex Dadgar 2015-10-23 17:16:55 -07:00
commit bdebcfd573
4 changed files with 96 additions and 23 deletions

View file

@ -2,6 +2,7 @@ package nomad
import (
"container/heap"
"errors"
"fmt"
"math/rand"
"sync"
@ -18,6 +19,17 @@ const (
failedQueue = "_failed"
)
var (
// ErrNotOutstanding is returned if an evaluation is not outstanding
ErrNotOutstanding = errors.New("evaluation is not outstanding")
// ErrTokenMismatch is the outstanding eval has a different token
ErrTokenMismatch = errors.New("evaluation token does not match")
// ErrNackTimeoutReached is returned if an expired evaluation is reset
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
)
// EvalBroker is used to manage brokering of evaluations. When an evaluation is
// created, due to a change in a job specification or a node, we put it into the
// broker. The broker sorts by evaluations by priority and scheduler type. This
@ -381,6 +393,24 @@ func (b *EvalBroker) Outstanding(evalID string) (string, bool) {
return unack.Token, true
}
// OutstandingReset resets the Nack timer for the EvalID if the
// token matches and the eval is outstanding
func (b *EvalBroker) OutstandingReset(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Reset(b.nackTimeout) {
return ErrNackTimeoutReached
}
return nil
}
// Ack is used to positively acknowledge handling an evaluation
func (b *EvalBroker) Ack(evalID, token string) error {
b.l.Lock()

View file

@ -89,6 +89,20 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
t.Fatalf("Bad: %#v %#v", token, tokenOut)
}
// OutstandingReset should verify the token
err = b.OutstandingReset("nope", "foo")
if err != ErrNotOutstanding {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, "foo")
if err != ErrTokenMismatch {
t.Fatalf("err: %v", err)
}
err = b.OutstandingReset(out.ID, tokenOut)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
@ -560,6 +574,50 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
}
}
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Reset in 2 milliseconds
time.Sleep(2 * time.Millisecond)
if err := b.OutstandingReset(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
// Dequeue, should block on Nack timer
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 7*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_DeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)

View file

@ -134,12 +134,8 @@ func (e *Eval) Update(args *structs.EvalUpdateRequest,
eval := args.Evals[0]
// Verify the evaluation is outstanding, and that the tokens match.
token, ok := e.srv.evalBroker.Outstanding(eval.ID)
if !ok {
return fmt.Errorf("evaluation is not outstanding")
}
if args.EvalToken != token {
return fmt.Errorf("evaluation token does not match")
if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil {
return err
}
// Update via Raft
@ -168,12 +164,8 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest,
eval := args.Evals[0]
// Verify the parent evaluation is outstanding, and that the tokens match.
token, ok := e.srv.evalBroker.Outstanding(eval.PreviousEval)
if !ok {
return fmt.Errorf("previous evaluation is not outstanding")
}
if args.EvalToken != token {
return fmt.Errorf("previous evaluation token does not match")
if err := e.srv.evalBroker.OutstandingReset(eval.PreviousEval, args.EvalToken); err != nil {
return err
}
// Look for the eval

View file

@ -50,17 +50,10 @@ func (s *Server) planApply() {
}
// Verify the evaluation is outstanding, and that the tokens match.
token, ok := s.evalBroker.Outstanding(pending.plan.EvalID)
if !ok {
s.logger.Printf("[ERR] nomad: plan received for non-outstanding evaluation %s",
pending.plan.EvalID)
pending.respond(nil, fmt.Errorf("evaluation is not outstanding"))
continue
}
if pending.plan.EvalToken != token {
s.logger.Printf("[ERR] nomad: plan received for evaluation %s with wrong token",
pending.plan.EvalID)
pending.respond(nil, fmt.Errorf("evaluation token does not match"))
if err := s.evalBroker.OutstandingReset(pending.plan.EvalID, pending.plan.EvalToken); err != nil {
s.logger.Printf("[ERR] nomad: plan rejected for evaluation %s: %v",
pending.plan.EvalID, err)
pending.respond(nil, err)
continue
}