Make number of scheduler workers reloadable (#11593)

## Development Environment Changes
* Added stringer to build deps

## New HTTP APIs
* Added scheduler worker config API
* Added scheduler worker info API

## New Internals
* (Scheduler)Worker API refactor—Start(), Stop(), Pause(), Resume()
* Update shutdown to use context
* Add mutex for contended server data
    - `workerLock` for the `workers` slice
    - `workerConfigLock` for the `Server.Config.NumSchedulers` and
      `Server.Config.EnabledSchedulers` values

## Other
* Adding docs for scheduler worker api
* Add changelog message

Co-authored-by: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com>
This commit is contained in:
Charlie Voiselle 2022-01-06 10:56:13 -06:00 committed by GitHub
parent 1af8d47de2
commit 98a240cd99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 2215 additions and 105 deletions

3
.changelog/11593.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
server: Make num_schedulers and enabled_schedulers hot reloadable; add agent API endpoint to enable dynamic modifications of these values.
```

View File

@ -0,0 +1,57 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Hot Reload",
"steps": [
{
"file": "nomad/server.go",
"description": "## Server.Reload()\n\nServer configuration reloads start here.",
"line": 782,
"selection": {
"start": {
"line": 780,
"character": 4
},
"end": {
"line": 780,
"character": 10
}
}
},
{
"file": "nomad/server.go",
"description": "## Did NumSchedulers change?\nIf the number of schedulers has changed between the running configuration and the new one we need to adopt that change in realtime.",
"line": 812
},
{
"file": "nomad/server.go",
"description": "## Server.setupNewWorkers()\n\nsetupNewWorkers performs three tasks:\n\n- makes a copy of the existing worker pointers\n\n- creates a fresh array and loads a new set of workers into them\n\n- iterates through the \"old\" workers and shuts them down in individual\n goroutines for maximum parallelism",
"line": 1482,
"selection": {
"start": {
"line": 1480,
"character": 4
},
"end": {
"line": 1480,
"character": 12
}
}
},
{
"file": "nomad/server.go",
"description": "Once all of the work in setupNewWorkers is complete, we stop the old ones.",
"line": 1485
},
{
"file": "nomad/server.go",
"description": "The `stopOldWorkers` function iterates through the array of workers and calls their `Shutdown` method\nas a goroutine to prevent blocking.",
"line": 1505
},
{
"file": "nomad/worker.go",
"description": "The `Shutdown` method sets `w.stop` to true signaling that we intend for the `Worker` to stop the next time we consult it. We also manually unpause the `Worker` by setting w.paused to false and sending a `Broadcast()` via the cond.",
"line": 110
}
],
"ref": "f-reload-num-schedulers"
}

View File

@ -0,0 +1,66 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Pause",
"steps": [
{
"file": "nomad/leader.go",
"description": "## Server.establishLeadership()\n\nUpon becoming a leader, the server pauses a subset of the workers to allow for the additional burden of the leader's goroutines. The `handlePausableWorkers` function takes a boolean that states whether or not the current node is a leader or not. Because we are in `establishLeadership` we use `true` rather than calling `s.IsLeader()`",
"line": 233,
"selection": {
"start": {
"line": 233,
"character": 4
},
"end": {
"line": 233,
"character": 12
}
}
},
{
"file": "nomad/leader.go",
"description": "## Server.handlePausableWorkers()\n\nhandlePausableWorkers ranges over a slice of Workers and manipulates their paused state by calling their `SetPause` method.",
"line": 443,
"selection": {
"start": {
"line": 443,
"character": 18
},
"end": {
"line": 443,
"character": 26
}
}
},
{
"file": "nomad/leader.go",
"description": "## Server.pausableWorkers()\n\nThe pausableWorkers function provides a consistent slice of workers that the server can pause and unpause. Since the Worker array is never mutated, the same slice is returned by pausableWorkers on every invocation.\nThis comment is interesting/potentially confusing\n\n```golang\n // Disabling 3/4 of the workers frees CPU for raft and the\n\t// plan applier which uses 1/2 the cores.\n``` \n\nHowever, the key point is that it will return a slice containg 3/4th of the workers.",
"line": 1100,
"selection": {
"start": {
"line": 1104,
"character": 1
},
"end": {
"line": 1105,
"character": 43
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.SetPause()\n\nThe `SetPause` function is used to signal an intention to pause the worker. Because the worker's work is happening in the `run()` goroutine, pauses happen asynchronously.",
"line": 91
},
{
"file": "nomad/worker.go",
"description": "## Worker.dequeueEvaluation()\n\nCalls checkPaused, which will be the function we wait in if the scheduler is set to be paused. \n\n> **NOTE:** This is called here rather than in run() because this function loops in case of an error fetching a evaluation.",
"line": 206
},
{
"file": "nomad/worker.go",
"description": "## Worker.checkPaused()\n\nWhen `w.paused` is `true`, we call the `Wait()` function on the condition. Execution of this goroutine will stop here until it receives a `Broadcast()` or a `Signal()`. At this point, the `Worker` is paused.",
"line": 104
}
]
}

View File

@ -0,0 +1,51 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Unpause",
"steps": [
{
"file": "nomad/leader.go",
"description": "## revokeLeadership()\n\nAs a server transistions from leader to non-leader, the pausableWorkers are resumed since the other leader goroutines are stopped providing extra capacity.",
"line": 1040,
"selection": {
"start": {
"line": 1038,
"character": 10
},
"end": {
"line": 1038,
"character": 20
}
}
},
{
"file": "nomad/leader.go",
"description": "## handlePausableWorkers()\n\nThe handlePausableWorkers method is called with `false`. We fetch the pausableWorkers and call their SetPause method with `false`.\n",
"line": 443,
"selection": {
"start": {
"line": 443,
"character": 18
},
"end": {
"line": 443,
"character": 27
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.SetPause()\n\nDuring unpause, p is false. We update w.paused in the mutex, and then call Broadcast on the cond. This wakes the goroutine sitting in the Wait() inside of `checkPaused()`",
"line": 91
},
{
"file": "nomad/worker.go",
"description": "## Worker.checkPaused()\n\nOnce the goroutine receives the `Broadcast()` message from `SetPause()`, execution continues here. Now that `w.paused == false`, we exit the loop and return to the caller (the `dequeueEvaluation()` function).",
"line": 104
},
{
"file": "nomad/worker.go",
"description": "## Worker.dequeueEvaluation\n\nWe return back into dequeueEvaluation after the call to checkPaused. At this point the worker will either stop (if that signal boolean has been set) or continue looping after returning to run().",
"line": 207
}
]
}

View File

@ -0,0 +1,36 @@
{
"$schema": "https://aka.ms/codetour-schema",
"title": "Scheduler Worker - Start",
"steps": [
{
"file": "nomad/server.go",
"description": "## Server.NewServer()\n\nScheduler workers are started as the agent starts the `server` go routines.",
"line": 402
},
{
"file": "nomad/server.go",
"description": "## Server.setupWorkers()\n\nThe `setupWorkers()` function validates that there are enabled Schedulers by type and count. It then creates s.config.NumSchedulers by calling `NewWorker()`\n\nThe `_core` scheduler _**must**_ be enabled. **TODO: why?**\n",
"line": 1443,
"selection": {
"start": {
"line": 1442,
"character": 4
},
"end": {
"line": 1442,
"character": 12
}
}
},
{
"file": "nomad/worker.go",
"description": "## Worker.NewWorker\n\nNewWorker creates the Worker and starts `run()` in a goroutine.",
"line": 78
},
{
"file": "nomad/worker.go",
"description": "## Worker.run()\n\nThe `run()` function runs in a loop until it's paused, it's stopped, or the server indicates that it is shutting down. All of the work the `Worker` performs should be\nimplemented in or called from here.\n",
"line": 152
}
]
}

View File

@ -124,6 +124,7 @@ deps: ## Install build and development dependencies
go install github.com/hashicorp/go-msgpack/codec/codecgen@v1.1.5 go install github.com/hashicorp/go-msgpack/codec/codecgen@v1.1.5
go install github.com/bufbuild/buf/cmd/buf@v0.36.0 go install github.com/bufbuild/buf/cmd/buf@v0.36.0
go install github.com/hashicorp/go-changelog/cmd/changelog-build@latest go install github.com/hashicorp/go-changelog/cmd/changelog-build@latest
go install golang.org/x/tools/cmd/stringer@v0.1.8
.PHONY: lint-deps .PHONY: lint-deps
lint-deps: ## Install linter dependencies lint-deps: ## Install linter dependencies

View File

@ -494,3 +494,78 @@ type HostDataResponse struct {
AgentID string AgentID string
HostData *HostData `json:",omitempty"` HostData *HostData `json:",omitempty"`
} }
// GetSchedulerWorkerConfig returns the targeted agent's worker pool configuration
func (a *Agent) GetSchedulerWorkerConfig(q *QueryOptions) (*SchedulerWorkerPoolArgs, error) {
var resp AgentSchedulerWorkerConfigResponse
_, err := a.client.query("/v1/agent/schedulers/config", &resp, q)
if err != nil {
return nil, err
}
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
}
// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration
func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs, q *WriteOptions) (*SchedulerWorkerPoolArgs, error) {
req := AgentSchedulerWorkerConfigRequest(args)
var resp AgentSchedulerWorkerConfigResponse
_, err := a.client.write("/v1/agent/schedulers/config", &req, &resp, q)
if err != nil {
return nil, err
}
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
}
type SchedulerWorkerPoolArgs struct {
NumSchedulers int
EnabledSchedulers []string
}
// AgentSchedulerWorkerConfigRequest is used to provide new scheduler worker configuration
// to a specific Nomad server. EnabledSchedulers must contain at least the `_core` scheduler
// to be valid.
type AgentSchedulerWorkerConfigRequest struct {
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}
// AgentSchedulerWorkerConfigResponse contains the Nomad server's current running configuration
// as well as the server's id as a convenience. This can be used to provide starting values for
// creating an AgentSchedulerWorkerConfigRequest to make changes to the running configuration.
type AgentSchedulerWorkerConfigResponse struct {
ServerID string `json:"server_id"`
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}
// GetSchedulerWorkersInfo returns the current status of all of the scheduler workers on
// a Nomad server.
func (a *Agent) GetSchedulerWorkersInfo(q *QueryOptions) (*AgentSchedulerWorkersInfo, error) {
var out *AgentSchedulerWorkersInfo
_, err := a.client.query("/v1/agent/schedulers", &out, q)
if err != nil {
return nil, err
}
return out, nil
}
// AgentSchedulerWorkersInfo is the response from the scheduler information endpoint containing
// a detailed status of each scheduler worker running on the server.
type AgentSchedulerWorkersInfo struct {
ServerID string `json:"server_id"`
Schedulers []AgentSchedulerWorkerInfo `json:"schedulers"`
}
// AgentSchedulerWorkerInfo holds the detailed status information for a single scheduler worker.
type AgentSchedulerWorkerInfo struct {
ID string `json:"id"`
EnabledSchedulers []string `json:"enabled_schedulers"`
Started string `json:"started"`
Status string `json:"status"`
WorkloadStatus string `json:"workload_status"`
}

View File

@ -2,6 +2,7 @@ package api
import ( import (
"fmt" "fmt"
"net/http"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
@ -456,3 +457,50 @@ func TestAgentProfile(t *testing.T) {
require.Nil(t, resp) require.Nil(t, resp)
} }
} }
func TestAgent_SchedulerWorkerConfig(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
a := c.Agent()
config, err := a.GetSchedulerWorkerConfig(nil)
require.NoError(t, err)
require.NotNil(t, config)
newConfig := SchedulerWorkerPoolArgs{NumSchedulers: 0, EnabledSchedulers: []string{"_core", "system"}}
resp, err := a.SetSchedulerWorkerConfig(newConfig, nil)
require.NoError(t, err)
assert.NotEqual(t, config, resp)
}
func TestAgent_SchedulerWorkerConfig_BadRequest(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
a := c.Agent()
config, err := a.GetSchedulerWorkerConfig(nil)
require.NoError(t, err)
require.NotNil(t, config)
newConfig := SchedulerWorkerPoolArgs{NumSchedulers: -1, EnabledSchedulers: []string{"_core", "system"}}
_, err = a.SetSchedulerWorkerConfig(newConfig, nil)
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("%v (%s)", http.StatusBadRequest, "Invalid request"))
}
func TestAgent_SchedulerWorkersInfo(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()
a := c.Agent()
info, err := a.GetSchedulerWorkersInfo(nil)
require.NoError(t, err)
require.NotNil(t, info)
defaultSchedulers := []string{"batch", "system", "sysbatch", "service", "_core"}
for _, worker := range info.Schedulers {
require.ElementsMatch(t, defaultSchedulers, worker.EnabledSchedulers)
}
}

View File

@ -11,14 +11,17 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/ioutils"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/command/agent/pprof" "github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure" "github.com/mitchellh/copystructure"
@ -364,7 +367,7 @@ func (s *HTTPServer) agentPprof(reqType pprof.ReqType, resp http.ResponseWriter,
// Parse query param int values // Parse query param int values
// Errors are dropped here and default to their zero values. // Errors are dropped here and default to their zero values.
// This is to mimick the functionality that net/pprof implements. // This is to mimic the functionality that net/pprof implements.
seconds, _ := strconv.Atoi(req.URL.Query().Get("seconds")) seconds, _ := strconv.Atoi(req.URL.Query().Get("seconds"))
debug, _ := strconv.Atoi(req.URL.Query().Get("debug")) debug, _ := strconv.Atoi(req.URL.Query().Get("debug"))
gc, _ := strconv.Atoi(req.URL.Query().Get("gc")) gc, _ := strconv.Atoi(req.URL.Query().Get("gc"))
@ -744,3 +747,129 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
return reply, rpcErr return reply, rpcErr
} }
// AgentSchedulerWorkerInfoRequest is used to query the running state of the
// agent's scheduler workers.
func (s *HTTPServer) AgentSchedulerWorkerInfoRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
srv := s.agent.Server()
if srv == nil {
return nil, CodedError(http.StatusBadRequest, ErrServerOnly)
}
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
var secret string
s.parseToken(req, &secret)
// Check agent read permissions
if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentRead() {
return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error())
}
schedulersInfo := srv.GetSchedulerWorkersInfo()
response := &api.AgentSchedulerWorkersInfo{
ServerID: srv.LocalMember().Name,
Schedulers: make([]api.AgentSchedulerWorkerInfo, len(schedulersInfo)),
}
for i, workerInfo := range schedulersInfo {
response.Schedulers[i] = api.AgentSchedulerWorkerInfo{
ID: workerInfo.ID,
EnabledSchedulers: make([]string, len(workerInfo.EnabledSchedulers)),
Started: workerInfo.Started.UTC().Format(time.RFC3339Nano),
Status: workerInfo.Status,
WorkloadStatus: workerInfo.WorkloadStatus,
}
copy(response.Schedulers[i].EnabledSchedulers, workerInfo.EnabledSchedulers)
}
return response, nil
}
// AgentSchedulerWorkerConfigRequest is used to query the count (and state eventually)
// of the scheduler workers running in a Nomad server agent.
// This endpoint can also be used to update the count of running workers for a
// given agent.
func (s *HTTPServer) AgentSchedulerWorkerConfigRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.Server() == nil {
return nil, CodedError(http.StatusBadRequest, ErrServerOnly)
}
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.updateScheduleWorkersConfig(resp, req)
case http.MethodGet:
return s.getScheduleWorkersConfig(resp, req)
default:
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
}
func (s *HTTPServer) getScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
srv := s.agent.Server()
if srv == nil {
return nil, CodedError(http.StatusBadRequest, ErrServerOnly)
}
var secret string
s.parseToken(req, &secret)
// Check agent read permissions
if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentRead() {
return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error())
}
config := srv.GetSchedulerWorkerConfig()
response := &api.AgentSchedulerWorkerConfigResponse{
ServerID: srv.LocalMember().Name,
NumSchedulers: config.NumSchedulers,
EnabledSchedulers: config.EnabledSchedulers,
}
return response, nil
}
func (s *HTTPServer) updateScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
srv := s.agent.Server()
if srv == nil {
return nil, CodedError(http.StatusBadRequest, ErrServerOnly)
}
var secret string
s.parseToken(req, &secret)
// Check agent write permissions
if aclObj, err := srv.ResolveToken(secret); err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error())
}
var args api.AgentSchedulerWorkerConfigRequest
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Invalid request: %s", err.Error()))
}
// the server_id provided in the payload is ignored to allow the
// response to be roundtripped right into a PUT.
newArgs := nomad.SchedulerWorkerPoolArgs{
NumSchedulers: args.NumSchedulers,
EnabledSchedulers: args.EnabledSchedulers,
}
if newArgs.IsInvalid() {
return nil, CodedError(http.StatusBadRequest, "Invalid request")
}
reply := srv.SetSchedulerWorkerConfig(newArgs)
response := &api.AgentSchedulerWorkerConfigResponse{
ServerID: srv.LocalMember().Name,
NumSchedulers: reply.NumSchedulers,
EnabledSchedulers: reply.EnabledSchedulers,
}
return response, nil
}

View File

@ -11,6 +11,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"reflect"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
@ -19,6 +20,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
@ -263,7 +265,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("invalid log_json parameter", func(t *testing.T) { t.Run("invalid log_json parameter", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
// Make the request // Make the request
@ -276,7 +278,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("unknown log_level", func(t *testing.T) { t.Run("unknown log_level", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
// Make the request // Make the request
@ -289,7 +291,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("check for specific log level", func(t *testing.T) { t.Run("check for specific log level", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
defer resp.Close() defer resp.Close()
@ -323,7 +325,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("plain output", func(t *testing.T) { t.Run("plain output", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=debug&plain=true", nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=debug&plain=true", nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
defer resp.Close() defer resp.Close()
@ -357,7 +359,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("logs for a specific node", func(t *testing.T) { t.Run("logs for a specific node", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn&node_id="+s.client.NodeID(), nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn&node_id="+s.client.NodeID(), nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
defer resp.Close() defer resp.Close()
@ -397,7 +399,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Run("logs for a local client with no server running on agent", func(t *testing.T) { t.Run("logs for a local client with no server running on agent", func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil) req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil)
require.Nil(t, err) require.NoError(t, err)
resp := newClosableRecorder() resp := newClosableRecorder()
defer resp.Close() defer resp.Close()
@ -595,7 +597,7 @@ func TestAgent_PprofRequest(t *testing.T) {
} }
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
require.Nil(t, err) require.NoError(t, err)
respW := httptest.NewRecorder() respW := httptest.NewRecorder()
resp, err := s.Server.AgentPprofRequest(respW, req) resp, err := s.Server.AgentPprofRequest(respW, req)
@ -913,7 +915,7 @@ func TestHTTP_AgentListKeys(t *testing.T) {
respW := httptest.NewRecorder() respW := httptest.NewRecorder()
out, err := s.Server.KeyringOperationRequest(respW, req) out, err := s.Server.KeyringOperationRequest(respW, req)
require.Nil(t, err) require.NoError(t, err)
kresp := out.(structs.KeyringResponse) kresp := out.(structs.KeyringResponse)
require.Len(t, kresp.Keys, 1) require.Len(t, kresp.Keys, 1)
}) })
@ -1463,3 +1465,586 @@ func TestHTTP_XSS_Monitor(t *testing.T) {
}) })
} }
} }
// ----------------------------
// SchedulerWorkerInfoAPI tests
// ----------------------------
type schedulerWorkerAPITest_testCase struct {
name string // test case name
request schedulerWorkerAPITest_testRequest
whenACLNotEnabled schedulerWorkerAPITest_testExpect
whenACLEnabled schedulerWorkerAPITest_testExpect
}
type schedulerWorkerAPITest_testRequest struct {
verb string
aclToken string
requestBody string
}
type schedulerWorkerAPITest_testExpect struct {
statusCode int
response interface{}
err error
isError bool
}
func (te schedulerWorkerAPITest_testExpect) Code() int {
return te.statusCode
}
func schedulerWorkerInfoTest_testCases() []schedulerWorkerAPITest_testCase {
forbidden := schedulerWorkerAPITest_testExpect{
statusCode: http.StatusForbidden,
response: structs.ErrPermissionDenied.Error(),
isError: true,
}
invalidMethod := schedulerWorkerAPITest_testExpect{
statusCode: http.StatusMethodNotAllowed,
response: ErrInvalidMethod,
isError: true,
}
success := schedulerWorkerAPITest_testExpect{
statusCode: http.StatusOK,
response: &api.AgentSchedulerWorkersInfo{
Schedulers: []api.AgentSchedulerWorkerInfo{
{
ID: "9b3713e0-6f74-0e1b-3b3e-d94f0c22dbf9",
EnabledSchedulers: []string{"_core", "batch"},
Started: "2021-12-10 22:13:12.595366 -0500 EST m=+0.039016232",
Status: "Pausing",
WorkloadStatus: "WaitingToDequeue",
},
{
ID: "ebda23e2-7f68-0c82-f0b2-f91d4581094d",
EnabledSchedulers: []string{"_core", "batch"},
Started: "2021-12-10 22:13:12.595478 -0500 EST m=+0.039127886",
Status: "Pausing",
WorkloadStatus: "WaitingToDequeue",
},
{
ID: "b3869c9b-64ff-686c-a003-e7d059d3a573",
EnabledSchedulers: []string{"_core", "batch"},
Started: "2021-12-10 22:13:12.595501 -0500 EST m=+0.039151276",
Status: "Pausing",
WorkloadStatus: "WaitingToDequeue",
},
{
ID: "cc5907c0-552e-bf36-0ca1-f150af7273c2",
EnabledSchedulers: []string{"_core", "batch"},
Started: "2021-12-10 22:13:12.595691 -0500 EST m=+0.039341541",
Status: "Starting",
WorkloadStatus: "WaitingToDequeue",
},
},
},
}
return []schedulerWorkerAPITest_testCase{
{
name: "bad verb",
request: schedulerWorkerAPITest_testRequest{
verb: "FOO",
aclToken: "",
requestBody: "",
},
whenACLNotEnabled: invalidMethod,
whenACLEnabled: invalidMethod,
},
{
name: "get without token",
request: schedulerWorkerAPITest_testRequest{
verb: "GET",
aclToken: "",
requestBody: "",
},
whenACLNotEnabled: success,
whenACLEnabled: forbidden,
},
{
name: "get with management token",
request: schedulerWorkerAPITest_testRequest{
verb: "GET",
aclToken: "management",
requestBody: "",
},
whenACLNotEnabled: success,
whenACLEnabled: success,
},
{
name: "get with read token",
request: schedulerWorkerAPITest_testRequest{
verb: "GET",
aclToken: "agent_read",
requestBody: "",
},
whenACLNotEnabled: success,
whenACLEnabled: success,
},
{
name: "get with invalid token",
request: schedulerWorkerAPITest_testRequest{
verb: "GET",
aclToken: "node_write",
requestBody: "",
},
whenACLNotEnabled: success,
whenACLEnabled: forbidden,
},
}
}
func TestHTTP_AgentSchedulerWorkerInfoRequest(t *testing.T) {
configFn := func(c *Config) {
var numSchedulers = 4
c.Server.NumSchedulers = &numSchedulers
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}
for _, runACL := range []string{"no_acl", "acl"} {
t.Run(runACL, func(t *testing.T) {
tests := func(s *TestAgent) {
testingACLS := s.Config.ACL.Enabled
var tokens map[string]*structs.ACLToken
if s.Config.ACL.Enabled {
state := s.Agent.server.State()
tokens = make(map[string]*structs.ACLToken)
tokens["management"] = s.RootToken
tokens["agent_read"] = mock.CreatePolicyAndToken(t, state, 1005, "agent_read", mock.AgentPolicy(acl.PolicyRead))
tokens["agent_write"] = mock.CreatePolicyAndToken(t, state, 1007, "agent_write", mock.AgentPolicy(acl.PolicyWrite))
tokens["node_write"] = mock.CreatePolicyAndToken(t, state, 1009, "node_write", mock.NodePolicy(acl.PolicyWrite))
}
for _, tc := range schedulerWorkerInfoTest_testCases() {
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers", bytes.NewReader([]byte(tc.request.requestBody)))
if testingACLS && tc.request.aclToken != "" {
setToken(req, tokens[tc.request.aclToken])
}
require.NoError(t, err)
respW := httptest.NewRecorder()
workerInfoResp, err := s.Server.AgentSchedulerWorkerInfoRequest(respW, req)
expected := tc.whenACLNotEnabled
if testingACLS {
expected = tc.whenACLEnabled
}
if expected.isError {
require.Error(t, err)
codedErr, ok := err.(HTTPCodedError)
require.True(t, ok, "expected a HTTPCodedError")
require.Equal(t, expected.Code(), codedErr.Code())
require.Equal(t, expected.response, codedErr.Error())
return
}
require.NoError(t, err)
workerInfo, ok := workerInfoResp.(*api.AgentSchedulerWorkersInfo)
require.True(t, ok, "expected an *AgentSchedulersWorkersInfo. received:%s", reflect.TypeOf(workerInfoResp))
expectWorkerInfo, ok := expected.response.(*api.AgentSchedulerWorkersInfo)
require.True(t, ok, "error casting test case to *AgentSchedulersWorkersInfo. received:%s", reflect.TypeOf(workerInfoResp))
var schedCount int = *s.Config.Server.NumSchedulers
require.Equal(t, schedCount, len(workerInfo.Schedulers), "must match num_schedulers")
require.Equal(t, len(expectWorkerInfo.Schedulers), len(workerInfo.Schedulers), "lengths must match")
for i, info := range expectWorkerInfo.Schedulers {
require.ElementsMatch(t, info.EnabledSchedulers, workerInfo.Schedulers[i].EnabledSchedulers)
}
})
}
}
if runACL == "acl" {
httpACLTest(t, configFn, tests)
} else {
httpTest(t, configFn, tests)
}
})
}
}
// ----------------------------
// SchedulerWorkerConfigAPI tests
// ----------------------------
type scheduleWorkerConfigTest_workerRequestTest struct {
name string // test case name
request schedulerWorkerConfigTest_testRequest
whenACLNotEnabled schedulerWorkerConfigTest_testExpect
whenACLEnabled schedulerWorkerConfigTest_testExpect
}
type schedulerWorkerConfigTest_testRequest struct {
verb string
aclToken string
requestBody string
}
type schedulerWorkerConfigTest_testExpect struct {
expectedResponseCode int
expectedResponse interface{}
}
// These test cases are run for both the ACL and Non-ACL enabled servers. When
// ACLS are not enabled, the request.aclTokens are ignored.
func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequestTest {
forbidden := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusForbidden,
expectedResponse: structs.ErrPermissionDenied.Error(),
}
invalidMethod := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusMethodNotAllowed,
expectedResponse: ErrInvalidMethod,
}
invalidRequest := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusBadRequest,
expectedResponse: "Invalid request",
}
success1 := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8},
}
success2 := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9},
}
return []scheduleWorkerConfigTest_workerRequestTest{
{
name: "bad verb",
request: schedulerWorkerConfigTest_testRequest{
verb: "FOO",
aclToken: "",
requestBody: "",
},
whenACLNotEnabled: invalidMethod,
whenACLEnabled: invalidMethod,
},
{
name: "get without token",
request: schedulerWorkerConfigTest_testRequest{
verb: "GET",
aclToken: "",
requestBody: "",
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
},
{
name: "get with management token",
request: schedulerWorkerConfigTest_testRequest{
verb: "GET",
aclToken: "management",
requestBody: "",
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
},
{
name: "get with read token",
request: schedulerWorkerConfigTest_testRequest{
verb: "GET",
aclToken: "agent_read",
requestBody: "",
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
},
{
name: "get with write token",
request: schedulerWorkerConfigTest_testRequest{
verb: "GET",
aclToken: "agent_write",
requestBody: "",
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
},
{
name: "post with no token",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
},
{
name: "put with no token",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
},
{
name: "post with invalid token",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "node_write",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
},
{
name: "put with invalid token",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "node_write",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
},
{
name: "post with valid token",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "agent_write",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success2,
whenACLEnabled: success2,
},
{
name: "put with valid token",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "agent_write",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
},
{
name: "post with good token and bad value",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "agent_write",
requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
{
name: "post with bad token and bad value",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "node_write",
requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: forbidden,
},
{
name: "put with good token and bad value",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "agent_write",
requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
{
name: "put with bad token and bad value",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "node_write",
requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: forbidden,
},
{
name: "post with bad json",
request: schedulerWorkerConfigTest_testRequest{
verb: "POST",
aclToken: "agent_write",
requestBody: `{num_schedulers:-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
{
name: "put with bad json",
request: schedulerWorkerConfigTest_testRequest{
verb: "PUT",
aclToken: "agent_write",
requestBody: `{num_schedulers:-1,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
}
}
func TestHTTP_AgentSchedulerWorkerConfigRequest_NoACL(t *testing.T) {
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}
testFn := func(s *TestAgent) {
for _, tc := range schedulerWorkerConfigTest_testCases() {
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers/config", bytes.NewReader([]byte(tc.request.requestBody)))
require.NoError(t, err)
respW := httptest.NewRecorder()
workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req)
switch tc.whenACLNotEnabled.expectedResponseCode {
case http.StatusBadRequest, http.StatusForbidden, http.StatusMethodNotAllowed:
schedulerWorkerTest_parseError(t, false, tc, workersI, err)
case http.StatusOK:
schedulerWorkerTest_parseSuccess(t, false, tc, workersI, err)
default:
require.Failf(t, "unexpected status code", "code: %v", tc.whenACLNotEnabled.expectedResponseCode)
}
})
}
}
httpTest(t, configFn, testFn)
}
func TestHTTP_AgentSchedulerWorkerConfigRequest_ACL(t *testing.T) {
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}
tests := func(s *TestAgent) {
state := s.Agent.server.State()
var tokens map[string]*structs.ACLToken = make(map[string]*structs.ACLToken)
tokens["management"] = s.RootToken
tokens["agent_read"] = mock.CreatePolicyAndToken(t, state, 1005, "agent_read", mock.AgentPolicy(acl.PolicyRead))
tokens["agent_write"] = mock.CreatePolicyAndToken(t, state, 1007, "agent_write", mock.AgentPolicy(acl.PolicyWrite))
tokens["node_write"] = mock.CreatePolicyAndToken(t, state, 1009, "node_write", mock.NodePolicy(acl.PolicyWrite))
for _, tc := range schedulerWorkerConfigTest_testCases() {
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers", bytes.NewReader([]byte(tc.request.requestBody)))
if tc.request.aclToken != "" {
setToken(req, tokens[tc.request.aclToken])
}
require.NoError(t, err)
respW := httptest.NewRecorder()
workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req)
switch tc.whenACLEnabled.expectedResponseCode {
case http.StatusOK:
schedulerWorkerTest_parseSuccess(t, true, tc, workersI, err)
case http.StatusBadRequest, http.StatusForbidden, http.StatusMethodNotAllowed:
schedulerWorkerTest_parseError(t, true, tc, workersI, err)
default:
require.Failf(t, "unexpected status code", "code: %v", tc.whenACLEnabled.expectedResponseCode)
}
})
}
}
httpACLTest(t, configFn, tests)
}
func schedulerWorkerTest_parseSuccess(t *testing.T, isACLEnabled bool, tc scheduleWorkerConfigTest_workerRequestTest, workersI interface{}, err error) {
require.NoError(t, err)
require.NotNil(t, workersI)
testExpect := tc.whenACLNotEnabled
if isACLEnabled {
testExpect = tc.whenACLNotEnabled
}
// test into the response when we expect an okay
tcConfig, ok := testExpect.expectedResponse.(*api.AgentSchedulerWorkerConfigResponse)
require.True(t, ok, "expected response malformed - this is an issue with a test case.")
workersConfig, ok := workersI.(*api.AgentSchedulerWorkerConfigResponse)
require.True(t, ok, "response can not cast to an agentSchedulerWorkerConfig")
require.NotNil(t, workersConfig)
require.Equal(t, tcConfig.NumSchedulers, workersConfig.NumSchedulers)
require.ElementsMatch(t, tcConfig.EnabledSchedulers, workersConfig.EnabledSchedulers)
}
// schedulerWorkerTest_parseError parses the error response given
// from the API call to make sure that it's a coded error and is the
// expected value from the test case
func schedulerWorkerTest_parseError(t *testing.T, isACLEnabled bool, tc scheduleWorkerConfigTest_workerRequestTest, workersI interface{}, err error) {
require.Error(t, err)
require.Nil(t, workersI)
codedError, ok := err.(HTTPCodedError)
require.True(t, ok, "expected an HTTPCodedError")
testExpect := tc.whenACLNotEnabled
if isACLEnabled {
testExpect = tc.whenACLEnabled
}
require.Equal(t, testExpect.expectedResponseCode, codedError.Code())
// this is a relaxed test to allow us to not have to create a case
// for concatenated error strings.
require.Contains(t, codedError.Error(), testExpect.expectedResponse)
}
func TestHTTP_AgentSchedulerWorkerInfoRequest_Client(t *testing.T) {
verbs := []string{"GET", "POST", "PUT"}
path := "schedulers"
for _, verb := range verbs {
t.Run(verb, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s.Agent.server = nil
req, err := http.NewRequest(verb, fmt.Sprintf("/v1/agent/%v", path), nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
_, err = s.Server.AgentSchedulerWorkerInfoRequest(respW, req)
require.Error(t, err)
codedErr, ok := err.(HTTPCodedError)
require.True(t, ok, "expected a HTTPCodedError")
require.Equal(t, http.StatusBadRequest, codedErr.Code())
require.Equal(t, ErrServerOnly, codedErr.Error())
})
})
}
}
func TestHTTP_AgentSchedulerWorkerConfigRequest_Client(t *testing.T) {
verbs := []string{"GET", "POST", "PUT"}
path := "schedulers/config"
for _, verb := range verbs {
t.Run(verb, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s.Agent.server = nil
req, err := http.NewRequest(verb, fmt.Sprintf("/v1/agent/%v", path), nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
_, err = s.Server.AgentSchedulerWorkerInfoRequest(respW, req)
require.Error(t, err)
codedErr, ok := err.(HTTPCodedError)
require.True(t, ok, "expected a HTTPCodedError")
require.Equal(t, http.StatusBadRequest, codedErr.Code())
require.Equal(t, ErrServerOnly, codedErr.Error())
})
})
}
}

View File

@ -36,6 +36,10 @@ const (
// endpoint // endpoint
ErrEntOnly = "Nomad Enterprise only endpoint" ErrEntOnly = "Nomad Enterprise only endpoint"
// ErrServerOnly is the error text returned if accessing a server only
// endpoint
ErrServerOnly = "Server only endpoint"
// ContextKeyReqID is a unique ID for a given request // ContextKeyReqID is a unique ID for a given request
ContextKeyReqID = "requestID" ContextKeyReqID = "requestID"
@ -311,6 +315,8 @@ func (s HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/members", s.wrap(s.AgentMembersRequest)) s.mux.HandleFunc("/v1/agent/members", s.wrap(s.AgentMembersRequest))
s.mux.HandleFunc("/v1/agent/force-leave", s.wrap(s.AgentForceLeaveRequest)) s.mux.HandleFunc("/v1/agent/force-leave", s.wrap(s.AgentForceLeaveRequest))
s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest)) s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest))
s.mux.HandleFunc("/v1/agent/schedulers", s.wrap(s.AgentSchedulerWorkerInfoRequest))
s.mux.HandleFunc("/v1/agent/schedulers/config", s.wrap(s.AgentSchedulerWorkerConfigRequest))
s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest)) s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest))
s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest)) s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest))
s.mux.HandleFunc("/v1/agent/host", s.wrap(s.AgentHostRequest)) s.mux.HandleFunc("/v1/agent/host", s.wrap(s.AgentHostRequest))

View File

@ -230,9 +230,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Disable workers to free half the cores for use in the plan queue and // Disable workers to free half the cores for use in the plan queue and
// evaluation broker // evaluation broker
for _, w := range s.pausableWorkers() { s.handlePausableWorkers(true)
w.SetPause(true)
}
// Initialize and start the autopilot routine // Initialize and start the autopilot routine
s.getOrCreateAutopilotConfig() s.getOrCreateAutopilotConfig()
@ -442,6 +440,16 @@ ERR_WAIT:
} }
} }
func (s *Server) handlePausableWorkers(isLeader bool) {
for _, w := range s.pausableWorkers() {
if isLeader {
w.Pause()
} else {
w.Resume()
}
}
}
// diffNamespaces is used to perform a two-way diff between the local namespaces // diffNamespaces is used to perform a two-way diff between the local namespaces
// and the remote namespaces to determine which namespaces need to be deleted or // and the remote namespaces to determine which namespaces need to be deleted or
// updated. // updated.
@ -1081,9 +1089,7 @@ func (s *Server) revokeLeadership() error {
} }
// Unpause our worker if we paused previously // Unpause our worker if we paused previously
for _, w := range s.pausableWorkers() { s.handlePausableWorkers(false)
w.SetPause(false)
}
return nil return nil
} }

View File

@ -1328,25 +1328,31 @@ func TestLeader_PausingWorkers(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s1.RPC)
require.Len(t, s1.workers, 12) require.Len(t, s1.workers, 12)
pausedWorkers := func() int { // this satisfies the require.Eventually test interface
c := 0 checkPaused := func(count int) func() bool {
for _, w := range s1.workers { return func() bool {
w.pauseLock.Lock() pausedWorkers := func() int {
if w.paused { c := 0
c++ for _, w := range s1.workers {
if w.IsPaused() {
c++
}
}
return c
} }
w.pauseLock.Unlock()
return pausedWorkers() == count
} }
return c
} }
// pause 3/4 of the workers // acquiring leadership should have paused 3/4 of the workers
require.Equal(t, 9, pausedWorkers()) require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change")
err := s1.revokeLeadership() err := s1.revokeLeadership()
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, pausedWorkers()) // unpausing is a relatively quick activity
require.Eventually(t, checkPaused(0), 50*time.Millisecond, 10*time.Millisecond, "scheduler workers should have unpaused after losing leadership")
} }
// Test doing an inplace upgrade on a server from raft protocol 2 to 3 // Test doing an inplace upgrade on a server from raft protocol 2 to 3

View File

@ -226,7 +226,9 @@ type Server struct {
vault VaultClient vault VaultClient
// Worker used for processing // Worker used for processing
workers []*Worker workers []*Worker
workerLock sync.RWMutex
workerConfigLock sync.RWMutex
// aclCache is used to maintain the parsed ACL objects // aclCache is used to maintain the parsed ACL objects
aclCache *lru.TwoQueueCache aclCache *lru.TwoQueueCache
@ -399,7 +401,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
} }
// Initialize the scheduling workers // Initialize the scheduling workers
if err := s.setupWorkers(); err != nil { if err := s.setupWorkers(s.shutdownCtx); err != nil {
s.Shutdown() s.Shutdown()
s.logger.Error("failed to start workers", "error", err) s.logger.Error("failed to start workers", "error", err)
return nil, fmt.Errorf("Failed to start workers: %v", err) return nil, fmt.Errorf("Failed to start workers: %v", err)
@ -558,7 +560,7 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
// Check if we can reload the RPC listener // Check if we can reload the RPC listener
if s.rpcListener == nil || s.rpcCancel == nil { if s.rpcListener == nil || s.rpcCancel == nil {
s.logger.Warn("unable to reload configuration due to uninitialized rpc listner") s.logger.Warn("unable to reload configuration due to uninitialized rpc listener")
return fmt.Errorf("can't reload uninitialized RPC listener") return fmt.Errorf("can't reload uninitialized RPC listener")
} }
@ -809,6 +811,15 @@ func (s *Server) Reload(newConfig *Config) error {
s.EnterpriseState.ReloadLicense(newConfig) s.EnterpriseState.ReloadLicense(newConfig)
} }
// Because this is a new configuration, we extract the worker pool arguments without acquiring a lock
workerPoolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(newConfig)
if reload, newVals := shouldReloadSchedulers(s, workerPoolArgs); reload {
if newVals.IsValid() {
reloadSchedulers(s, newVals)
}
reloadSchedulers(s, newVals)
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
@ -1430,17 +1441,165 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
return serf.Create(conf) return serf.Create(conf)
} }
// shouldReloadSchedulers checks the new config to determine if the scheduler worker pool
// needs to be updated. If so, returns true and a pointer to a populated SchedulerWorkerPoolArgs
func shouldReloadSchedulers(s *Server, newPoolArgs *SchedulerWorkerPoolArgs) (bool, *SchedulerWorkerPoolArgs) {
s.workerConfigLock.RLock()
defer s.workerConfigLock.RUnlock()
newSchedulers := make([]string, len(newPoolArgs.EnabledSchedulers))
copy(newSchedulers, newPoolArgs.EnabledSchedulers)
sort.Strings(newSchedulers)
if s.config.NumSchedulers != newPoolArgs.NumSchedulers {
return true, newPoolArgs
}
oldSchedulers := make([]string, len(s.config.EnabledSchedulers))
copy(oldSchedulers, s.config.EnabledSchedulers)
sort.Strings(oldSchedulers)
for i, v := range newSchedulers {
if oldSchedulers[i] != v {
return true, newPoolArgs
}
}
return false, nil
}
// SchedulerWorkerPoolArgs are the two key configuration options for a Nomad server's
// scheduler worker pool. Before using, you should always verify that they are rational
// using IsValid() or IsInvalid()
type SchedulerWorkerPoolArgs struct {
NumSchedulers int
EnabledSchedulers []string
}
// IsInvalid returns true when the SchedulerWorkerPoolArgs.IsValid is false
func (swpa SchedulerWorkerPoolArgs) IsInvalid() bool {
return !swpa.IsValid()
}
// IsValid verifies that the pool arguments are valid. That is, they have a non-negative
// numSchedulers value and the enabledSchedulers list has _core and only refers to known
// schedulers.
func (swpa SchedulerWorkerPoolArgs) IsValid() bool {
if swpa.NumSchedulers < 0 {
// the pool has to be non-negative
return false
}
// validate the scheduler list against the builtin types and _core
foundCore := false
for _, sched := range swpa.EnabledSchedulers {
if sched == structs.JobTypeCore {
foundCore = true
continue // core is not in the BuiltinSchedulers map, so we need to skip that check
}
if _, ok := scheduler.BuiltinSchedulers[sched]; !ok {
return false // found an unknown scheduler in the list; bailing out
}
}
return foundCore
}
// Copy returns a clone of a SchedulerWorkerPoolArgs struct. Concurrent access
// concerns should be managed by the caller.
func (swpa SchedulerWorkerPoolArgs) Copy() SchedulerWorkerPoolArgs {
out := SchedulerWorkerPoolArgs{
NumSchedulers: swpa.NumSchedulers,
EnabledSchedulers: make([]string, len(swpa.EnabledSchedulers)),
}
copy(out.EnabledSchedulers, swpa.EnabledSchedulers)
return out
}
func getSchedulerWorkerPoolArgsFromConfigLocked(c *Config) *SchedulerWorkerPoolArgs {
return &SchedulerWorkerPoolArgs{
NumSchedulers: c.NumSchedulers,
EnabledSchedulers: c.EnabledSchedulers,
}
}
// GetSchedulerWorkerInfo returns a slice of WorkerInfos from all of
// the running scheduler workers.
func (s *Server) GetSchedulerWorkersInfo() []WorkerInfo {
s.workerLock.RLock()
defer s.workerLock.RUnlock()
out := make([]WorkerInfo, len(s.workers))
for i := 0; i < len(s.workers); i = i + 1 {
workerInfo := s.workers[i].Info()
out[i] = workerInfo.Copy()
}
return out
}
// GetSchedulerWorkerConfig returns a clean copy of the server's current scheduler
// worker config.
func (s *Server) GetSchedulerWorkerConfig() SchedulerWorkerPoolArgs {
s.workerConfigLock.RLock()
defer s.workerConfigLock.RUnlock()
return getSchedulerWorkerPoolArgsFromConfigLocked(s.config).Copy()
}
func (s *Server) SetSchedulerWorkerConfig(newArgs SchedulerWorkerPoolArgs) SchedulerWorkerPoolArgs {
if reload, newVals := shouldReloadSchedulers(s, &newArgs); reload {
if newVals.IsValid() {
reloadSchedulers(s, newVals)
}
}
return s.GetSchedulerWorkerConfig()
}
// reloadSchedulers validates the passed scheduler worker pool arguments, locks the
// workerLock, applies the new values to the s.config, and restarts the pool
func reloadSchedulers(s *Server, newArgs *SchedulerWorkerPoolArgs) {
if newArgs == nil || newArgs.IsInvalid() {
s.logger.Info("received invalid arguments for scheduler pool reload; ignoring")
return
}
// reload will modify the server.config so it needs a write lock
s.workerConfigLock.Lock()
defer s.workerConfigLock.Unlock()
// reload modifies the worker slice so it needs a write lock
s.workerLock.Lock()
defer s.workerLock.Unlock()
// TODO: If EnabledSchedulers didn't change, we can scale rather than drain and rebuild
s.config.NumSchedulers = newArgs.NumSchedulers
s.config.EnabledSchedulers = newArgs.EnabledSchedulers
s.setupNewWorkersLocked()
}
// setupWorkers is used to start the scheduling workers // setupWorkers is used to start the scheduling workers
func (s *Server) setupWorkers() error { func (s *Server) setupWorkers(ctx context.Context) error {
poolArgs := s.GetSchedulerWorkerConfig()
// we will be writing to the worker slice
s.workerLock.Lock()
defer s.workerLock.Unlock()
return s.setupWorkersLocked(ctx, poolArgs)
}
// setupWorkersLocked directly manipulates the server.config, so it is not safe to
// call concurrently. Use setupWorkers() or call this with server.workerLock set.
func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorkerPoolArgs) error {
// Check if all the schedulers are disabled // Check if all the schedulers are disabled
if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 { if len(poolArgs.EnabledSchedulers) == 0 || poolArgs.NumSchedulers == 0 {
s.logger.Warn("no enabled schedulers") s.logger.Warn("no enabled schedulers")
return nil return nil
} }
// Check if the core scheduler is not enabled // Check if the core scheduler is not enabled
foundCore := false foundCore := false
for _, sched := range s.config.EnabledSchedulers { for _, sched := range poolArgs.EnabledSchedulers {
if sched == structs.JobTypeCore { if sched == structs.JobTypeCore {
foundCore = true foundCore = true
continue continue
@ -1454,18 +1613,58 @@ func (s *Server) setupWorkers() error {
return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore) return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore)
} }
s.logger.Info("starting scheduling worker(s)", "num_workers", poolArgs.NumSchedulers, "schedulers", poolArgs.EnabledSchedulers)
// Start the workers // Start the workers
for i := 0; i < s.config.NumSchedulers; i++ { for i := 0; i < s.config.NumSchedulers; i++ {
if w, err := NewWorker(s); err != nil { if w, err := NewWorker(ctx, s, poolArgs); err != nil {
return err return err
} else { } else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)
s.workers = append(s.workers, w) s.workers = append(s.workers, w)
} }
} }
s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers) s.logger.Info("started scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers)
return nil return nil
} }
// setupNewWorkersLocked directly manipulates the server.config, so it is not safe to
// call concurrently. Use reloadWorkers() or call this with server.workerLock set.
func (s *Server) setupNewWorkersLocked() error {
// make a copy of the s.workers array so we can safely stop those goroutines asynchronously
oldWorkers := make([]*Worker, len(s.workers))
defer s.stopOldWorkers(oldWorkers)
for i, w := range s.workers {
oldWorkers[i] = w
}
s.logger.Info(fmt.Sprintf("marking %v current schedulers for shutdown", len(oldWorkers)))
// build a clean backing array and call setupWorkersLocked like setupWorkers
// does in the normal startup path
s.workers = make([]*Worker, 0, s.config.NumSchedulers)
poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s.config).Copy()
err := s.setupWorkersLocked(s.shutdownCtx, poolArgs)
if err != nil {
return err
}
// if we're the leader, we need to pause all of the pausable workers.
s.handlePausableWorkers(s.IsLeader())
return nil
}
// stopOldWorkers is called once setupNewWorkers has created the new worker
// array to asynchronously stop each of the old workers individually.
func (s *Server) stopOldWorkers(oldWorkers []*Worker) {
workerCount := len(oldWorkers)
for i, w := range oldWorkers {
s.logger.Debug("stopping old scheduling worker", "id", w.ID(), "index", i+1, "of", workerCount)
go w.Stop()
}
}
// numPeers is used to check on the number of known peers, including the local // numPeers is used to check on the number of known peers, including the local
// node. // node.
func (s *Server) numPeers() (int, error) { func (s *Server) numPeers() (int, error) {

View File

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -540,13 +541,13 @@ func TestServer_InvalidSchedulers(t *testing.T) {
} }
config.EnabledSchedulers = []string{"batch"} config.EnabledSchedulers = []string{"batch"}
err := s.setupWorkers() err := s.setupWorkers(s.shutdownCtx)
require.NotNil(err) require.NotNil(err)
require.Contains(err.Error(), "scheduler not enabled") require.Contains(err.Error(), "scheduler not enabled")
// Set the config to have an unknown scheduler // Set the config to have an unknown scheduler
config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"} config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"}
err = s.setupWorkers() err = s.setupWorkers(s.shutdownCtx)
require.NotNil(err) require.NotNil(err)
require.Contains(err.Error(), "foo") require.Contains(err.Error(), "foo")
} }
@ -577,3 +578,69 @@ func TestServer_RPCNameAndRegionValidation(t *testing.T) {
tc.name, tc.region, tc.expected) tc.name, tc.region, tc.expected)
} }
} }
func TestServer_ReloadSchedulers_NumSchedulers(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 8
})
defer cleanupS1()
require.Equal(t, s1.config.NumSchedulers, len(s1.workers))
config := DefaultConfig()
config.NumSchedulers = 4
require.NoError(t, s1.Reload(config))
time.Sleep(1 * time.Second)
require.Equal(t, config.NumSchedulers, len(s1.workers))
}
func TestServer_ReloadSchedulers_EnabledSchedulers(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnabledSchedulers = []string{structs.JobTypeCore, structs.JobTypeSystem}
})
defer cleanupS1()
require.Equal(t, s1.config.NumSchedulers, len(s1.workers))
config := DefaultConfig()
config.EnabledSchedulers = []string{structs.JobTypeCore, structs.JobTypeSystem, structs.JobTypeBatch}
require.NoError(t, s1.Reload(config))
time.Sleep(1 * time.Second)
require.Equal(t, config.NumSchedulers, len(s1.workers))
require.ElementsMatch(t, config.EnabledSchedulers, s1.GetSchedulerWorkerConfig().EnabledSchedulers)
}
func TestServer_ReloadSchedulers_InvalidSchedulers(t *testing.T) {
t.Parallel()
// Set the config to not have the core scheduler
config := DefaultConfig()
logger := testlog.HCLogger(t)
s := &Server{
config: config,
logger: logger,
}
s.config.NumSchedulers = 0
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()
config.EnabledSchedulers = []string{"_core", "batch"}
err := s.setupWorkers(s.shutdownCtx)
require.Nil(t, err)
origWC := s.GetSchedulerWorkerConfig()
reloadSchedulers(s, &SchedulerWorkerPoolArgs{NumSchedulers: config.NumSchedulers, EnabledSchedulers: []string{"batch"}})
currentWC := s.GetSchedulerWorkerConfig()
require.Equal(t, origWC, currentWC)
// Set the config to have an unknown scheduler
reloadSchedulers(s, &SchedulerWorkerPoolArgs{NumSchedulers: config.NumSchedulers, EnabledSchedulers: []string{"_core", "foo"}})
currentWC = s.GetSchedulerWorkerConfig()
require.Equal(t, origWC, currentWC)
}

View File

@ -56,7 +56,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
nodeNum := atomic.AddUint32(&nodeNumber, 1) nodeNum := atomic.AddUint32(&nodeNumber, 1)
config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)
// configer logger // configure logger
level := hclog.Trace level := hclog.Trace
if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" { if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" {
level = hclog.LevelFromString(envLogLevel) level = hclog.LevelFromString(envLogLevel)

View File

@ -2,6 +2,7 @@ package nomad
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
@ -10,6 +11,7 @@ import (
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/scheduler"
@ -46,6 +48,35 @@ const (
dequeueErrGrace = 10 * time.Second dequeueErrGrace = 10 * time.Second
) )
type WorkerStatus int
//go:generate stringer -trimprefix=Worker -output worker_string_workerstatus.go -linecomment -type=WorkerStatus
const (
WorkerUnknownStatus WorkerStatus = iota // Unknown
WorkerStarting
WorkerStarted
WorkerPausing
WorkerPaused
WorkerResuming
WorkerStopping
WorkerStopped
)
type SchedulerWorkerStatus int
//go:generate stringer -trimprefix=Workload -output worker_string_schedulerworkerstatus.go -linecomment -type=SchedulerWorkerStatus
const (
WorkloadUnknownStatus SchedulerWorkerStatus = iota
WorkloadRunning
WorkloadWaitingToDequeue
WorkloadWaitingForRaft
WorkloadScheduling
WorkloadSubmitting
WorkloadBackoff
WorkloadStopped
WorkloadPaused
)
// Worker is a single threaded scheduling worker. There may be multiple // Worker is a single threaded scheduling worker. There may be multiple
// running per server (leader or follower). They are responsible for dequeuing // running per server (leader or follower). They are responsible for dequeuing
// pending evaluations, invoking schedulers, plan submission and the // pending evaluations, invoking schedulers, plan submission and the
@ -55,13 +86,25 @@ type Worker struct {
srv *Server srv *Server
logger log.Logger logger log.Logger
start time.Time start time.Time
id string
paused bool status WorkerStatus
workloadStatus SchedulerWorkerStatus
statusLock sync.RWMutex
pauseFlag bool
pauseLock sync.Mutex pauseLock sync.Mutex
pauseCond *sync.Cond pauseCond *sync.Cond
ctx context.Context
cancelFn context.CancelFunc
failures uint // the Server.Config.EnabledSchedulers value is not safe for concurrent access, so
// the worker needs a cached copy of it. Workers are stopped if this value changes.
enabledSchedulers []string
// failures is the count of errors encountered while dequeueing evaluations
// and is used to calculate backoff.
failures uint
evalToken string evalToken string
// snapshotIndex is the index of the snapshot in which the scheduler was // snapshotIndex is the index of the snapshot in which the scheduler was
@ -70,70 +113,321 @@ type Worker struct {
snapshotIndex uint64 snapshotIndex uint64
} }
// NewWorker starts a new worker associated with the given server // NewWorker starts a new scheduler worker associated with the given server
func NewWorker(srv *Server) (*Worker, error) { func NewWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) (*Worker, error) {
w := &Worker{ w := newWorker(ctx, srv, args)
srv: srv, w.Start()
logger: srv.logger.ResetNamed("worker"),
start: time.Now(),
}
w.pauseCond = sync.NewCond(&w.pauseLock)
go w.run()
return w, nil return w, nil
} }
// SetPause is used to pause or unpause a worker // _newWorker creates a worker without calling its Start func. This is useful for testing.
func (w *Worker) SetPause(p bool) { func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) *Worker {
w.pauseLock.Lock() w := &Worker{
w.paused = p id: uuid.Generate(),
w.pauseLock.Unlock() srv: srv,
if !p { start: time.Now(),
status: WorkerStarting,
enabledSchedulers: make([]string, len(args.EnabledSchedulers)),
}
copy(w.enabledSchedulers, args.EnabledSchedulers)
w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id)
w.pauseCond = sync.NewCond(&w.pauseLock)
w.ctx, w.cancelFn = context.WithCancel(ctx)
return w
}
// ID returns a string ID for the worker.
func (w *Worker) ID() string {
return w.id
}
// Start transitions a worker to the starting state. Check
// to see if it paused using IsStarted()
func (w *Worker) Start() {
w.setStatus(WorkerStarting)
go w.run()
}
// Pause transitions a worker to the pausing state. Check
// to see if it paused using IsPaused()
func (w *Worker) Pause() {
if w.isPausable() {
w.setStatus(WorkerPausing)
w.setPauseFlag(true)
}
}
// Resume transitions a worker to the resuming state. Check
// to see if the worker restarted by calling IsStarted()
func (w *Worker) Resume() {
if w.IsPaused() {
w.setStatus(WorkerResuming)
w.setPauseFlag(false)
w.pauseCond.Broadcast() w.pauseCond.Broadcast()
} }
} }
// checkPaused is used to park the worker when paused // Resume transitions a worker to the stopping state. Check
func (w *Worker) checkPaused() { // to see if the worker stopped by calling IsStopped()
func (w *Worker) Stop() {
w.setStatus(WorkerStopping)
w.shutdown()
}
// IsStarted returns a boolean indicating if this worker has been started.
func (w *Worker) IsStarted() bool {
return w.GetStatus() == WorkerStarted
}
// IsPaused returns a boolean indicating if this worker has been paused.
func (w *Worker) IsPaused() bool {
return w.GetStatus() == WorkerPaused
}
// IsStopped returns a boolean indicating if this worker has been stopped.
func (w *Worker) IsStopped() bool {
return w.GetStatus() == WorkerStopped
}
func (w *Worker) isPausable() bool {
w.statusLock.RLock()
defer w.statusLock.RUnlock()
switch w.status {
case WorkerPausing, WorkerPaused, WorkerStopping, WorkerStopped:
return false
default:
return true
}
}
// GetStatus returns the status of the Worker
func (w *Worker) GetStatus() WorkerStatus {
w.statusLock.RLock()
defer w.statusLock.RUnlock()
return w.status
}
// setStatuses is used internally to the worker to update the
// status of the worker and workload at one time, since some
// transitions need to update both values using the same lock.
func (w *Worker) setStatuses(newWorkerStatus WorkerStatus, newWorkloadStatus SchedulerWorkerStatus) {
w.statusLock.Lock()
defer w.statusLock.Unlock()
w.setWorkerStatusLocked(newWorkerStatus)
w.setWorkloadStatusLocked(newWorkloadStatus)
}
// setStatus is used internally to the worker to update the
// status of the worker based on calls to the Worker API. For
// atomically updating the scheduler status and the workload
// status, use `setStatuses`.
func (w *Worker) setStatus(newStatus WorkerStatus) {
w.statusLock.Lock()
defer w.statusLock.Unlock()
w.setWorkerStatusLocked(newStatus)
}
func (w *Worker) setWorkerStatusLocked(newStatus WorkerStatus) {
if newStatus == w.status {
return
}
w.logger.Trace("changed worker status", "from", w.status, "to", newStatus)
w.status = newStatus
}
// GetStatus returns the status of the Worker's Workload.
func (w *Worker) GetWorkloadStatus() SchedulerWorkerStatus {
w.statusLock.RLock()
defer w.statusLock.RUnlock()
return w.workloadStatus
}
// setWorkloadStatus is used internally to the worker to update the
// status of the worker based updates from the workload.
func (w *Worker) setWorkloadStatus(newStatus SchedulerWorkerStatus) {
w.statusLock.Lock()
defer w.statusLock.Unlock()
w.setWorkloadStatusLocked(newStatus)
}
func (w *Worker) setWorkloadStatusLocked(newStatus SchedulerWorkerStatus) {
if newStatus == w.workloadStatus {
return
}
w.logger.Trace("changed workload status", "from", w.workloadStatus, "to", newStatus)
w.workloadStatus = newStatus
}
type WorkerInfo struct {
ID string `json:"id"`
EnabledSchedulers []string `json:"enabled_schedulers"`
Started time.Time `json:"started"`
Status string `json:"status"`
WorkloadStatus string `json:"workload_status"`
}
func (w WorkerInfo) Copy() WorkerInfo {
out := WorkerInfo{
ID: w.ID,
EnabledSchedulers: make([]string, len(w.EnabledSchedulers)),
Started: w.Started,
Status: w.Status,
WorkloadStatus: w.WorkloadStatus,
}
copy(out.EnabledSchedulers, w.EnabledSchedulers)
return out
}
func (w WorkerInfo) String() string {
// lazy implementation of WorkerInfo to string
out, _ := json.Marshal(w)
return string(out)
}
func (w *Worker) Info() WorkerInfo {
w.pauseLock.Lock() w.pauseLock.Lock()
for w.paused { defer w.pauseLock.Unlock()
out := WorkerInfo{
ID: w.id,
Status: w.status.String(),
WorkloadStatus: w.workloadStatus.String(),
EnabledSchedulers: make([]string, len(w.enabledSchedulers)),
}
out.Started = w.start
copy(out.EnabledSchedulers, w.enabledSchedulers)
return out
}
// ----------------------------------
// Pause Implementation
// These functions are used to support the worker's pause behaviors.
// ----------------------------------
func (w *Worker) setPauseFlag(pause bool) {
w.pauseLock.Lock()
defer w.pauseLock.Unlock()
w.pauseFlag = pause
}
// maybeWait is responsible for making the transition from `pausing`
// to `paused`, waiting, and then transitioning back to the running
// values.
func (w *Worker) maybeWait() {
w.pauseLock.Lock()
defer w.pauseLock.Unlock()
if !w.pauseFlag {
return
}
w.statusLock.Lock()
w.status = WorkerPaused
originalWorkloadStatus := w.workloadStatus
w.workloadStatus = WorkloadPaused
w.logger.Trace("changed workload status", "from", originalWorkloadStatus, "to", w.workloadStatus)
w.statusLock.Unlock()
for w.pauseFlag {
w.pauseCond.Wait() w.pauseCond.Wait()
} }
w.pauseLock.Unlock()
w.statusLock.Lock()
w.logger.Trace("changed workload status", "from", w.workloadStatus, "to", originalWorkloadStatus)
w.workloadStatus = originalWorkloadStatus
// only reset the worker status if the worker is not resuming to stop the paused workload.
if w.status != WorkerStopping {
w.logger.Trace("changed worker status", "from", w.status, "to", WorkerStarted)
w.status = WorkerStarted
}
w.statusLock.Unlock()
} }
// Shutdown is used to signal that the worker should shutdown.
func (w *Worker) shutdown() {
w.pauseLock.Lock()
wasPaused := w.pauseFlag
w.pauseFlag = false
w.pauseLock.Unlock()
w.logger.Trace("shutdown request received")
w.cancelFn()
if wasPaused {
w.pauseCond.Broadcast()
}
}
// markStopped is used to mark the worker and workload as stopped. It should be called in a
// defer immediately upon entering the run() function.
func (w *Worker) markStopped() {
w.setStatuses(WorkerStopped, WorkloadStopped)
w.logger.Debug("stopped")
}
func (w *Worker) workerShuttingDown() bool {
select {
case <-w.ctx.Done():
return true
default:
return false
}
}
// ----------------------------------
// Workload behavior code
// ----------------------------------
// run is the long-lived goroutine which is used to run the worker // run is the long-lived goroutine which is used to run the worker
func (w *Worker) run() { func (w *Worker) run() {
defer func() {
w.markStopped()
}()
w.setStatuses(WorkerStarted, WorkloadRunning)
w.logger.Debug("running")
for { for {
// Check to see if the context has been cancelled. Server shutdown and Shutdown()
// should do this.
if w.workerShuttingDown() {
return
}
// Dequeue a pending evaluation // Dequeue a pending evaluation
eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout) eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout)
if shutdown { if shutdown {
return return
} }
// Check for a shutdown // since dequeue takes time, we could have shutdown the server after getting an eval that
// needs to be nacked before we exit. Explicitly checking the server to allow this eval
// to be processed on worker shutdown.
if w.srv.IsShutdown() { if w.srv.IsShutdown() {
w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval)) w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval))
w.sendNack(eval.ID, token) w.sendNack(eval, token)
return return
} }
// Wait for the raft log to catchup to the evaluation // Wait for the raft log to catchup to the evaluation
w.setWorkloadStatus(WorkloadWaitingForRaft)
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
if err != nil { if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendNack(eval.ID, token) w.sendNack(eval, token)
continue continue
} }
// Invoke the scheduler to determine placements // Invoke the scheduler to determine placements
w.setWorkloadStatus(WorkloadScheduling)
if err := w.invokeScheduler(snap, eval, token); err != nil { if err := w.invokeScheduler(snap, eval, token); err != nil {
w.logger.Error("error invoking scheduler", "error", err) w.logger.Error("error invoking scheduler", "error", err)
w.sendNack(eval.ID, token) w.sendNack(eval, token)
continue continue
} }
// Complete the evaluation // Complete the evaluation
w.sendAck(eval.ID, token) w.sendAck(eval, token)
} }
} }
@ -143,7 +437,7 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) (
eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) { eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) {
// Setup the request // Setup the request
req := structs.EvalDequeueRequest{ req := structs.EvalDequeueRequest{
Schedulers: w.srv.config.EnabledSchedulers, Schedulers: w.enabledSchedulers,
Timeout: timeout, Timeout: timeout,
SchedulerVersion: scheduler.SchedulerVersion, SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
@ -153,15 +447,20 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) (
var resp structs.EvalDequeueResponse var resp structs.EvalDequeueResponse
REQ: REQ:
// Check if we are paused // Wait inside this function if the worker is paused.
w.checkPaused() w.maybeWait()
// Immediately check to see if the worker has been shutdown.
if w.workerShuttingDown() {
return nil, "", 0, true
}
// Make a blocking RPC // Make a blocking RPC
start := time.Now() start := time.Now()
w.setWorkloadStatus(WorkloadWaitingToDequeue)
err := w.srv.RPC("Eval.Dequeue", &req, &resp) err := w.srv.RPC("Eval.Dequeue", &req, &resp)
metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start) metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start)
if err != nil { if err != nil {
if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() { if time.Since(w.start) > dequeueErrGrace && !w.workerShuttingDown() {
w.logger.Error("failed to dequeue evaluation", "error", err) w.logger.Error("failed to dequeue evaluation", "error", err)
} }
@ -182,25 +481,21 @@ REQ:
// Check if we got a response // Check if we got a response
if resp.Eval != nil { if resp.Eval != nil {
w.logger.Debug("dequeued evaluation", "eval_id", resp.Eval.ID) w.logger.Debug("dequeued evaluation", "eval_id", resp.Eval.ID, "type", resp.Eval.Type, "namespace", resp.Eval.Namespace, "job_id", resp.Eval.JobID, "node_id", resp.Eval.NodeID, "triggered_by", resp.Eval.TriggeredBy)
return resp.Eval, resp.Token, resp.GetWaitIndex(), false return resp.Eval, resp.Token, resp.GetWaitIndex(), false
} }
// Check for potential shutdown
if w.srv.IsShutdown() {
return nil, "", 0, true
}
goto REQ goto REQ
} }
// sendAcknowledgement should not be called directly. Call `sendAck` or `sendNack` instead. // sendAcknowledgement should not be called directly. Call `sendAck` or `sendNack` instead.
// This function implements `ack`ing or `nack`ing the evaluation generally. // This function implements `ack`ing or `nack`ing the evaluation generally.
// Any errors are logged but swallowed. // Any errors are logged but swallowed.
func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) { func (w *Worker) sendAcknowledgement(eval *structs.Evaluation, token string, ack bool) {
defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now())
// Setup the request // Setup the request
req := structs.EvalAckRequest{ req := structs.EvalAckRequest{
EvalID: evalID, EvalID: eval.ID,
Token: token, Token: token,
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region, Region: w.srv.config.Region,
@ -219,28 +514,28 @@ func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) {
// Make the RPC call // Make the RPC call
err := w.srv.RPC(endpoint, &req, &resp) err := w.srv.RPC(endpoint, &req, &resp)
if err != nil { if err != nil {
w.logger.Error(fmt.Sprintf("failed to %s evaluation", verb), "eval_id", evalID, "error", err) w.logger.Error(fmt.Sprintf("failed to %s evaluation", verb), "eval_id", eval.ID, "error", err)
} else { } else {
w.logger.Debug(fmt.Sprintf("%s evaluation", verb), "eval_id", evalID) w.logger.Debug(fmt.Sprintf("%s evaluation", verb), "eval_id", eval.ID, "type", eval.Type, "namespace", eval.Namespace, "job_id", eval.JobID, "node_id", eval.NodeID, "triggered_by", eval.TriggeredBy)
} }
} }
// sendNack makes a best effort to nack the evaluation. // sendNack makes a best effort to nack the evaluation.
// Any errors are logged but swallowed. // Any errors are logged but swallowed.
func (w *Worker) sendNack(evalID, token string) { func (w *Worker) sendNack(eval *structs.Evaluation, token string) {
w.sendAcknowledgement(evalID, token, false) w.sendAcknowledgement(eval, token, false)
} }
// sendAck makes a best effort to ack the evaluation. // sendAck makes a best effort to ack the evaluation.
// Any errors are logged but swallowed. // Any errors are logged but swallowed.
func (w *Worker) sendAck(evalID, token string) { func (w *Worker) sendAck(eval *structs.Evaluation, token string) {
w.sendAcknowledgement(evalID, token, true) w.sendAcknowledgement(eval, token, true)
} }
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. // snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now() start := time.Now()
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) ctx, cancel := context.WithTimeout(w.ctx, timeout)
snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex)
cancel() cancel()
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
@ -288,7 +583,8 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
// SubmitPlan is used to submit a plan for consideration. This allows // SubmitPlan is used to submit a plan for consideration. This allows
// the worker to act as the planner for the scheduler. // the worker to act as the planner for the scheduler.
func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) { func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) {
// Check for a shutdown before plan submission // Check for a shutdown before plan submission. Checking server state rather than
// worker state to allow work in flight to complete before stopping.
if w.srv.IsShutdown() { if w.srv.IsShutdown() {
return nil, nil, fmt.Errorf("shutdown while planning") return nil, nil, fmt.Errorf("shutdown while planning")
} }
@ -358,7 +654,8 @@ SUBMIT:
// UpdateEval is used to submit an updated evaluation. This allows // UpdateEval is used to submit an updated evaluation. This allows
// the worker to act as the planner for the scheduler. // the worker to act as the planner for the scheduler.
func (w *Worker) UpdateEval(eval *structs.Evaluation) error { func (w *Worker) UpdateEval(eval *structs.Evaluation) error {
// Check for a shutdown before plan submission // Check for a shutdown before plan submission. Checking server state rather than
// worker state to allow a workers work in flight to complete before stopping.
if w.srv.IsShutdown() { if w.srv.IsShutdown() {
return fmt.Errorf("shutdown while planning") return fmt.Errorf("shutdown while planning")
} }
@ -396,7 +693,8 @@ SUBMIT:
// CreateEval is used to create a new evaluation. This allows // CreateEval is used to create a new evaluation. This allows
// the worker to act as the planner for the scheduler. // the worker to act as the planner for the scheduler.
func (w *Worker) CreateEval(eval *structs.Evaluation) error { func (w *Worker) CreateEval(eval *structs.Evaluation) error {
// Check for a shutdown before plan submission // Check for a shutdown before plan submission. This consults the server Shutdown state
// instead of the worker's to prevent aborting work in flight.
if w.srv.IsShutdown() { if w.srv.IsShutdown() {
return fmt.Errorf("shutdown while planning") return fmt.Errorf("shutdown while planning")
} }
@ -437,7 +735,8 @@ SUBMIT:
// ReblockEval is used to reinsert a blocked evaluation into the blocked eval // ReblockEval is used to reinsert a blocked evaluation into the blocked eval
// tracker. This allows the worker to act as the planner for the scheduler. // tracker. This allows the worker to act as the planner for the scheduler.
func (w *Worker) ReblockEval(eval *structs.Evaluation) error { func (w *Worker) ReblockEval(eval *structs.Evaluation) error {
// Check for a shutdown before plan submission // Check for a shutdown before plan submission. This checks the server state rather than
// the worker's to prevent erroring on work in flight that would complete otherwise.
if w.srv.IsShutdown() { if w.srv.IsShutdown() {
return fmt.Errorf("shutdown while planning") return fmt.Errorf("shutdown while planning")
} }
@ -514,7 +813,10 @@ func (w *Worker) shouldResubmit(err error) bool {
// backoffErr is used to do an exponential back off on error. This is // backoffErr is used to do an exponential back off on error. This is
// maintained statefully for the worker. Returns if attempts should be // maintained statefully for the worker. Returns if attempts should be
// abandoned due to shutdown. // abandoned due to shutdown.
// This uses the worker's context in order to immediately stop the
// backoff if the server or the worker is shutdown.
func (w *Worker) backoffErr(base, limit time.Duration) bool { func (w *Worker) backoffErr(base, limit time.Duration) bool {
w.setWorkloadStatus(WorkloadBackoff)
backoff := (1 << (2 * w.failures)) * base backoff := (1 << (2 * w.failures)) * base
if backoff > limit { if backoff > limit {
backoff = limit backoff = limit
@ -524,7 +826,7 @@ func (w *Worker) backoffErr(base, limit time.Duration) bool {
select { select {
case <-time.After(backoff): case <-time.After(backoff):
return false return false
case <-w.srv.shutdownCh: case <-w.ctx.Done():
return true return true
} }
} }

View File

@ -0,0 +1,31 @@
// Code generated by "stringer -trimprefix=Workload -output worker_string_schedulerworkerstatus.go -linecomment -type=SchedulerWorkerStatus"; DO NOT EDIT.
package nomad
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[WorkloadUnknownStatus-0]
_ = x[WorkloadRunning-1]
_ = x[WorkloadWaitingToDequeue-2]
_ = x[WorkloadWaitingForRaft-3]
_ = x[WorkloadScheduling-4]
_ = x[WorkloadSubmitting-5]
_ = x[WorkloadBackoff-6]
_ = x[WorkloadStopped-7]
_ = x[WorkloadPaused-8]
}
const _SchedulerWorkerStatus_name = "UnknownStatusRunningWaitingToDequeueWaitingForRaftSchedulingSubmittingBackoffStoppedPaused"
var _SchedulerWorkerStatus_index = [...]uint8{0, 13, 20, 36, 50, 60, 70, 77, 84, 90}
func (i SchedulerWorkerStatus) String() string {
if i < 0 || i >= SchedulerWorkerStatus(len(_SchedulerWorkerStatus_index)-1) {
return "SchedulerWorkerStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _SchedulerWorkerStatus_name[_SchedulerWorkerStatus_index[i]:_SchedulerWorkerStatus_index[i+1]]
}

View File

@ -0,0 +1,30 @@
// Code generated by "stringer -trimprefix=Worker -output worker_string_workerstatus.go -linecomment -type=WorkerStatus"; DO NOT EDIT.
package nomad
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[WorkerUnknownStatus-0]
_ = x[WorkerStarting-1]
_ = x[WorkerStarted-2]
_ = x[WorkerPausing-3]
_ = x[WorkerPaused-4]
_ = x[WorkerResuming-5]
_ = x[WorkerStopping-6]
_ = x[WorkerStopped-7]
}
const _WorkerStatus_name = "UnknownStartingStartedPausingPausedResumingStoppingStopped"
var _WorkerStatus_index = [...]uint8{0, 7, 15, 22, 29, 35, 43, 51, 58}
func (i WorkerStatus) String() string {
if i < 0 || i >= WorkerStatus(len(_WorkerStatus_index)-1) {
return "WorkerStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _WorkerStatus_name[_WorkerStatus_index[i]:_WorkerStatus_index[i+1]]
}

View File

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
@ -11,6 +12,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -47,6 +49,19 @@ func init() {
} }
} }
// NewTestWorker returns the worker without calling it's run method.
func NewTestWorker(shutdownCtx context.Context, srv *Server) *Worker {
w := &Worker{
srv: srv,
start: time.Now(),
id: uuid.Generate(),
}
w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id)
w.pauseCond = sync.NewCond(&w.pauseLock)
w.ctx, w.cancelFn = context.WithCancel(shutdownCtx)
return w
}
func TestWorker_dequeueEvaluation(t *testing.T) { func TestWorker_dequeueEvaluation(t *testing.T) {
t.Parallel() t.Parallel()
@ -62,7 +77,8 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval1)
// Create a worker // Create a worker
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w, _ := NewWorker(s1.shutdownCtx, s1, poolArgs)
// Attempt dequeue // Attempt dequeue
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
@ -108,7 +124,8 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
s1.evalBroker.Enqueue(eval2) s1.evalBroker.Enqueue(eval2)
// Create a worker // Create a worker
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
// Attempt dequeue // Attempt dequeue
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
@ -133,7 +150,7 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
} }
// Send the Ack // Send the Ack
w.sendAck(eval1.ID, token) w.sendAck(eval1, token)
// Attempt second dequeue // Attempt second dequeue
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
@ -168,15 +185,16 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval1)
// Create a worker // Create a worker
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.pauseCond = sync.NewCond(&w.pauseLock) w.pauseCond = sync.NewCond(&w.pauseLock)
// PAUSE the worker // PAUSE the worker
w.SetPause(true) w.Pause()
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
w.SetPause(false) w.Resume()
}() }()
// Attempt dequeue // Attempt dequeue
@ -212,7 +230,8 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s1.RPC)
// Create a worker // Create a worker
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
go func() { go func() {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -231,6 +250,57 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
} }
} }
func TestWorker_Shutdown(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
go func() {
time.Sleep(10 * time.Millisecond)
w.Stop()
}()
// Attempt dequeue
eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
require.True(t, shutdown)
require.Nil(t, eval)
}
func TestWorker_Shutdown_paused(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w, _ := NewWorker(s1.shutdownCtx, s1, poolArgs)
w.Pause()
// pausing can take up to 500ms because of the blocking query timeout in dequeueEvaluation.
require.Eventually(t, w.IsPaused, 550*time.Millisecond, 10*time.Millisecond, "should pause")
go func() {
w.Stop()
}()
// transitioning to stopped from paused should be very quick,
// but might not be immediate.
require.Eventually(t, w.IsStopped, 100*time.Millisecond, 10*time.Millisecond, "should stop when paused")
}
func TestWorker_sendAck(t *testing.T) { func TestWorker_sendAck(t *testing.T) {
t.Parallel() t.Parallel()
@ -246,7 +316,8 @@ func TestWorker_sendAck(t *testing.T) {
s1.evalBroker.Enqueue(eval1) s1.evalBroker.Enqueue(eval1)
// Create a worker // Create a worker
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
// Attempt dequeue // Attempt dequeue
eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond) eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond)
@ -258,7 +329,7 @@ func TestWorker_sendAck(t *testing.T) {
} }
// Send the Nack // Send the Nack
w.sendNack(eval.ID, token) w.sendNack(eval, token)
// Check the depth is 1, nothing unacked // Check the depth is 1, nothing unacked
stats = s1.evalBroker.Stats() stats = s1.evalBroker.Stats()
@ -270,7 +341,7 @@ func TestWorker_sendAck(t *testing.T) {
eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond) eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond)
// Send the Ack // Send the Ack
w.sendAck(eval.ID, token) w.sendAck(eval, token)
// Check the depth is 0 // Check the depth is 0
stats = s1.evalBroker.Stats() stats = s1.evalBroker.Stats()
@ -301,7 +372,8 @@ func TestWorker_waitForIndex(t *testing.T) {
}() }()
// Wait for a future index // Wait for a future index
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
snap, err := w.snapshotMinIndex(index+1, time.Second) snap, err := w.snapshotMinIndex(index+1, time.Second)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, snap) require.NotNil(t, snap)
@ -327,7 +399,8 @@ func TestWorker_invokeScheduler(t *testing.T) {
}) })
defer cleanupS1() defer cleanupS1()
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
eval := mock.Eval() eval := mock.Eval()
eval.Type = "noop" eval.Type = "noop"
@ -380,7 +453,10 @@ func TestWorker_SubmitPlan(t *testing.T) {
} }
// Attempt to submit a plan // Attempt to submit a plan
w := &Worker{srv: s1, logger: s1.logger, evalToken: token} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.evalToken = token
result, state, err := w.SubmitPlan(plan) result, state, err := w.SubmitPlan(plan)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -442,7 +518,8 @@ func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
// Attempt to submit a plan // Attempt to submit a plan
w := &Worker{srv: s1, logger: s1.logger} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.SubmitPlan(plan) w.SubmitPlan(plan)
assert.Equal(t, &structs.Allocation{ assert.Equal(t, &structs.Allocation{
@ -499,7 +576,10 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) {
} }
// Attempt to submit a plan // Attempt to submit a plan
w := &Worker{srv: s1, logger: s1.logger, evalToken: token} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.evalToken = token
result, state, err := w.SubmitPlan(plan) result, state, err := w.SubmitPlan(plan)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -556,7 +636,10 @@ func TestWorker_UpdateEval(t *testing.T) {
eval2.Status = structs.EvalStatusComplete eval2.Status = structs.EvalStatusComplete
// Attempt to update eval // Attempt to update eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.evalToken = token
err = w.UpdateEval(eval2) err = w.UpdateEval(eval2)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -605,7 +688,10 @@ func TestWorker_CreateEval(t *testing.T) {
eval2.PreviousEval = eval1.ID eval2.PreviousEval = eval1.ID
// Attempt to create eval // Attempt to create eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.evalToken = token
err = w.CreateEval(eval2) err = w.CreateEval(eval2)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -667,14 +753,17 @@ func TestWorker_ReblockEval(t *testing.T) {
eval2.QueuedAllocations = map[string]int{"web": 50} eval2.QueuedAllocations = map[string]int{"web": 50}
// Attempt to reblock eval // Attempt to reblock eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token} poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
w := newWorker(s1.shutdownCtx, s1, poolArgs)
w.evalToken = token
err = w.ReblockEval(eval2) err = w.ReblockEval(eval2)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Ack the eval // Ack the eval
w.sendAck(evalOut.ID, token) w.sendAck(evalOut, token)
// Check that it is blocked // Check that it is blocked
bStats := s1.blockedEvals.Stats() bStats := s1.blockedEvals.Stats()
@ -713,3 +802,125 @@ func TestWorker_ReblockEval(t *testing.T) {
reblockedEval.SnapshotIndex, w.snapshotIndex) reblockedEval.SnapshotIndex, w.snapshotIndex)
} }
} }
func TestWorker_Info(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy()
// Create a worker
w := newWorker(s1.shutdownCtx, s1, poolArgs)
require.Equal(t, WorkerStarting, w.GetStatus())
workerInfo := w.Info()
require.Equal(t, WorkerStarting.String(), workerInfo.Status)
}
const (
longWait = 100 * time.Millisecond
tinyWait = 10 * time.Millisecond
)
func TestWorker_SetPause(t *testing.T) {
t.Parallel()
logger := testlog.HCLogger(t)
srv := &Server{
logger: logger,
shutdownCtx: context.Background(),
}
args := SchedulerWorkerPoolArgs{
EnabledSchedulers: []string{structs.JobTypeCore, structs.JobTypeBatch, structs.JobTypeSystem},
}
w := newWorker(context.Background(), srv, args)
w._start(testWorkload)
require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have started")
go func() {
time.Sleep(tinyWait)
w.Pause()
}()
require.Eventually(t, w.IsPaused, longWait, tinyWait, "should have paused")
go func() {
time.Sleep(tinyWait)
w.Pause()
}()
require.Eventually(t, w.IsPaused, longWait, tinyWait, "pausing a paused should be okay")
go func() {
time.Sleep(tinyWait)
w.Resume()
}()
require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have restarted from pause")
go func() {
time.Sleep(tinyWait)
w.Stop()
}()
require.Eventually(t, w.IsStopped, longWait, tinyWait, "should have shutdown")
}
func TestWorker_SetPause_OutOfOrderEvents(t *testing.T) {
t.Parallel()
logger := testlog.HCLogger(t)
srv := &Server{
logger: logger,
shutdownCtx: context.Background(),
}
args := SchedulerWorkerPoolArgs{
EnabledSchedulers: []string{structs.JobTypeCore, structs.JobTypeBatch, structs.JobTypeSystem},
}
w := newWorker(context.Background(), srv, args)
w._start(testWorkload)
require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have started")
go func() {
time.Sleep(tinyWait)
w.Pause()
}()
require.Eventually(t, w.IsPaused, longWait, tinyWait, "should have paused")
go func() {
time.Sleep(tinyWait)
w.Stop()
}()
require.Eventually(t, w.IsStopped, longWait, tinyWait, "stop from pause should have shutdown")
go func() {
time.Sleep(tinyWait)
w.Pause()
}()
require.Eventually(t, w.IsStopped, longWait, tinyWait, "pausing a stopped should stay stopped")
}
// _start is a test helper function used to start a worker with an alternate workload
func (w *Worker) _start(inFunc func(w *Worker)) {
w.setStatus(WorkerStarting)
go inFunc(w)
}
// testWorkload is a very simple function that performs the same status updating behaviors that the
// real workload does.
func testWorkload(w *Worker) {
defer w.markStopped()
w.setStatuses(WorkerStarted, WorkloadRunning)
w.logger.Debug("testWorkload running")
for {
// ensure state variables are happy after resuming.
w.maybeWait()
if w.workerShuttingDown() {
w.logger.Debug("testWorkload stopped")
return
}
// do some fake work
time.Sleep(10 * time.Millisecond)
}
}

View File

@ -725,3 +725,204 @@ $ curl -O -J \
go tool trace trace go tool trace trace
``` ```
## Fetch all scheduler worker's status
The `/agent/schedulers` endpoint allow Nomad operators to inspect the state of
a Nomad server agent's scheduler workers.
| Method | Path | Produces |
| ------ | ------------------- | ------------------ |
| `GET` | `/agent/schedulers` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api-docs#blocking-queries) and
[required ACLs](/api-docs#acls).
| Blocking Queries | ACL Required |
| ---------------- | ------------ |
| `NO` | `agent:read` |
### Parameters
This endpoint accepts no additional parameters.
### Sample Request
```shell-session
$ curl \
https://localhost:4646/v1/agent/schedulers
```
### Sample Response
```json
{
"schedulers": [
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"id": "5669d6fa-0def-7369-6558-a47c35fdc675",
"started": "2021-12-21T19:25:00.911883Z",
"status": "Paused",
"workload_status": "Paused"
},
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"id": "c919709d-6d14-66bf-b425-80b8167a267e",
"started": "2021-12-21T19:25:00.91189Z",
"status": "Paused",
"workload_status": "Paused"
},
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"id": "f5edb69a-6122-be8f-b32a-23cd8511dba5",
"started": "2021-12-21T19:25:00.911961Z",
"status": "Paused",
"workload_status": "Paused"
},
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"id": "458816ae-83cf-0710-d8d4-35d2ad2e42d7",
"started": "2021-12-21T19:25:00.912119Z",
"status": "Started",
"workload_status": "WaitingToDequeue"
}
],
"server_id": "server1.global"
}
```
## Read scheduler worker configuration
This endpoint returns data about the agent's scheduler configuration from
the perspective of the agent. This is only applicable for servers.
| Method | Path | Produces |
| ------ | -------------------------- | ------------------ |
| `GET` | `/agent/schedulers/config` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api-docs#blocking-queries) and
[required ACLs](/api-docs#acls).
| Blocking Queries | ACL Required |
| ---------------- | ------------ |
| `NO` | `agent:read` |
### Parameters
This endpoint accepts no additional parameters.
### Sample Request
```shell-session
$ curl \
--request PUT \
--data @payload.json \
https://localhost:4646/v1/jobs
```
### Sample Response
```json
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"num_schedulers": 8,
"server_id": "server1.global"
}
```
## Update scheduler worker configuration
This allows a Nomad operator to modify the server's running scheduler
configuration, which will remain in effect until another update or until the
node is restarted. For durable changes to this value, set the corresponding
values—[`num_schedulers`][] and [`enabled_schedulers`][]—in the node's
configuration file. The response contains the configuration after attempting
to apply the provided values. This is only applicable for servers.
| Method | Path | Produces |
| ------ | -------------------------- | ------------------ |
| `PUT` | `/agent/schedulers/config` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api-docs#blocking-queries) and
[required ACLs](/api-docs#acls).
| Blocking Queries | ACL Required |
| ---------------- | ------------- |
| `NO` | `agent:write` |
### Sample Payload
```json
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"num_schedulers": 12
}
```
### Sample Request
```shell-session
$ curl \
--request PUT \
--data @payload.json \
https://localhost:4646/v1/jobs
```
### Sample Response
```json
{
"enabled_schedulers": [
"service",
"batch",
"system",
"sysbatch",
"_core"
],
"num_schedulers": 12,
"server_id": "server1.global"
}
```
[`enabled_schedulers`]: /docs/configuration/server#enabled_schedulers
[`num_schedulers`]: /docs/configuration/server#num_schedulers