diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 97c76eafa..6e5a3ed59 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -510,6 +510,40 @@ func (b *EvalBroker) Nack(evalID, token string) error { return nil } +// PauseNackTimeout is used to pause the Nack timeout for an eval that is making +// progress but is in a potentially unbounded operation such as the plan queue. +func (b *EvalBroker) PauseNackTimeout(evalID, token string) error { + b.l.RLock() + defer b.l.RUnlock() + unack, ok := b.unack[evalID] + if !ok { + return ErrNotOutstanding + } + if unack.Token != token { + return ErrTokenMismatch + } + if !unack.NackTimer.Stop() { + return ErrNackTimeoutReached + } + return nil +} + +// ResumeNackTimeout is used to resume the Nack timeout for an eval that was +// paused. It should be resumed after leaving an unbounded operation. +func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { + b.l.Lock() + defer b.l.Unlock() + unack, ok := b.unack[evalID] + if !ok { + return ErrNotOutstanding + } + if unack.Token != token { + return ErrTokenMismatch + } + unack.NackTimer.Reset(b.nackTimeout) + return nil +} + // Flush is used to clear the state of the broker func (b *EvalBroker) Flush() { b.l.Lock() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index b918b4e72..b81fbd944 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -656,6 +656,56 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { } } +func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { + b := testBroker(t, 5*time.Millisecond) + b.SetEnabled(true) + + // Enqueue + eval := mock.Eval() + err := b.Enqueue(eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Dequeue + out, token, err := b.Dequeue(defaultSched, time.Second) + start := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Pause in 2 milliseconds + time.Sleep(2 * time.Millisecond) + if err := b.PauseNackTimeout(out.ID, token); err != nil { + t.Fatalf("err: %v", err) + } + + go func() { + time.Sleep(2 * time.Millisecond) + if err := b.ResumeNackTimeout(out.ID, token); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Dequeue, should block until the timer is resumed + out, _, err = b.Dequeue(defaultSched, time.Second) + end := time.Now() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad: %v", out) + } + + // Check the nack timer + if diff := end.Sub(start); diff < 9*time.Millisecond { + t.Fatalf("bad: %#v", diff) + } +} + func TestEvalBroker_DeliveryLimit(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go index cb9b798e8..556c016df 100644 --- a/nomad/plan_endpoint.go +++ b/nomad/plan_endpoint.go @@ -19,8 +19,19 @@ func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) er } defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now()) + // Pause the Nack timer for the eval as it is making progress as long as it + // is in the plan queue. We resume immediately after we get a result to + // handle the case that the receiving worker dies. + plan := args.Plan + id := plan.EvalID + token := plan.EvalToken + if err := p.srv.evalBroker.PauseNackTimeout(id, token); err != nil { + return err + } + defer p.srv.evalBroker.ResumeNackTimeout(id, token) + // Submit the plan to the queue - future, err := p.srv.planQueue.Enqueue(args.Plan) + future, err := p.srv.planQueue.Enqueue(plan) if err != nil { return err } diff --git a/scripts/update_docker.sh b/scripts/update_docker.sh index e1cc66176..18b258ee4 100755 --- a/scripts/update_docker.sh +++ b/scripts/update_docker.sh @@ -2,7 +2,7 @@ set -ex -DOCKER_VERSION="1.9.1" +DOCKER_VERSION="1.10.2" sudo stop docker sudo rm -rf /var/lib/docker @@ -10,6 +10,6 @@ sudo rm -f `which docker` sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" | sudo tee /etc/apt/sources.list.d/docker.list sudo apt-get update -sudo apt-get install docker-engine=$DOCKER_VERSION-0~`lsb_release -cs` -y --force-yes +sudo apt-get install -y --force-yes -o Dpkg::Options::="--force-confnew" docker-engine=$DOCKER_VERSION-0~`lsb_release -cs` docker version