node pools: apply node pool scheduler configuration (#17598)

This commit is contained in:
Luiz Aoqui 2023-06-21 20:31:50 -04:00 committed by GitHub
parent 16886bf6bf
commit ac08fc751b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1069 additions and 175 deletions

View File

@ -0,0 +1,23 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package iterator
// Iterator represents an object that can iterate over a set of values one at a
// time.
type Iterator interface {
// Next returns the next element or nil if there are none left.
Next() any
}
// Len consumes the iterator and returns the number of elements found.
//
// IMPORTANT: this method consumes the iterator, so it should not be used after
// Len() returns.
func Len(iter Iterator) int {
count := 0
for raw := iter.Next(); raw != nil; raw = iter.Next() {
count++
}
return count
}

View File

@ -322,7 +322,12 @@ func (v *memoryOversubscriptionValidate) Validate(job *structs.Job) (warnings []
return nil, err return nil, err
} }
if c != nil && c.MemoryOversubscriptionEnabled { pool, err := v.srv.State().NodePoolByName(nil, job.NodePool)
if err != nil {
return nil, err
}
if pool.MemoryOversubscriptionEnabled(c) {
return nil, nil return nil, nil
} }

View File

@ -2164,6 +2164,113 @@ func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) {
require.Empty(t, resp.Warnings) require.Empty(t, resp.Warnings)
} }
func TestJobEndpoint_Register_ValidateMemoryMax_NodePool(t *testing.T) {
ci.Parallel(t)
s, cleanupS := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Store default scheduler configuration to reset between test cases.
_, defaultSchedConfig, err := s.State().SchedulerConfig()
must.NoError(t, err)
// Create test node pools.
noSchedConfig := mock.NodePool()
noSchedConfig.SchedulerConfiguration = nil
withMemOversub := mock.NodePool()
withMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
}
noMemOversub := mock.NodePool()
noMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(false),
}
s.State().UpsertNodePools(structs.MsgTypeTestSetup, 100, []*structs.NodePool{
noSchedConfig,
withMemOversub,
noMemOversub,
})
testCases := []struct {
name string
pool string
globalConfig *structs.SchedulerConfiguration
expectedWarning string
}{
{
name: "no scheduler config uses global config",
pool: noSchedConfig.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expectedWarning: "",
},
{
name: "enabled via node pool",
pool: withMemOversub.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
expectedWarning: "",
},
{
name: "disabled via node pool",
pool: noMemOversub.Name,
globalConfig: &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expectedWarning: "Memory oversubscription is not enabled",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set global scheduler config if provided.
if tc.globalConfig != nil {
idx, err := s.State().LatestIndex()
must.NoError(t, err)
err = s.State().SchedulerSetConfig(idx, tc.globalConfig)
must.NoError(t, err)
}
// Create job with node_pool and memory_max.
job := mock.Job()
job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 2000
job.NodePool = tc.pool
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
// Validate respose.
must.NoError(t, err)
if tc.expectedWarning != "" {
must.StrContains(t, resp.Warnings, tc.expectedWarning)
} else {
must.Eq(t, "", resp.Warnings)
}
// Reset to default global scheduler config.
err = s.State().SchedulerSetConfig(resp.Index+1, defaultSchedConfig)
must.NoError(t, err)
})
}
}
// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval // evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval
func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation { func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation {
var store raft.LogStore = s.raftInmem var store raft.LogStore = s.raftInmem

View File

@ -117,6 +117,24 @@ func (n *NodePool) IsBuiltIn() bool {
} }
} }
// MemoryOversubscriptionEnabled returns true if memory oversubscription is
// enabled in the node pool or in the global cluster configuration.
func (n *NodePool) MemoryOversubscriptionEnabled(global *SchedulerConfiguration) bool {
// Default to the global scheduler config.
memOversubEnabled := global != nil && global.MemoryOversubscriptionEnabled
// But overwrite it if the node pool also has it configured.
poolHasMemOversub := n != nil &&
n.SchedulerConfiguration != nil &&
n.SchedulerConfiguration.MemoryOversubscriptionEnabled != nil
if poolHasMemOversub {
memOversubEnabled = *n.SchedulerConfiguration.MemoryOversubscriptionEnabled
}
return memOversubEnabled
}
// SetHash is used to compute and set the hash of node pool // SetHash is used to compute and set the hash of node pool
func (n *NodePool) SetHash() []byte { func (n *NodePool) SetHash() []byte {
// Initialize a 256bit Blake2 hash (32 bytes) // Initialize a 256bit Blake2 hash (32 bytes)
@ -163,6 +181,9 @@ func (n *NodePool) SetHash() []byte {
// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a // NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool. // node pool.
//
// When adding new values that should override global scheduler configuration,
// verify the scheduler handles the node pool configuration as well.
type NodePoolSchedulerConfiguration struct { type NodePoolSchedulerConfiguration struct {
// SchedulerAlgorithm is the scheduling algorithm to use for the pool. // SchedulerAlgorithm is the scheduling algorithm to use for the pool.

View File

@ -127,3 +127,70 @@ func TestNodePool_IsBuiltIn(t *testing.T) {
}) })
} }
} }
func TestNodePool_MemoryOversubscriptionEnabled(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
pool *NodePool
global *SchedulerConfiguration
expected bool
}{
{
name: "global used if pool is nil",
pool: nil,
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "global used if pool doesn't have scheduler config",
pool: &NodePool{},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "global used if pool doesn't specify memory oversub",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: true,
},
{
name: "pool overrides global if it defines memory oversub",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(false),
},
},
global: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
expected: false,
},
{
name: "pool used if global is nil",
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
},
},
global: nil,
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := tc.pool.MemoryOversubscriptionEnabled(tc.global)
must.Eq(t, got, tc.expected)
})
}
}

View File

@ -196,6 +196,26 @@ func (s *SchedulerConfiguration) EffectiveSchedulerAlgorithm() SchedulerAlgorith
return s.SchedulerAlgorithm return s.SchedulerAlgorithm
} }
// WithNodePool returns a new SchedulerConfiguration with the node pool
// scheduler configuration applied.
func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfiguration {
schedConfig := s.Copy()
if pool == nil || pool.SchedulerConfiguration == nil {
return schedConfig
}
poolConfig := pool.SchedulerConfiguration
if poolConfig.SchedulerAlgorithm != "" {
schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm
}
if poolConfig.MemoryOversubscriptionEnabled != nil {
schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled
}
return schedConfig
}
func (s *SchedulerConfiguration) Canonicalize() { func (s *SchedulerConfiguration) Canonicalize() {
if s != nil && s.SchedulerAlgorithm == "" { if s != nil && s.SchedulerAlgorithm == "" {
s.SchedulerAlgorithm = SchedulerAlgorithmBinpack s.SchedulerAlgorithm = SchedulerAlgorithmBinpack

View File

@ -0,0 +1,108 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package structs
import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/shoenig/test/must"
)
func TestSchedulerConfiguration_WithNodePool(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
schedConfig *SchedulerConfiguration
pool *NodePool
expected *SchedulerConfiguration
}{
{
name: "nil pool returns same config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: nil,
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "nil pool scheduler config returns same config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: &NodePool{},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "pool with memory oversubscription overwrites config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
},
},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: true,
},
},
{
name: "pool with scheduler algorithm overwrites config",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmBinpack,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
expected: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
{
name: "pool without memory oversubscription does not modify config",
schedConfig: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
expected: &SchedulerConfiguration{
MemoryOversubscriptionEnabled: false,
},
},
{
name: "pool without scheduler algorithm does not modify config",
schedConfig: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
pool: &NodePool{
SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
},
expected: &SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := tc.schedConfig.WithNodePool(tc.pool)
must.Eq(t, tc.expected, got)
must.NotEqOp(t, tc.schedConfig, got)
})
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -281,7 +282,7 @@ func (s *GenericScheduler) process() (bool, error) {
// Construct the placement stack // Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx) s.stack = NewGenericStack(s.batch, s.ctx)
if !s.job.Stopped() { if !s.job.Stopped() {
s.stack.SetJob(s.job) s.setJob(s.job)
} }
// Compute the target job allocations // Compute the target job allocations
@ -509,7 +510,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place. // destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes // Get the base nodes
nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, s.job.Datacenters, s.job.NodePool) nodes, byDC, err := s.setNodes(s.job)
if err != nil { if err != nil {
return err return err
} }
@ -562,10 +563,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
continue continue
} }
// Use downgraded job in scheduling stack to honor // Use downgraded job in scheduling stack to honor old job
// old job resources and constraints // resources, constraints, and node pool scheduler configuration.
if downgradedJob != nil { if downgradedJob != nil {
s.stack.SetJob(downgradedJob) s.setJob(downgradedJob)
if needsToSetNodes(downgradedJob, s.job) {
nodes, byDC, err = s.setNodes(downgradedJob)
if err != nil {
return err
}
}
} }
// Find the preferred node // Find the preferred node
@ -596,9 +604,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata // Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData() s.ctx.Metrics().PopulateScoreMetaData()
// Restore stack job now that placement is done, to use plan job version // Restore stack job and nodes now that placement is done, to use
// plan job version
if downgradedJob != nil { if downgradedJob != nil {
s.stack.SetJob(s.job) s.setJob(s.job)
if needsToSetNodes(downgradedJob, s.job) {
nodes, byDC, err = s.setNodes(s.job)
if err != nil {
return err
}
}
} }
// Set fields based on if we found an allocation option // Set fields based on if we found an allocation option
@ -690,6 +706,45 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
return nil return nil
} }
// setJob updates the stack with the given job and job's node pool scheduler
// configuration.
func (s *GenericScheduler) setJob(job *structs.Job) error {
// Fetch node pool and global scheduler configuration to determine how to
// configure the scheduler.
pool, err := s.state.NodePoolByName(nil, job.NodePool)
if err != nil {
return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err)
}
_, schedConfig, err := s.state.SchedulerConfig()
if err != nil {
return fmt.Errorf("failed to get scheduler configuration: %v", err)
}
s.stack.SetJob(job)
s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
return nil
}
// setnodes updates the stack with the nodes that are ready for placement for
// the given job.
func (s *GenericScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) {
nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, job.Datacenters, job.NodePool)
if err != nil {
return nil, nil, err
}
s.stack.SetNodes(nodes)
return nodes, byDC, nil
}
// needsToSetNodes returns true if jobs a and b changed in a way that requires
// the nodes to be reset.
func needsToSetNodes(a, b *structs.Job) bool {
return !helper.SliceSetEq(a.Datacenters, b.Datacenters) ||
a.NodePool != b.NodePool
}
// propagateTaskState copies task handles from previous allocations to // propagateTaskState copies task handles from previous allocations to
// replacement allocations when the previous allocation is being drained or was // replacement allocations when the previous allocation is being drained or was
// lost. Remote task drivers rely on this to reconnect to remote tasks when the // lost. Remote task drivers rely on this to reconnect to remote tasks when the
@ -818,6 +873,11 @@ func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions
_, schedConfig, _ := s.ctx.State().SchedulerConfig() _, schedConfig, _ := s.ctx.State().SchedulerConfig()
// Check if preemption is enabled, defaults to true // Check if preemption is enabled, defaults to true
//
// The scheduler configuration is read directly from state but only
// values that can't be specified per node pool should be used. Other
// values must be merged by calling schedConfig.WithNodePool() and set in
// the stack by calling SetSchedulerConfiguration().
enablePreemption := true enablePreemption := true
if schedConfig != nil { if schedConfig != nil {
if s.job.Type == structs.JobTypeBatch { if s.job.Type == structs.JobTypeBatch {

View File

@ -124,119 +124,6 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)
} }
func TestServiceSched_JobRegister_MemoryMaxHonored(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
cpu int
memory int
memoryMax int
memoryOversubscriptionEnabled bool
expectedTaskMemoryMax int
// expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all tasks
expectedTotalMemoryMax int
}{
{
name: "plain no max",
cpu: 100,
memory: 200,
memoryMax: 0,
memoryOversubscriptionEnabled: true,
expectedTaskMemoryMax: 0,
expectedTotalMemoryMax: 200,
},
{
name: "with max",
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: true,
expectedTaskMemoryMax: 300,
expectedTotalMemoryMax: 300,
},
{
name: "with max but disabled",
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: false,
expectedTaskMemoryMax: 0,
expectedTotalMemoryMax: 200, // same as no max
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 1
task := job.TaskGroups[0].Tasks[0].Name
res := job.TaskGroups[0].Tasks[0].Resources
res.CPU = c.cpu
res.MemoryMB = c.memory
res.MemoryMaxMB = c.memoryMax
h := NewHarness(t)
h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled,
})
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Len(t, h.Plans, 1)
out, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Len(t, out, 1)
alloc := out[0]
// checking new resources field deprecated Resources fields
require.Equal(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares)
require.Equal(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB)
require.Equal(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB)
// checking old deprecated Resources fields
require.Equal(t, c.cpu, alloc.TaskResources[task].CPU)
require.Equal(t, c.memory, alloc.TaskResources[task].MemoryMB)
require.Equal(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB)
// check total resource fields - alloc.Resources deprecated field, no modern equivalent
require.Equal(t, c.cpu, alloc.Resources.CPU)
require.Equal(t, c.memory, alloc.Resources.MemoryMB)
require.Equal(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB)
})
}
}
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
@ -854,6 +741,140 @@ func TestServiceSched_Spread(t *testing.T) {
} }
} }
// TestServiceSched_JobRegister_NodePool_Downgrade tests the case where an
// allocation fails during a deployment with canaries, where the job changes
// node pool. The failed alloc should be placed in the node pool of the
// original job.
func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Set global scheduler configuration.
h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
})
// Create test node pools with different scheduler algorithms.
poolBinpack := mock.NodePool()
poolBinpack.Name = "pool-binpack"
poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
}
poolSpread := mock.NodePool()
poolSpread.Name = "pool-spread"
poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
}
nodePools := []*structs.NodePool{
poolBinpack,
poolSpread,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
// Create 5 nodes in each node pool.
// Use two loops so nodes are separated by node pool.
nodes := []*structs.Node{}
for i := 0; i < 5; i++ {
node := mock.Node()
node.Name = fmt.Sprintf("node-binpack-%d", i)
node.NodePool = poolBinpack.Name
nodes = append(nodes, node)
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
for i := 0; i < 5; i++ {
node := mock.Node()
node.Name = fmt.Sprintf("node-spread-%d", i)
node.NodePool = poolSpread.Name
nodes = append(nodes, node)
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Create first version of the test job running in the binpack node pool.
job1 := mock.Job()
job1.Version = 1
job1.NodePool = poolBinpack.Name
job1.Status = structs.JobStatusRunning
job1.TaskGroups[0].Count = 3
job1.TaskGroups[0].Update = &structs.UpdateStrategy{
Stagger: time.Duration(30 * time.Second),
MaxParallel: 1,
HealthCheck: "checks",
MinHealthyTime: time.Duration(30 * time.Second),
HealthyDeadline: time.Duration(9 * time.Minute),
ProgressDeadline: time.Duration(10 * time.Minute),
AutoRevert: true,
Canary: 1,
}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1))
// Create allocs for this job version with one being a canary and another
// marked as failed.
allocs := []*structs.Allocation{}
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job1
alloc.JobID = job1.ID
alloc.NodeID = nodes[i].ID
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(true),
Timestamp: time.Now(),
Canary: false,
ModifyIndex: h.NextIndex(),
}
if i == 0 {
alloc.DeploymentStatus.Canary = true
}
if i == 1 {
alloc.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, alloc)
}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
// Update job to place it in the spread node pool.
job2 := job1.Copy()
job2.Version = 2
job2.NodePool = poolSpread.Name
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
eval := &structs.Evaluation{
Namespace: job2.Namespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
processErr := h.Process(NewServiceScheduler, eval)
require.NoError(t, processErr, "failed to process eval")
require.Len(t, h.Plans, 1)
// Verify the plan places the new allocation in the spread node pool and
// the replacement failure from the previous version in the binpack pool.
for nodeID, allocs := range h.Plans[0].NodeAllocation {
var node *structs.Node
for _, n := range nodes {
if n.ID == nodeID {
node = n
break
}
}
must.Len(t, 1, allocs)
alloc := allocs[0]
must.Eq(t, alloc.Job.NodePool, node.NodePool, must.Sprintf(
"alloc for job in node pool %q placed in node in node pool %q",
alloc.Job.NodePool,
node.NodePool,
))
}
}
// Test job registration with even spread across dc // Test job registration with even spread across dc
func TestServiceSched_EvenSpread(t *testing.T) { func TestServiceSched_EvenSpread(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
@ -1336,6 +1357,168 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)
} }
func TestServiceSched_JobRegister_SchedulerAlgorithm(t *testing.T) {
ci.Parallel(t)
// Test node pools.
poolNoSchedConfig := mock.NodePool()
poolNoSchedConfig.SchedulerConfiguration = nil
poolBinpack := mock.NodePool()
poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
}
poolSpread := mock.NodePool()
poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
}
testCases := []struct {
name string
nodePool string
schedulerAlgorithm structs.SchedulerAlgorithm
expectedAlgorithm structs.SchedulerAlgorithm
}{
{
name: "global binpack",
nodePool: poolNoSchedConfig.Name,
schedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
expectedAlgorithm: structs.SchedulerAlgorithmBinpack,
},
{
name: "global spread",
nodePool: poolNoSchedConfig.Name,
schedulerAlgorithm: structs.SchedulerAlgorithmSpread,
expectedAlgorithm: structs.SchedulerAlgorithmSpread,
},
{
name: "node pool binpack overrides global config",
nodePool: poolBinpack.Name,
schedulerAlgorithm: structs.SchedulerAlgorithmSpread,
expectedAlgorithm: structs.SchedulerAlgorithmBinpack,
},
{
name: "node pool spread overrides global config",
nodePool: poolSpread.Name,
schedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
expectedAlgorithm: structs.SchedulerAlgorithmSpread,
},
}
jobTypes := []string{
"batch",
"service",
}
for _, jobType := range jobTypes {
for _, tc := range testCases {
t.Run(fmt.Sprintf("%s/%s", jobType, tc.name), func(t *testing.T) {
h := NewHarness(t)
// Create node pools.
nodePools := []*structs.NodePool{
poolNoSchedConfig,
poolBinpack,
poolSpread,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
// Create two test nodes. Use two to prevent flakiness due to
// the scheduler shuffling nodes.
for i := 0; i < 2; i++ {
node := mock.Node()
node.NodePool = tc.nodePool
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Set global scheduler configuration.
h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
SchedulerAlgorithm: tc.schedulerAlgorithm,
})
// Create test job.
var job *structs.Job
switch jobType {
case "batch":
job = mock.BatchJob()
case "service":
job = mock.Job()
}
job.TaskGroups[0].Count = 1
job.NodePool = tc.nodePool
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Register an existing job.
existingJob := mock.Job()
existingJob.TaskGroups[0].Count = 1
existingJob.NodePool = tc.nodePool
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, existingJob))
// Process eval for existing job to place an existing alloc.
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: existingJob.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: existingJob.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
var scheduler Factory
switch jobType {
case "batch":
scheduler = NewBatchScheduler
case "service":
scheduler = NewServiceScheduler
}
err := h.Process(scheduler, eval)
must.NoError(t, err)
must.Len(t, 1, h.Plans)
allocs, err := h.State.AllocsByJob(nil, existingJob.Namespace, existingJob.ID, false)
must.NoError(t, err)
must.Len(t, 1, allocs)
// Process eval for test job.
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
err = h.Process(scheduler, eval)
must.NoError(t, err)
must.Len(t, 2, h.Plans)
allocs, err = h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 1, allocs)
// Expect new alloc to be either in the empty node or in the
// node with the existing alloc depending on the expected
// scheduler algorithm.
var expectedAllocCount int
switch tc.expectedAlgorithm {
case structs.SchedulerAlgorithmSpread:
expectedAllocCount = 1
case structs.SchedulerAlgorithmBinpack:
expectedAllocCount = 2
}
alloc := allocs[0]
nodeAllocs, err := h.State.AllocsByNode(nil, alloc.NodeID)
must.NoError(t, err)
must.Len(t, expectedAllocCount, nodeAllocs)
})
}
}
}
// This test just ensures the scheduler handles the eval type to avoid // This test just ensures the scheduler handles the eval type to avoid
// regressions. // regressions.
func TestServiceSched_EvaluateMaxPlanEval(t *testing.T) { func TestServiceSched_EvaluateMaxPlanEval(t *testing.T) {

View File

@ -1356,10 +1356,11 @@ func TestPreemption(t *testing.T) {
ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions
} }
static := NewStaticRankIterator(ctx, nodes) static := NewStaticRankIterator(ctx, nodes)
binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority, testSchedulerConfig) binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority)
job := mock.Job() job := mock.Job()
job.Priority = tc.jobPriority job.Priority = tc.jobPriority
binPackIter.SetJob(job) binPackIter.SetJob(job)
binPackIter.SetSchedulerConfiguration(testSchedulerConfig)
taskGroup := &structs.TaskGroup{ taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{}, EphemeralDisk: &structs.EphemeralDisk{},

View File

@ -164,24 +164,18 @@ type BinPackIterator struct {
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks // NewBinPackIterator returns a BinPackIterator which tries to fit tasks
// potentially evicting other tasks based on a given priority. // potentially evicting other tasks based on a given priority.
func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int, schedConfig *structs.SchedulerConfiguration) *BinPackIterator { func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator {
return &BinPackIterator{
ctx: ctx,
source: source,
evict: evict,
priority: priority,
algorithm := schedConfig.EffectiveSchedulerAlgorithm() // These are default values that may be overwritten by
scoreFn := structs.ScoreFitBinPack // SetSchedulerConfiguration.
if algorithm == structs.SchedulerAlgorithmSpread { memoryOversubscription: false,
scoreFn = structs.ScoreFitSpread scoreFit: structs.ScoreFitBinPack,
} }
iter := &BinPackIterator{
ctx: ctx,
source: source,
evict: evict,
priority: priority,
memoryOversubscription: schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled,
scoreFit: scoreFn,
}
iter.ctx.Logger().Named("binpack").Trace("NewBinPackIterator created", "algorithm", algorithm)
return iter
} }
func (iter *BinPackIterator) SetJob(job *structs.Job) { func (iter *BinPackIterator) SetJob(job *structs.Job) {
@ -193,6 +187,19 @@ func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
iter.taskGroup = taskGroup iter.taskGroup = taskGroup
} }
func (iter *BinPackIterator) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
// Set scoring function.
algorithm := schedConfig.EffectiveSchedulerAlgorithm()
scoreFn := structs.ScoreFitBinPack
if algorithm == structs.SchedulerAlgorithmSpread {
scoreFn = structs.ScoreFitSpread
}
iter.scoreFit = scoreFn
// Set memory oversubscription.
iter.memoryOversubscription = schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled
}
func (iter *BinPackIterator) Next() *RankedNode { func (iter *BinPackIterator) Next() *RankedNode {
OUTER: OUTER:
for { for {

View File

@ -115,8 +115,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -228,8 +229,9 @@ func TestBinPackIterator_NoExistingAlloc_MixedReserve(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -349,8 +351,9 @@ func TestBinPackIterator_Network_Success(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -481,8 +484,9 @@ func TestBinPackIterator_Network_Failure(t *testing.T) {
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -575,8 +579,9 @@ func TestBinPackIterator_Network_NoCollision_Node(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm) out := collectRanked(scoreNorm)
@ -674,8 +679,9 @@ func TestBinPackIterator_Network_NodeError(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm) out := collectRanked(scoreNorm)
@ -801,8 +807,9 @@ func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm) out := collectRanked(scoreNorm)
@ -944,8 +951,9 @@ func TestBinPackIterator_Network_Interpolation_Success(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1055,8 +1063,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Absent_Value(t *testing.T) {
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1156,8 +1165,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Interface_Not_Exists(t *test
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1250,8 +1260,9 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1372,8 +1383,9 @@ func TestBinPackIterator_ReservedCores(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1482,8 +1494,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
}, },
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1602,8 +1615,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
}, },
} }
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup) binp.SetTaskGroup(taskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp) scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@ -1907,8 +1921,9 @@ func TestBinPackIterator_Devices(t *testing.T) {
} }
static := NewStaticRankIterator(ctx, []*RankedNode{{Node: c.Node}}) static := NewStaticRankIterator(ctx, []*RankedNode{{Node: c.Node}})
binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(c.TaskGroup) binp.SetTaskGroup(c.TaskGroup)
binp.SetSchedulerConfiguration(testSchedulerConfig)
out := binp.Next() out := binp.Next()
if out == nil && !c.NoPlace { if out == nil && !c.NoPlace {

View File

@ -78,6 +78,9 @@ type State interface {
// NodesByNodePool returns an iterator over all nodes in the node pool // NodesByNodePool returns an iterator over all nodes in the node pool
NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error) NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error)
// NodePoolByName is used to lookup a node by ID.
NodePoolByName(ws memdb.WatchSet, poolName string) (*structs.NodePool, error)
// AllocsByJob returns the allocations by JobID // AllocsByJob returns the allocations by JobID
AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error)
@ -90,7 +93,7 @@ type State interface {
// AllocsByNodeTerminal returns all the allocations by node filtering by terminal status // AllocsByNodeTerminal returns all the allocations by node filtering by terminal status
AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error)
// GetNodeByID is used to lookup a node by ID // NodeByID is used to lookup a node by ID
NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error)
// GetJobByID is used to lookup a job by ID // GetJobByID is used to lookup a job by ID

View File

@ -156,7 +156,7 @@ func (s *SystemScheduler) process() (bool, error) {
// Construct the placement stack // Construct the placement stack
s.stack = NewSystemStack(s.sysbatch, s.ctx) s.stack = NewSystemStack(s.sysbatch, s.ctx)
if !s.job.Stopped() { if !s.job.Stopped() {
s.stack.SetJob(s.job) s.setJob(s.job)
} }
// Compute the target job allocations // Compute the target job allocations
@ -211,6 +211,26 @@ func (s *SystemScheduler) process() (bool, error) {
return true, nil return true, nil
} }
// setJob updates the stack with the given job and job's node pool scheduler
// configuration.
func (s *SystemScheduler) setJob(job *structs.Job) error {
// Fetch node pool and global scheduler configuration to determine how to
// configure the scheduler.
pool, err := s.state.NodePoolByName(nil, job.NodePool)
if err != nil {
return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err)
}
_, schedConfig, err := s.state.SchedulerConfig()
if err != nil {
return fmt.Errorf("failed to get scheduler configuration: %v", err)
}
s.stack.SetJob(job)
s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
return nil
}
// computeJobAllocs is used to reconcile differences between the job, // computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations. // existing allocations and node status to update the allocations.
func (s *SystemScheduler) computeJobAllocs() error { func (s *SystemScheduler) computeJobAllocs() error {

224
scheduler/scheduler_test.go Normal file
View File

@ -0,0 +1,224 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package scheduler
import (
"fmt"
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/iterator"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
)
func TestScheduler_JobRegister_MemoryMaxHonored(t *testing.T) {
ci.Parallel(t)
// Test node pools.
poolNoSchedConfig := mock.NodePool()
poolNoSchedConfig.SchedulerConfiguration = nil
poolWithMemOversub := mock.NodePool()
poolWithMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(true),
}
poolNoMemOversub := mock.NodePool()
poolNoMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
MemoryOversubscriptionEnabled: pointer.Of(false),
}
cases := []struct {
name string
nodePool string
cpu int
memory int
memoryMax int
memoryOversubscriptionEnabled bool
expectedTaskMemoryMax int
// expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all
// tasks if memory oversubscription is enabled and SUM(memory) if it's
// disabled.
expectedTotalMemoryMax int
}{
{
name: "plain no max",
nodePool: poolNoSchedConfig.Name,
cpu: 100,
memory: 200,
memoryMax: 0,
memoryOversubscriptionEnabled: true,
expectedTaskMemoryMax: 0,
expectedTotalMemoryMax: 200,
},
{
name: "with max",
nodePool: poolNoSchedConfig.Name,
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: true,
expectedTaskMemoryMax: 300,
expectedTotalMemoryMax: 300,
},
{
name: "with max but disabled",
nodePool: poolNoSchedConfig.Name,
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: false,
expectedTaskMemoryMax: 0,
expectedTotalMemoryMax: 200, // same as no max
},
{
name: "with max and enabled by node pool",
nodePool: poolWithMemOversub.Name,
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: false,
expectedTaskMemoryMax: 300,
expectedTotalMemoryMax: 300,
},
{
name: "with max but disabled by node pool",
nodePool: poolNoMemOversub.Name,
cpu: 100,
memory: 200,
memoryMax: 300,
memoryOversubscriptionEnabled: true,
expectedTaskMemoryMax: 0,
expectedTotalMemoryMax: 200, // same as no max
},
}
jobTypes := []string{
"batch",
"service",
"sysbatch",
"system",
}
for _, jobType := range jobTypes {
for _, c := range cases {
t.Run(fmt.Sprintf("%s/%s", jobType, c.name), func(t *testing.T) {
h := NewHarness(t)
// Create node pools.
nodePools := []*structs.NodePool{
poolNoSchedConfig,
poolWithMemOversub,
poolNoMemOversub,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
// Create some nodes.
for i := 0; i < 3; i++ {
node := mock.Node()
node.NodePool = c.nodePool
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Set global scheduler configuration.
h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled,
})
// Create test job.
var job *structs.Job
switch jobType {
case "batch":
job = mock.BatchJob()
case "service":
job = mock.Job()
case "sysbatch":
job = mock.SystemBatchJob()
case "system":
job = mock.SystemJob()
}
job.TaskGroups[0].Count = 1
job.NodePool = c.nodePool
task := job.TaskGroups[0].Tasks[0].Name
res := job.TaskGroups[0].Tasks[0].Resources
res.CPU = c.cpu
res.MemoryMB = c.memory
res.MemoryMaxMB = c.memoryMax
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
var scheduler Factory
switch jobType {
case "batch":
scheduler = NewBatchScheduler
case "service":
scheduler = NewServiceScheduler
case "sysbatch":
scheduler = NewSysBatchScheduler
case "system":
scheduler = NewSystemScheduler
}
err := h.Process(scheduler, eval)
must.NoError(t, err)
must.Len(t, 1, h.Plans)
allocs, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
must.NoError(t, err)
// Ensure all allocations placed
var expectedAllocCount int
switch jobType {
case "batch", "service":
expectedAllocCount = 1
case "system", "sysbatch":
nodes, err := h.State.NodesByNodePool(nil, job.NodePool)
must.NoError(t, err)
expectedAllocCount = iterator.Len(nodes)
}
must.Len(t, expectedAllocCount, allocs)
alloc := allocs[0]
// checking new resources field deprecated Resources fields
must.Eq(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares)
must.Eq(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB)
must.Eq(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB)
// checking old deprecated Resources fields
must.Eq(t, c.cpu, alloc.TaskResources[task].CPU)
must.Eq(t, c.memory, alloc.TaskResources[task].MemoryMB)
must.Eq(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB)
// check total resource fields - alloc.Resources deprecated field, no modern equivalent
must.Eq(t, c.cpu, alloc.Resources.CPU)
must.Eq(t, c.memory, alloc.Resources.MemoryMB)
must.Eq(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB)
})
}
}
}

View File

@ -118,6 +118,13 @@ func (s *GenericStack) SetJob(job *structs.Job) {
} }
} }
// SetSchedulerConfiguration applies the given scheduler configuration to
// process nodes. Scheduler configuration values may change per job depending
// on the node pool being used.
func (s *GenericStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
s.binPack.SetSchedulerConfiguration(schedConfig)
}
func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode { func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// This block handles trying to select from preferred nodes if options specify them // This block handles trying to select from preferred nodes if options specify them
@ -275,6 +282,11 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
// Apply the bin packing, this depends on the resources needed // Apply the bin packing, this depends on the resources needed
// by a particular task group. Enable eviction as system jobs are high // by a particular task group. Enable eviction as system jobs are high
// priority. // priority.
//
// The scheduler configuration is read directly from state but only
// values that can't be specified per node pool should be used. Other
// values must be merged by calling schedConfig.WithNodePool() and set in
// the stack by calling SetSchedulerConfiguration().
_, schedConfig, _ := s.ctx.State().SchedulerConfig() _, schedConfig, _ := s.ctx.State().SchedulerConfig()
enablePreemption := true enablePreemption := true
if schedConfig != nil { if schedConfig != nil {
@ -286,7 +298,7 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
} }
// Create binpack iterator // Create binpack iterator
s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig) s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0)
// Apply score normalization // Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack) s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
@ -311,6 +323,13 @@ func (s *SystemStack) SetJob(job *structs.Job) {
} }
} }
// SetSchedulerConfiguration applies the given scheduler configuration to
// process nodes. Scheduler configuration values may change per job depending
// on the node pool being used.
func (s *SystemStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
s.binPack.SetSchedulerConfiguration(schedConfig)
}
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode { func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// Reset the binpack selector and context // Reset the binpack selector and context
s.scoreNorm.Reset() s.scoreNorm.Reset()
@ -412,8 +431,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Apply the bin packing, this depends on the resources needed // Apply the bin packing, this depends on the resources needed
// by a particular task group. // by a particular task group.
_, schedConfig, _ := ctx.State().SchedulerConfig() s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig)
// Apply the job anti-affinity iterator. This is to avoid placing // Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. // multiple allocations on the same node for this job.

View File

@ -67,12 +67,16 @@ $ curl \
settings mentioned below. settings mentioned below.
- `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler
binpacks or spreads allocations on available nodes. binpacks or spreads allocations on available nodes. Node pools may set
their own [`SchedulerAlgorithm`][np_sched_algo] value that takes precedence
over this global value.
- `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When - `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may
`true`, tasks may exceed their reserved memory limit, if the client has excess exceed their reserved memory limit, if the client has excess memory
memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
to take advantage of memory oversubscription. to take advantage of memory oversubscription. Node pools may set their own
[`MemoryOversubscriptionEnabled`][np_mem_oversubs] value that takes
precedence over this global value.
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs, permission denied errors for job registration, job dispatch, and job scale APIs,
@ -148,12 +152,14 @@ server state is authoritative.
- `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler
binpacks or spreads allocations on available nodes. Possible values are binpacks or spreads allocations on available nodes. Possible values are
`"binpack"` and `"spread"` `"binpack"` and `"spread"`. This value may also be set per [node
pool][np_sched_algo].
- `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When - `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may
`true`, tasks may exceed their reserved memory limit, if the client has excess exceed their reserved memory limit, if the client has excess memory capacity.
memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
to take advantage of memory oversubscription. to take advantage of memory oversubscription. This value may also be set per
[node pool][np_mem_oversubs].
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs, permission denied errors for job registration, job dispatch, and job scale APIs,
@ -200,3 +206,5 @@ server state is authoritative.
- `Index` - Current Raft index when the request was received. - `Index` - Current Raft index when the request was received.
[`default_scheduler_config`]: /nomad/docs/configuration/server#default_scheduler_config [`default_scheduler_config`]: /nomad/docs/configuration/server#default_scheduler_config
[np_mem_oversubs]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled
[np_sched_algo]: /nomad/docs/other-specifications/node-pool#scheduler_algorithm

View File

@ -97,7 +97,7 @@ the task may exceed the limit and be interrupted; if the task memory is too
high, the cluster is left underutilized. high, the cluster is left underutilized.
To help maximize cluster memory utilization while allowing a safety margin for To help maximize cluster memory utilization while allowing a safety margin for
unexpected load spikes, Nomad 1.1. lets job authors set two separate memory unexpected load spikes, Nomad allows job authors to set two separate memory
limits: limits:
* `memory`: the reserve limit to represent the tasks typical memory usage — * `memory`: the reserve limit to represent the tasks typical memory usage —
@ -112,14 +112,15 @@ may kill oversubscribed tasks and reschedule them to other clients. The exact
mechanism for memory pressure is specific to the task driver, operating system, mechanism for memory pressure is specific to the task driver, operating system,
and application runtime. and application runtime.
The new max limit attribute is currently supported by the official `docker`, The `memory_max` limit attribute is currently supported by the official
`exec`, and `java` task drivers. Consult the documentation of `docker`, `exec`, and `java` task drivers. Consult the documentation of
community-supported task drivers for their memory oversubscription support. community-supported task drivers for their memory oversubscription support.
Memory oversubscription is opt-in. Nomad operators can enable [Memory Oversubscription in the scheduler Memory oversubscription is opt-in. Nomad operators can enable [Memory
configuration](/nomad/api-docs/operator/scheduler#update-scheduler-configuration). Enterprise customers can use [Resource Oversubscription in the scheduler configuration][api_sched_config]. Enterprise
Quotas](/nomad/tutorials/governance-and-policy/quotas) to limit the memory customers can use [Resource Quotas][tutorial_quota] to limit the memory
oversubscription. oversubscription and enable or disable memory oversubscription per [node
pool][np_sched_config].
To avoid degrading the cluster experience, we recommend examining and monitoring To avoid degrading the cluster experience, we recommend examining and monitoring
resource utilization and considering the following suggestions: resource utilization and considering the following suggestions:
@ -136,6 +137,9 @@ resource utilization and considering the following suggestions:
1GB in aggregate before the memory becomes contended and allocations get 1GB in aggregate before the memory becomes contended and allocations get
killed. killed.
[api_sched_config]: /nomad/api-docs/operator/scheduler#update-scheduler-configuration
[device]: /nomad/docs/job-specification/device 'Nomad device Job Specification' [device]: /nomad/docs/job-specification/device 'Nomad device Job Specification'
[docker_cpu]: /nomad/docs/drivers/docker#cpu [docker_cpu]: /nomad/docs/drivers/docker#cpu
[exec_cpu]: /nomad/docs/drivers/exec#cpu [exec_cpu]: /nomad/docs/drivers/exec#cpu
[np_sched_config]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled
[tutorial_quota]: /nomad/tutorials/governance-and-policy/quotas