From ac08fc751b02f26c4728320203d7e9680f83a730 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 21 Jun 2023 20:31:50 -0400 Subject: [PATCH] node pools: apply node pool scheduler configuration (#17598) --- helper/iterator/iterator.go | 23 + nomad/job_endpoint_hooks.go | 7 +- nomad/job_endpoint_test.go | 107 +++++ nomad/structs/node_pool.go | 21 + nomad/structs/node_pool_test.go | 67 +++ nomad/structs/operator.go | 20 + nomad/structs/operator_test.go | 108 +++++ scheduler/generic_sched.go | 74 +++- scheduler/generic_sched_test.go | 409 +++++++++++++----- scheduler/preemption_test.go | 3 +- scheduler/rank.go | 39 +- scheduler/rank_test.go | 45 +- scheduler/scheduler.go | 5 +- scheduler/scheduler_system.go | 22 +- scheduler/scheduler_test.go | 224 ++++++++++ scheduler/stack.go | 24 +- .../content/api-docs/operator/scheduler.mdx | 28 +- .../docs/job-specification/resources.mdx | 18 +- 18 files changed, 1069 insertions(+), 175 deletions(-) create mode 100644 helper/iterator/iterator.go create mode 100644 nomad/structs/operator_test.go create mode 100644 scheduler/scheduler_test.go diff --git a/helper/iterator/iterator.go b/helper/iterator/iterator.go new file mode 100644 index 000000000..8f87c4b7d --- /dev/null +++ b/helper/iterator/iterator.go @@ -0,0 +1,23 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package iterator + +// Iterator represents an object that can iterate over a set of values one at a +// time. +type Iterator interface { + // Next returns the next element or nil if there are none left. + Next() any +} + +// Len consumes the iterator and returns the number of elements found. +// +// IMPORTANT: this method consumes the iterator, so it should not be used after +// Len() returns. +func Len(iter Iterator) int { + count := 0 + for raw := iter.Next(); raw != nil; raw = iter.Next() { + count++ + } + return count +} diff --git a/nomad/job_endpoint_hooks.go b/nomad/job_endpoint_hooks.go index 631d280cd..ee28e19db 100644 --- a/nomad/job_endpoint_hooks.go +++ b/nomad/job_endpoint_hooks.go @@ -322,7 +322,12 @@ func (v *memoryOversubscriptionValidate) Validate(job *structs.Job) (warnings [] return nil, err } - if c != nil && c.MemoryOversubscriptionEnabled { + pool, err := v.srv.State().NodePoolByName(nil, job.NodePool) + if err != nil { + return nil, err + } + + if pool.MemoryOversubscriptionEnabled(c) { return nil, nil } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e16b5f5e1..cab6dec9e 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2164,6 +2164,113 @@ func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) { require.Empty(t, resp.Warnings) } +func TestJobEndpoint_Register_ValidateMemoryMax_NodePool(t *testing.T) { + ci.Parallel(t) + + s, cleanupS := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Store default scheduler configuration to reset between test cases. + _, defaultSchedConfig, err := s.State().SchedulerConfig() + must.NoError(t, err) + + // Create test node pools. + noSchedConfig := mock.NodePool() + noSchedConfig.SchedulerConfiguration = nil + + withMemOversub := mock.NodePool() + withMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(true), + } + + noMemOversub := mock.NodePool() + noMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(false), + } + + s.State().UpsertNodePools(structs.MsgTypeTestSetup, 100, []*structs.NodePool{ + noSchedConfig, + withMemOversub, + noMemOversub, + }) + + testCases := []struct { + name string + pool string + globalConfig *structs.SchedulerConfiguration + expectedWarning string + }{ + { + name: "no scheduler config uses global config", + pool: noSchedConfig.Name, + globalConfig: &structs.SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expectedWarning: "", + }, + { + name: "enabled via node pool", + pool: withMemOversub.Name, + globalConfig: &structs.SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + }, + expectedWarning: "", + }, + { + name: "disabled via node pool", + pool: noMemOversub.Name, + globalConfig: &structs.SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expectedWarning: "Memory oversubscription is not enabled", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set global scheduler config if provided. + if tc.globalConfig != nil { + idx, err := s.State().LatestIndex() + must.NoError(t, err) + + err = s.State().SchedulerSetConfig(idx, tc.globalConfig) + must.NoError(t, err) + } + + // Create job with node_pool and memory_max. + job := mock.Job() + job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 2000 + job.NodePool = tc.pool + + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + // Validate respose. + must.NoError(t, err) + if tc.expectedWarning != "" { + must.StrContains(t, resp.Warnings, tc.expectedWarning) + } else { + must.Eq(t, "", resp.Warnings) + } + + // Reset to default global scheduler config. + err = s.State().SchedulerSetConfig(resp.Index+1, defaultSchedConfig) + must.NoError(t, err) + }) + } +} + // evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation { var store raft.LogStore = s.raftInmem diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index e706b4ff2..babc3e1ed 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -117,6 +117,24 @@ func (n *NodePool) IsBuiltIn() bool { } } +// MemoryOversubscriptionEnabled returns true if memory oversubscription is +// enabled in the node pool or in the global cluster configuration. +func (n *NodePool) MemoryOversubscriptionEnabled(global *SchedulerConfiguration) bool { + + // Default to the global scheduler config. + memOversubEnabled := global != nil && global.MemoryOversubscriptionEnabled + + // But overwrite it if the node pool also has it configured. + poolHasMemOversub := n != nil && + n.SchedulerConfiguration != nil && + n.SchedulerConfiguration.MemoryOversubscriptionEnabled != nil + if poolHasMemOversub { + memOversubEnabled = *n.SchedulerConfiguration.MemoryOversubscriptionEnabled + } + + return memOversubEnabled +} + // SetHash is used to compute and set the hash of node pool func (n *NodePool) SetHash() []byte { // Initialize a 256bit Blake2 hash (32 bytes) @@ -163,6 +181,9 @@ func (n *NodePool) SetHash() []byte { // NodePoolSchedulerConfiguration is the scheduler confinguration applied to a // node pool. +// +// When adding new values that should override global scheduler configuration, +// verify the scheduler handles the node pool configuration as well. type NodePoolSchedulerConfiguration struct { // SchedulerAlgorithm is the scheduling algorithm to use for the pool. diff --git a/nomad/structs/node_pool_test.go b/nomad/structs/node_pool_test.go index f261c49c6..dd04df74c 100644 --- a/nomad/structs/node_pool_test.go +++ b/nomad/structs/node_pool_test.go @@ -127,3 +127,70 @@ func TestNodePool_IsBuiltIn(t *testing.T) { }) } } + +func TestNodePool_MemoryOversubscriptionEnabled(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + pool *NodePool + global *SchedulerConfiguration + expected bool + }{ + { + name: "global used if pool is nil", + pool: nil, + global: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expected: true, + }, + { + name: "global used if pool doesn't have scheduler config", + pool: &NodePool{}, + global: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expected: true, + }, + { + name: "global used if pool doesn't specify memory oversub", + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{}, + }, + global: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expected: true, + }, + { + name: "pool overrides global if it defines memory oversub", + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(false), + }, + }, + global: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + expected: false, + }, + { + name: "pool used if global is nil", + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(true), + }, + }, + global: nil, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tc.pool.MemoryOversubscriptionEnabled(tc.global) + must.Eq(t, got, tc.expected) + }) + } +} diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index a14e4ccaf..91184024e 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -196,6 +196,26 @@ func (s *SchedulerConfiguration) EffectiveSchedulerAlgorithm() SchedulerAlgorith return s.SchedulerAlgorithm } +// WithNodePool returns a new SchedulerConfiguration with the node pool +// scheduler configuration applied. +func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfiguration { + schedConfig := s.Copy() + + if pool == nil || pool.SchedulerConfiguration == nil { + return schedConfig + } + + poolConfig := pool.SchedulerConfiguration + if poolConfig.SchedulerAlgorithm != "" { + schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm + } + if poolConfig.MemoryOversubscriptionEnabled != nil { + schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled + } + + return schedConfig +} + func (s *SchedulerConfiguration) Canonicalize() { if s != nil && s.SchedulerAlgorithm == "" { s.SchedulerAlgorithm = SchedulerAlgorithmBinpack diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go new file mode 100644 index 000000000..229fdb050 --- /dev/null +++ b/nomad/structs/operator_test.go @@ -0,0 +1,108 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package structs + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/shoenig/test/must" +) + +func TestSchedulerConfiguration_WithNodePool(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + schedConfig *SchedulerConfiguration + pool *NodePool + expected *SchedulerConfiguration + }{ + { + name: "nil pool returns same config", + schedConfig: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + pool: nil, + expected: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + }, + { + name: "nil pool scheduler config returns same config", + schedConfig: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + pool: &NodePool{}, + expected: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + }, + { + name: "pool with memory oversubscription overwrites config", + schedConfig: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + }, + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(true), + }, + }, + expected: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: true, + }, + }, + { + name: "pool with scheduler algorithm overwrites config", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmBinpack, + }, + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + }, + expected: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + }, + { + name: "pool without memory oversubscription does not modify config", + schedConfig: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + }, + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{}, + }, + expected: &SchedulerConfiguration{ + MemoryOversubscriptionEnabled: false, + }, + }, + { + name: "pool without scheduler algorithm does not modify config", + schedConfig: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + pool: &NodePool{ + SchedulerConfiguration: &NodePoolSchedulerConfiguration{}, + }, + expected: &SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tc.schedConfig.WithNodePool(tc.pool) + must.Eq(t, tc.expected, got) + must.NotEqOp(t, tc.schedConfig, got) + }) + } +} diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 3d04d9c5b..e5af34b1d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -281,7 +282,7 @@ func (s *GenericScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) if !s.job.Stopped() { - s.stack.SetJob(s.job) + s.setJob(s.job) } // Compute the target job allocations @@ -509,7 +510,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, s.job.Datacenters, s.job.NodePool) + nodes, byDC, err := s.setNodes(s.job) if err != nil { return err } @@ -562,10 +563,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul continue } - // Use downgraded job in scheduling stack to honor - // old job resources and constraints + // Use downgraded job in scheduling stack to honor old job + // resources, constraints, and node pool scheduler configuration. if downgradedJob != nil { - s.stack.SetJob(downgradedJob) + s.setJob(downgradedJob) + + if needsToSetNodes(downgradedJob, s.job) { + nodes, byDC, err = s.setNodes(downgradedJob) + if err != nil { + return err + } + } } // Find the preferred node @@ -596,9 +604,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() - // Restore stack job now that placement is done, to use plan job version + // Restore stack job and nodes now that placement is done, to use + // plan job version if downgradedJob != nil { - s.stack.SetJob(s.job) + s.setJob(s.job) + + if needsToSetNodes(downgradedJob, s.job) { + nodes, byDC, err = s.setNodes(s.job) + if err != nil { + return err + } + } } // Set fields based on if we found an allocation option @@ -690,6 +706,45 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul return nil } +// setJob updates the stack with the given job and job's node pool scheduler +// configuration. +func (s *GenericScheduler) setJob(job *structs.Job) error { + // Fetch node pool and global scheduler configuration to determine how to + // configure the scheduler. + pool, err := s.state.NodePoolByName(nil, job.NodePool) + if err != nil { + return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err) + } + + _, schedConfig, err := s.state.SchedulerConfig() + if err != nil { + return fmt.Errorf("failed to get scheduler configuration: %v", err) + } + + s.stack.SetJob(job) + s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool)) + return nil +} + +// setnodes updates the stack with the nodes that are ready for placement for +// the given job. +func (s *GenericScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) { + nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, job.Datacenters, job.NodePool) + if err != nil { + return nil, nil, err + } + + s.stack.SetNodes(nodes) + return nodes, byDC, nil +} + +// needsToSetNodes returns true if jobs a and b changed in a way that requires +// the nodes to be reset. +func needsToSetNodes(a, b *structs.Job) bool { + return !helper.SliceSetEq(a.Datacenters, b.Datacenters) || + a.NodePool != b.NodePool +} + // propagateTaskState copies task handles from previous allocations to // replacement allocations when the previous allocation is being drained or was // lost. Remote task drivers rely on this to reconnect to remote tasks when the @@ -818,6 +873,11 @@ func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions _, schedConfig, _ := s.ctx.State().SchedulerConfig() // Check if preemption is enabled, defaults to true + // + // The scheduler configuration is read directly from state but only + // values that can't be specified per node pool should be used. Other + // values must be merged by calling schedConfig.WithNodePool() and set in + // the stack by calling SetSchedulerConfiguration(). enablePreemption := true if schedConfig != nil { if s.job.Type == structs.JobTypeBatch { diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 20bbbb462..3c9a833c1 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -124,119 +124,6 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_MemoryMaxHonored(t *testing.T) { - ci.Parallel(t) - - cases := []struct { - name string - cpu int - memory int - memoryMax int - memoryOversubscriptionEnabled bool - - expectedTaskMemoryMax int - // expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all tasks - expectedTotalMemoryMax int - }{ - { - name: "plain no max", - cpu: 100, - memory: 200, - memoryMax: 0, - memoryOversubscriptionEnabled: true, - - expectedTaskMemoryMax: 0, - expectedTotalMemoryMax: 200, - }, - { - name: "with max", - cpu: 100, - memory: 200, - memoryMax: 300, - memoryOversubscriptionEnabled: true, - - expectedTaskMemoryMax: 300, - expectedTotalMemoryMax: 300, - }, - { - name: "with max but disabled", - cpu: 100, - memory: 200, - memoryMax: 300, - - memoryOversubscriptionEnabled: false, - expectedTaskMemoryMax: 0, - expectedTotalMemoryMax: 200, // same as no max - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - job := mock.Job() - job.TaskGroups[0].Count = 1 - - task := job.TaskGroups[0].Tasks[0].Name - res := job.TaskGroups[0].Tasks[0].Resources - res.CPU = c.cpu - res.MemoryMB = c.memory - res.MemoryMaxMB = c.memoryMax - - h := NewHarness(t) - h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ - MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled, - }) - - // Create some nodes - for i := 0; i < 10; i++ { - node := mock.Node() - require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) - } - require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) - - // Create a mock evaluation to register the job - eval := &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: job.ID, - Status: structs.EvalStatusPending, - } - - require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) - - // Process the evaluation - err := h.Process(NewServiceScheduler, eval) - require.NoError(t, err) - - require.Len(t, h.Plans, 1) - - out, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false) - require.NoError(t, err) - - // Ensure all allocations placed - require.Len(t, out, 1) - alloc := out[0] - - // checking new resources field deprecated Resources fields - require.Equal(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares) - require.Equal(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB) - require.Equal(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB) - - // checking old deprecated Resources fields - require.Equal(t, c.cpu, alloc.TaskResources[task].CPU) - require.Equal(t, c.memory, alloc.TaskResources[task].MemoryMB) - require.Equal(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB) - - // check total resource fields - alloc.Resources deprecated field, no modern equivalent - require.Equal(t, c.cpu, alloc.Resources.CPU) - require.Equal(t, c.memory, alloc.Resources.MemoryMB) - require.Equal(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB) - - }) - } -} - func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { ci.Parallel(t) @@ -854,6 +741,140 @@ func TestServiceSched_Spread(t *testing.T) { } } +// TestServiceSched_JobRegister_NodePool_Downgrade tests the case where an +// allocation fails during a deployment with canaries, where the job changes +// node pool. The failed alloc should be placed in the node pool of the +// original job. +func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { + ci.Parallel(t) + + h := NewHarness(t) + + // Set global scheduler configuration. + h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + }) + + // Create test node pools with different scheduler algorithms. + poolBinpack := mock.NodePool() + poolBinpack.Name = "pool-binpack" + poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + } + + poolSpread := mock.NodePool() + poolSpread.Name = "pool-spread" + poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmSpread, + } + + nodePools := []*structs.NodePool{ + poolBinpack, + poolSpread, + } + h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools) + + // Create 5 nodes in each node pool. + // Use two loops so nodes are separated by node pool. + nodes := []*structs.Node{} + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-binpack-%d", i) + node.NodePool = poolBinpack.Name + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + for i := 0; i < 5; i++ { + node := mock.Node() + node.Name = fmt.Sprintf("node-spread-%d", i) + node.NodePool = poolSpread.Name + nodes = append(nodes, node) + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Create first version of the test job running in the binpack node pool. + job1 := mock.Job() + job1.Version = 1 + job1.NodePool = poolBinpack.Name + job1.Status = structs.JobStatusRunning + job1.TaskGroups[0].Count = 3 + job1.TaskGroups[0].Update = &structs.UpdateStrategy{ + Stagger: time.Duration(30 * time.Second), + MaxParallel: 1, + HealthCheck: "checks", + MinHealthyTime: time.Duration(30 * time.Second), + HealthyDeadline: time.Duration(9 * time.Minute), + ProgressDeadline: time.Duration(10 * time.Minute), + AutoRevert: true, + Canary: 1, + } + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1)) + + // Create allocs for this job version with one being a canary and another + // marked as failed. + allocs := []*structs.Allocation{} + for i := 0; i < 3; i++ { + alloc := mock.Alloc() + alloc.Job = job1 + alloc.JobID = job1.ID + alloc.NodeID = nodes[i].ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Now(), + Canary: false, + ModifyIndex: h.NextIndex(), + } + if i == 0 { + alloc.DeploymentStatus.Canary = true + } + if i == 1 { + alloc.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, alloc) + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update job to place it in the spread node pool. + job2 := job1.Copy() + job2.Version = 2 + job2.NodePool = poolSpread.Name + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2)) + + eval := &structs.Evaluation{ + Namespace: job2.Namespace, + ID: uuid.Generate(), + Priority: job2.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job2.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + processErr := h.Process(NewServiceScheduler, eval) + require.NoError(t, processErr, "failed to process eval") + require.Len(t, h.Plans, 1) + + // Verify the plan places the new allocation in the spread node pool and + // the replacement failure from the previous version in the binpack pool. + for nodeID, allocs := range h.Plans[0].NodeAllocation { + var node *structs.Node + for _, n := range nodes { + if n.ID == nodeID { + node = n + break + } + } + + must.Len(t, 1, allocs) + alloc := allocs[0] + must.Eq(t, alloc.Job.NodePool, node.NodePool, must.Sprintf( + "alloc for job in node pool %q placed in node in node pool %q", + alloc.Job.NodePool, + node.NodePool, + )) + } +} + // Test job registration with even spread across dc func TestServiceSched_EvenSpread(t *testing.T) { ci.Parallel(t) @@ -1336,6 +1357,168 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_SchedulerAlgorithm(t *testing.T) { + ci.Parallel(t) + + // Test node pools. + poolNoSchedConfig := mock.NodePool() + poolNoSchedConfig.SchedulerConfiguration = nil + + poolBinpack := mock.NodePool() + poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + } + + poolSpread := mock.NodePool() + poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + SchedulerAlgorithm: structs.SchedulerAlgorithmSpread, + } + + testCases := []struct { + name string + nodePool string + schedulerAlgorithm structs.SchedulerAlgorithm + expectedAlgorithm structs.SchedulerAlgorithm + }{ + { + name: "global binpack", + nodePool: poolNoSchedConfig.Name, + schedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + expectedAlgorithm: structs.SchedulerAlgorithmBinpack, + }, + { + name: "global spread", + nodePool: poolNoSchedConfig.Name, + schedulerAlgorithm: structs.SchedulerAlgorithmSpread, + expectedAlgorithm: structs.SchedulerAlgorithmSpread, + }, + { + name: "node pool binpack overrides global config", + nodePool: poolBinpack.Name, + schedulerAlgorithm: structs.SchedulerAlgorithmSpread, + expectedAlgorithm: structs.SchedulerAlgorithmBinpack, + }, + { + name: "node pool spread overrides global config", + nodePool: poolSpread.Name, + schedulerAlgorithm: structs.SchedulerAlgorithmBinpack, + expectedAlgorithm: structs.SchedulerAlgorithmSpread, + }, + } + + jobTypes := []string{ + "batch", + "service", + } + + for _, jobType := range jobTypes { + for _, tc := range testCases { + t.Run(fmt.Sprintf("%s/%s", jobType, tc.name), func(t *testing.T) { + h := NewHarness(t) + + // Create node pools. + nodePools := []*structs.NodePool{ + poolNoSchedConfig, + poolBinpack, + poolSpread, + } + h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools) + + // Create two test nodes. Use two to prevent flakiness due to + // the scheduler shuffling nodes. + for i := 0; i < 2; i++ { + node := mock.Node() + node.NodePool = tc.nodePool + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Set global scheduler configuration. + h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ + SchedulerAlgorithm: tc.schedulerAlgorithm, + }) + + // Create test job. + var job *structs.Job + switch jobType { + case "batch": + job = mock.BatchJob() + case "service": + job = mock.Job() + } + job.TaskGroups[0].Count = 1 + job.NodePool = tc.nodePool + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Register an existing job. + existingJob := mock.Job() + existingJob.TaskGroups[0].Count = 1 + existingJob.NodePool = tc.nodePool + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, existingJob)) + + // Process eval for existing job to place an existing alloc. + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: existingJob.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: existingJob.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + var scheduler Factory + switch jobType { + case "batch": + scheduler = NewBatchScheduler + case "service": + scheduler = NewServiceScheduler + } + err := h.Process(scheduler, eval) + must.NoError(t, err) + + must.Len(t, 1, h.Plans) + allocs, err := h.State.AllocsByJob(nil, existingJob.Namespace, existingJob.ID, false) + must.NoError(t, err) + must.Len(t, 1, allocs) + + // Process eval for test job. + eval = &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + err = h.Process(scheduler, eval) + must.NoError(t, err) + + must.Len(t, 2, h.Plans) + allocs, err = h.State.AllocsByJob(nil, job.Namespace, job.ID, false) + must.NoError(t, err) + must.Len(t, 1, allocs) + + // Expect new alloc to be either in the empty node or in the + // node with the existing alloc depending on the expected + // scheduler algorithm. + var expectedAllocCount int + switch tc.expectedAlgorithm { + case structs.SchedulerAlgorithmSpread: + expectedAllocCount = 1 + case structs.SchedulerAlgorithmBinpack: + expectedAllocCount = 2 + } + + alloc := allocs[0] + nodeAllocs, err := h.State.AllocsByNode(nil, alloc.NodeID) + must.NoError(t, err) + must.Len(t, expectedAllocCount, nodeAllocs) + }) + } + } +} + // This test just ensures the scheduler handles the eval type to avoid // regressions. func TestServiceSched_EvaluateMaxPlanEval(t *testing.T) { diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index cba1da068..ee529595f 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -1356,10 +1356,11 @@ func TestPreemption(t *testing.T) { ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions } static := NewStaticRankIterator(ctx, nodes) - binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority, testSchedulerConfig) + binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority) job := mock.Job() job.Priority = tc.jobPriority binPackIter.SetJob(job) + binPackIter.SetSchedulerConfiguration(testSchedulerConfig) taskGroup := &structs.TaskGroup{ EphemeralDisk: &structs.EphemeralDisk{}, diff --git a/scheduler/rank.go b/scheduler/rank.go index 767e16a32..0b960c39b 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -164,24 +164,18 @@ type BinPackIterator struct { // NewBinPackIterator returns a BinPackIterator which tries to fit tasks // potentially evicting other tasks based on a given priority. -func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int, schedConfig *structs.SchedulerConfiguration) *BinPackIterator { +func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator { + return &BinPackIterator{ + ctx: ctx, + source: source, + evict: evict, + priority: priority, - algorithm := schedConfig.EffectiveSchedulerAlgorithm() - scoreFn := structs.ScoreFitBinPack - if algorithm == structs.SchedulerAlgorithmSpread { - scoreFn = structs.ScoreFitSpread + // These are default values that may be overwritten by + // SetSchedulerConfiguration. + memoryOversubscription: false, + scoreFit: structs.ScoreFitBinPack, } - - iter := &BinPackIterator{ - ctx: ctx, - source: source, - evict: evict, - priority: priority, - memoryOversubscription: schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled, - scoreFit: scoreFn, - } - iter.ctx.Logger().Named("binpack").Trace("NewBinPackIterator created", "algorithm", algorithm) - return iter } func (iter *BinPackIterator) SetJob(job *structs.Job) { @@ -193,6 +187,19 @@ func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) { iter.taskGroup = taskGroup } +func (iter *BinPackIterator) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) { + // Set scoring function. + algorithm := schedConfig.EffectiveSchedulerAlgorithm() + scoreFn := structs.ScoreFitBinPack + if algorithm == structs.SchedulerAlgorithmSpread { + scoreFn = structs.ScoreFitSpread + } + iter.scoreFit = scoreFn + + // Set memory oversubscription. + iter.memoryOversubscription = schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled +} + func (iter *BinPackIterator) Next() *RankedNode { OUTER: for { diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 00005cc75..dce4ced9c 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -115,8 +115,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -228,8 +229,9 @@ func TestBinPackIterator_NoExistingAlloc_MixedReserve(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -349,8 +351,9 @@ func TestBinPackIterator_Network_Success(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -481,8 +484,9 @@ func TestBinPackIterator_Network_Failure(t *testing.T) { }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -575,8 +579,9 @@ func TestBinPackIterator_Network_NoCollision_Node(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) out := collectRanked(scoreNorm) @@ -674,8 +679,9 @@ func TestBinPackIterator_Network_NodeError(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) out := collectRanked(scoreNorm) @@ -801,8 +807,9 @@ func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) out := collectRanked(scoreNorm) @@ -944,8 +951,9 @@ func TestBinPackIterator_Network_Interpolation_Success(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1055,8 +1063,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Absent_Value(t *testing.T) { }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1156,8 +1165,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Interface_Not_Exists(t *test }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1250,8 +1260,9 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1372,8 +1383,9 @@ func TestBinPackIterator_ReservedCores(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1482,8 +1494,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { }, }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1602,8 +1615,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { }, } - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) scoreNorm := NewScoreNormalizationIterator(ctx, binp) @@ -1907,8 +1921,9 @@ func TestBinPackIterator_Devices(t *testing.T) { } static := NewStaticRankIterator(ctx, []*RankedNode{{Node: c.Node}}) - binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(c.TaskGroup) + binp.SetSchedulerConfiguration(testSchedulerConfig) out := binp.Next() if out == nil && !c.NoPlace { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 6a8d2bda9..326bf4780 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -78,6 +78,9 @@ type State interface { // NodesByNodePool returns an iterator over all nodes in the node pool NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error) + // NodePoolByName is used to lookup a node by ID. + NodePoolByName(ws memdb.WatchSet, poolName string) (*structs.NodePool, error) + // AllocsByJob returns the allocations by JobID AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) @@ -90,7 +93,7 @@ type State interface { // AllocsByNodeTerminal returns all the allocations by node filtering by terminal status AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) - // GetNodeByID is used to lookup a node by ID + // NodeByID is used to lookup a node by ID NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) // GetJobByID is used to lookup a job by ID diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 1471701eb..5ff636563 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -156,7 +156,7 @@ func (s *SystemScheduler) process() (bool, error) { // Construct the placement stack s.stack = NewSystemStack(s.sysbatch, s.ctx) if !s.job.Stopped() { - s.stack.SetJob(s.job) + s.setJob(s.job) } // Compute the target job allocations @@ -211,6 +211,26 @@ func (s *SystemScheduler) process() (bool, error) { return true, nil } +// setJob updates the stack with the given job and job's node pool scheduler +// configuration. +func (s *SystemScheduler) setJob(job *structs.Job) error { + // Fetch node pool and global scheduler configuration to determine how to + // configure the scheduler. + pool, err := s.state.NodePoolByName(nil, job.NodePool) + if err != nil { + return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err) + } + + _, schedConfig, err := s.state.SchedulerConfig() + if err != nil { + return fmt.Errorf("failed to get scheduler configuration: %v", err) + } + + s.stack.SetJob(job) + s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool)) + return nil +} + // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. func (s *SystemScheduler) computeJobAllocs() error { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go new file mode 100644 index 000000000..003a1571d --- /dev/null +++ b/scheduler/scheduler_test.go @@ -0,0 +1,224 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package scheduler + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/iterator" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" +) + +func TestScheduler_JobRegister_MemoryMaxHonored(t *testing.T) { + ci.Parallel(t) + + // Test node pools. + poolNoSchedConfig := mock.NodePool() + poolNoSchedConfig.SchedulerConfiguration = nil + + poolWithMemOversub := mock.NodePool() + poolWithMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(true), + } + + poolNoMemOversub := mock.NodePool() + poolNoMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{ + MemoryOversubscriptionEnabled: pointer.Of(false), + } + + cases := []struct { + name string + nodePool string + cpu int + memory int + memoryMax int + memoryOversubscriptionEnabled bool + + expectedTaskMemoryMax int + // expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all + // tasks if memory oversubscription is enabled and SUM(memory) if it's + // disabled. + expectedTotalMemoryMax int + }{ + { + name: "plain no max", + nodePool: poolNoSchedConfig.Name, + cpu: 100, + memory: 200, + memoryMax: 0, + memoryOversubscriptionEnabled: true, + + expectedTaskMemoryMax: 0, + expectedTotalMemoryMax: 200, + }, + { + name: "with max", + nodePool: poolNoSchedConfig.Name, + cpu: 100, + memory: 200, + memoryMax: 300, + memoryOversubscriptionEnabled: true, + + expectedTaskMemoryMax: 300, + expectedTotalMemoryMax: 300, + }, + { + name: "with max but disabled", + nodePool: poolNoSchedConfig.Name, + cpu: 100, + memory: 200, + memoryMax: 300, + + memoryOversubscriptionEnabled: false, + expectedTaskMemoryMax: 0, + expectedTotalMemoryMax: 200, // same as no max + }, + { + name: "with max and enabled by node pool", + nodePool: poolWithMemOversub.Name, + cpu: 100, + memory: 200, + memoryMax: 300, + memoryOversubscriptionEnabled: false, + + expectedTaskMemoryMax: 300, + expectedTotalMemoryMax: 300, + }, + { + name: "with max but disabled by node pool", + nodePool: poolNoMemOversub.Name, + cpu: 100, + memory: 200, + memoryMax: 300, + memoryOversubscriptionEnabled: true, + + expectedTaskMemoryMax: 0, + expectedTotalMemoryMax: 200, // same as no max + }, + } + + jobTypes := []string{ + "batch", + "service", + "sysbatch", + "system", + } + + for _, jobType := range jobTypes { + for _, c := range cases { + t.Run(fmt.Sprintf("%s/%s", jobType, c.name), func(t *testing.T) { + h := NewHarness(t) + + // Create node pools. + nodePools := []*structs.NodePool{ + poolNoSchedConfig, + poolWithMemOversub, + poolNoMemOversub, + } + h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools) + + // Create some nodes. + for i := 0; i < 3; i++ { + node := mock.Node() + node.NodePool = c.nodePool + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + + // Set global scheduler configuration. + h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ + MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled, + }) + + // Create test job. + var job *structs.Job + switch jobType { + case "batch": + job = mock.BatchJob() + case "service": + job = mock.Job() + case "sysbatch": + job = mock.SystemBatchJob() + case "system": + job = mock.SystemJob() + } + job.TaskGroups[0].Count = 1 + job.NodePool = c.nodePool + + task := job.TaskGroups[0].Tasks[0].Name + res := job.TaskGroups[0].Tasks[0].Resources + res.CPU = c.cpu + res.MemoryMB = c.memory + res.MemoryMaxMB = c.memoryMax + + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + var scheduler Factory + switch jobType { + case "batch": + scheduler = NewBatchScheduler + case "service": + scheduler = NewServiceScheduler + case "sysbatch": + scheduler = NewSysBatchScheduler + case "system": + scheduler = NewSystemScheduler + } + err := h.Process(scheduler, eval) + must.NoError(t, err) + + must.Len(t, 1, h.Plans) + + allocs, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false) + must.NoError(t, err) + + // Ensure all allocations placed + var expectedAllocCount int + switch jobType { + case "batch", "service": + expectedAllocCount = 1 + case "system", "sysbatch": + nodes, err := h.State.NodesByNodePool(nil, job.NodePool) + must.NoError(t, err) + expectedAllocCount = iterator.Len(nodes) + } + must.Len(t, expectedAllocCount, allocs) + alloc := allocs[0] + + // checking new resources field deprecated Resources fields + must.Eq(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares) + must.Eq(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB) + must.Eq(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB) + + // checking old deprecated Resources fields + must.Eq(t, c.cpu, alloc.TaskResources[task].CPU) + must.Eq(t, c.memory, alloc.TaskResources[task].MemoryMB) + must.Eq(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB) + + // check total resource fields - alloc.Resources deprecated field, no modern equivalent + must.Eq(t, c.cpu, alloc.Resources.CPU) + must.Eq(t, c.memory, alloc.Resources.MemoryMB) + must.Eq(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB) + }) + } + } +} diff --git a/scheduler/stack.go b/scheduler/stack.go index 3691c04dd..dffc9bf56 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -118,6 +118,13 @@ func (s *GenericStack) SetJob(job *structs.Job) { } } +// SetSchedulerConfiguration applies the given scheduler configuration to +// process nodes. Scheduler configuration values may change per job depending +// on the node pool being used. +func (s *GenericStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) { + s.binPack.SetSchedulerConfiguration(schedConfig) +} + func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode { // This block handles trying to select from preferred nodes if options specify them @@ -275,6 +282,11 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack { // Apply the bin packing, this depends on the resources needed // by a particular task group. Enable eviction as system jobs are high // priority. + // + // The scheduler configuration is read directly from state but only + // values that can't be specified per node pool should be used. Other + // values must be merged by calling schedConfig.WithNodePool() and set in + // the stack by calling SetSchedulerConfiguration(). _, schedConfig, _ := s.ctx.State().SchedulerConfig() enablePreemption := true if schedConfig != nil { @@ -286,7 +298,7 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack { } // Create binpack iterator - s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig) + s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0) // Apply score normalization s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack) @@ -311,6 +323,13 @@ func (s *SystemStack) SetJob(job *structs.Job) { } } +// SetSchedulerConfiguration applies the given scheduler configuration to +// process nodes. Scheduler configuration values may change per job depending +// on the node pool being used. +func (s *SystemStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) { + s.binPack.SetSchedulerConfiguration(schedConfig) +} + func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode { // Reset the binpack selector and context s.scoreNorm.Reset() @@ -412,8 +431,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Apply the bin packing, this depends on the resources needed // by a particular task group. - _, schedConfig, _ := ctx.State().SchedulerConfig() - s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig) + s.binPack = NewBinPackIterator(ctx, rankSource, false, 0) // Apply the job anti-affinity iterator. This is to avoid placing // multiple allocations on the same node for this job. diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index fdad37cda..be1b1328c 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -67,12 +67,16 @@ $ curl \ settings mentioned below. - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler - binpacks or spreads allocations on available nodes. + binpacks or spreads allocations on available nodes. Node pools may set + their own [`SchedulerAlgorithm`][np_sched_algo] value that takes precedence + over this global value. - - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When - `true`, tasks may exceed their reserved memory limit, if the client has excess - memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) - to take advantage of memory oversubscription. + - `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may + exceed their reserved memory limit, if the client has excess memory + capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) + to take advantage of memory oversubscription. Node pools may set their own + [`MemoryOversubscriptionEnabled`][np_mem_oversubs] value that takes + precedence over this global value. - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, @@ -148,12 +152,14 @@ server state is authoritative. - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler binpacks or spreads allocations on available nodes. Possible values are - `"binpack"` and `"spread"` + `"binpack"` and `"spread"`. This value may also be set per [node + pool][np_sched_algo]. -- `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When - `true`, tasks may exceed their reserved memory limit, if the client has excess - memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) - to take advantage of memory oversubscription. +- `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may + exceed their reserved memory limit, if the client has excess memory capacity. + Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max) + to take advantage of memory oversubscription. This value may also be set per + [node pool][np_mem_oversubs]. - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, @@ -200,3 +206,5 @@ server state is authoritative. - `Index` - Current Raft index when the request was received. [`default_scheduler_config`]: /nomad/docs/configuration/server#default_scheduler_config +[np_mem_oversubs]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled +[np_sched_algo]: /nomad/docs/other-specifications/node-pool#scheduler_algorithm diff --git a/website/content/docs/job-specification/resources.mdx b/website/content/docs/job-specification/resources.mdx index 3e2be15ac..c01ed1122 100644 --- a/website/content/docs/job-specification/resources.mdx +++ b/website/content/docs/job-specification/resources.mdx @@ -97,7 +97,7 @@ the task may exceed the limit and be interrupted; if the task memory is too high, the cluster is left underutilized. To help maximize cluster memory utilization while allowing a safety margin for -unexpected load spikes, Nomad 1.1. lets job authors set two separate memory +unexpected load spikes, Nomad allows job authors to set two separate memory limits: * `memory`: the reserve limit to represent the task’s typical memory usage — @@ -112,14 +112,15 @@ may kill oversubscribed tasks and reschedule them to other clients. The exact mechanism for memory pressure is specific to the task driver, operating system, and application runtime. -The new max limit attribute is currently supported by the official `docker`, -`exec`, and `java` task drivers. Consult the documentation of +The `memory_max` limit attribute is currently supported by the official +`docker`, `exec`, and `java` task drivers. Consult the documentation of community-supported task drivers for their memory oversubscription support. -Memory oversubscription is opt-in. Nomad operators can enable [Memory Oversubscription in the scheduler -configuration](/nomad/api-docs/operator/scheduler#update-scheduler-configuration). Enterprise customers can use [Resource -Quotas](/nomad/tutorials/governance-and-policy/quotas) to limit the memory -oversubscription. +Memory oversubscription is opt-in. Nomad operators can enable [Memory +Oversubscription in the scheduler configuration][api_sched_config]. Enterprise +customers can use [Resource Quotas][tutorial_quota] to limit the memory +oversubscription and enable or disable memory oversubscription per [node +pool][np_sched_config]. To avoid degrading the cluster experience, we recommend examining and monitoring resource utilization and considering the following suggestions: @@ -136,6 +137,9 @@ resource utilization and considering the following suggestions: 1GB in aggregate before the memory becomes contended and allocations get killed. +[api_sched_config]: /nomad/api-docs/operator/scheduler#update-scheduler-configuration [device]: /nomad/docs/job-specification/device 'Nomad device Job Specification' [docker_cpu]: /nomad/docs/drivers/docker#cpu [exec_cpu]: /nomad/docs/drivers/exec#cpu +[np_sched_config]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled +[tutorial_quota]: /nomad/tutorials/governance-and-policy/quotas