scheduler: config option to reject job registration (#11610)

During incident response, operators may find that automated processes
elsewhere in the organization can be generating new workloads on Nomad
clusters that are unable to handle the workload. This changeset adds a
field to the `SchedulerConfiguration` API that causes all job
registration calls to be rejected unless the request has a management
ACL token.
This commit is contained in:
Tim Gross 2021-12-06 15:20:34 -05:00 committed by GitHub
parent a16f383d82
commit 03e697a69d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 337 additions and 2 deletions

3
.changelog/11610.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token
```

View File

@ -129,6 +129,10 @@ type SchedulerConfiguration struct {
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool
// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64

View File

@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) {
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}
@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
errMsg = structs.ErrTokenNotFound.Error()
code = 403
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
errMsg = structs.ErrJobRegistrationDisabled.Error()
code = 403
}
}

View File

@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
args.Config = structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,

View File

@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)
// Check job submission permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
@ -175,6 +176,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}
if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job registration is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}
// Lookup the job
snap, err := j.srv.State().Snapshot()
if err != nil {
@ -1022,6 +1028,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
}
}
if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job scaling is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}
// Validate args
err = args.Validate()
if err != nil {
@ -1304,6 +1315,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string)
return r, nil
}
// registrationsAreAllowed checks that the scheduler is not in
// RejectJobRegistration mode for load-shedding.
func registrationsAreAllowed(aclObj *acl.ACL, state *state.StateStore) (bool, error) {
_, cfg, err := state.SchedulerConfig()
if err != nil {
return false, err
}
if cfg != nil && !cfg.RejectJobRegistration {
return true, nil
}
if aclObj != nil && aclObj.IsManagement() {
return true, nil
}
return false, nil
}
// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
@ -1852,12 +1879,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now())
// Check for submit-job permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
var aclObj *acl.ACL
var err error
if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) {
return structs.ErrPermissionDenied
}
if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil {
j.logger.Warn("job dispatch is currently disabled for non-management ACL")
return structs.ErrJobRegistrationDisabled
}
// Lookup the parameterized job
if args.JobID == "" {
return fmt.Errorf("missing parameterized job ID")

View File

@ -2248,6 +2248,100 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) {
assert.NotNil(out, "expected job")
}
func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) {
t.Parallel()
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write",
mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)).
SecretID
cases := []struct {
name string
token string
rejectEnabled bool
errExpected string
}{
{
name: "reject disabled, with a submit token",
token: submitJobToken,
rejectEnabled: false,
},
{
name: "reject enabled, with a submit token",
token: submitJobToken,
rejectEnabled: true,
errExpected: structs.ErrJobRegistrationDisabled.Error(),
},
{
name: "reject enabled, without a token",
token: "",
rejectEnabled: true,
errExpected: structs.ErrPermissionDenied.Error(),
},
{
name: "reject enabled, with a management token",
token: root.SecretID,
rejectEnabled: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
req.AuthToken = tc.token
cfgReq := &structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
RejectJobRegistration: tc.rejectEnabled,
},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
cfgReq.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration",
cfgReq, &structs.SchedulerSetConfigurationResponse{},
)
require.NoError(t, err)
var resp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if tc.errExpected != "" {
require.Error(t, err, "expected error")
require.EqualError(t, err, tc.errExpected)
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
require.NoError(t, err)
require.Nil(t, out)
} else {
require.NoError(t, err, "unexpected error")
require.NotEqual(t, 0, resp.Index)
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.Namespace, job.ID)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, job.TaskGroups, out.TaskGroups)
}
})
}
}
func TestJobEndpoint_Revert(t *testing.T) {
t.Parallel()
@ -6646,6 +6740,95 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) {
require.Equal(t, structs.JobStatusDead, dispatchedStatus())
}
func TestJobEndpoint_Dispatch_ACL_RejectedBySchedulerConfig(t *testing.T) {
t.Parallel()
s1, root, cleanupS1 := TestACLServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
job := mock.BatchJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
require.NoError(t, err)
dispatch := &structs.JobDispatchRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write",
mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)).
SecretID
cases := []struct {
name string
token string
rejectEnabled bool
errExpected string
}{
{
name: "reject disabled, with a submit token",
token: submitJobToken,
rejectEnabled: false,
},
{
name: "reject enabled, with a submit token",
token: submitJobToken,
rejectEnabled: true,
errExpected: structs.ErrJobRegistrationDisabled.Error(),
},
{
name: "reject enabled, without a token",
token: "",
rejectEnabled: true,
errExpected: structs.ErrPermissionDenied.Error(),
},
{
name: "reject enabled, with a management token",
token: root.SecretID,
rejectEnabled: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfgReq := &structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
RejectJobRegistration: tc.rejectEnabled,
},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
cfgReq.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration",
cfgReq, &structs.SchedulerSetConfigurationResponse{},
)
require.NoError(t, err)
dispatch.AuthToken = tc.token
var resp structs.JobDispatchResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatch, &resp)
if tc.errExpected != "" {
require.Error(t, err, "expected error")
require.EqualError(t, err, tc.errExpected)
} else {
require.NoError(t, err, "unexpected error")
require.NotEqual(t, 0, resp.Index)
}
})
}
}
func TestJobEndpoint_Scale(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -6934,6 +7117,97 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) {
}
func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) {
t.Parallel()
s1, root, cleanupS1 := TestACLServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
job := mock.Job()
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
require.NoError(t, err)
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Message: "because of the load",
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write",
mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)).
SecretID
cases := []struct {
name string
token string
rejectEnabled bool
errExpected string
}{
{
name: "reject disabled, with a submit token",
token: submitJobToken,
rejectEnabled: false,
},
{
name: "reject enabled, with a submit token",
token: submitJobToken,
rejectEnabled: true,
errExpected: structs.ErrJobRegistrationDisabled.Error(),
},
{
name: "reject enabled, without a token",
token: "",
rejectEnabled: true,
errExpected: structs.ErrPermissionDenied.Error(),
},
{
name: "reject enabled, with a management token",
token: root.SecretID,
rejectEnabled: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfgReq := &structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{
RejectJobRegistration: tc.rejectEnabled,
},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
cfgReq.AuthToken = root.SecretID
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration",
cfgReq, &structs.SchedulerSetConfigurationResponse{},
)
require.NoError(t, err)
var resp structs.JobRegisterResponse
scale.AuthToken = tc.token
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
if tc.errExpected != "" {
require.Error(t, err, "expected error")
require.EqualError(t, err, tc.errExpected)
} else {
require.NoError(t, err, "unexpected error")
require.NotEqual(t, 0, resp.Index)
}
})
}
}
func TestJobEndpoint_Scale_Invalid(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@ -13,6 +13,7 @@ const (
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errJobRegistrationDisabled = "Job registration, dispatch, and scale are disabled by the scheduler configuration"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
@ -46,6 +47,7 @@ var (
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrJobRegistrationDisabled = errors.New(errJobRegistrationDisabled)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)

View File

@ -149,8 +149,13 @@ type SchedulerConfiguration struct {
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig `hcl:"preemption_config"`
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"`
// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool `hcl:"reject_job_registration"`
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64

View File

@ -45,6 +45,7 @@ $ curl \
"ModifyIndex": 5,
"SchedulerAlgorithm": "spread",
"MemoryOversubscriptionEnabled": true,
"RejectJobRegistration": false,
"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"BatchSchedulerEnabled": false,
@ -114,6 +115,7 @@ server state is authoritative.
{
"SchedulerAlgorithm": "spread",
"MemoryOversubscriptionEnabled": false,
"RejectJobRegistration": false,
"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"BatchSchedulerEnabled": false,
@ -128,6 +130,8 @@ server state is authoritative.
- `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription.
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response.
- `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for
various schedulers.

View File

@ -312,6 +312,8 @@ server {
memory_oversubscription_enabled = true
reject_job_registration = false
preemption_config {
batch_scheduler_enabled = true
system_scheduler_enabled = true