open-nomad/scheduler/testing.go

292 lines
7.4 KiB
Go
Raw Normal View History

2015-08-13 22:57:49 +00:00
package scheduler
import (
"fmt"
2015-08-13 22:57:49 +00:00
"sync"
"testing"
2017-05-18 19:36:04 +00:00
"time"
2015-08-13 22:57:49 +00:00
"github.com/stretchr/testify/require"
2019-01-15 19:46:12 +00:00
2019-03-05 21:41:41 +00:00
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
2015-08-13 22:57:49 +00:00
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// RejectPlan is used to always reject the entire plan and force a state refresh
type RejectPlan struct {
Harness *Harness
}
func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) {
result := new(structs.PlanResult)
result.RefreshIndex = r.Harness.NextIndex()
return result, r.Harness.State, nil
}
func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error {
return nil
}
func (r *RejectPlan) CreateEval(*structs.Evaluation) error {
return nil
}
func (r *RejectPlan) ReblockEval(*structs.Evaluation) error {
return nil
}
2016-05-16 19:49:18 +00:00
// Harness is a lightweight testing harness for schedulers. It manages a state
// store copy and provides the planner interface. It can be extended for various
// testing uses or for invoking the scheduler without side effects.
2015-08-13 22:57:49 +00:00
type Harness struct {
t testing.TB
2015-08-13 22:57:49 +00:00
State *state.StateStore
Planner Planner
planLock sync.Mutex
Plans []*structs.Plan
Evals []*structs.Evaluation
CreateEvals []*structs.Evaluation
ReblockEvals []*structs.Evaluation
2015-08-13 22:57:49 +00:00
nextIndex uint64
nextIndexLock sync.Mutex
2019-03-05 21:41:41 +00:00
optimizePlan bool
2015-08-13 22:57:49 +00:00
}
// NewHarness is used to make a new testing harness
func NewHarness(t testing.TB) *Harness {
2017-10-13 21:36:02 +00:00
state := state.TestStateStore(t)
2015-08-13 22:57:49 +00:00
h := &Harness{
t: t,
State: state,
nextIndex: 1,
2015-08-13 22:57:49 +00:00
}
return h
}
// NewHarnessWithState creates a new harness with the given state for testing
// purposes.
func NewHarnessWithState(t testing.TB, state *state.StateStore) *Harness {
return &Harness{
t: t,
State: state,
nextIndex: 1,
}
}
2015-08-13 22:57:49 +00:00
// SubmitPlan is used to handle plan submission
func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, error) {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the plan
h.Plans = append(h.Plans, plan)
// Check for custom planner
if h.Planner != nil {
return h.Planner.SubmitPlan(plan)
}
// Get the index
index := h.NextIndex()
// Prepare the result
result := new(structs.PlanResult)
2015-08-26 00:06:06 +00:00
result.NodeUpdate = plan.NodeUpdate
2015-08-13 22:57:49 +00:00
result.NodeAllocation = plan.NodeAllocation
result.NodePreemptions = plan.NodePreemptions
2015-08-13 22:57:49 +00:00
result.AllocIndex = index
// Flatten evicts and allocs
2019-03-05 21:41:41 +00:00
now := time.Now().UTC().UnixNano()
2015-08-13 22:57:49 +00:00
2019-03-05 21:41:41 +00:00
allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
for _, allocList := range plan.NodeAllocation {
allocsUpdated = append(allocsUpdated, allocList...)
2016-02-21 19:32:56 +00:00
}
2019-03-05 21:41:41 +00:00
updateCreateTimestamp(allocsUpdated, now)
2016-02-21 19:32:56 +00:00
2017-05-18 19:36:04 +00:00
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
2019-03-05 21:41:41 +00:00
Job: plan.Job,
2017-05-18 19:36:04 +00:00
},
Deployment: plan.Deployment,
2017-05-18 19:36:04 +00:00
DeploymentUpdates: plan.DeploymentUpdates,
EvalID: plan.EvalID,
2017-05-18 19:36:04 +00:00
}
if h.optimizePlan {
stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
for _, stoppedAlloc := range updateList {
stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff())
}
}
req.AllocsStopped = stoppedAllocDiffs
2019-03-05 21:41:41 +00:00
req.AllocsUpdated = allocsUpdated
preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
for _, preemptions := range plan.NodePreemptions {
for _, preemptedAlloc := range preemptions {
allocDiff := preemptedAlloc.AllocationDiff()
allocDiff.ModifyTime = now
preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff)
}
}
req.AllocsPreempted = preemptedAllocDiffs
2019-03-05 21:41:41 +00:00
} else {
2019-03-08 11:18:56 +00:00
// COMPAT 0.11: Handles unoptimized log format
2019-03-05 21:41:41 +00:00
var allocs []*structs.Allocation
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
allocsStopped = append(allocsStopped, updateList...)
}
2019-03-05 21:41:41 +00:00
allocs = append(allocs, allocsStopped...)
2019-03-05 21:41:41 +00:00
allocs = append(allocs, allocsUpdated...)
updateCreateTimestamp(allocs, now)
2019-03-05 21:41:41 +00:00
req.Alloc = allocs
// Set modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}
}
req.NodePreemptions = preemptedAllocs
2019-03-05 21:41:41 +00:00
}
2015-08-13 22:57:49 +00:00
// Apply the full plan
err := h.State.UpsertPlanResults(structs.MsgTypeTestSetup, index, &req)
2015-08-13 22:57:49 +00:00
return result, nil, err
}
// OptimizePlan is a function used only for Harness to help set the optimzePlan field,
// since Harness doesn't have access to a Server object
func (h *Harness) OptimizePlan(optimize bool) {
h.optimizePlan = optimize
}
2019-03-05 21:41:41 +00:00
func updateCreateTimestamp(allocations []*structs.Allocation, now int64) {
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
}
2015-08-15 21:25:00 +00:00
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the eval
2015-08-15 21:25:00 +00:00
h.Evals = append(h.Evals, eval)
// Check for custom planner
if h.Planner != nil {
return h.Planner.UpdateEval(eval)
}
2015-08-15 21:25:00 +00:00
return nil
}
func (h *Harness) CreateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the eval
h.CreateEvals = append(h.CreateEvals, eval)
// Check for custom planner
if h.Planner != nil {
return h.Planner.CreateEval(eval)
}
return nil
}
func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Check that the evaluation was already blocked.
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
old, err := h.State.EvalByID(ws, eval.ID)
if err != nil {
return err
}
if old == nil {
return fmt.Errorf("evaluation does not exist to be reblocked")
}
if old.Status != structs.EvalStatusBlocked {
return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID)
}
h.ReblockEvals = append(h.ReblockEvals, eval)
return nil
}
2015-08-13 22:57:49 +00:00
// NextIndex returns the next index
func (h *Harness) NextIndex() uint64 {
h.nextIndexLock.Lock()
defer h.nextIndexLock.Unlock()
idx := h.nextIndex
h.nextIndex += 1
return idx
}
// Snapshot is used to snapshot the current state
func (h *Harness) Snapshot() State {
snap, _ := h.State.Snapshot()
return snap
}
// Scheduler is used to return a new scheduler from
// a snapshot of current state using the harness for planning.
func (h *Harness) Scheduler(factory Factory) Scheduler {
2018-09-15 23:23:13 +00:00
logger := testlog.HCLogger(h.t)
eventsCh := make(chan interface{})
// Listen for and log events from the scheduler.
go func() {
for e := range eventsCh {
switch event := e.(type) {
case *PortCollisionEvent:
h.t.Errorf("unexpected worker eval event: %v", event.Reason)
}
}
}()
return factory(logger, eventsCh, h.Snapshot(), h)
2015-08-13 22:57:49 +00:00
}
// Process is used to process an evaluation given a factory
// function to create the scheduler
func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
sched := h.Scheduler(factory)
2015-08-13 22:57:49 +00:00
return sched.Process(eval)
}
func (h *Harness) AssertEvalStatus(t testing.TB, state string) {
require.Len(t, h.Evals, 1)
update := h.Evals[0]
require.Equal(t, state, update.Status)
}