Merge branch 'master' of github.com:hashicorp/nomad

This commit is contained in:
Alex Dadgar 2016-03-04 15:20:57 -08:00
commit 0ffa926fb5
4 changed files with 98 additions and 3 deletions

View File

@ -510,6 +510,40 @@ func (b *EvalBroker) Nack(evalID, token string) error {
return nil
}
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {
b.l.RLock()
defer b.l.RUnlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
if !unack.NackTimer.Stop() {
return ErrNackTimeoutReached
}
return nil
}
// ResumeNackTimeout is used to resume the Nack timeout for an eval that was
// paused. It should be resumed after leaving an unbounded operation.
func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
b.l.Lock()
defer b.l.Unlock()
unack, ok := b.unack[evalID]
if !ok {
return ErrNotOutstanding
}
if unack.Token != token {
return ErrTokenMismatch
}
unack.NackTimer.Reset(b.nackTimeout)
return nil
}
// Flush is used to clear the state of the broker
func (b *EvalBroker) Flush() {
b.l.Lock()

View File

@ -656,6 +656,56 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
}
}
func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b.SetEnabled(true)
// Enqueue
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Dequeue
out, token, err := b.Dequeue(defaultSched, time.Second)
start := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Pause in 2 milliseconds
time.Sleep(2 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
go func() {
time.Sleep(2 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Dequeue, should block until the timer is resumed
out, _, err = b.Dequeue(defaultSched, time.Second)
end := time.Now()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad: %v", out)
}
// Check the nack timer
if diff := end.Sub(start); diff < 9*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_DeliveryLimit(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)

View File

@ -19,8 +19,19 @@ func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) er
}
defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now())
// Pause the Nack timer for the eval as it is making progress as long as it
// is in the plan queue. We resume immediately after we get a result to
// handle the case that the receiving worker dies.
plan := args.Plan
id := plan.EvalID
token := plan.EvalToken
if err := p.srv.evalBroker.PauseNackTimeout(id, token); err != nil {
return err
}
defer p.srv.evalBroker.ResumeNackTimeout(id, token)
// Submit the plan to the queue
future, err := p.srv.planQueue.Enqueue(args.Plan)
future, err := p.srv.planQueue.Enqueue(plan)
if err != nil {
return err
}

View File

@ -2,7 +2,7 @@
set -ex
DOCKER_VERSION="1.9.1"
DOCKER_VERSION="1.10.2"
sudo stop docker
sudo rm -rf /var/lib/docker
@ -10,6 +10,6 @@ sudo rm -f `which docker`
sudo apt-key adv --keyserver hkp://p80.pool.sks-keyservers.net:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D
echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" | sudo tee /etc/apt/sources.list.d/docker.list
sudo apt-get update
sudo apt-get install docker-engine=$DOCKER_VERSION-0~`lsb_release -cs` -y --force-yes
sudo apt-get install -y --force-yes -o Dpkg::Options::="--force-confnew" docker-engine=$DOCKER_VERSION-0~`lsb_release -cs`
docker version