eval broker: use write lock when reaping cancelable evals (#16112)
The eval broker's `Cancelable` method used by the cancelable eval reaper mutates the slice of cancelable evals by removing a batch at a time from the slice. But this method unsafely uses a read lock despite this mutation. Under normal workloads this is likely to be safe but when the eval broker is under the heavy load this feature is intended to fix, we're likely to have a race condition. Switch this to a write lock, like the other locks that mutate the eval broker state. This changeset also adjusts the timeout to allow poorly-sized Actions runners more time to schedule the appropriate goroutines. The test has also been updated to use `shoenig/test/wait` so we can have sensible reporting of the results rather than just a timeout error when things go wrong.
This commit is contained in:
parent
c2bd829fe2
commit
65c7e149d3
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
eval broker: Fixed a bug where the cancelable eval reaper used an incorrect lock when getting the set of cancelable evals from the broker
|
||||
```
|
|
@ -868,8 +868,8 @@ func (b *EvalBroker) Stats() *BrokerStats {
|
|||
// stale and ready to mark for canceling. The eval RPC will call this with a
|
||||
// batch size set to avoid sending overly large raft messages.
|
||||
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
if batchSize > len(b.cancelable) {
|
||||
batchSize = len(b.cancelable)
|
||||
|
|
|
@ -9,14 +9,16 @@ import (
|
|||
"time"
|
||||
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/shoenig/test/wait"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -1475,6 +1477,31 @@ func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) {
|
|||
must.Eq(t, 100, eval.ModifyIndex)
|
||||
}
|
||||
|
||||
func TestEvalBroker_Cancelable(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
b := testBroker(t, time.Minute)
|
||||
|
||||
evals := []*structs.Evaluation{}
|
||||
for i := 0; i < 20; i++ {
|
||||
eval := mock.Eval()
|
||||
evals = append(evals, eval)
|
||||
}
|
||||
b.cancelable = evals
|
||||
b.stats.TotalCancelable = len(b.cancelable)
|
||||
|
||||
must.Len(t, 20, b.cancelable)
|
||||
cancelable := b.Cancelable(10)
|
||||
must.Len(t, 10, cancelable)
|
||||
must.Len(t, 10, b.cancelable)
|
||||
must.Eq(t, 10, b.stats.TotalCancelable)
|
||||
|
||||
cancelable = b.Cancelable(20)
|
||||
must.Len(t, 10, cancelable)
|
||||
must.Len(t, 0, b.cancelable)
|
||||
must.Eq(t, 0, b.stats.TotalCancelable)
|
||||
}
|
||||
|
||||
// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
|
||||
// workflows
|
||||
func TestEvalBroker_IntegrationTest(t *testing.T) {
|
||||
|
@ -1567,16 +1594,26 @@ func TestEvalBroker_IntegrationTest(t *testing.T) {
|
|||
|
||||
config := DefaultConfig()
|
||||
config.NumSchedulers = 4
|
||||
config.EvalReapCancelableInterval = time.Minute * 10
|
||||
require.NoError(t, srv.Reload(config))
|
||||
must.NoError(t, srv.Reload(config))
|
||||
|
||||
// assert that all but 2 evals were canceled and that the eval broker state
|
||||
// has been cleared
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
got := getEvalStatuses()
|
||||
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
|
||||
}, 2*time.Second, time.Millisecond*100)
|
||||
var got map[string]int
|
||||
|
||||
must.Wait(t, wait.InitialSuccess(
|
||||
wait.Timeout(5*time.Second),
|
||||
wait.Gap(100*time.Millisecond),
|
||||
wait.BoolFunc(func() bool {
|
||||
got = getEvalStatuses()
|
||||
return got[structs.EvalStatusComplete] == 2 &&
|
||||
got[structs.EvalStatusCancelled] == 9
|
||||
}),
|
||||
),
|
||||
must.Func(func() string {
|
||||
return fmt.Sprintf("expected map[complete:2 canceled:9] within timeout, got: %v with broker status=%#v", got, getStats())
|
||||
}),
|
||||
)
|
||||
|
||||
must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
|
||||
TotalPending: 0, TotalCancelable: 0}, getStats())
|
||||
|
|
|
@ -1035,6 +1035,8 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {
|
|||
return wakeCh
|
||||
}
|
||||
|
||||
const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10
|
||||
|
||||
// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
|
||||
// broker and updates their status to canceled.
|
||||
func cancelCancelableEvals(srv *Server) error {
|
||||
|
@ -1044,7 +1046,7 @@ func cancelCancelableEvals(srv *Server) error {
|
|||
// We *can* send larger raft logs but rough benchmarks show that a smaller
|
||||
// page size strikes a balance between throughput and time we block the FSM
|
||||
// apply for other operations
|
||||
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
|
||||
cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize)
|
||||
if len(cancelable) > 0 {
|
||||
for i, eval := range cancelable {
|
||||
eval = eval.Copy()
|
||||
|
|
Loading…
Reference in New Issue