scheduler: seed random shuffle nodes with eval ID (#12008)
Processing an evaluation is nearly a pure function over the state snapshot, but we randomly shuffle the nodes. This means that developers can't take a given state snapshot and pass an evaluation through it and be guaranteed the same plan results. But the evaluation ID is already random, so if we use this as the seed for shuffling the nodes we can greatly reduce the sources of non-determinism. Unfortunately golang map iteration uses a global source of randomness and not a goroutine-local one, but arguably if the scheduler behavior is impacted by this, that's a bug in the iteration.
This commit is contained in:
parent
aece0ddda8
commit
d9d4da1e9f
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
scheduler: Seed node shuffling with the evaluation ID to make the order reproducible
|
||||||
|
```
|
|
@ -14,6 +14,7 @@ import (
|
||||||
func testContext(t testing.TB) (*state.StateStore, *EvalContext) {
|
func testContext(t testing.TB) (*state.StateStore, *EvalContext) {
|
||||||
state := state.TestStateStore(t)
|
state := state.TestStateStore(t)
|
||||||
plan := &structs.Plan{
|
plan := &structs.Plan{
|
||||||
|
EvalID: uuid.Generate(),
|
||||||
NodeUpdate: make(map[string][]*structs.Allocation),
|
NodeUpdate: make(map[string][]*structs.Allocation),
|
||||||
NodeAllocation: make(map[string][]*structs.Allocation),
|
NodeAllocation: make(map[string][]*structs.Allocation),
|
||||||
NodePreemptions: make(map[string][]*structs.Allocation),
|
NodePreemptions: make(map[string][]*structs.Allocation),
|
||||||
|
|
|
@ -122,7 +122,8 @@ func (iter *StaticIterator) SetNodes(nodes []*structs.Node) {
|
||||||
// is applied in-place
|
// is applied in-place
|
||||||
func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
|
func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
|
||||||
// shuffle with the Fisher-Yates algorithm
|
// shuffle with the Fisher-Yates algorithm
|
||||||
shuffleNodes(nodes)
|
idx, _ := ctx.State().LatestIndex()
|
||||||
|
shuffleNodes(ctx.Plan(), idx, nodes)
|
||||||
|
|
||||||
// Create a static iterator
|
// Create a static iterator
|
||||||
return NewStaticIterator(ctx, nodes)
|
return NewStaticIterator(ctx, nodes)
|
||||||
|
|
|
@ -107,6 +107,9 @@ type State interface {
|
||||||
|
|
||||||
// CSIVolumeByID fetch CSI volumes, containing controller jobs
|
// CSIVolumeByID fetch CSI volumes, containing controller jobs
|
||||||
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)
|
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)
|
||||||
|
|
||||||
|
// LatestIndex returns the greatest index value for all indexes.
|
||||||
|
LatestIndex() (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Planner interface is used to submit a task allocation plan.
|
// Planner interface is used to submit a task allocation plan.
|
||||||
|
|
|
@ -70,7 +70,8 @@ type GenericStack struct {
|
||||||
|
|
||||||
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
|
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
|
||||||
// Shuffle base nodes
|
// Shuffle base nodes
|
||||||
shuffleNodes(baseNodes)
|
idx, _ := s.ctx.State().LatestIndex()
|
||||||
|
shuffleNodes(s.ctx.Plan(), idx, baseNodes)
|
||||||
|
|
||||||
// Update the set of base nodes
|
// Update the set of base nodes
|
||||||
s.source.SetNodes(baseNodes)
|
s.source.SetNodes(baseNodes)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -376,11 +377,25 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// shuffleNodes randomizes the slice order with the Fisher-Yates algorithm
|
// shuffleNodes randomizes the slice order with the Fisher-Yates
|
||||||
func shuffleNodes(nodes []*structs.Node) {
|
// algorithm. We seed the random source with the eval ID (which is
|
||||||
|
// random) to aid in postmortem debugging of specific evaluations and
|
||||||
|
// state snapshots.
|
||||||
|
func shuffleNodes(plan *structs.Plan, index uint64, nodes []*structs.Node) {
|
||||||
|
|
||||||
|
// use the last 4 bytes because those are the random bits
|
||||||
|
// if we have sortable IDs
|
||||||
|
buf := []byte(plan.EvalID)
|
||||||
|
seed := binary.BigEndian.Uint64(buf[len(buf)-8:])
|
||||||
|
|
||||||
|
// for retried plans the index is the plan result's RefreshIndex
|
||||||
|
// so that we don't retry with the exact same shuffle
|
||||||
|
seed ^= index
|
||||||
|
r := rand.New(rand.NewSource(int64(seed >> 2)))
|
||||||
|
|
||||||
n := len(nodes)
|
n := len(nodes)
|
||||||
for i := n - 1; i > 0; i-- {
|
for i := n - 1; i > 0; i-- {
|
||||||
j := rand.Intn(i + 1)
|
j := r.Intn(i + 1)
|
||||||
nodes[i], nodes[j] = nodes[j], nodes[i]
|
nodes[i], nodes[j] = nodes[j], nodes[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -507,8 +507,17 @@ func TestShuffleNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
orig := make([]*structs.Node, len(nodes))
|
orig := make([]*structs.Node, len(nodes))
|
||||||
copy(orig, nodes)
|
copy(orig, nodes)
|
||||||
shuffleNodes(nodes)
|
eval := mock.Eval() // will have random EvalID
|
||||||
|
plan := eval.MakePlan(mock.Job())
|
||||||
|
shuffleNodes(plan, 1000, nodes)
|
||||||
require.False(t, reflect.DeepEqual(nodes, orig))
|
require.False(t, reflect.DeepEqual(nodes, orig))
|
||||||
|
|
||||||
|
nodes2 := make([]*structs.Node, len(nodes))
|
||||||
|
copy(nodes2, orig)
|
||||||
|
shuffleNodes(plan, 1000, nodes2)
|
||||||
|
|
||||||
|
require.True(t, reflect.DeepEqual(nodes, nodes2))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskUpdatedAffinity(t *testing.T) {
|
func TestTaskUpdatedAffinity(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue