From b22b2bf9689d951784f2cc7aa62db800f8e42dbb Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 02:57:50 +0000 Subject: [PATCH 1/6] wip: developmental test to preserve existing task group counts during job update --- api/jobs.go | 2 + api/jobs_test.go | 142 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/api/jobs.go b/api/jobs.go index 578659511..ce5ec58d8 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -87,6 +87,7 @@ type RegisterOptions struct { EnforceIndex bool ModifyIndex uint64 PolicyOverride bool + PreserveCounts bool } // Register is used to register a new job. It returns the ID @@ -1054,6 +1055,7 @@ type RegisterJobRequest struct { EnforceIndex bool `json:",omitempty"` JobModifyIndex uint64 `json:",omitempty"` PolicyOverride bool `json:",omitempty"` + PreserveCounts bool `json:",omitempty"` } // JobRegisterResponse is used to respond to a job registration diff --git a/api/jobs_test.go b/api/jobs_test.go index b5760e7a9..34ebb91d7 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -44,6 +44,148 @@ func TestJobs_Register(t *testing.T) { } } +func TestJobs_Register_PreserveCounts(t *testing.T) { + t.Parallel() + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, _, err := jobs.List(nil) + require.Nil(err) + require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp)) + + // Create a job + task := NewTask("task", "exec"). + SetConfig("command", "/bin/sleep"). + Require(&Resources{ + CPU: intToPtr(100), + MemoryMB: intToPtr(256), + }). + SetLogConfig(&LogConfig{ + MaxFiles: intToPtr(1), + MaxFileSizeMB: intToPtr(2), + }) + + group1 := NewTaskGroup("group1", 1). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + group2 := NewTaskGroup("group2", 2). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + + job := NewBatchJob("job", "redis", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup(group1). + AddTaskGroup(group2) + + // Create a job and register it + resp2, wm, err := jobs.Register(job, nil) + require.Nil(err) + require.NotNil(resp2) + require.NotEmpty(resp2.EvalID) + assertWriteMeta(t, wm) + + // Update the job, new groups to test PreserveCounts + group1.Count = nil + group2.Count = intToPtr(0) + group3 := NewTaskGroup("group3", 3). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + job.AddTaskGroup(group3) + + // Update the job, with PreserveCounts = true + _, _, err = jobs.RegisterOpts(job, &RegisterOptions{ + PreserveCounts: true, + }, nil) + require.NoError(err) + + // Query the job scale status + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Equal(1, status.TaskGroups["group1"].Desired) // present and nil => preserved + require.Equal(2, status.TaskGroups["group2"].Desired) // present and specified => preserved + require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specific in job spec +} + +func TestJobs_Register_NoPreserveCounts(t *testing.T) { + t.Parallel() + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, _, err := jobs.List(nil) + require.Nil(err) + require.Emptyf(resp, "expected 0 jobs, got: %d", len(resp)) + + // Create a job + task := NewTask("task", "exec"). + SetConfig("command", "/bin/sleep"). + Require(&Resources{ + CPU: intToPtr(100), + MemoryMB: intToPtr(256), + }). + SetLogConfig(&LogConfig{ + MaxFiles: intToPtr(1), + MaxFileSizeMB: intToPtr(2), + }) + + group1 := NewTaskGroup("group1", 1). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + group2 := NewTaskGroup("group2", 2). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + + job := NewBatchJob("job", "redis", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup(group1). + AddTaskGroup(group2) + + // Create a job and register it + resp2, wm, err := jobs.Register(job, nil) + require.Nil(err) + require.NotNil(resp2) + require.NotEmpty(resp2.EvalID) + assertWriteMeta(t, wm) + + // Update the job, new groups to test PreserveCounts + group1.Count = intToPtr(0) + group2.Count = nil + group3 := NewTaskGroup("group3", 3). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: intToPtr(25), + }) + job.AddTaskGroup(group3) + + // Update the job, with PreserveCounts = default [false] + _, _, err = jobs.Register(job, nil) + require.NoError(err) + + // Query the job scale status + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Equal(0, status.TaskGroups["group1"].Desired) // present => as specified + require.Equal(1, status.TaskGroups["group2"].Desired) // nil => default (1) + require.Equal(3, status.TaskGroups["group3"].Desired) // new => as specified +} + func TestJobs_Validate(t *testing.T) { t.Parallel() c, s := makeClient(t, nil, nil) From 1e3563e08c066a876590b8eefeaf104901b231e7 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 15:30:04 +0000 Subject: [PATCH 2/6] wip: added PreserveCounts to struct.JobRegisterRequest, development test for Job.Register --- command/agent/job_endpoint.go | 1 + nomad/job_endpoint_test.go | 61 +++++++++++++++++++ nomad/structs/structs.go | 5 ++ vendor/github.com/hashicorp/nomad/api/api.go | 5 +- vendor/github.com/hashicorp/nomad/api/jobs.go | 8 ++- .../github.com/hashicorp/nomad/api/scaling.go | 15 ++--- 6 files changed, 84 insertions(+), 11 deletions(-) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index d650ffef6..b7671b4ac 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -413,6 +413,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, EnforceIndex: args.EnforceIndex, JobModifyIndex: args.JobModifyIndex, PolicyOverride: args.PolicyOverride, + PreserveCounts: args.PreserveCounts, WriteRequest: structs.WriteRequest{ Region: sJob.Region, AuthToken: args.WriteRequest.SecretID, diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index d15f8aa67..8db4085c8 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -108,6 +108,67 @@ func TestJobEndpoint_Register(t *testing.T) { } } +func TestJobEndpoint_Register_PreserveCounts(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.TaskGroups[0].Name = "group1" + job.TaskGroups[0].Count = 10 + job.Canonicalize() + + // Register the job + require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{})) + + // Check the job in the FSM state + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.Equal(10, out.TaskGroups[0].Count) + + // New version: + // new "group2" with 2 instances + // "group1" goes from 10 -> 0 in the spec + job = job.Copy() + job.TaskGroups[0].Count = 0 // 10 -> 0 in the job spec + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[1].Name = "group2" + job.TaskGroups[1].Count = 2 + + // Perform the update + require.NoError(msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + PreserveCounts: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{})) + + // Check the job in the FSM state + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.Equal(10, out.TaskGroups[0].Count) // should not change + require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec +} + + func TestJobEndpoint_Register_Connect(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 922c4804d..4949f0fb6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -549,6 +549,11 @@ type JobRegisterRequest struct { EnforceIndex bool JobModifyIndex uint64 + // PreserveCounts indicates that during job update, existing task group + // counts should be preserved, over those specified in the new job spec + // PreserveCounts is ignored for newly created jobs. + PreserveCounts bool + // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool diff --git a/vendor/github.com/hashicorp/nomad/api/api.go b/vendor/github.com/hashicorp/nomad/api/api.go index 83ccf6bd2..9633efb0e 100644 --- a/vendor/github.com/hashicorp/nomad/api/api.go +++ b/vendor/github.com/hashicorp/nomad/api/api.go @@ -943,7 +943,10 @@ func decodeBody(resp *http.Response, out interface{}) error { } } -// encodeBody is used to encode a request body +// encodeBody prepares the reader to serve as the request body. +// +// Returns the `obj` input if it is a raw io.Reader object; otherwise +// returns a reader of the json format of the passed argument. func encodeBody(obj interface{}) (io.Reader, error) { if reader, ok := obj.(io.Reader); ok { return reader, nil diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 578659511..6607717a1 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -87,6 +87,7 @@ type RegisterOptions struct { EnforceIndex bool ModifyIndex uint64 PolicyOverride bool + PreserveCounts bool } // Register is used to register a new job. It returns the ID @@ -1041,9 +1042,10 @@ type JobRegisterRequest struct { // If EnforceIndex is set then the job will only be registered if the passed // JobModifyIndex matches the current Jobs index. If the index is zero, the // register only occurs if the job is new. - EnforceIndex bool - JobModifyIndex uint64 - PolicyOverride bool + EnforceIndex bool `json:",omitempty"` + JobModifyIndex uint64 `json:",omitempty"` + PolicyOverride bool `json:",omitempty"` + PreserveCounts bool `json:",omitempty"` WriteRequest } diff --git a/vendor/github.com/hashicorp/nomad/api/scaling.go b/vendor/github.com/hashicorp/nomad/api/scaling.go index 1b3c3badf..3a31abd89 100644 --- a/vendor/github.com/hashicorp/nomad/api/scaling.go +++ b/vendor/github.com/hashicorp/nomad/api/scaling.go @@ -92,11 +92,12 @@ type TaskGroupScaleStatus struct { } type ScalingEvent struct { - Count *int64 - Error bool - Message string - Meta map[string]interface{} - EvalID *string - Time uint64 - CreateIndex uint64 + Count *int64 + PreviousCount int64 + Error bool + Message string + Meta map[string]interface{} + EvalID *string + Time uint64 + CreateIndex uint64 } From 377f881fbdfaf3fc038a8c1e4d7d2bcd6abcd666 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 03:39:44 +0000 Subject: [PATCH 3/6] removed api.RegisterJobRequest in favor of api.JobRegisterRequest modified `job inspect` and `job run -output` to use anonymous struct to keep previous behavior --- api/jobs.go | 16 ++++------------ command/job_inspect.go | 6 +++++- command/job_run.go | 6 +++++- vendor/github.com/hashicorp/nomad/api/jobs.go | 12 ++---------- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index ce5ec58d8..8e4f0173b 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -106,7 +106,7 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (* // of the evaluation, along with any errors encountered. func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { // Format the request - req := &RegisterJobRequest{ + req := &JobRegisterRequest{ Job: job, } if opts != nil { @@ -1036,26 +1036,18 @@ type JobRevertRequest struct { WriteRequest } -// JobUpdateRequest is used to update a job +// JobRegisterRequest is used to update a job type JobRegisterRequest struct { Job *Job // If EnforceIndex is set then the job will only be registered if the passed // JobModifyIndex matches the current Jobs index. If the index is zero, the // register only occurs if the job is new. - EnforceIndex bool - JobModifyIndex uint64 - PolicyOverride bool - - WriteRequest -} - -// RegisterJobRequest is used to serialize a job registration -type RegisterJobRequest struct { - Job *Job EnforceIndex bool `json:",omitempty"` JobModifyIndex uint64 `json:",omitempty"` PolicyOverride bool `json:",omitempty"` PreserveCounts bool `json:",omitempty"` + + WriteRequest } // JobRegisterResponse is used to respond to a job registration diff --git a/command/job_inspect.go b/command/job_inspect.go index c95a9e247..b3de0f4bb 100644 --- a/command/job_inspect.go +++ b/command/job_inspect.go @@ -162,7 +162,11 @@ func (c *JobInspectCommand) Run(args []string) int { } // Print the contents of the job - req := api.RegisterJobRequest{Job: job} + req := struct { + Job *api.Job + }{ + Job: job, + } f, err := DataFormat("json", "") if err != nil { c.Ui.Error(fmt.Sprintf("Error getting formatter: %s", err)) diff --git a/command/job_run.go b/command/job_run.go index 3f0bcebfa..fe125cecb 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -208,7 +208,11 @@ func (c *JobRunCommand) Run(args []string) int { } if output { - req := api.RegisterJobRequest{Job: job} + req := struct { + Job *api.Job + }{ + Job: job, + } buf, err := json.MarshalIndent(req, "", " ") if err != nil { c.Ui.Error(fmt.Sprintf("Error converting job: %s", err)) diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 6607717a1..8e4f0173b 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -106,7 +106,7 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (* // of the evaluation, along with any errors encountered. func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { // Format the request - req := &RegisterJobRequest{ + req := &JobRegisterRequest{ Job: job, } if opts != nil { @@ -1036,7 +1036,7 @@ type JobRevertRequest struct { WriteRequest } -// JobUpdateRequest is used to update a job +// JobRegisterRequest is used to update a job type JobRegisterRequest struct { Job *Job // If EnforceIndex is set then the job will only be registered if the passed @@ -1050,14 +1050,6 @@ type JobRegisterRequest struct { WriteRequest } -// RegisterJobRequest is used to serialize a job registration -type RegisterJobRequest struct { - Job *Job - EnforceIndex bool `json:",omitempty"` - JobModifyIndex uint64 `json:",omitempty"` - PolicyOverride bool `json:",omitempty"` -} - // JobRegisterResponse is used to respond to a job registration type JobRegisterResponse struct { EvalID string From 9fc66bc1aa9aeb04b5b30b444b3600f4c79d8c38 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 17:52:47 +0000 Subject: [PATCH 4/6] support in API client and Job.Register RPC for PreserveCounts --- api/jobs.go | 5 ++--- nomad/job_endpoint.go | 13 +++++++++++++ vendor/github.com/hashicorp/nomad/api/jobs.go | 5 ++--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 8e4f0173b..b8f56acbe 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -114,9 +114,8 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (* req.EnforceIndex = true req.JobModifyIndex = opts.ModifyIndex } - if opts.PolicyOverride { - req.PolicyOverride = true - } + req.PolicyOverride = opts.PolicyOverride + req.PreserveCounts = opts.PreserveCounts } var resp JobRegisterResponse diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5f60b842f..1d68217aa 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -278,6 +278,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Clear the Consul token args.Job.ConsulToken = "" + // Preserve the existing task group counts, if so requested + if existingJob != nil && args.PreserveCounts { + prevCounts := make(map[string]int) + for _, tg := range existingJob.TaskGroups { + prevCounts[tg.Name] = tg.Count + } + for _, tg := range args.Job.TaskGroups { + if count, ok := prevCounts[tg.Name]; ok { + tg.Count = count + } + } + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { // Set the submit time diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index 8e4f0173b..b8f56acbe 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -114,9 +114,8 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (* req.EnforceIndex = true req.JobModifyIndex = opts.ModifyIndex } - if opts.PolicyOverride { - req.PolicyOverride = true - } + req.PolicyOverride = opts.PolicyOverride + req.PreserveCounts = opts.PreserveCounts } var resp JobRegisterResponse From de8a46b0f818af76f381568f12f63287d7f9c9de Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 17:53:13 +0000 Subject: [PATCH 5/6] added -preserve-counts to `job run` CLI, updated website --- command/job_run.go | 12 ++++++++---- website/pages/docs/commands/job/run.mdx | 3 +++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/command/job_run.go b/command/job_run.go index fe125cecb..b52ae2b0a 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -86,6 +86,9 @@ Run Options: -policy-override Sets the flag to force override any soft mandatory Sentinel policies. + -preserve-counts + If set, the existing task group counts will be preserved when updating a job. + -consul-token If set, the passed Consul token is stored in the job before sending to the Nomad servers. This allows passing the Consul token without storing it in @@ -118,6 +121,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags { "-vault-token": complete.PredictAnything, "-output": complete.PredictNothing, "-policy-override": complete.PredictNothing, + "-preserve-counts": complete.PredictNothing, }) } @@ -128,7 +132,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor { func (c *JobRunCommand) Name() string { return "job run" } func (c *JobRunCommand) Run(args []string) int { - var detach, verbose, output, override bool + var detach, verbose, output, override, preserveCounts bool var checkIndexStr, consulToken, vaultToken string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -137,6 +141,7 @@ func (c *JobRunCommand) Run(args []string) int { flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&output, "output", false, "") flags.BoolVar(&override, "policy-override", false, "") + flags.BoolVar(&preserveCounts, "preserve-counts", false, "") flags.StringVar(&checkIndexStr, "check-index", "", "") flags.StringVar(&consulToken, "consul-token", "", "") flags.StringVar(&vaultToken, "vault-token", "", "") @@ -236,9 +241,8 @@ func (c *JobRunCommand) Run(args []string) int { opts.EnforceIndex = true opts.ModifyIndex = checkIndex } - if override { - opts.PolicyOverride = true - } + opts.PolicyOverride = override + opts.PreserveCounts = preserveCounts // Submit the job resp, _, err := client.Jobs().RegisterOpts(job, opts, nil) diff --git a/website/pages/docs/commands/job/run.mdx b/website/pages/docs/commands/job/run.mdx index 4e039c3b5..b57cf9de7 100644 --- a/website/pages/docs/commands/job/run.mdx +++ b/website/pages/docs/commands/job/run.mdx @@ -70,6 +70,9 @@ precedence, going from highest to lowest: the `-vault-token` flag, the - `-policy-override`: Sets the flag to force override any soft mandatory Sentinel policies. +- `-preserve-counts`: If set, the existing task group counts will be preserved + when updating a job. + - `-consul-token`: If set, the passed Consul token is stored in the job before sending to the Nomad servers. This allows passing the Consul token without storing it in the job file. This overrides the token found in the \$CONSUL_HTTP_TOKEN From 6a43aa31a598747d048de921b5b86bafdd81782a Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Tue, 16 Jun 2020 17:54:37 +0000 Subject: [PATCH 6/6] changelog for -persist-counts, #8168 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b495308e5..d6ec094f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ FEATURES: IMPROVEMENTS: +* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)] * api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)] * build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]