Allow configurable range of Job priorities (#16084)
This commit is contained in:
parent
279929df38
commit
4e9ec24b22
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
agent: Allow configurable range of Job priorities
|
||||
```
|
|
@ -938,7 +938,7 @@ func (j *Job) Canonicalize() {
|
|||
j.Namespace = pointerOf(DefaultNamespace)
|
||||
}
|
||||
if j.Priority == nil {
|
||||
j.Priority = pointerOf(50)
|
||||
j.Priority = pointerOf(0)
|
||||
}
|
||||
if j.Stop == nil {
|
||||
j.Stop = pointerOf(false)
|
||||
|
|
|
@ -279,7 +279,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Namespace: pointerOf(DefaultNamespace),
|
||||
Type: pointerOf("service"),
|
||||
ParentID: pointerOf(""),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -374,7 +374,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Namespace: pointerOf(DefaultNamespace),
|
||||
Type: pointerOf("batch"),
|
||||
ParentID: pointerOf(""),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -452,7 +452,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
ParentID: pointerOf("lol"),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -621,7 +621,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
ID: pointerOf("example_template"),
|
||||
Name: pointerOf("example_template"),
|
||||
ParentID: pointerOf(""),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
AllAtOnce: pointerOf(false),
|
||||
|
@ -791,7 +791,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Name: pointerOf("bar"),
|
||||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -882,7 +882,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
ParentID: pointerOf("lol"),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -1058,7 +1058,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
ParentID: pointerOf("lol"),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
@ -1229,7 +1229,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Region: pointerOf("global"),
|
||||
Type: pointerOf("service"),
|
||||
ParentID: pointerOf("lol"),
|
||||
Priority: pointerOf(50),
|
||||
Priority: pointerOf(0),
|
||||
AllAtOnce: pointerOf(false),
|
||||
ConsulToken: pointerOf(""),
|
||||
ConsulNamespace: pointerOf(""),
|
||||
|
|
|
@ -322,6 +322,23 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
|
|||
}
|
||||
}
|
||||
|
||||
jobMaxPriority := structs.JobDefaultMaxPriority
|
||||
if agentConfig.Server.JobMaxPriority != nil && *agentConfig.Server.JobMaxPriority != 0 {
|
||||
jobMaxPriority = *agentConfig.Server.JobMaxPriority
|
||||
if jobMaxPriority < structs.JobDefaultMaxPriority || jobMaxPriority > structs.JobMaxPriority {
|
||||
return nil, fmt.Errorf("job_max_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobMaxPriority, structs.JobDefaultMaxPriority, structs.JobMaxPriority)
|
||||
}
|
||||
}
|
||||
jobDefaultPriority := structs.JobDefaultPriority
|
||||
if agentConfig.Server.JobDefaultPriority != nil && *agentConfig.Server.JobDefaultPriority != 0 {
|
||||
jobDefaultPriority = *agentConfig.Server.JobDefaultPriority
|
||||
if jobDefaultPriority < structs.JobDefaultPriority || jobDefaultPriority >= jobMaxPriority {
|
||||
return nil, fmt.Errorf("job_default_priority cannot be %d. Must be between %d and %d", *agentConfig.Server.JobDefaultPriority, structs.JobDefaultPriority, jobMaxPriority)
|
||||
}
|
||||
}
|
||||
conf.JobMaxPriority = jobMaxPriority
|
||||
conf.JobDefaultPriority = jobDefaultPriority
|
||||
|
||||
// Set up the bind addresses
|
||||
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)
|
||||
if err != nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package agent
|
|||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
@ -1569,3 +1570,144 @@ func TestAgent_ProxyRPC_Dev(t *testing.T) {
|
|||
})
|
||||
|
||||
}
|
||||
|
||||
func TestAgent_ServerConfig_JobMaxPriority_Ok(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
cases := []struct {
|
||||
maxPriority *int
|
||||
jobMaxPriority int
|
||||
}{
|
||||
{
|
||||
maxPriority: nil,
|
||||
jobMaxPriority: 100,
|
||||
},
|
||||
|
||||
{
|
||||
maxPriority: pointer.Of(0),
|
||||
jobMaxPriority: 100,
|
||||
},
|
||||
{
|
||||
maxPriority: pointer.Of(100),
|
||||
jobMaxPriority: 100,
|
||||
},
|
||||
{
|
||||
maxPriority: pointer.Of(200),
|
||||
jobMaxPriority: 200,
|
||||
},
|
||||
{
|
||||
maxPriority: pointer.Of(32766),
|
||||
jobMaxPriority: 32766,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
v := "default"
|
||||
if tc.maxPriority != nil {
|
||||
v = fmt.Sprintf("%v", *tc.maxPriority)
|
||||
}
|
||||
t.Run(v, func(t *testing.T) {
|
||||
conf := DevConfig(nil)
|
||||
must.NoError(t, conf.normalizeAddrs())
|
||||
|
||||
conf.Server.JobMaxPriority = tc.maxPriority
|
||||
|
||||
serverConf, err := convertServerConfig(conf)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, tc.jobMaxPriority, serverConf.JobMaxPriority)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_ServerConfig_JobMaxPriority_Bad(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
cases := []int{
|
||||
99,
|
||||
math.MaxInt16,
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
|
||||
conf := DevConfig(nil)
|
||||
must.NoError(t, conf.normalizeAddrs())
|
||||
|
||||
conf.Server.JobMaxPriority = &tc
|
||||
|
||||
_, err := convertServerConfig(conf)
|
||||
must.Error(t, err)
|
||||
must.ErrorContains(t, err, "job_max_priority cannot be")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_ServerConfig_JobDefaultPriority_Ok(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
cases := []struct {
|
||||
defaultPriority *int
|
||||
jobDefaultPriority int
|
||||
}{
|
||||
{
|
||||
defaultPriority: nil,
|
||||
jobDefaultPriority: 50,
|
||||
},
|
||||
|
||||
{
|
||||
defaultPriority: pointer.Of(0),
|
||||
jobDefaultPriority: 50,
|
||||
},
|
||||
{
|
||||
defaultPriority: pointer.Of(50),
|
||||
jobDefaultPriority: 50,
|
||||
},
|
||||
{
|
||||
defaultPriority: pointer.Of(60),
|
||||
jobDefaultPriority: 60,
|
||||
},
|
||||
{
|
||||
defaultPriority: pointer.Of(99),
|
||||
jobDefaultPriority: 99,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
v := "default"
|
||||
if tc.defaultPriority != nil {
|
||||
v = fmt.Sprintf("%v", *tc.defaultPriority)
|
||||
}
|
||||
t.Run(v, func(t *testing.T) {
|
||||
conf := DevConfig(nil)
|
||||
must.NoError(t, conf.normalizeAddrs())
|
||||
|
||||
conf.Server.JobDefaultPriority = tc.defaultPriority
|
||||
|
||||
serverConf, err := convertServerConfig(conf)
|
||||
must.NoError(t, err)
|
||||
|
||||
must.Eq(t, tc.jobDefaultPriority, serverConf.JobDefaultPriority)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
cases := []int{
|
||||
49,
|
||||
100,
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
|
||||
conf := DevConfig(nil)
|
||||
must.NoError(t, conf.normalizeAddrs())
|
||||
|
||||
conf.Server.JobDefaultPriority = &tc
|
||||
|
||||
_, err := convertServerConfig(conf)
|
||||
must.Error(t, err)
|
||||
must.ErrorContains(t, err, "job_default_priority cannot be")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -648,6 +648,12 @@ type ServerConfig struct {
|
|||
// forced to send an entire snapshot. The value passed here is the initial
|
||||
// setting used. This can be tuned during operation using a hot reload.
|
||||
RaftTrailingLogs *int `hcl:"raft_trailing_logs"`
|
||||
|
||||
// JobDefaultPriority is the default Job priority if not specified.
|
||||
JobDefaultPriority *int `hcl:"job_default_priority"`
|
||||
|
||||
// JobMaxPriority is an upper bound on the Job priority.
|
||||
JobMaxPriority *int `hcl:"job_max_priority"`
|
||||
}
|
||||
|
||||
func (s *ServerConfig) Copy() *ServerConfig {
|
||||
|
@ -673,6 +679,8 @@ func (s *ServerConfig) Copy() *ServerConfig {
|
|||
ns.RaftSnapshotInterval = pointer.Copy(s.RaftSnapshotInterval)
|
||||
ns.RaftSnapshotThreshold = pointer.Copy(s.RaftSnapshotThreshold)
|
||||
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
|
||||
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
|
||||
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
|
||||
return &ns
|
||||
}
|
||||
|
||||
|
@ -1871,6 +1879,12 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
|
|||
if b.JobGCThreshold != "" {
|
||||
result.JobGCThreshold = b.JobGCThreshold
|
||||
}
|
||||
if b.JobDefaultPriority != nil {
|
||||
result.JobDefaultPriority = pointer.Of(*b.JobDefaultPriority)
|
||||
}
|
||||
if b.JobMaxPriority != nil {
|
||||
result.JobMaxPriority = pointer.Of(*b.JobMaxPriority)
|
||||
}
|
||||
if b.EvalGCThreshold != "" {
|
||||
result.EvalGCThreshold = b.EvalGCThreshold
|
||||
}
|
||||
|
|
|
@ -148,7 +148,9 @@ var basicConfig = &Config{
|
|||
ServiceSchedulerEnabled: true,
|
||||
},
|
||||
},
|
||||
LicensePath: "/tmp/nomad.hclic",
|
||||
LicensePath: "/tmp/nomad.hclic",
|
||||
JobDefaultPriority: pointer.Of(100),
|
||||
JobMaxPriority: pointer.Of(200),
|
||||
},
|
||||
ACL: &ACLConfig{
|
||||
Enabled: true,
|
||||
|
|
|
@ -358,6 +358,8 @@ func TestConfig_Merge(t *testing.T) {
|
|||
NodeThreshold: 100,
|
||||
NodeWindow: 11 * time.Minute,
|
||||
},
|
||||
JobMaxPriority: pointer.Of(200),
|
||||
JobDefaultPriority: pointer.Of(100),
|
||||
},
|
||||
ACL: &ACLConfig{
|
||||
Enabled: true,
|
||||
|
|
|
@ -195,7 +195,6 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ
|
|||
}
|
||||
|
||||
job := ApiJobToStructJob(validateRequest.Job)
|
||||
|
||||
args := structs.JobValidateRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -2066,11 +2067,11 @@ func TestJobs_ParsingWriteRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
sJob, sWriteReq := srv.apiJobAndRequestToStructs(job, req, apiReq)
|
||||
require.Equal(t, tc.expectedJobRegion, sJob.Region)
|
||||
require.Equal(t, tc.expectedNamespace, sJob.Namespace)
|
||||
require.Equal(t, tc.expectedNamespace, sWriteReq.Namespace)
|
||||
require.Equal(t, tc.expectedRequestRegion, sWriteReq.Region)
|
||||
require.Equal(t, tc.expectedToken, sWriteReq.AuthToken)
|
||||
must.Eq(t, tc.expectedJobRegion, sJob.Region)
|
||||
must.Eq(t, tc.expectedNamespace, sJob.Namespace)
|
||||
must.Eq(t, tc.expectedNamespace, sWriteReq.Namespace)
|
||||
must.Eq(t, tc.expectedRequestRegion, sWriteReq.Region)
|
||||
must.Eq(t, tc.expectedToken, sWriteReq.AuthToken)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -3546,16 +3547,17 @@ func TestHTTP_JobValidate_SystemMigrate(t *testing.T) {
|
|||
|
||||
// Make the HTTP request
|
||||
req, err := http.NewRequest("PUT", "/v1/validate/job", buf)
|
||||
require.NoError(t, err)
|
||||
must.NoError(t, err)
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
// Make the request
|
||||
obj, err := s.Server.ValidateJobRequest(respW, req)
|
||||
require.NoError(t, err)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Check the response
|
||||
resp := obj.(structs.JobValidateResponse)
|
||||
require.Contains(t, resp.Error, `Job type "system" does not allow migrate block`)
|
||||
must.StrContains(t, resp.Error, `Job type "system" does not allow migrate block`)
|
||||
must.Len(t, 1, resp.ValidationErrors)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -134,6 +134,8 @@ server {
|
|||
raft_multiplier = 4
|
||||
enable_event_broker = false
|
||||
event_buffer_size = 200
|
||||
job_default_priority = 100
|
||||
job_max_priority = 200
|
||||
|
||||
plan_rejection_tracker {
|
||||
enabled = true
|
||||
|
|
|
@ -321,7 +321,9 @@
|
|||
}]
|
||||
}],
|
||||
"upgrade_version": "0.8.0",
|
||||
"license_path": "/tmp/nomad.hclic"
|
||||
"license_path": "/tmp/nomad.hclic",
|
||||
"job_default_priority": 100,
|
||||
"job_max_priority": 200
|
||||
}
|
||||
],
|
||||
"syslog_facility": "LOCAL1",
|
||||
|
|
|
@ -404,6 +404,12 @@ type Config struct {
|
|||
// DeploymentQueryRateLimit is in queries per second and is used by the
|
||||
// DeploymentWatcher to throttle the amount of simultaneously deployments
|
||||
DeploymentQueryRateLimit float64
|
||||
|
||||
// JobDefaultPriority is the default Job priority if not specified.
|
||||
JobDefaultPriority int
|
||||
|
||||
// JobMaxPriority is an upper bound on the Job priority.
|
||||
JobMaxPriority int
|
||||
}
|
||||
|
||||
func (c *Config) Copy() *Config {
|
||||
|
@ -521,6 +527,8 @@ func DefaultConfig() *Config {
|
|||
},
|
||||
},
|
||||
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,
|
||||
JobDefaultPriority: structs.JobDefaultPriority,
|
||||
JobMaxPriority: structs.JobDefaultMaxPriority,
|
||||
}
|
||||
|
||||
// Enable all known schedulers by default
|
||||
|
|
|
@ -66,7 +66,7 @@ func NewJobEndpoints(s *Server, ctx *RPCContext) *Job {
|
|||
ctx: ctx,
|
||||
logger: s.logger.Named("job"),
|
||||
mutators: []jobMutator{
|
||||
jobCanonicalizer{},
|
||||
&jobCanonicalizer{srv: s},
|
||||
jobConnectHook{},
|
||||
jobExposeCheckHook{},
|
||||
jobImpliedConstraints{},
|
||||
|
@ -76,7 +76,7 @@ func NewJobEndpoints(s *Server, ctx *RPCContext) *Job {
|
|||
jobExposeCheckHook{},
|
||||
jobVaultHook{srv: s},
|
||||
jobNamespaceConstraintCheckHook{srv: s},
|
||||
jobValidate{},
|
||||
&jobValidate{srv: s},
|
||||
&memoryOversubscriptionValidate{srv: s},
|
||||
},
|
||||
}
|
||||
|
@ -990,7 +990,7 @@ func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *st
|
|||
continue
|
||||
}
|
||||
|
||||
priority := structs.JobDefaultPriority
|
||||
priority := j.srv.config.JobDefaultPriority
|
||||
jtype := structs.JobTypeService
|
||||
if job != nil {
|
||||
priority = job.Priority
|
||||
|
|
|
@ -126,14 +126,22 @@ func (j *Job) admissionValidators(origJob *structs.Job) ([]error, error) {
|
|||
|
||||
// jobCanonicalizer calls job.Canonicalize (sets defaults and initializes
|
||||
// fields) and returns any errors as warnings.
|
||||
type jobCanonicalizer struct{}
|
||||
type jobCanonicalizer struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
func (jobCanonicalizer) Name() string {
|
||||
func (c *jobCanonicalizer) Name() string {
|
||||
return "canonicalize"
|
||||
}
|
||||
|
||||
func (jobCanonicalizer) Mutate(job *structs.Job) (*structs.Job, []error, error) {
|
||||
func (c *jobCanonicalizer) Mutate(job *structs.Job) (*structs.Job, []error, error) {
|
||||
job.Canonicalize()
|
||||
|
||||
// If the job priority is not set, we fallback on the defaults specified in the server config
|
||||
if job.Priority == 0 {
|
||||
job.Priority = c.srv.GetConfig().JobDefaultPriority
|
||||
}
|
||||
|
||||
return job, nil, nil
|
||||
}
|
||||
|
||||
|
@ -253,13 +261,15 @@ func mutateConstraint(matcher constraintMatcher, taskGroup *structs.TaskGroup, c
|
|||
// jobValidate validates a Job and task drivers and returns an error if there is
|
||||
// a validation problem or if the Job is of a type a user is not allowed to
|
||||
// submit.
|
||||
type jobValidate struct{}
|
||||
type jobValidate struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
func (jobValidate) Name() string {
|
||||
func (*jobValidate) Name() string {
|
||||
return "validate"
|
||||
}
|
||||
|
||||
func (jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
|
||||
func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
|
||||
validationErrors := new(multierror.Error)
|
||||
if err := job.Validate(); err != nil {
|
||||
multierror.Append(validationErrors, err)
|
||||
|
@ -287,6 +297,10 @@ func (jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
|
|||
multierror.Append(validationErrors, fmt.Errorf("job can't be submitted with a payload, only dispatched"))
|
||||
}
|
||||
|
||||
if job.Priority < structs.JobMinPriority || job.Priority > v.srv.config.JobMaxPriority {
|
||||
multierror.Append(validationErrors, fmt.Errorf("job priority must be between [%d, %d]", structs.JobMinPriority, v.srv.config.JobMaxPriority))
|
||||
}
|
||||
|
||||
return warnings, validationErrors.ErrorOrNil()
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -717,3 +718,51 @@ func Test_jobImpliedConstraints_Mutate(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_jobCanonicalizer_Mutate(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
serverJobDefaultPriority := 100
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
inputJob *structs.Job
|
||||
expectedOutputJob *structs.Job
|
||||
}{
|
||||
{
|
||||
name: "no mutation",
|
||||
inputJob: &structs.Job{
|
||||
Namespace: "default",
|
||||
Datacenters: []string{"*"},
|
||||
Priority: 123,
|
||||
},
|
||||
expectedOutputJob: &structs.Job{
|
||||
Namespace: "default",
|
||||
Datacenters: []string{"*"},
|
||||
Priority: 123,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when priority is 0 mutate using the value present in the server config",
|
||||
inputJob: &structs.Job{
|
||||
Namespace: "default",
|
||||
Datacenters: []string{"*"},
|
||||
Priority: 0,
|
||||
},
|
||||
expectedOutputJob: &structs.Job{
|
||||
Namespace: "default",
|
||||
Datacenters: []string{"*"},
|
||||
Priority: serverJobDefaultPriority,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
impl := jobCanonicalizer{srv: &Server{config: &Config{JobDefaultPriority: serverJobDefaultPriority}}}
|
||||
actualJob, actualWarnings, actualError := impl.Mutate(tc.inputJob)
|
||||
must.Eq(t, tc.expectedOutputJob, actualJob)
|
||||
must.NoError(t, actualError)
|
||||
must.Nil(t, actualWarnings)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6651,6 +6651,61 @@ func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) {
|
|||
require.Equal("", validResp.Warnings)
|
||||
}
|
||||
|
||||
func TestJobEndpoint_ValidateJob_PriorityNotOk(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
validateJob := func(j *structs.Job) error {
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: j,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: j.Namespace,
|
||||
},
|
||||
}
|
||||
var resp structs.JobValidateResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.Validate", req, &resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.Error != "" {
|
||||
return errors.New(resp.Error)
|
||||
}
|
||||
|
||||
if len(resp.ValidationErrors) != 0 {
|
||||
return errors.New(strings.Join(resp.ValidationErrors, ","))
|
||||
}
|
||||
|
||||
if resp.Warnings != "" {
|
||||
return errors.New(resp.Warnings)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
t.Run("job with invalid min priority", func(t *testing.T) {
|
||||
j := mock.Job()
|
||||
j.Priority = -1
|
||||
|
||||
err := validateJob(j)
|
||||
must.Error(t, err)
|
||||
must.ErrorContains(t, err, "job priority must be between")
|
||||
})
|
||||
|
||||
t.Run("job with invalid max priority", func(t *testing.T) {
|
||||
j := mock.Job()
|
||||
j.Priority = 101
|
||||
|
||||
err := validateJob(j)
|
||||
must.Error(t, err)
|
||||
must.ErrorContains(t, err, "job priority must be between")
|
||||
})
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Dispatch_ACL(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
|
|
@ -15,7 +15,7 @@ func Job() *structs.Job {
|
|||
Name: "my-job",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
Type: structs.JobTypeService,
|
||||
Priority: 50,
|
||||
Priority: structs.JobDefaultPriority,
|
||||
AllAtOnce: false,
|
||||
Datacenters: []string{"dc1"},
|
||||
Constraints: []*structs.Constraint{
|
||||
|
@ -294,7 +294,7 @@ func BatchJob() *structs.Job {
|
|||
Name: "batch-job",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
Type: structs.JobTypeBatch,
|
||||
Priority: 50,
|
||||
Priority: structs.JobDefaultPriority,
|
||||
AllAtOnce: false,
|
||||
Datacenters: []string{"dc1"},
|
||||
TaskGroups: []*structs.TaskGroup{
|
||||
|
@ -360,7 +360,7 @@ func SystemJob() *structs.Job {
|
|||
ID: fmt.Sprintf("mock-system-%s", uuid.Generate()),
|
||||
Name: "my-job",
|
||||
Type: structs.JobTypeSystem,
|
||||
Priority: 100,
|
||||
Priority: structs.JobDefaultMaxPriority,
|
||||
AllAtOnce: false,
|
||||
Datacenters: []string{"dc1"},
|
||||
Constraints: []*structs.Constraint{
|
||||
|
@ -437,7 +437,7 @@ func MaxParallelJob() *structs.Job {
|
|||
Name: "my-job",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
Type: structs.JobTypeService,
|
||||
Priority: 50,
|
||||
Priority: structs.JobDefaultPriority,
|
||||
AllAtOnce: false,
|
||||
Datacenters: []string{"dc1"},
|
||||
Constraints: []*structs.Constraint{
|
||||
|
|
|
@ -4156,17 +4156,19 @@ const (
|
|||
// JobMinPriority is the minimum allowed priority
|
||||
JobMinPriority = 1
|
||||
|
||||
// JobDefaultPriority is the default priority if not
|
||||
// not specified.
|
||||
// JobDefaultPriority is the default priority if not specified.
|
||||
JobDefaultPriority = 50
|
||||
|
||||
// JobMaxPriority is the maximum allowed priority
|
||||
JobMaxPriority = 100
|
||||
// JobDefaultMaxPriority is the default maximum allowed priority
|
||||
JobDefaultMaxPriority = 100
|
||||
|
||||
// JobMaxPriority is the maximum allowed configuration value for maximum job priority
|
||||
JobMaxPriority = math.MaxInt16 - 1
|
||||
|
||||
// CoreJobPriority should be higher than any user
|
||||
// specified job so that it gets priority. This is important
|
||||
// for the system to remain healthy.
|
||||
CoreJobPriority = JobMaxPriority * 2
|
||||
CoreJobPriority = math.MaxInt16
|
||||
|
||||
// JobTrackedVersions is the number of historic job versions that are
|
||||
// kept.
|
||||
|
@ -4446,9 +4448,6 @@ func (j *Job) Validate() error {
|
|||
default:
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("Invalid job type: %q", j.Type))
|
||||
}
|
||||
if j.Priority < JobMinPriority || j.Priority > JobMaxPriority {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("Job priority must be between [%d, %d]", JobMinPriority, JobMaxPriority))
|
||||
}
|
||||
if len(j.Datacenters) == 0 && !j.IsMultiregion() {
|
||||
mErr.Errors = append(mErr.Errors, errors.New("Missing job datacenters"))
|
||||
} else {
|
||||
|
|
|
@ -30,7 +30,6 @@ func TestJob_Validate(t *testing.T) {
|
|||
"job region",
|
||||
"job type",
|
||||
"namespace",
|
||||
"priority",
|
||||
"task groups",
|
||||
)
|
||||
|
||||
|
@ -57,7 +56,7 @@ func TestJob_Validate(t *testing.T) {
|
|||
Namespace: "test",
|
||||
Name: "my-job",
|
||||
Type: JobTypeService,
|
||||
Priority: 50,
|
||||
Priority: JobDefaultPriority,
|
||||
Datacenters: []string{"*"},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
|
@ -370,7 +369,7 @@ func testJob() *Job {
|
|||
Namespace: "test",
|
||||
Name: "my-job",
|
||||
Type: JobTypeService,
|
||||
Priority: 50,
|
||||
Priority: JobDefaultPriority,
|
||||
AllAtOnce: false,
|
||||
Datacenters: []string{"*"},
|
||||
Constraints: []*Constraint{
|
||||
|
|
|
@ -253,6 +253,12 @@ server {
|
|||
- `search` <code>([search][search]: nil)</code> - Specifies configuration parameters
|
||||
for the Nomad search API.
|
||||
|
||||
- `job_max_priority` `(int: 100)` - Specifies the maximum priority that can be assigned to a job.
|
||||
A valid value must be between `100` and `32766`.
|
||||
|
||||
- `job_default_priority` `(int: 50)` - Specifies the default priority assigned to a job.
|
||||
A valid value must be between `50` and `job_max_priority`.
|
||||
|
||||
### Deprecated Parameters
|
||||
|
||||
- `retry_join` `(array<string>: [])` - Specifies a list of server addresses to
|
||||
|
|
|
@ -105,8 +105,10 @@ job "docs" {
|
|||
at fixed times, dates or intervals.
|
||||
|
||||
- `priority` `(int: 50)` - Specifies the job priority which is used to
|
||||
prioritize scheduling and access to resources. Must be between 1 and 100
|
||||
inclusively, with a larger value corresponding to a higher priority.
|
||||
prioritize scheduling and access to resources.
|
||||
Must be between 1 and [`job_max_priority`] inclusively,
|
||||
with a larger value corresponding to a higher priority.
|
||||
If value 0 is provided this will fallback to [`job_default_priority`].
|
||||
Priority only has an effect when job preemption is enabled.
|
||||
It does not have an effect on which of multiple pending jobs is run first.
|
||||
|
||||
|
@ -257,3 +259,5 @@ $ VAULT_TOKEN="..." nomad job run example.nomad.hcl
|
|||
[task]: /nomad/docs/job-specification/task 'Nomad task Job Specification'
|
||||
[update]: /nomad/docs/job-specification/update 'Nomad update Job Specification'
|
||||
[vault]: /nomad/docs/job-specification/vault 'Nomad vault Job Specification'
|
||||
[`job_max_priority`]: /nomad/docs/configuration/server#job_max_priority
|
||||
[`job_default_priority`]: /nomad/docs/configuration/server#job_default_priority
|
||||
|
|
Loading…
Reference in New Issue