core: allow pausing and un-pausing of leader broker routine (#13045)

* core: allow pause/un-pause of eval broker on region leader.

* agent: add ability to pause eval broker via scheduler config.

* cli: add operator scheduler commands to interact with config.

* api: add ability to pause eval broker via scheduler config

* e2e: add operator scheduler test for eval broker pause.

* docs: include new opertor scheduler CLI and pause eval API info.
This commit is contained in:
James Rasell 2022-07-06 16:13:48 +02:00 committed by GitHub
parent f227855de1
commit 181b247384
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1388 additions and 75 deletions

7
.changelog/13045.txt Normal file
View file

@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `scheduler get-config` and `scheduler set-config` commands to the operator CLI
```
```release-note:improvement
core: Added the ability to pause and un-pause the eval broker and blocked eval broker
```

View file

@ -134,6 +134,10 @@ type SchedulerConfiguration struct {
// management ACL token // management ACL token
RejectJobRegistration bool RejectJobRegistration bool
// PauseEvalBroker stops the leader evaluation broker process from running
// until the configuration is updated and written to the Nomad servers.
PauseEvalBroker bool
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration. // CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64 CreateIndex uint64
ModifyIndex uint64 ModifyIndex uint64

View file

@ -5,6 +5,7 @@ import (
"testing" "testing"
"github.com/hashicorp/nomad/api/internal/testutil" "github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/require"
) )
func TestOperator_RaftGetConfiguration(t *testing.T) { func TestOperator_RaftGetConfiguration(t *testing.T) {
@ -53,3 +54,46 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func TestOperator_SchedulerGetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.NotEmpty(t, schedulerConfig)
}
func TestOperator_SchedulerSetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
newSchedulerConfig := SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
PreemptionConfig: PreemptionConfig{
SystemSchedulerEnabled: true,
SysBatchSchedulerEnabled: true,
BatchSchedulerEnabled: true,
ServiceSchedulerEnabled: true,
},
MemoryOversubscriptionEnabled: true,
RejectJobRegistration: true,
PauseEvalBroker: true,
}
schedulerConfigUpdateResp, _, err := c.Operator().SchedulerSetConfiguration(&newSchedulerConfig, nil)
require.Nil(t, err)
require.True(t, schedulerConfigUpdateResp.Updated)
// We can't exactly predict the query meta responses, so we test fields
// individually.
schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.Equal(t, schedulerConfig.SchedulerConfig.SchedulerAlgorithm, SchedulerAlgorithmSpread)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
require.True(t, schedulerConfig.SchedulerConfig.RejectJobRegistration)
require.True(t, schedulerConfig.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Equal(t, newSchedulerConfig.PreemptionConfig, schedulerConfig.SchedulerConfig.PreemptionConfig)
}

View file

@ -262,6 +262,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
RejectJobRegistration: conf.RejectJobRegistration, RejectJobRegistration: conf.RejectJobRegistration,
PauseEvalBroker: conf.PauseEvalBroker,
PreemptionConfig: structs.PreemptionConfig{ PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,

View file

@ -272,32 +272,32 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
func TestOperator_SchedulerGetConfiguration(t *testing.T) { func TestOperator_SchedulerGetConfiguration(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil) body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body) req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err) require.Nil(t, err)
require.Equal(200, resp.Code) require.Equal(t, 200, resp.Code)
out, ok := obj.(structs.SchedulerConfigurationResponse) out, ok := obj.(structs.SchedulerConfigurationResponse)
require.True(ok) require.True(t, ok)
// Only system jobs can preempt other jobs by default. // Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) require.True(t, out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) require.False(t, out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) require.False(t, out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) require.False(t, out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled) require.False(t, out.SchedulerConfig.MemoryOversubscriptionEnabled)
require.False(t, out.SchedulerConfig.PauseEvalBroker)
}) })
} }
func TestOperator_SchedulerSetConfiguration(t *testing.T) { func TestOperator_SchedulerSetConfiguration(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(` body := bytes.NewBuffer([]byte(`
{ {
"MemoryOversubscriptionEnabled": true, "MemoryOversubscriptionEnabled": true,
"PauseEvalBroker": true,
"PreemptionConfig": { "PreemptionConfig": {
"SystemSchedulerEnabled": true, "SystemSchedulerEnabled": true,
"ServiceSchedulerEnabled": true "ServiceSchedulerEnabled": true
@ -306,11 +306,11 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body) req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req) setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err) require.Nil(t, err)
require.Equal(200, resp.Code) require.Equal(t, 200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse) schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok) require.True(t, ok)
require.NotZero(schedSetResp.Index) require.NotZero(t, schedSetResp.Index)
args := structs.GenericRequest{ args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
@ -320,12 +320,13 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
var reply structs.SchedulerConfigurationResponse var reply structs.SchedulerConfigurationResponse
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply) err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err) require.Nil(t, err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) require.True(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) require.False(t, reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) require.False(t, reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) require.True(t, reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled) require.True(t, reply.SchedulerConfig.MemoryOversubscriptionEnabled)
require.True(t, reply.SchedulerConfig.PauseEvalBroker)
}) })
} }

View file

@ -571,7 +571,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta, Meta: meta,
}, nil }, nil
}, },
"operator scheduler": func() (cli.Command, error) {
return &OperatorSchedulerCommand{
Meta: meta,
}, nil
},
"operator scheduler get-config": func() (cli.Command, error) {
return &OperatorSchedulerGetConfig{
Meta: meta,
}, nil
},
"operator scheduler set-config": func() (cli.Command, error) {
return &OperatorSchedulerSetConfig{
Meta: meta,
}, nil
},
"operator snapshot": func() (cli.Command, error) { "operator snapshot": func() (cli.Command, error) {
return &OperatorSnapshotCommand{ return &OperatorSnapshotCommand{
Meta: meta, Meta: meta,

View file

@ -186,6 +186,9 @@ func (m *monitor) monitor(evalID string) int {
// Add the initial pending state // Add the initial pending state
m.update(newEvalState()) m.update(newEvalState())
m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(evalID, m.length)))
for { for {
// Query the evaluation // Query the evaluation
eval, _, err := m.client.Evaluations().Info(evalID, nil) eval, _, err := m.client.Evaluations().Info(evalID, nil)
@ -194,9 +197,6 @@ func (m *monitor) monitor(evalID string) int {
return 1 return 1
} }
m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q",
formatTime(time.Now()), limit(eval.ID, m.length)))
// Create the new eval state. // Create the new eval state.
state := newEvalState() state := newEvalState()
state.status = eval.Status state.status = eval.Status

View file

@ -0,0 +1,42 @@
package command
import (
"strings"
"github.com/mitchellh/cli"
)
// Ensure OperatorSchedulerCommand satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerCommand{}
type OperatorSchedulerCommand struct {
Meta
}
func (o *OperatorSchedulerCommand) Help() string {
helpText := `
Usage: nomad operator scheduler <subcommand> [options]
This command groups subcommands for interacting with Nomad's scheduler
subsystem.
Get the scheduler configuration:
$ nomad operator scheduler get-config
Set the scheduler to use the spread algorithm:
$ nomad operator scheduler set-config -scheduler-algorithm=spread
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
}
func (o *OperatorSchedulerCommand) Synopsis() string {
return "Provides access to the scheduler configuration"
}
func (o *OperatorSchedulerCommand) Name() string { return "operator scheduler" }
func (o *OperatorSchedulerCommand) Run(_ []string) int { return cli.RunResultHelp }

View file

@ -0,0 +1,118 @@
package command
import (
"fmt"
"strings"
"github.com/mitchellh/cli"
"github.com/posener/complete"
)
// Ensure OperatorSchedulerGetConfig satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerGetConfig{}
type OperatorSchedulerGetConfig struct {
Meta
json bool
tmpl string
}
func (o *OperatorSchedulerGetConfig) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
},
)
}
func (o *OperatorSchedulerGetConfig) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (o *OperatorSchedulerGetConfig) Name() string { return "operator scheduler get-config" }
func (o *OperatorSchedulerGetConfig) Run(args []string) int {
flags := o.Meta.FlagSet("get-config", FlagSetClient)
flags.BoolVar(&o.json, "json", false, "")
flags.StringVar(&o.tmpl, "t", "", "")
flags.Usage = func() { o.Ui.Output(o.Help()) }
if err := flags.Parse(args); err != nil {
return 1
}
// Set up a client.
client, err := o.Meta.Client()
if err != nil {
o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Fetch the current configuration.
resp, _, err := client.Operator().SchedulerGetConfiguration(nil)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err))
return 1
}
// If the user has specified to output the scheduler config as JSON or
// using a template, perform this action for the entire object and exit the
// command.
if o.json || len(o.tmpl) > 0 {
out, err := Format(o.json, o.tmpl, resp)
if err != nil {
o.Ui.Error(err.Error())
return 1
}
o.Ui.Output(out)
return 0
}
schedConfig := resp.SchedulerConfig
// Output the information.
o.Ui.Output(formatKV([]string{
fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm),
fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled),
fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration),
fmt.Sprintf("Pause Eval Broker|%v", schedConfig.PauseEvalBroker),
fmt.Sprintf("Preemption System Scheduler|%v", schedConfig.PreemptionConfig.SystemSchedulerEnabled),
fmt.Sprintf("Preemption Service Scheduler|%v", schedConfig.PreemptionConfig.ServiceSchedulerEnabled),
fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled),
fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled),
fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex),
}))
return 0
}
func (o *OperatorSchedulerGetConfig) Synopsis() string {
return "Display the current scheduler configuration"
}
func (o *OperatorSchedulerGetConfig) Help() string {
helpText := `
Usage: nomad operator scheduler get-config [options]
Displays the current scheduler configuration.
If ACLs are enabled, this command requires a token with the 'operator:read'
capability.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Scheduler Get Config Options:
-json
Output the scheduler config in its JSON format.
-t
Format and display the scheduler config using a Go template.
`
return strings.TrimSpace(helpText)
}

View file

@ -0,0 +1,47 @@
package command
import (
"encoding/json"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestOperatorSchedulerGetConfig_Run(t *testing.T) {
ci.Parallel(t)
srv, _, addr := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
c := &OperatorSchedulerGetConfig{Meta: Meta{Ui: ui}}
// Run the command, so we get the default output and test this.
require.EqualValues(t, 0, c.Run([]string{"-address=" + addr}))
s := ui.OutputWriter.String()
require.Contains(t, s, "Scheduler Algorithm = binpack")
require.Contains(t, s, "Preemption SysBatch Scheduler = false")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Request JSON output and test.
require.EqualValues(t, 0, c.Run([]string{"-address=" + addr, "-json"}))
s = ui.OutputWriter.String()
var js api.SchedulerConfiguration
require.NoError(t, json.Unmarshal([]byte(s), &js))
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Request a template output and test.
require.EqualValues(t, 0, c.Run([]string{"-address=" + addr, "-t='{{printf \"%s!!!\" .SchedulerConfig.SchedulerAlgorithm}}'"}))
require.Contains(t, ui.OutputWriter.String(), "binpack!!!")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Test an unsupported flag.
require.EqualValues(t, 1, c.Run([]string{"-address=" + addr, "-yaml"}))
require.Contains(t, ui.OutputWriter.String(), "Usage: nomad operator scheduler get-config")
}

View file

@ -0,0 +1,210 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api"
flagHelper "github.com/hashicorp/nomad/helper/flags"
"github.com/mitchellh/cli"
"github.com/posener/complete"
)
// Ensure OperatorSchedulerSetConfig satisfies the cli.Command interface.
var _ cli.Command = &OperatorSchedulerSetConfig{}
type OperatorSchedulerSetConfig struct {
Meta
// The scheduler configuration flags allow us to tell whether the user set
// a value or not. This means we can safely merge the current configuration
// with user supplied, selective updates.
checkIndex string
schedulerAlgorithm string
memoryOversubscription flagHelper.BoolValue
rejectJobRegistration flagHelper.BoolValue
pauseEvalBroker flagHelper.BoolValue
preemptBatchScheduler flagHelper.BoolValue
preemptServiceScheduler flagHelper.BoolValue
preemptSysBatchScheduler flagHelper.BoolValue
preemptSystemScheduler flagHelper.BoolValue
}
func (o *OperatorSchedulerSetConfig) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-check-index": complete.PredictAnything,
"-scheduler-algorithm": complete.PredictSet(
string(api.SchedulerAlgorithmBinpack),
string(api.SchedulerAlgorithmSpread),
),
"-memory-oversubscription": complete.PredictSet("true", "false"),
"-reject-job-registration": complete.PredictSet("true", "false"),
"-pause-eval-broker": complete.PredictSet("true", "false"),
"-preempt-batch-scheduler": complete.PredictSet("true", "false"),
"-preempt-service-scheduler": complete.PredictSet("true", "false"),
"-preempt-sysbatch-scheduler": complete.PredictSet("true", "false"),
"-preempt-system-scheduler": complete.PredictSet("true", "false"),
},
)
}
func (o *OperatorSchedulerSetConfig) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (o *OperatorSchedulerSetConfig) Name() string { return "operator scheduler set-config" }
func (o *OperatorSchedulerSetConfig) Run(args []string) int {
flags := o.Meta.FlagSet("set-config", FlagSetClient)
flags.Usage = func() { o.Ui.Output(o.Help()) }
flags.StringVar(&o.checkIndex, "check-index", "", "")
flags.StringVar(&o.schedulerAlgorithm, "scheduler-algorithm", "", "")
flags.Var(&o.memoryOversubscription, "memory-oversubscription", "")
flags.Var(&o.rejectJobRegistration, "reject-job-registration", "")
flags.Var(&o.pauseEvalBroker, "pause-eval-broker", "")
flags.Var(&o.preemptBatchScheduler, "preempt-batch-scheduler", "")
flags.Var(&o.preemptServiceScheduler, "preempt-service-scheduler", "")
flags.Var(&o.preemptSysBatchScheduler, "preempt-sysbatch-scheduler", "")
flags.Var(&o.preemptSystemScheduler, "preempt-system-scheduler", "")
if err := flags.Parse(args); err != nil {
return 1
}
// Set up a client.
client, err := o.Meta.Client()
if err != nil {
o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Check that we got no arguments.
args = flags.Args()
if l := len(args); l != 0 {
o.Ui.Error("This command takes no arguments")
o.Ui.Error(commandErrorText(o))
return 1
}
// Convert the check index string and handle any errors before adding this
// to our request. This parsing handles empty values correctly.
checkIndex, _, err := parseCheckIndex(o.checkIndex)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", o.checkIndex, err))
return 1
}
// Fetch the current configuration. This will be used as a base to merge
// user configuration onto.
resp, _, err := client.Operator().SchedulerGetConfiguration(nil)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error querying for scheduler configuration: %s", err))
return 1
}
if checkIndex > 0 && resp.SchedulerConfig.ModifyIndex != checkIndex {
errMsg := fmt.Sprintf("check-index %v does not match does not match current state value %v",
checkIndex, resp.SchedulerConfig.ModifyIndex)
o.Ui.Error(fmt.Sprintf("Error performing check index set: %s", errMsg))
return 1
}
schedulerConfig := resp.SchedulerConfig
// Overwrite the modification index if the user supplied one, otherwise we
// use what was included within the read response.
if checkIndex > 0 {
schedulerConfig.ModifyIndex = checkIndex
}
// Merge the current configuration with any values set by the operator.
if o.schedulerAlgorithm != "" {
schedulerConfig.SchedulerAlgorithm = api.SchedulerAlgorithm(o.schedulerAlgorithm)
}
o.memoryOversubscription.Merge(&schedulerConfig.MemoryOversubscriptionEnabled)
o.rejectJobRegistration.Merge(&schedulerConfig.RejectJobRegistration)
o.pauseEvalBroker.Merge(&schedulerConfig.PauseEvalBroker)
o.preemptBatchScheduler.Merge(&schedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
o.preemptServiceScheduler.Merge(&schedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
o.preemptSysBatchScheduler.Merge(&schedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
o.preemptSystemScheduler.Merge(&schedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
// Check-and-set the new configuration.
result, _, err := client.Operator().SchedulerCASConfiguration(schedulerConfig, nil)
if err != nil {
o.Ui.Error(fmt.Sprintf("Error setting scheduler configuration: %s", err))
return 1
}
if result.Updated {
o.Ui.Output("Scheduler configuration updated!")
return 0
}
o.Ui.Output("Scheduler configuration could not be atomically updated, please try again")
return 1
}
func (o *OperatorSchedulerSetConfig) Synopsis() string {
return "Modify the current scheduler configuration"
}
func (o *OperatorSchedulerSetConfig) Help() string {
helpText := `
Usage: nomad operator scheduler set-config [options]
Modifies the current scheduler configuration.
If ACLs are enabled, this command requires a token with the 'operator:write'
capability.
General Options:
` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + `
Scheduler Set Config Options:
-check-index
If set, the scheduler config is only updated if the passed modify index
matches the current server side version. If a non-zero value is passed, it
ensures that the scheduler config is being updated from a known state.
-scheduler-algorithm=["binpack"|"spread"]
Specifies whether scheduler binpacks or spreads allocations on available
nodes.
-memory-oversubscription=[true|false]
When true, tasks may exceed their reserved memory limit, if the client has
excess memory capacity. Tasks must specify memory_max to take advantage of
memory oversubscription.
-reject-job-registration=[true|false]
When true, the server will return permission denied errors for job registration,
job dispatch, and job scale APIs, unless the ACL token for the request is a
management token. If ACLs are disabled, no user will be able to register jobs.
This allows operators to shed load from automated processes during incident
response.
-pause-eval-broker=[true|false]
When set to true, the eval broker which usually runs on the leader will be
disabled. This will prevent the scheduler workers from receiving new work.
-preempt-batch-scheduler=[true|false]
Specifies whether preemption for batch jobs is enabled. Note that if this
is set to true, then batch jobs can preempt any other jobs.
-preempt-service-scheduler=[true|false]
Specifies whether preemption for service jobs is enabled. Note that if this
is set to true, then service jobs can preempt any other jobs.
-preempt-sysbatch-scheduler=[true|false]
Specifies whether preemption for system batch jobs is enabled. Note that if
this is set to true, then system batch jobs can preempt any other jobs.
-preempt-system-scheduler=[true|false]
Specifies whether preemption for system jobs is enabled. Note that if this
is set to true, then system jobs can preempt any other jobs.
`
return strings.TrimSpace(helpText)
}

View file

@ -0,0 +1,108 @@
package command
import (
"strconv"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestOperatorSchedulerSetConfig_Run(t *testing.T) {
ci.Parallel(t)
srv, _, addr := testServer(t, false, nil)
defer srv.Shutdown()
ui := cli.NewMockUi()
c := &OperatorSchedulerSetConfig{Meta: Meta{Ui: ui}}
bootstrappedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.NotEmpty(t, bootstrappedConfig.SchedulerConfig)
// Run the command with zero value and ensure the configuration does not
// change.
require.EqualValues(t, 0, c.Run([]string{"-address=" + addr}))
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Read the configuration again and test that nothing has changed which
// ensures our empty flags are working correctly.
nonModifiedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
schedulerConfigEquals(t, bootstrappedConfig.SchedulerConfig, nonModifiedConfig.SchedulerConfig)
// Modify every configuration parameter using the flags. This ensures the
// merging is working correctly and that operators can control the entire
// object via the CLI.
modifyingArgs := []string{
"-address=" + addr,
"-scheduler-algorithm=spread",
"-pause-eval-broker=true",
"-memory-oversubscription=true",
"-reject-job-registration=true",
"-preempt-batch-scheduler=true",
"-preempt-service-scheduler=true",
"-preempt-sysbatch-scheduler=true",
"-preempt-system-scheduler=false",
}
require.EqualValues(t, 0, c.Run(modifyingArgs))
s := ui.OutputWriter.String()
require.Contains(t, s, "Scheduler configuration updated!")
modifiedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
schedulerConfigEquals(t, &api.SchedulerConfiguration{
SchedulerAlgorithm: "spread",
PreemptionConfig: api.PreemptionConfig{
SystemSchedulerEnabled: false,
SysBatchSchedulerEnabled: true,
BatchSchedulerEnabled: true,
ServiceSchedulerEnabled: true,
},
MemoryOversubscriptionEnabled: true,
RejectJobRegistration: true,
PauseEvalBroker: true,
}, modifiedConfig.SchedulerConfig)
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Make a Freudian slip with one of the flags to ensure the usage is
// returned.
require.EqualValues(t, 1, c.Run([]string{"-address=" + addr, "-pause-evil-broker=true"}))
require.Contains(t, ui.OutputWriter.String(), "Usage: nomad operator scheduler set-config")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Try updating the config using an incorrect check-index value.
require.EqualValues(t, 1, c.Run([]string{
"-address=" + addr,
"-pause-eval-broker=false",
"-check-index=1000000",
}))
require.Contains(t, ui.ErrorWriter.String(), "check-index 1000000 does not match does not match current state")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Try updating the config using a correct check-index value.
require.EqualValues(t, 0, c.Run([]string{
"-address=" + addr,
"-pause-eval-broker=false",
"-check-index=" + strconv.FormatUint(modifiedConfig.SchedulerConfig.ModifyIndex, 10),
}))
require.Contains(t, ui.OutputWriter.String(), "Scheduler configuration updated!")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
}
func schedulerConfigEquals(t *testing.T, expected, actual *api.SchedulerConfiguration) {
require.Equal(t, expected.SchedulerAlgorithm, actual.SchedulerAlgorithm)
require.Equal(t, expected.RejectJobRegistration, actual.RejectJobRegistration)
require.Equal(t, expected.MemoryOversubscriptionEnabled, actual.MemoryOversubscriptionEnabled)
require.Equal(t, expected.PauseEvalBroker, actual.PauseEvalBroker)
require.Equal(t, expected.PreemptionConfig, actual.PreemptionConfig)
}

View file

@ -0,0 +1,6 @@
// Package operator_scheduler provides end-to-end tests for the Nomad operator
// scheduler functionality and configuration options.
//
// In order to run this test suite only, from the e2e directory you can trigger
// go test -v -run '^TestOperatorScheduler' ./operator_scheduler
package operator_scheduler

View file

@ -0,0 +1,21 @@
job "operator_scheduler" {
datacenters = ["dc1"]
type = "batch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "operator_scheduler" {
task "test" {
driver = "raw_exec"
config {
command = "bash"
args = ["-c", "sleep 1"]
}
}
}
}

View file

@ -0,0 +1,117 @@
package operator_scheduler
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const jobBasic = "./input/basic.nomad"
// TestOperatorScheduler runs the Nomad Operator Scheduler suit of tests which
// focus on the behaviour of the /v1/operator/scheduler API.
func TestOperatorScheduler(t *testing.T) {
// Wait until we have a usable cluster before running the tests.
nomadClient := e2eutil.NomadClient(t)
e2eutil.WaitForLeader(t, nomadClient)
e2eutil.WaitForNodesReady(t, nomadClient, 1)
// Run our test cases.
t.Run("TestOperatorScheduler_ConfigPauseEvalBroker", testConfigPauseEvalBroker)
}
// testConfig tests pausing and un-pausing the eval broker and ensures the
// correct behaviour is observed at each stage.
func testConfigPauseEvalBroker(t *testing.T) {
nomadClient := e2eutil.NomadClient(t)
// Generate our job ID which will be used for the entire test.
jobID := "operator-scheduler-config-pause-eval-broker-" + uuid.Generate()[:8]
jobIDs := []string{jobID}
// Defer a cleanup function to remove the job. This will trigger if the
// test fails, unless the cancel function is called.
ctx, cancel := context.WithCancel(context.Background())
defer e2eutil.CleanupJobsAndGCWithContext(t, ctx, &jobIDs)
// Register the job and ensure the alloc reaches the running state before
// moving safely on.
allocStubs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, jobBasic, jobID, "")
require.Len(t, allocStubs, 1)
e2eutil.WaitForAllocRunning(t, nomadClient, allocStubs[0].ID)
// Get the current scheduler config object.
schedulerConfig, _, err := nomadClient.Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.NotNil(t, schedulerConfig.SchedulerConfig)
// Set the eval broker to be paused.
schedulerConfig.SchedulerConfig.PauseEvalBroker = true
// Write the config back to Nomad.
schedulerConfigUpdate, _, err := nomadClient.Operator().SchedulerSetConfiguration(
schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfigUpdate.Updated)
// Perform a deregister call. The call will succeed and create an
// evaluation. Do not use purge, so we can check the job status when
// dereigster happens.
evalID, _, err := nomadClient.Jobs().Deregister(jobID, false, nil)
require.NoError(t, err)
require.NotEmpty(t, evalID)
// Evaluation status is set to pending initially, so there isn't a great
// way to ensure it doesn't transition to another status other than polling
// for a long enough time to assume it won't change.
timedFn := func() error {
// 5 seconds should be more than enough time for an eval to change
// status unless the broker is disabled.
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
return nil
default:
evalInfo, _, err := nomadClient.Evaluations().Info(evalID, nil)
if err != nil {
return err
}
if !assert.Equal(t, "pending", evalInfo.Status) {
return fmt.Errorf(`expected eval status "pending", got %q`, evalInfo.Status)
}
time.Sleep(1 * time.Second)
}
}
}
require.NoError(t, timedFn())
// Set the eval broker to be un-paused.
schedulerConfig.SchedulerConfig.PauseEvalBroker = false
// Write the config back to Nomad.
schedulerConfigUpdate, _, err = nomadClient.Operator().SchedulerSetConfiguration(
schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfigUpdate.Updated)
// Ensure the job is stopped, then run the garbage collection to clear out
// all resources.
e2eutil.WaitForJobStopped(t, nomadClient, jobID)
_, err = e2eutil.Command("nomad", "system", "gc")
require.NoError(t, err)
// If we have reached this far, we do not need to run the cleanup function.
cancel()
}

106
helper/broker/notify.go Normal file
View file

@ -0,0 +1,106 @@
package broker
import (
"time"
"github.com/hashicorp/nomad/helper"
)
// GenericNotifier allows a process to send updates to many subscribers in an
// easy manner.
type GenericNotifier struct {
// publishCh is the channel used to receive the update which will be sent
// to all subscribers.
publishCh chan interface{}
// subscribeCh and unsubscribeCh are the channels used to modify the
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}
}
// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
}
}
// Notify allows the implementer to notify all subscribers with a specific
// update. There is no guarantee the order in which subscribers receive the
// message which is sent linearly.
func (g *GenericNotifier) Notify(msg interface{}) {
select {
case g.publishCh <- msg:
default:
}
}
// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
// using a lock.
subscribers := map[chan interface{}]struct{}{}
for {
select {
case <-stopCh:
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
case msgCh := <-g.unsubscribeCh:
delete(subscribers, msgCh)
case update := <-g.publishCh:
for subscriberCh := range subscribers {
// The subscribers channels are buffered, but ensure we don't
// block the whole process on this.
select {
case subscriberCh <- update:
default:
}
}
}
}
}
// WaitForChange allows a subscriber to wait until there is a notification
// change, or the timeout is reached. The function will block until one
// condition is met.
func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh
// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout)
// Defer a function which performs all the required cleanup of the
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
close(updateCh)
timeoutStop()
}()
// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
return update
}
}

View file

@ -0,0 +1,55 @@
package broker
import (
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/ci"
"github.com/stretchr/testify/require"
)
func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)
// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)
notifier := NewGenericNotifier()
go notifier.Run(stopChan)
// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
require.Equal(t, 1, cap(notifier.subscribeCh))
require.Equal(t, 1, cap(notifier.unsubscribeCh))
// Test that the timeout works.
var timeoutWG sync.WaitGroup
for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(100 * time.Millisecond)
require.Equal(t, "wait timed out after 100ms", msg)
wg.Done()
}(&timeoutWG)
}
timeoutWG.Wait()
// Test that all subscribers recieve an update when a single notification
// is sent.
var notifiedWG sync.WaitGroup
for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(3 * time.Second)
require.Equal(t, "we got an update and not a timeout", msg)
wg.Done()
}(&notifiedWG)
}
notifier.Notify("we got an update and not a timeout")
notifiedWG.Wait()
}

View file

@ -6,11 +6,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv"
"sync" "sync"
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/broker"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -49,6 +51,8 @@ type EvalBroker struct {
deliveryLimit int deliveryLimit int
enabled bool enabled bool
enabledNotifier *broker.GenericNotifier
stats *BrokerStats stats *BrokerStats
// evals tracks queued evaluations by ID to de-duplicate enqueue. // evals tracks queued evaluations by ID to de-duplicate enqueue.
@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
nackTimeout: timeout, nackTimeout: timeout,
deliveryLimit: deliveryLimit, deliveryLimit: deliveryLimit,
enabled: false, enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
stats: new(BrokerStats), stats: new(BrokerStats),
evals: make(map[string]int), evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string), jobEvals: make(map[structs.NamespacedID]string),
@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
if !enabled { if !enabled {
b.flush() b.flush()
} }
// Notify all subscribers to state changes of the broker enabled value.
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
} }
// Enqueue is used to enqueue a new evaluation // Enqueue is used to enqueue a new evaluation

View file

@ -131,6 +131,23 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
args.Timeout = DefaultDequeueTimeout args.Timeout = DefaultDequeueTimeout
} }
// If the eval broker is paused, attempt to block and wait for a state
// change before returning. This avoids a tight loop and mimics the
// behaviour where there are no evals to process.
//
// The call can return because either the timeout is reached or the broker
// SetEnabled function was called to modify its state. It is possible this
// is because of leadership transition, therefore the RPC should exit to
// allow all safety checks and RPC forwarding to occur again.
//
// The log line is trace, because the default worker timeout is 500ms which
// produces a large amount of logging.
if !e.srv.evalBroker.Enabled() {
message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout)
e.logger.Trace("eval broker wait for un-pause", "message", message)
return nil
}
// Attempt the dequeue // Attempt the dequeue
eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
if err != nil { if err != nil {

View file

@ -454,6 +454,33 @@ func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
} }
} }
func TestEvalEndpoint_Dequeue_BrokerDisabled(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue.
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register a request.
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
// Disable the eval broker and try to dequeue.
s1.evalBroker.SetEnabled(false)
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
require.Empty(t, resp.Eval)
}
func TestEvalEndpoint_Ack(t *testing.T) { func TestEvalEndpoint_Ack(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)

View file

@ -289,8 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.getOrCreateAutopilotConfig() s.getOrCreateAutopilotConfig()
s.autopilot.Start() s.autopilot.Start()
// Initialize scheduler configuration // Initialize scheduler configuration.
s.getOrCreateSchedulerConfig() schedulerConfig := s.getOrCreateSchedulerConfig()
// Initialize the ClusterID // Initialize the ClusterID
_, _ = s.ClusterID() _, _ = s.ClusterID()
@ -302,12 +302,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Start the plan evaluator // Start the plan evaluator
go s.planApply() go s.planApply()
// Enable the eval broker, since we are now the leader // Start the eval broker and blocked eval broker if these are not paused by
s.evalBroker.SetEnabled(true) // the operator.
restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig)
// Enable the blocked eval tracker, since we are now the leader
s.blockedEvals.SetEnabled(true)
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
// Enable the deployment watcher, since we are now the leader // Enable the deployment watcher, since we are now the leader
s.deploymentWatcher.SetEnabled(true, s.State()) s.deploymentWatcher.SetEnabled(true, s.State())
@ -318,10 +315,13 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Enable the volume watcher, since we are now the leader // Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())
// Restore the eval broker state // Restore the eval broker state and blocked eval state. If these are
// currently paused, we do not need to do this.
if restoreEvals {
if err := s.restoreEvals(); err != nil { if err := s.restoreEvals(); err != nil {
return err return err
} }
}
// Activate the vault client // Activate the vault client
s.vault.SetActive(true) s.vault.SetActive(true)
@ -1110,11 +1110,13 @@ func (s *Server) revokeLeadership() error {
// Disable the plan queue, since we are no longer leader // Disable the plan queue, since we are no longer leader
s.planQueue.SetEnabled(false) s.planQueue.SetEnabled(false)
// Disable the eval broker, since it is only useful as a leader // Disable the eval broker and blocked evals. We do not need to check the
// scheduler configuration paused eval broker value, as the brokers should
// always be paused on the non-leader.
s.brokerLock.Lock()
s.evalBroker.SetEnabled(false) s.evalBroker.SetEnabled(false)
// Disable the blocked eval tracker, since it is only useful as a leader
s.blockedEvals.SetEnabled(false) s.blockedEvals.SetEnabled(false)
s.brokerLock.Unlock()
// Disable the periodic dispatcher, since it is only useful as a leader // Disable the periodic dispatcher, since it is only useful as a leader
s.periodicDispatcher.SetEnabled(false) s.periodicDispatcher.SetEnabled(false)
@ -1693,3 +1695,70 @@ func (s *Server) generateClusterID() (string, error) {
s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime) s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
return newMeta.ClusterID, nil return newMeta.ClusterID, nil
} }
// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals
// enabled status based on the passed scheduler configuration. The boolean
// response indicates whether the caller needs to call restoreEvals() due to
// the brokers being enabled. It is for use when the change must take the
// scheduler configuration into account. This is not needed when calling
// revokeLeadership, as the configuration doesn't matter, and we need to ensure
// the brokers are stopped.
//
// The function checks the server is the leader and uses a mutex to avoid any
// potential timings problems. Consider the following timings:
// - operator updates the configuration via the API
// - the RPC handler applies the change via Raft
// - leadership transitions with write barrier
// - the RPC handler call this function to enact the change
//
// The mutex also protects against a situation where leadership is revoked
// while this function is being called. Ensuring the correct series of actions
// occurs so that state stays consistent.
func (s *Server) handleEvalBrokerStateChange(schedConfig *structs.SchedulerConfiguration) bool {
// Grab the lock first. Once we have this we can be sure to run everything
// needed before any leader transition can attempt to modify the state.
s.brokerLock.Lock()
defer s.brokerLock.Unlock()
// If we are no longer the leader, exit early.
if !s.IsLeader() {
return false
}
// enableEvalBroker tracks whether the evalBroker and blockedEvals
// processes should be enabled or not. It allows us to answer this question
// whether using a persisted Raft configuration, or the default bootstrap
// config.
var enableBrokers, restoreEvals bool
// The scheduler config can only be persisted to Raft once quorum has been
// established. If this is a fresh cluster, we need to use the default
// scheduler config, otherwise we can use the persisted object.
switch schedConfig {
case nil:
enableBrokers = !s.config.DefaultSchedulerConfig.PauseEvalBroker
default:
enableBrokers = !schedConfig.PauseEvalBroker
}
// If the evalBroker status is changing, set the new state.
if enableBrokers != s.evalBroker.Enabled() {
s.logger.Info("eval broker status modified", "paused", !enableBrokers)
s.evalBroker.SetEnabled(enableBrokers)
restoreEvals = enableBrokers
}
// If the blockedEvals status is changing, set the new state.
if enableBrokers != s.blockedEvals.Enabled() {
s.logger.Info("blocked evals status modified", "paused", !enableBrokers)
s.blockedEvals.SetEnabled(enableBrokers)
restoreEvals = enableBrokers
if enableBrokers {
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
}
}
return restoreEvals
}

View file

@ -1664,3 +1664,89 @@ func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
return leader return leader
} }
func TestServer_handleEvalBrokerStateChange(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
startValue bool
testServerCallBackConfig func(c *Config)
inputSchedulerConfiguration *structs.SchedulerConfiguration
expectedOutput bool
name string
}{
{
startValue: false,
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = false },
inputSchedulerConfiguration: nil,
expectedOutput: true,
name: "bootstrap un-paused",
},
{
startValue: false,
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = true },
inputSchedulerConfiguration: nil,
expectedOutput: false,
name: "bootstrap paused",
},
{
startValue: true,
testServerCallBackConfig: nil,
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
expectedOutput: false,
name: "state change to paused",
},
{
startValue: false,
testServerCallBackConfig: nil,
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
expectedOutput: false,
name: "no state change to paused",
},
{
startValue: false,
testServerCallBackConfig: nil,
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: false},
expectedOutput: true,
name: "state change to un-paused",
},
{
startValue: false,
testServerCallBackConfig: nil,
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
expectedOutput: false,
name: "no state change to un-paused",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a new server and wait for leadership to be established.
testServer, cleanupFn := TestServer(t, nil)
_ = waitForStableLeadership(t, []*Server{testServer})
defer cleanupFn()
// If we set a callback config, we are just testing the eventual
// state of the brokers. Otherwise, we set our starting value and
// then perform our state modification change and check.
if tc.testServerCallBackConfig == nil {
testServer.evalBroker.SetEnabled(tc.startValue)
testServer.blockedEvals.SetEnabled(tc.startValue)
actualOutput := testServer.handleEvalBrokerStateChange(tc.inputSchedulerConfiguration)
require.Equal(t, tc.expectedOutput, actualOutput)
}
// Check the brokers are in the expected state.
var expectedEnabledVal bool
if tc.inputSchedulerConfiguration == nil {
expectedEnabledVal = !testServer.config.DefaultSchedulerConfig.PauseEvalBroker
} else {
expectedEnabledVal = !tc.inputSchedulerConfiguration.PauseEvalBroker
}
require.Equal(t, expectedEnabledVal, testServer.evalBroker.Enabled())
require.Equal(t, expectedEnabledVal, testServer.blockedEvals.Enabled())
})
}
}

View file

@ -334,7 +334,19 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
if respBool, ok := resp.(bool); ok { if respBool, ok := resp.(bool); ok {
reply.Updated = respBool reply.Updated = respBool
} }
reply.Index = index reply.Index = index
// If we updated the configuration, handle any required state changes within
// the eval broker and blocked evals processes. The state change and
// restore functions have protections around leadership transitions and
// restoring into non-running brokers.
if reply.Updated {
if op.srv.handleEvalBrokerStateChange(&args.Config) {
return op.srv.restoreEvals()
}
}
return nil return nil
} }

View file

@ -402,39 +402,42 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
c.Build = "0.9.0+unittest" c.Build = "0.9.0+unittest"
}) })
defer cleanupS1() defer cleanupS1()
codec := rpcClient(t, s1) rpcCodec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s1.RPC)
require := require.New(t) // Disable preemption and pause the eval broker.
// Disable preemption
arg := structs.SchedulerSetConfigRequest{ arg := structs.SchedulerSetConfigRequest{
Config: structs.SchedulerConfiguration{ Config: structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{ PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: false, SystemSchedulerEnabled: false,
}, },
PauseEvalBroker: true,
}, },
} }
arg.Region = s1.config.Region arg.Region = s1.config.Region
var setResponse structs.SchedulerSetConfigurationResponse var setResponse structs.SchedulerSetConfigurationResponse
err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &setResponse) err := msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerSetConfiguration", &arg, &setResponse)
require.Nil(err) require.Nil(t, err)
require.NotZero(setResponse.Index) require.NotZero(t, setResponse.Index)
// Read and verify that preemption is disabled // Read and verify that preemption is disabled and the eval and blocked
// evals systems are disabled.
readConfig := structs.GenericRequest{ readConfig := structs.GenericRequest{
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Region: s1.config.Region, Region: s1.config.Region,
}, },
} }
var reply structs.SchedulerConfigurationResponse var reply structs.SchedulerConfigurationResponse
if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &readConfig, &reply); err != nil { err = msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerGetConfiguration", &readConfig, &reply)
t.Fatalf("err: %v", err) require.NoError(t, err)
}
require.NotZero(reply.Index) require.NotZero(t, reply.Index)
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) require.False(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(t, reply.SchedulerConfig.PauseEvalBroker)
require.False(t, s1.evalBroker.Enabled())
require.False(t, s1.blockedEvals.Enabled())
} }
func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) { func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) {

View file

@ -190,6 +190,18 @@ type Server struct {
// capacity changes. // capacity changes.
blockedEvals *BlockedEvals blockedEvals *BlockedEvals
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
// brokerLock is used to synchronise the alteration of the blockedEvals and
// evalBroker enabled state. These two subsystems change state when
// leadership changes or when the user modifies the setting via the
// operator scheduler configuration. This lock allows these actions to be
// performed safely, without potential for user interactions and leadership
// transitions to collide and create inconsistent state.
brokerLock sync.Mutex
// deploymentWatcher is used to watch deployments and their allocations and // deploymentWatcher is used to watch deployments and their allocations and
// make the required calls to continue to transition the deployment. // make the required calls to continue to transition the deployment.
deploymentWatcher *deploymentwatcher.Watcher deploymentWatcher *deploymentwatcher.Watcher
@ -200,10 +212,6 @@ type Server struct {
// volumeWatcher is used to release volume claims // volumeWatcher is used to release volume claims
volumeWatcher *volumewatcher.Watcher volumeWatcher *volumewatcher.Watcher
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
// periodicDispatcher is used to track and create evaluations for periodic jobs. // periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher *PeriodicDispatch periodicDispatcher *PeriodicDispatch
@ -423,6 +431,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
return nil, fmt.Errorf("failed to create volume watcher: %v", err) return nil, fmt.Errorf("failed to create volume watcher: %v", err)
} }
// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)
// Setup the node drainer. // Setup the node drainer.
s.setupNodeDrainer() s.setupNodeDrainer()

View file

@ -156,6 +156,12 @@ type SchedulerConfiguration struct {
// management ACL token // management ACL token
RejectJobRegistration bool `hcl:"reject_job_registration"` RejectJobRegistration bool `hcl:"reject_job_registration"`
// PauseEvalBroker is a boolean to control whether the evaluation broker
// should be paused on the cluster leader. Only a single broker runs per
// region, and it must be persisted to state so the parameter is consistent
// during leadership transitions.
PauseEvalBroker bool `hcl:"pause_eval_broker"`
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration. // CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64 CreateIndex uint64
ModifyIndex uint64 ModifyIndex uint64

View file

@ -40,18 +40,20 @@ $ curl \
"Index": 5, "Index": 5,
"KnownLeader": true, "KnownLeader": true,
"LastContact": 0, "LastContact": 0,
"NextToken": "",
"SchedulerConfig": { "SchedulerConfig": {
"CreateIndex": 5, "CreateIndex": 5,
"MemoryOversubscriptionEnabled": false,
"ModifyIndex": 5, "ModifyIndex": 5,
"SchedulerAlgorithm": "spread", "PauseEvalBroker": false,
"MemoryOversubscriptionEnabled": true,
"RejectJobRegistration": false,
"PreemptionConfig": { "PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled": false,
"BatchSchedulerEnabled": false, "BatchSchedulerEnabled": false,
"ServiceSchedulerEnabled": false "ServiceSchedulerEnabled": false,
} "SysBatchSchedulerEnabled": false,
"SystemSchedulerEnabled": true
},
"RejectJobRegistration": false,
"SchedulerAlgorithm": "binpack"
} }
} }
``` ```
@ -64,9 +66,23 @@ $ curl \
- `SchedulerConfig` `(SchedulerConfig)` - The returned `SchedulerConfig` object has configuration - `SchedulerConfig` `(SchedulerConfig)` - The returned `SchedulerConfig` object has configuration
settings mentioned below. settings mentioned below.
- `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler binpacks or spreads allocations on available nodes. - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler
binpacks or spreads allocations on available nodes.
- `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. - `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When
`true`, tasks may exceed their reserved memory limit, if the client has excess
memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max)
to take advantage of memory oversubscription.
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs,
unless the ACL token for the request is a management token. If ACLs are disabled,
no user will be able to register jobs. This allows operators to shed load from
automated processes during incident response.
- `PauseEvalBroker` `(bool: false)` - When set to `true`, the eval broker which
usually runs on the leader will be disabled. This will prevent the scheduler
workers from receiving new work.
- `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers.
@ -120,6 +136,7 @@ server state is authoritative.
"SchedulerAlgorithm": "spread", "SchedulerAlgorithm": "spread",
"MemoryOversubscriptionEnabled": false, "MemoryOversubscriptionEnabled": false,
"RejectJobRegistration": false, "RejectJobRegistration": false,
"PauseEvalBroker": false,
"PreemptionConfig": { "PreemptionConfig": {
"SystemSchedulerEnabled": true, "SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled": false, "SysBatchSchedulerEnabled": false,
@ -133,9 +150,20 @@ server state is authoritative.
binpacks or spreads allocations on available nodes. Possible values are binpacks or spreads allocations on available nodes. Possible values are
`"binpack"` and `"spread"` `"binpack"` and `"spread"`
- `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. - `MemoryOversubscriptionEnabled` `(bool: false)` <sup>1.1 Beta</sup> - When
`true`, tasks may exceed their reserved memory limit, if the client has excess
memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max)
to take advantage of memory oversubscription.
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response. - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs,
unless the ACL token for the request is a management token. If ACLs are disabled,
no user will be able to register jobs. This allows operators to shed load from
automated processes during incident response.
- `PauseEvalBroker` `(bool: false)` - When set to `true`, the eval broker which
usually runs on the leader will be disabled. This will prevent the scheduler
workers from receiving new work.
- `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for
various schedulers. various schedulers.

View file

@ -42,6 +42,12 @@ The following subcommands are available:
- [`operator raft remove-peer`][remove] - Remove a Nomad server from the Raft - [`operator raft remove-peer`][remove] - Remove a Nomad server from the Raft
configuration configuration
- [`operator scheduler get-config`][scheduler-get-config] - Display the current
scheduler configuration
- [`operator scheduler set-config`][scheduler-set-config] - Modify the scheduler
configuration
- [`operator snapshot agent`][snapshot-agent] <EnterpriseAlert inline /> - Inspects a snapshot of the Nomad server state - [`operator snapshot agent`][snapshot-agent] <EnterpriseAlert inline /> - Inspects a snapshot of the Nomad server state
- [`operator snapshot save`][snapshot-save] - Saves a snapshot of the Nomad server state - [`operator snapshot save`][snapshot-save] - Saves a snapshot of the Nomad server state
@ -63,3 +69,5 @@ The following subcommands are available:
[snapshot-restore]: /docs/commands/operator/snapshot-restore 'Snapshot Restore command' [snapshot-restore]: /docs/commands/operator/snapshot-restore 'Snapshot Restore command'
[snapshot-inspect]: /docs/commands/operator/snapshot-inspect 'Snapshot Inspect command' [snapshot-inspect]: /docs/commands/operator/snapshot-inspect 'Snapshot Inspect command'
[snapshot-agent]: /docs/commands/operator/snapshot-agent 'Snapshot Agent command' [snapshot-agent]: /docs/commands/operator/snapshot-agent 'Snapshot Agent command'
[scheduler-get-config]: /docs/commands/operator/scheduler-get-config 'Scheduler Get Config command'
[scheduler-set-config]: /docs/commands/operator/scheduler-set-config 'Scheduler Set Config command'

View file

@ -0,0 +1,47 @@
---
layout: docs
page_title: 'Commands: operator scheduler get-config'
description: |
Display the current scheduler configuration.
---
# Command: operator scheduler get-config
The scheduler operator get-config command is used to view the current scheduler
configuration.
## Usage
```plaintext
nomad operator scheduler get-config [options]
```
If ACLs are enabled, this command requires a token with the `operator:read`
capability.
## General Options
@include 'general_options_no_namespace.mdx'
## Get Config Options
- `-json`: Output the scheduler config in its JSON format.
- `-t`: Format and display the scheduler config using a Go template.
## Examples
Display the current scheduler configuration:
```shell-session
$ nomad operator scheduler get-config
Scheduler Algorithm = binpack
Memory Oversubscription = false
Reject Job Registration = false
Pause Eval Broker = false
Preemption System Scheduler = true
Preemption Service Scheduler = false
Preemption Batch Scheduler = false
Preemption SysBatch Scheduler = false
Modify Index = 5
```

View file

@ -0,0 +1,82 @@
---
layout: docs
page_title: 'Commands: operator scheduler set-config'
description: |
Modify the scheduler configuration.
---
# Command: operator scheduler set-config
The scheduler operator set-config command is used to modify the scheduler
configuration.
## Usage
```plaintext
nomad operator scheduler set-config [options]
```
If ACLs are enabled, this command requires a token with the `operator:write`
capability.
## General Options
@include 'general_options_no_namespace.mdx'
## Set Config Options
- `-check-index` - If set, the scheduler config is only updated if the passed
modify index matches the current server side version. If a non-zero value is
passed, it ensures that the scheduler config is being updated from a known
state.
- `-scheduler-algorithm` - Specifies whether scheduler binpacks or spreads
allocations on available nodes. Must be one of `["binpack"|"spread"]`.
- `-memory-oversubscription` - When true, tasks may exceed their reserved memory
limit, if the client has excess memory capacity. Tasks must specify [`memory_max`]
to take advantage of memory oversubscription. Must be one of `[true|false]`.
- `-reject-job-registration` - When true, the server will return permission denied
errors for job registration, job dispatch, and job scale APIs, unless the ACL
token for the request is a management token. If ACLs are disabled, no user
will be able to register jobs. This allows operators to shed load from automated
processes during incident response. Must be one of `[true|false]`.
- `-pause-eval-broker` - When set to true, the eval broker which usually runs on
the leader will be disabled. This will prevent the scheduler workers from
receiving new work. Must be one of `[true|false]`.
- `-preempt-batch-scheduler` - Specifies whether preemption for batch jobs
is enabled. Note that if this is set to true, then batch jobs can preempt any
other jobs. Must be one of `[true|false]`.
- `-preempt-service-scheduler` - Specifies whether preemption for service jobs
is enabled. Note that if this is set to true, then service jobs can preempt any
other jobs. Must be one of `[true|false]`.
- `-preempt-sysbatch-scheduler` - Specifies whether preemption for system batch
jobs is enabled. Note that if this is set to true, then system batch jobs can
preempt any other jobs. Must be one of `[true|false]`.
- `-preempt-system-scheduler` - Specifies whether preemption for system jobs
is enabled. Note that if this is set to true, then system jobs can preempt any
other jobs. Must be one of `[true|false]`.
## Examples
Modify the scheduler algorithm to spread:
```shell-session
$ nomad operator scheduler set-config -scheduler-algorithm=spread
Scheduler configuration updated!
```
Modify the scheduler algorithm to spread using the check index flag:
```shell-session
$ nomad operator scheduler set-config -scheduler-algorithm=spread -check-index=5
Scheduler configuration updated!
```
[`memory_max`]: /docs/job-specification/resources#memory_max

View file

@ -312,10 +312,9 @@ job-type schedulers.
server { server {
default_scheduler_config { default_scheduler_config {
scheduler_algorithm = "spread" scheduler_algorithm = "spread"
memory_oversubscription_enabled = true memory_oversubscription_enabled = true
reject_job_registration = false reject_job_registration = false
pause_eval_broker = false # New in Nomad 1.3.2
preemption_config { preemption_config {
batch_scheduler_enabled = true batch_scheduler_enabled = true

View file

@ -585,6 +585,14 @@
"title": "raft state", "title": "raft state",
"path": "commands/operator/raft-state" "path": "commands/operator/raft-state"
}, },
{
"title": "scheduler get-config",
"path": "commands/operator/scheduler-get-config"
},
{
"title": "scheduler set-config",
"path": "commands/operator/scheduler-set-config"
},
{ {
"title": "snapshot agent", "title": "snapshot agent",
"path": "commands/operator/snapshot-agent" "path": "commands/operator/snapshot-agent"