diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a56e73b6..03c5f28f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ IMPROVEMENTS: image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)] * driver/raw_exec: Use cgroups to manage process tree for precise cleanup of launched processes [[GH-4350](https://github.com/hashicorp/nomad/issues/4350)] + * env: Default interpolation of optional meta fields of parameterized jobs to + an empty string rather than the field key. [[GH-3720](https://github.com/hashicorp/nomad/issues/3720)] * ui: Show node drain, node eligibility, and node drain strategy information in the Client list and Client detail pages [[GH-4353](https://github.com/hashicorp/nomad/issues/4353)] * ui: Show reschedule-event information for allocations that were server-side rescheduled [[GH-4254](https://github.com/hashicorp/nomad/issues/4254)] * ui: Show the running deployment Progress Deadlines on the Job Detail Page [[GH-4388](https://github.com/hashicorp/nomad/issues/4388)] diff --git a/api/jobs.go b/api/jobs.go index 254322977..45a7c180b 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -614,6 +614,7 @@ type Job struct { Update *UpdateStrategy Periodic *PeriodicConfig ParameterizedJob *ParameterizedJobConfig + Dispatched bool Payload []byte Reschedule *ReschedulePolicy Migrate *MigrateStrategy @@ -636,7 +637,7 @@ func (j *Job) IsPeriodic() bool { // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { - return j.ParameterizedJob != nil + return j.ParameterizedJob != nil && !j.Dispatched } func (j *Job) Canonicalize() { diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 923ebaa65..c59ef807e 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -397,7 +397,23 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { // Set meta combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, b.taskName) - b.taskMeta = make(map[string]string, len(combined)*2) + // taskMetaSize is double to total meta keys to account for given and upper + // cased values + taskMetaSize := len(combined) * 2 + + // if job is parameterized initialize optional meta to empty strings + if alloc.Job.Dispatched { + optionalMetaCount := len(alloc.Job.ParameterizedJob.MetaOptional) + b.taskMeta = make(map[string]string, taskMetaSize+optionalMetaCount*2) + + for _, k := range alloc.Job.ParameterizedJob.MetaOptional { + b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = "" + b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = "" + } + } else { + b.taskMeta = make(map[string]string, taskMetaSize) + } + for k, v := range combined { b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = v diff --git a/client/driver/env/env_test.go b/client/driver/env/env_test.go index 6ea4e72e6..01a712e57 100644 --- a/client/driver/env/env_test.go +++ b/client/driver/env/env_test.go @@ -11,6 +11,7 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) const ( @@ -372,3 +373,20 @@ func TestEnvironment_UpdateTask(t *testing.T) { t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v) } } + +// TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized +// job, if an optional meta field is not set, it will get interpolated as an +// empty string. +func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) { + require := require.New(t) + a := mock.Alloc() + a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ + MetaOptional: []string{"metaopt1", "metaopt2"}, + } + a.Job.Dispatched = true + task := a.Job.TaskGroups[0].Tasks[0] + task.Meta = map[string]string{"metaopt1": "metaopt1val"} + env := NewBuilder(mock.Node(), a, task, "global").Build() + require.Equal("metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}")) + require.Empty(env.ReplaceEnv("${NOMAD_META_metaopt2}")) +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 164899e58..d1e2af642 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -124,10 +124,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } // Validate job transitions if its an update - if existingJob != nil { - if err := validateJobUpdate(existingJob, args.Job); err != nil { - return err - } + if err := validateJobUpdate(existingJob, args.Job); err != nil { + return err } // Ensure that the job has permissions for the requested Vault tokens @@ -1327,6 +1325,14 @@ func validateJob(job *structs.Job) (invalid, warnings error) { // validateJobUpdate ensures updates to a job are valid. func validateJobUpdate(old, new *structs.Job) error { + // Validate Dispatch not set on new Jobs + if old == nil { + if new.Dispatched { + return fmt.Errorf("job can't be submitted with 'Dispatched' set") + } + return nil + } + // Type transitions are disallowed if old.Type != new.Type { return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type) @@ -1348,6 +1354,10 @@ func validateJobUpdate(old, new *structs.Job) error { return fmt.Errorf("cannot update parameterized job to being non-parameterized") } + if old.Dispatched != new.Dispatched { + return fmt.Errorf("field 'Dispatched' is read-only") + } + return nil } @@ -1398,11 +1408,11 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // Derive the child job and commit it via Raft dispatchJob := parameterizedJob.Copy() - dispatchJob.ParameterizedJob = nil dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) dispatchJob.ParentID = parameterizedJob.ID dispatchJob.Name = dispatchJob.ID dispatchJob.SetSubmitTime() + dispatchJob.Dispatched = true // Merge in the meta data for k, v := range args.Meta { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index fe51fbabe..3d70d281c 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -458,6 +458,33 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { } } +func TestJobEndpoint_Register_Dispatched(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request with a job with 'Dispatch' set to true + job := mock.Job() + job.Dispatched = true + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.Error(err) + require.Contains(err.Error(), "job can't be submitted with 'Dispatched'") +} func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { t.Parallel() s1 := TestServer(t, func(c *Config) { @@ -3959,6 +3986,7 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) { func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { t.Parallel() + require := require.New(t) old := mock.Job() new := mock.Job() @@ -3988,6 +4016,16 @@ func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { } else { t.Log(err) } + + new = mock.Job() + new.Dispatched = true + require.Error(validateJobUpdate(old, new), + "expected err when setting new job to dispatched") + require.Error(validateJobUpdate(nil, new), + "expected err when setting new job to dispatched") + require.Error(validateJobUpdate(new, old), + "expected err when setting dispatched to false") + require.NoError(validateJobUpdate(nil, old)) } func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) { @@ -4343,6 +4381,15 @@ func TestJobEndpoint_Dispatch(t *testing.T) { if out.ParentID != tc.parameterizedJob.ID { t.Fatalf("bad parent ID") } + if !out.Dispatched { + t.Fatal("expected dispatched job") + } + if out.IsParameterized() { + t.Fatal("dispatched job should not be parameterized") + } + if out.ParameterizedJob == nil { + t.Fatal("parameter job config should exist") + } if tc.noEval { return diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 879522ecd..30a59c7bd 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -150,6 +150,12 @@ func TestJobDiff(t *testing.T) { Old: "true", New: "", }, + { + Type: DiffTypeDeleted, + Name: "Dispatched", + Old: "false", + New: "", + }, { Type: DiffTypeDeleted, Name: "Meta[foo]", @@ -213,6 +219,12 @@ func TestJobDiff(t *testing.T) { Old: "", New: "true", }, + { + Type: DiffTypeAdded, + Name: "Dispatched", + Old: "", + New: "false", + }, { Type: DiffTypeAdded, Name: "Meta[foo]", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 93ad1bdbf..969f11338 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2018,6 +2018,10 @@ type Job struct { // for dispatching. ParameterizedJob *ParameterizedJobConfig + // Dispatched is used to identify if the Job has been dispatched from a + // parameterized job. + Dispatched bool + // Payload is the payload supplied when the job was dispatched. Payload []byte @@ -2328,7 +2332,7 @@ func (j *Job) IsPeriodicActive() bool { // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { - return j.ParameterizedJob != nil + return j.ParameterizedJob != nil && !j.Dispatched } // VaultPolicies returns the set of Vault policies per task group, per task