diff --git a/nomad/deploymentwatcher/batcher.go b/nomad/deploymentwatcher/batcher.go index 4bd529598..0d730a2cd 100644 --- a/nomad/deploymentwatcher/batcher.go +++ b/nomad/deploymentwatcher/batcher.go @@ -2,7 +2,6 @@ package deploymentwatcher import ( "context" - "sync" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -16,16 +15,11 @@ type EvalBatcher struct { // raft is used to actually commit the evaluations raft DeploymentRaftEndpoints - // future to be returned to callers - f *EvalFuture - - // inCh is used to pass evaluations to the daemon process - inCh chan *structs.Evaluation + // workCh is used to pass evaluations to the daemon process + workCh chan *evalWrapper // ctx is used to exit the daemon batcher ctx context.Context - - l sync.Mutex } // NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to @@ -33,10 +27,10 @@ type EvalBatcher struct { // closed. func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher { b := &EvalBatcher{ - batch: batchDuration, - raft: raft, - ctx: ctx, - inCh: make(chan *structs.Evaluation, 10), + batch: batchDuration, + raft: raft, + ctx: ctx, + workCh: make(chan *evalWrapper, 10), } go b.batcher() @@ -46,36 +40,41 @@ func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, c // CreateEval batches the creation of the evaluation and returns a future that // tracks the evaluations creation. func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture { - b.l.Lock() - if b.f == nil { - b.f = NewEvalFuture() + wrapper := &evalWrapper{ + e: e, + f: make(chan *EvalFuture, 1), } - b.l.Unlock() - b.inCh <- e - return b.f + b.workCh <- wrapper + return <-wrapper.f +} + +type evalWrapper struct { + e *structs.Evaluation + f chan *EvalFuture } // batcher is the long lived batcher goroutine func (b *EvalBatcher) batcher() { var timerCh <-chan time.Time evals := make(map[string]*structs.Evaluation) + future := NewEvalFuture() for { select { case <-b.ctx.Done(): return - case e := <-b.inCh: + case w := <-b.workCh: if timerCh == nil { timerCh = time.After(b.batch) } - evals[e.DeploymentID] = e + // Store the eval and attach the future + evals[w.e.DeploymentID] = w.e + w.f <- future case <-timerCh: - // Capture the future - b.l.Lock() - f := b.f - b.f = nil - b.l.Unlock() + // Capture the future and create a new one + f := future + future = NewEvalFuture() // Shouldn't be possible if f == nil { @@ -94,7 +93,6 @@ func (b *EvalBatcher) batcher() { // Reset the evals list and timer evals = make(map[string]*structs.Evaluation) timerCh = nil - } } }