Merge pull request #8168 from hashicorp/f-8158-add-preserve-counts

add `PreserveCounts` to `Job.Register`
This commit is contained in:
Chris Baker 2020-06-17 07:28:35 -05:00 committed by GitHub
commit a7e0f021bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 272 additions and 44 deletions

View File

@ -6,6 +6,7 @@ FEATURES:
IMPROVEMENTS:
* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)]
* api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)]
* build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]

View File

@ -87,6 +87,7 @@ type RegisterOptions struct {
EnforceIndex bool
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
}
// Register is used to register a new job. It returns the ID
@ -105,7 +106,7 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
// of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &RegisterJobRequest{
req := &JobRegisterRequest{
Job: job,
}
if opts != nil {
@ -113,9 +114,8 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
req.EnforceIndex = true
req.JobModifyIndex = opts.ModifyIndex
}
if opts.PolicyOverride {
req.PolicyOverride = true
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
}
var resp JobRegisterResponse
@ -1035,25 +1035,18 @@ type JobRevertRequest struct {
WriteRequest
}
// JobUpdateRequest is used to update a job
// JobRegisterRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
// If EnforceIndex is set then the job will only be registered if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero, the
// register only occurs if the job is new.
EnforceIndex bool
JobModifyIndex uint64
PolicyOverride bool
WriteRequest
}
// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
EnforceIndex bool `json:",omitempty"`
JobModifyIndex uint64 `json:",omitempty"`
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`
WriteRequest
}
// JobRegisterResponse is used to respond to a job registration

View File

@ -44,6 +44,148 @@ func TestJobs_Register(t *testing.T) {
}
}
func TestJobs_Register_PreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
// Listing jobs before registering returns nothing
resp, _, err := jobs.List(nil)
require.Nil(err)
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))
// Create a job
task := NewTask("task", "exec").
SetConfig("command", "/bin/sleep").
Require(&Resources{
CPU: intToPtr(100),
MemoryMB: intToPtr(256),
}).
SetLogConfig(&LogConfig{
MaxFiles: intToPtr(1),
MaxFileSizeMB: intToPtr(2),
})
group1 := NewTaskGroup("group1", 1).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
group2 := NewTaskGroup("group2", 2).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job := NewBatchJob("job", "redis", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(group1).
AddTaskGroup(group2)
// Create a job and register it
resp2, wm, err := jobs.Register(job, nil)
require.Nil(err)
require.NotNil(resp2)
require.NotEmpty(resp2.EvalID)
assertWriteMeta(t, wm)
// Update the job, new groups to test PreserveCounts
group1.Count = nil
group2.Count = intToPtr(0)
group3 := NewTaskGroup("group3", 3).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job.AddTaskGroup(group3)
// Update the job, with PreserveCounts = true
_, _, err = jobs.RegisterOpts(job, &RegisterOptions{
PreserveCounts: true,
}, nil)
require.NoError(err)
// Query the job scale status
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Equal(1, status.TaskGroups["group1"].Desired) // present and nil => preserved
require.Equal(2, status.TaskGroups["group2"].Desired) // present and specified => preserved
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specific in job spec
}
func TestJobs_Register_NoPreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
// Listing jobs before registering returns nothing
resp, _, err := jobs.List(nil)
require.Nil(err)
require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp))
// Create a job
task := NewTask("task", "exec").
SetConfig("command", "/bin/sleep").
Require(&Resources{
CPU: intToPtr(100),
MemoryMB: intToPtr(256),
}).
SetLogConfig(&LogConfig{
MaxFiles: intToPtr(1),
MaxFileSizeMB: intToPtr(2),
})
group1 := NewTaskGroup("group1", 1).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
group2 := NewTaskGroup("group2", 2).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job := NewBatchJob("job", "redis", "global", 1).
AddDatacenter("dc1").
AddTaskGroup(group1).
AddTaskGroup(group2)
// Create a job and register it
resp2, wm, err := jobs.Register(job, nil)
require.Nil(err)
require.NotNil(resp2)
require.NotEmpty(resp2.EvalID)
assertWriteMeta(t, wm)
// Update the job, new groups to test PreserveCounts
group1.Count = intToPtr(0)
group2.Count = nil
group3 := NewTaskGroup("group3", 3).
AddTask(task).
RequireDisk(&EphemeralDisk{
SizeMB: intToPtr(25),
})
job.AddTaskGroup(group3)
// Update the job, with PreserveCounts = default [false]
_, _, err = jobs.Register(job, nil)
require.NoError(err)
// Query the job scale status
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Equal(0, status.TaskGroups["group1"].Desired) // present => as specified
require.Equal(1, status.TaskGroups["group2"].Desired) // nil => default (1)
require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified
}
func TestJobs_Validate(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)

View File

@ -413,6 +413,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
WriteRequest: structs.WriteRequest{
Region: sJob.Region,
AuthToken: args.WriteRequest.SecretID,

View File

@ -162,7 +162,11 @@ func (c *JobInspectCommand) Run(args []string) int {
}
// Print the contents of the job
req := api.RegisterJobRequest{Job: job}
req := struct {
Job *api.Job
}{
Job: job,
}
f, err := DataFormat("json", "")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error getting formatter: %s", err))

View File

@ -86,6 +86,9 @@ Run Options:
-policy-override
Sets the flag to force override any soft mandatory Sentinel policies.
-preserve-counts
If set, the existing task group counts will be preserved when updating a job.
-consul-token
If set, the passed Consul token is stored in the job before sending to the
Nomad servers. This allows passing the Consul token without storing it in
@ -118,6 +121,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags {
"-vault-token": complete.PredictAnything,
"-output": complete.PredictNothing,
"-policy-override": complete.PredictNothing,
"-preserve-counts": complete.PredictNothing,
})
}
@ -128,7 +132,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor {
func (c *JobRunCommand) Name() string { return "job run" }
func (c *JobRunCommand) Run(args []string) int {
var detach, verbose, output, override bool
var detach, verbose, output, override, preserveCounts bool
var checkIndexStr, consulToken, vaultToken string
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
@ -137,6 +141,7 @@ func (c *JobRunCommand) Run(args []string) int {
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.BoolVar(&override, "policy-override", false, "")
flags.BoolVar(&preserveCounts, "preserve-counts", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")
flags.StringVar(&consulToken, "consul-token", "", "")
flags.StringVar(&vaultToken, "vault-token", "", "")
@ -208,7 +213,11 @@ func (c *JobRunCommand) Run(args []string) int {
}
if output {
req := api.RegisterJobRequest{Job: job}
req := struct {
Job *api.Job
}{
Job: job,
}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
@ -232,9 +241,8 @@ func (c *JobRunCommand) Run(args []string) int {
opts.EnforceIndex = true
opts.ModifyIndex = checkIndex
}
if override {
opts.PolicyOverride = true
}
opts.PolicyOverride = override
opts.PreserveCounts = preserveCounts
// Submit the job
resp, _, err := client.Jobs().RegisterOpts(job, opts, nil)

View File

@ -278,6 +278,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Clear the Consul token
args.Job.ConsulToken = ""
// Preserve the existing task group counts, if so requested
if existingJob != nil && args.PreserveCounts {
prevCounts := make(map[string]int)
for _, tg := range existingJob.TaskGroups {
prevCounts[tg.Name] = tg.Count
}
for _, tg := range args.Job.TaskGroups {
if count, ok := prevCounts[tg.Name]; ok {
tg.Count = count
}
}
}
// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time

View File

@ -108,6 +108,67 @@ func TestJobEndpoint_Register(t *testing.T) {
}
}
func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.TaskGroups[0].Name = "group1"
job.TaskGroups[0].Count = 10
job.Canonicalize()
// Register the job
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}))
// Check the job in the FSM state
state := s1.fsm.State()
out, err := state.JobByID(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(10, out.TaskGroups[0].Count)
// New version:
// new "group2" with 2 instances
// "group1" goes from 10 -> 0 in the spec
job = job.Copy()
job.TaskGroups[0].Count = 0 // 10 -> 0 in the job spec
job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy())
job.TaskGroups[1].Name = "group2"
job.TaskGroups[1].Count = 2
// Perform the update
require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
PreserveCounts: true,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &structs.JobRegisterResponse{}))
// Check the job in the FSM state
out, err = state.JobByID(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(out)
require.Equal(10, out.TaskGroups[0].Count) // should not change
require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec
}
func TestJobEndpoint_Register_Connect(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@ -549,6 +549,11 @@ type JobRegisterRequest struct {
EnforceIndex bool
JobModifyIndex uint64
// PreserveCounts indicates that during job update, existing task group
// counts should be preserved, over those specified in the new job spec
// PreserveCounts is ignored for newly created jobs.
PreserveCounts bool
// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool

View File

@ -943,7 +943,10 @@ func decodeBody(resp *http.Response, out interface{}) error {
}
}
// encodeBody is used to encode a request body
// encodeBody prepares the reader to serve as the request body.
//
// Returns the `obj` input if it is a raw io.Reader object; otherwise
// returns a reader of the json format of the passed argument.
func encodeBody(obj interface{}) (io.Reader, error) {
if reader, ok := obj.(io.Reader); ok {
return reader, nil

View File

@ -87,6 +87,7 @@ type RegisterOptions struct {
EnforceIndex bool
ModifyIndex uint64
PolicyOverride bool
PreserveCounts bool
}
// Register is used to register a new job. It returns the ID
@ -105,7 +106,7 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*
// of the evaluation, along with any errors encountered.
func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
// Format the request
req := &RegisterJobRequest{
req := &JobRegisterRequest{
Job: job,
}
if opts != nil {
@ -113,9 +114,8 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
req.EnforceIndex = true
req.JobModifyIndex = opts.ModifyIndex
}
if opts.PolicyOverride {
req.PolicyOverride = true
}
req.PolicyOverride = opts.PolicyOverride
req.PreserveCounts = opts.PreserveCounts
}
var resp JobRegisterResponse
@ -1035,25 +1035,18 @@ type JobRevertRequest struct {
WriteRequest
}
// JobUpdateRequest is used to update a job
// JobRegisterRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
// If EnforceIndex is set then the job will only be registered if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero, the
// register only occurs if the job is new.
EnforceIndex bool
JobModifyIndex uint64
PolicyOverride bool
WriteRequest
}
// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
EnforceIndex bool `json:",omitempty"`
JobModifyIndex uint64 `json:",omitempty"`
PolicyOverride bool `json:",omitempty"`
PreserveCounts bool `json:",omitempty"`
WriteRequest
}
// JobRegisterResponse is used to respond to a job registration

View File

@ -92,11 +92,12 @@ type TaskGroupScaleStatus struct {
}
type ScalingEvent struct {
Count *int64
Error bool
Message string
Meta map[string]interface{}
EvalID *string
Time uint64
CreateIndex uint64
Count *int64
PreviousCount int64
Error bool
Message string
Meta map[string]interface{}
EvalID *string
Time uint64
CreateIndex uint64
}

View File

@ -70,6 +70,9 @@ precedence, going from highest to lowest: the `-vault-token` flag, the
- `-policy-override`: Sets the flag to force override any soft mandatory
Sentinel policies.
- `-preserve-counts`: If set, the existing task group counts will be preserved
when updating a job.
- `-consul-token`: If set, the passed Consul token is stored in the job before
sending to the Nomad servers. This allows passing the Consul token without
storing it in the job file. This overrides the token found in the \$CONSUL_HTTP_TOKEN