Fix race creating EvalFuture
This PR fixes https://github.com/hashicorp/nomad/issues/3044
This commit is contained in:
parent
83900faf79
commit
782abf4098
|
@ -2,7 +2,6 @@ package deploymentwatcher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -16,16 +15,11 @@ type EvalBatcher struct {
|
||||||
// raft is used to actually commit the evaluations
|
// raft is used to actually commit the evaluations
|
||||||
raft DeploymentRaftEndpoints
|
raft DeploymentRaftEndpoints
|
||||||
|
|
||||||
// future to be returned to callers
|
// workCh is used to pass evaluations to the daemon process
|
||||||
f *EvalFuture
|
workCh chan *evalWrapper
|
||||||
|
|
||||||
// inCh is used to pass evaluations to the daemon process
|
|
||||||
inCh chan *structs.Evaluation
|
|
||||||
|
|
||||||
// ctx is used to exit the daemon batcher
|
// ctx is used to exit the daemon batcher
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
l sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
|
// NewEvalBatcher returns an EvalBatcher that uses the passed raft endpoints to
|
||||||
|
@ -33,10 +27,10 @@ type EvalBatcher struct {
|
||||||
// closed.
|
// closed.
|
||||||
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
|
func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatcher {
|
||||||
b := &EvalBatcher{
|
b := &EvalBatcher{
|
||||||
batch: batchDuration,
|
batch: batchDuration,
|
||||||
raft: raft,
|
raft: raft,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
inCh: make(chan *structs.Evaluation, 10),
|
workCh: make(chan *evalWrapper, 10),
|
||||||
}
|
}
|
||||||
|
|
||||||
go b.batcher()
|
go b.batcher()
|
||||||
|
@ -46,36 +40,41 @@ func NewEvalBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, c
|
||||||
// CreateEval batches the creation of the evaluation and returns a future that
|
// CreateEval batches the creation of the evaluation and returns a future that
|
||||||
// tracks the evaluations creation.
|
// tracks the evaluations creation.
|
||||||
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
||||||
b.l.Lock()
|
wrapper := &evalWrapper{
|
||||||
if b.f == nil {
|
e: e,
|
||||||
b.f = NewEvalFuture()
|
f: make(chan *EvalFuture, 1),
|
||||||
}
|
}
|
||||||
b.l.Unlock()
|
|
||||||
|
|
||||||
b.inCh <- e
|
b.workCh <- wrapper
|
||||||
return b.f
|
return <-wrapper.f
|
||||||
|
}
|
||||||
|
|
||||||
|
type evalWrapper struct {
|
||||||
|
e *structs.Evaluation
|
||||||
|
f chan *EvalFuture
|
||||||
}
|
}
|
||||||
|
|
||||||
// batcher is the long lived batcher goroutine
|
// batcher is the long lived batcher goroutine
|
||||||
func (b *EvalBatcher) batcher() {
|
func (b *EvalBatcher) batcher() {
|
||||||
var timerCh <-chan time.Time
|
var timerCh <-chan time.Time
|
||||||
evals := make(map[string]*structs.Evaluation)
|
evals := make(map[string]*structs.Evaluation)
|
||||||
|
future := NewEvalFuture()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-b.ctx.Done():
|
case <-b.ctx.Done():
|
||||||
return
|
return
|
||||||
case e := <-b.inCh:
|
case w := <-b.workCh:
|
||||||
if timerCh == nil {
|
if timerCh == nil {
|
||||||
timerCh = time.After(b.batch)
|
timerCh = time.After(b.batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
evals[e.DeploymentID] = e
|
// Store the eval and attach the future
|
||||||
|
evals[w.e.DeploymentID] = w.e
|
||||||
|
w.f <- future
|
||||||
case <-timerCh:
|
case <-timerCh:
|
||||||
// Capture the future
|
// Capture the future and create a new one
|
||||||
b.l.Lock()
|
f := future
|
||||||
f := b.f
|
future = NewEvalFuture()
|
||||||
b.f = nil
|
|
||||||
b.l.Unlock()
|
|
||||||
|
|
||||||
// Shouldn't be possible
|
// Shouldn't be possible
|
||||||
if f == nil {
|
if f == nil {
|
||||||
|
@ -94,7 +93,6 @@ func (b *EvalBatcher) batcher() {
|
||||||
// Reset the evals list and timer
|
// Reset the evals list and timer
|
||||||
evals = make(map[string]*structs.Evaluation)
|
evals = make(map[string]*structs.Evaluation)
|
||||||
timerCh = nil
|
timerCh = nil
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue