Merge pull request #493 from hashicorp/f-eval-broker-blocking-dequeue

Make a zero timeout for eval_broker.Dequeue() block
This commit is contained in:
Alex Dadgar 2015-11-23 18:26:52 -08:00
commit 136a181c22
2 changed files with 30 additions and 10 deletions

View File

@ -214,6 +214,7 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, queue string) {
// Dequeue is used to perform a blocking dequeue
func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) {
var timeoutTimer *time.Timer
var timeoutCh <-chan time.Time
SCAN:
// Scan for work
eval, token, err := b.scanForSchedulers(schedulers)
@ -233,12 +234,13 @@ SCAN:
}
// Setup the timeout channel the first time around
if timeoutTimer == nil {
if timeoutTimer == nil && timeout != 0 {
timeoutTimer = time.NewTimer(timeout)
timeoutCh = timeoutTimer.C
}
// Block until we get work
scan := b.waitForSchedulers(schedulers, timeoutTimer.C)
scan := b.waitForSchedulers(schedulers, timeoutCh)
if scan {
goto SCAN
}

View File

@ -403,20 +403,38 @@ func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
func TestEvalBroker_Dequeue_Empty_Timeout(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
doneCh := make(chan struct{}, 1)
start := time.Now()
out, _, err := b.Dequeue(defaultSched, 0)
end := time.Now()
go func() {
out, _, err := b.Dequeue(defaultSched, 0)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatal("Expect an eval")
}
doneCh <- struct{}{}
}()
// Sleep for a little bit
select {
case <-time.After(5 * time.Millisecond):
case <-doneCh:
t.Fatalf("Dequeue(0) should block")
}
// Enqueue to unblock the dequeue.
eval := mock.Eval()
err := b.Enqueue(eval)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected: %#v", out)
}
if diff := end.Sub(start); diff > 1*time.Millisecond {
t.Fatalf("bad: %#v", diff)
select {
case <-doneCh:
return
case <-time.After(5 * time.Millisecond):
t.Fatal("timeout: Dequeue(0) should return after enqueue")
}
}