open-nomad/scheduler/testing.go
Lang Martin d3c4700cd3
server: stop after client disconnect (#7939)
* jobspec, api: add stop_after_client_disconnect

* nomad/state/state_store: error message typo

* structs: alloc methods to support stop_after_client_disconnect

1. a global AllocStates to track status changes with timestamps. We
   need this to track the time at which the alloc became lost
   originally.

2. ShouldClientStop() and WaitClientStop() to actually do the math

* scheduler/reconcile_util: delayByStopAfterClientDisconnect

* scheduler/reconcile: use delayByStopAfterClientDisconnect

* scheduler/util: updateNonTerminalAllocsToLost comments

This was setup to only update allocs to lost if the DesiredStatus had
already been set by the scheduler. It seems like the intention was to
update the status from any non-terminal state, and not all lost allocs
have been marked stop or evict by now

* scheduler/testing: AssertEvalStatus just use require

* scheduler/generic_sched: don't create a blocked eval if delayed

* scheduler/generic_sched_test: several scheduling cases
2020-05-13 16:39:04 -04:00

280 lines
7.2 KiB
Go

package scheduler
import (
"fmt"
"sync"
"time"
testing "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// RejectPlan is used to always reject the entire plan and force a state refresh
type RejectPlan struct {
Harness *Harness
}
func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) {
result := new(structs.PlanResult)
result.RefreshIndex = r.Harness.NextIndex()
return result, r.Harness.State, nil
}
func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error {
return nil
}
func (r *RejectPlan) CreateEval(*structs.Evaluation) error {
return nil
}
func (r *RejectPlan) ReblockEval(*structs.Evaluation) error {
return nil
}
// Harness is a lightweight testing harness for schedulers. It manages a state
// store copy and provides the planner interface. It can be extended for various
// testing uses or for invoking the scheduler without side effects.
type Harness struct {
t testing.T
State *state.StateStore
Planner Planner
planLock sync.Mutex
Plans []*structs.Plan
Evals []*structs.Evaluation
CreateEvals []*structs.Evaluation
ReblockEvals []*structs.Evaluation
nextIndex uint64
nextIndexLock sync.Mutex
optimizePlan bool
}
// NewHarness is used to make a new testing harness
func NewHarness(t testing.T) *Harness {
state := state.TestStateStore(t)
h := &Harness{
t: t,
State: state,
nextIndex: 1,
}
return h
}
// NewHarnessWithState creates a new harness with the given state for testing
// purposes.
func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness {
return &Harness{
t: t,
State: state,
nextIndex: 1,
}
}
// SubmitPlan is used to handle plan submission
func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, error) {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the plan
h.Plans = append(h.Plans, plan)
// Check for custom planner
if h.Planner != nil {
return h.Planner.SubmitPlan(plan)
}
// Get the index
index := h.NextIndex()
// Prepare the result
result := new(structs.PlanResult)
result.NodeUpdate = plan.NodeUpdate
result.NodeAllocation = plan.NodeAllocation
result.NodePreemptions = plan.NodePreemptions
result.AllocIndex = index
// Flatten evicts and allocs
now := time.Now().UTC().UnixNano()
allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
for _, allocList := range plan.NodeAllocation {
allocsUpdated = append(allocsUpdated, allocList...)
}
updateCreateTimestamp(allocsUpdated, now)
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
},
Deployment: plan.Deployment,
DeploymentUpdates: plan.DeploymentUpdates,
EvalID: plan.EvalID,
}
if h.optimizePlan {
stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
for _, stoppedAlloc := range updateList {
stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff())
}
}
req.AllocsStopped = stoppedAllocDiffs
req.AllocsUpdated = allocsUpdated
preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
for _, preemptions := range plan.NodePreemptions {
for _, preemptedAlloc := range preemptions {
allocDiff := preemptedAlloc.AllocationDiff()
allocDiff.ModifyTime = now
preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff)
}
}
req.AllocsPreempted = preemptedAllocDiffs
} else {
// COMPAT 0.11: Handles unoptimized log format
var allocs []*structs.Allocation
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
allocsStopped = append(allocsStopped, updateList...)
}
allocs = append(allocs, allocsStopped...)
allocs = append(allocs, allocsUpdated...)
updateCreateTimestamp(allocs, now)
req.Alloc = allocs
// Set modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}
}
req.NodePreemptions = preemptedAllocs
}
// Apply the full plan
err := h.State.UpsertPlanResults(index, &req)
return result, nil, err
}
// OptimizePlan is a function used only for Harness to help set the optimzePlan field,
// since Harness doesn't have access to a Server object
func (h *Harness) OptimizePlan(optimize bool) {
h.optimizePlan = optimize
}
func updateCreateTimestamp(allocations []*structs.Allocation, now int64) {
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
}
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the eval
h.Evals = append(h.Evals, eval)
// Check for custom planner
if h.Planner != nil {
return h.Planner.UpdateEval(eval)
}
return nil
}
func (h *Harness) CreateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the eval
h.CreateEvals = append(h.CreateEvals, eval)
// Check for custom planner
if h.Planner != nil {
return h.Planner.CreateEval(eval)
}
return nil
}
func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Check that the evaluation was already blocked.
ws := memdb.NewWatchSet()
old, err := h.State.EvalByID(ws, eval.ID)
if err != nil {
return err
}
if old == nil {
return fmt.Errorf("evaluation does not exist to be reblocked")
}
if old.Status != structs.EvalStatusBlocked {
return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID)
}
h.ReblockEvals = append(h.ReblockEvals, eval)
return nil
}
// NextIndex returns the next index
func (h *Harness) NextIndex() uint64 {
h.nextIndexLock.Lock()
defer h.nextIndexLock.Unlock()
idx := h.nextIndex
h.nextIndex += 1
return idx
}
// Snapshot is used to snapshot the current state
func (h *Harness) Snapshot() State {
snap, _ := h.State.Snapshot()
return snap
}
// Scheduler is used to return a new scheduler from
// a snapshot of current state using the harness for planning.
func (h *Harness) Scheduler(factory Factory) Scheduler {
logger := testlog.HCLogger(h.t)
return factory(logger, h.Snapshot(), h)
}
// Process is used to process an evaluation given a factory
// function to create the scheduler
func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
sched := h.Scheduler(factory)
return sched.Process(eval)
}
func (h *Harness) AssertEvalStatus(t testing.T, state string) {
require.Len(t, h.Evals, 1)
update := h.Evals[0]
require.Equal(t, state, update.Status)
}