From 03e697a69d2ee0df46e12b1f1fbdcb09d4dd81cd Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 6 Dec 2021 15:20:34 -0500 Subject: [PATCH] scheduler: config option to reject job registration (#11610) During incident response, operators may find that automated processes elsewhere in the organization can be generating new workloads on Nomad clusters that are unable to handle the workload. This changeset adds a field to the `SchedulerConfiguration` API that causes all job registration calls to be rejected unless the request has a management ACL token. --- .changelog/11610.txt | 3 + api/operator.go | 4 + command/agent/http.go | 6 + command/agent/operator_endpoint.go | 1 + nomad/job_endpoint.go | 38 ++- nomad/job_endpoint_test.go | 274 ++++++++++++++++++ nomad/structs/errors.go | 2 + nomad/structs/operator.go | 5 + .../content/api-docs/operator/scheduler.mdx | 4 + website/content/docs/configuration/server.mdx | 2 + 10 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 .changelog/11610.txt diff --git a/.changelog/11610.txt b/.changelog/11610.txt new file mode 100644 index 000000000..1cf42c721 --- /dev/null +++ b/.changelog/11610.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token +``` diff --git a/api/operator.go b/api/operator.go index 7adf79f1c..439fabe05 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/http.go b/command/agent/http.go index 6371873d0..f56215f72 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -451,6 +451,9 @@ func errCodeFromHandler(err error) (int, string) { } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } @@ -487,6 +490,9 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque } else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) { errMsg = structs.ErrTokenNotFound.Error() code = 403 + } else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) { + errMsg = structs.ErrJobRegistrationDisabled.Error() + code = 403 } } diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c..a0f7575f4 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106..8d9119800 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,6 +176,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { @@ -1022,6 +1028,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1315,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// registrationsAreAllowed checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func registrationsAreAllowed(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1879,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := registrationsAreAllowed(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch is currently disabled for non-management ACL") + return structs.ErrJobRegistrationDisabled + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f..9fce33ca7 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,100 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -6646,6 +6740,95 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { require.Equal(t, structs.JobStatusDead, dispatchedStatus()) } +func TestJobEndpoint_Dispatch_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.BatchJob() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + dispatch := &structs.JobDispatchRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + dispatch.AuthToken = tc.token + var resp structs.JobDispatchResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatch, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t) @@ -6934,6 +7117,97 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { } +func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Message: "because of the load", + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected string + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: structs.ErrJobRegistrationDisabled.Error(), + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: structs.ErrPermissionDenied.Error(), + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + scale.AuthToken = tc.token + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + + if tc.errExpected != "" { + require.Error(t, err, "expected error") + require.EqualError(t, err, tc.errExpected) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale_Invalid(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 7a71708f9..10f8fac1c 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -13,6 +13,7 @@ const ( errNoRegionPath = "No path to region" errTokenNotFound = "ACL token not found" errPermissionDenied = "Permission denied" + errJobRegistrationDisabled = "Job registration, dispatch, and scale are disabled by the scheduler configuration" errNoNodeConn = "No path to node" errUnknownMethod = "Unknown rpc method" errUnknownNomadVersion = "Unable to determine Nomad version" @@ -46,6 +47,7 @@ var ( ErrNoRegionPath = errors.New(errNoRegionPath) ErrTokenNotFound = errors.New(errTokenNotFound) ErrPermissionDenied = errors.New(errPermissionDenied) + ErrJobRegistrationDisabled = errors.New(errJobRegistrationDisabled) ErrNoNodeConn = errors.New(errNoNodeConn) ErrUnknownMethod = errors.New(errUnknownMethod) ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31e..633afa6c3 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index cb44b2d0f..899bcadc0 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -45,6 +45,7 @@ $ curl \ "ModifyIndex": 5, "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": true, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -114,6 +115,7 @@ server state is authoritative. { "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": false, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -128,6 +130,8 @@ server state is authoritative. - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. +- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response. + - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 8e6eeaa43..dc7163b7c 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -312,6 +312,8 @@ server { memory_oversubscription_enabled = true + reject_job_registration = false + preemption_config { batch_scheduler_enabled = true system_scheduler_enabled = true