Merge pull request #3051 from hashicorp/b-batcher
Fix race creating EvalFuture
This commit is contained in:
commit
3f5b7204dc
|
@ -2,7 +2,6 @@ package deploymentwatcher
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -16,16 +15,11 @@ type EvalBatcher struct {
|
|||
// raft is used to actually commit the evaluations
|
||||
raft DeploymentRaftEndpoints
|
||||
|
||||
// future to be returned to callers
|
||||
f *EvalFuture
|
||||
|
||||
// inCh is used to pass evaluations to the daemon process
|
||||
inCh chan *structs.Evaluation
|
||||
// workCh is used to pass evaluations to the daemon process
|
||||
workCh chan *evalWrapper
|
||||
|
||||
// ctx is used to exit the daemon batcher
|
||||
ctx context.Context
|
||||
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
|
||||
|
@ -33,10 +27,10 @@ type EvalBatcher struct {
|
|||
// closed.
|
||||
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
|
||||
b := &EvalBatcher{
|
||||
batch: batchDuration,
|
||||
raft: raft,
|
||||
ctx: ctx,
|
||||
inCh: make(chan *structs.Evaluation, 10),
|
||||
batch: batchDuration,
|
||||
raft: raft,
|
||||
ctx: ctx,
|
||||
workCh: make(chan *evalWrapper, 10),
|
||||
}
|
||||
|
||||
go b.batcher()
|
||||
|
@ -46,36 +40,41 @@ func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, c
|
|||
// CreateEval batches the creation of the evaluation and returns a future that
|
||||
// tracks the evaluations creation.
|
||||
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
||||
b.l.Lock()
|
||||
if b.f == nil {
|
||||
b.f = NewEvalFuture()
|
||||
wrapper := &evalWrapper{
|
||||
e: e,
|
||||
f: make(chan *EvalFuture, 1),
|
||||
}
|
||||
b.l.Unlock()
|
||||
|
||||
b.inCh <- e
|
||||
return b.f
|
||||
b.workCh <- wrapper
|
||||
return <-wrapper.f
|
||||
}
|
||||
|
||||
type evalWrapper struct {
|
||||
e *structs.Evaluation
|
||||
f chan *EvalFuture
|
||||
}
|
||||
|
||||
// batcher is the long lived batcher goroutine
|
||||
func (b *EvalBatcher) batcher() {
|
||||
var timerCh <-chan time.Time
|
||||
evals := make(map[string]*structs.Evaluation)
|
||||
future := NewEvalFuture()
|
||||
for {
|
||||
select {
|
||||
case <-b.ctx.Done():
|
||||
return
|
||||
case e := <-b.inCh:
|
||||
case w := <-b.workCh:
|
||||
if timerCh == nil {
|
||||
timerCh = time.After(b.batch)
|
||||
}
|
||||
|
||||
evals[e.DeploymentID] = e
|
||||
// Store the eval and attach the future
|
||||
evals[w.e.DeploymentID] = w.e
|
||||
w.f <- future
|
||||
case <-timerCh:
|
||||
// Capture the future
|
||||
b.l.Lock()
|
||||
f := b.f
|
||||
b.f = nil
|
||||
b.l.Unlock()
|
||||
// Capture the future and create a new one
|
||||
f := future
|
||||
future = NewEvalFuture()
|
||||
|
||||
// Shouldn't be possible
|
||||
if f == nil {
|
||||
|
@ -94,7 +93,6 @@ func (b *EvalBatcher) batcher() {
|
|||
// Reset the evals list and timer
|
||||
evals = make(map[string]*structs.Evaluation)
|
||||
timerCh = nil
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue