Merge pull request #4403 from hashicorp/b-fix-dispatched-optional-meta
Fix dispatched optional meta correctly
This commit is contained in:
commit
f36eb14360
|
@ -32,6 +32,8 @@ IMPROVEMENTS:
|
|||
image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)]
|
||||
* driver/raw_exec: Use cgroups to manage process tree for precise cleanup of
|
||||
launched processes [[GH-4350](https://github.com/hashicorp/nomad/issues/4350)]
|
||||
* env: Default interpolation of optional meta fields of parameterized jobs to
|
||||
an empty string rather than the field key. [[GH-3720](https://github.com/hashicorp/nomad/issues/3720)]
|
||||
* ui: Show node drain, node eligibility, and node drain strategy information in the Client list and Client detail pages [[GH-4353](https://github.com/hashicorp/nomad/issues/4353)]
|
||||
* ui: Show reschedule-event information for allocations that were server-side rescheduled [[GH-4254](https://github.com/hashicorp/nomad/issues/4254)]
|
||||
* ui: Show the running deployment Progress Deadlines on the Job Detail Page [[GH-4388](https://github.com/hashicorp/nomad/issues/4388)]
|
||||
|
|
|
@ -614,6 +614,7 @@ type Job struct {
|
|||
Update *UpdateStrategy
|
||||
Periodic *PeriodicConfig
|
||||
ParameterizedJob *ParameterizedJobConfig
|
||||
Dispatched bool
|
||||
Payload []byte
|
||||
Reschedule *ReschedulePolicy
|
||||
Migrate *MigrateStrategy
|
||||
|
@ -636,7 +637,7 @@ func (j *Job) IsPeriodic() bool {
|
|||
|
||||
// IsParameterized returns whether a job is parameterized job.
|
||||
func (j *Job) IsParameterized() bool {
|
||||
return j.ParameterizedJob != nil
|
||||
return j.ParameterizedJob != nil && !j.Dispatched
|
||||
}
|
||||
|
||||
func (j *Job) Canonicalize() {
|
||||
|
|
|
@ -397,7 +397,23 @@ func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
|
|||
|
||||
// Set meta
|
||||
combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, b.taskName)
|
||||
b.taskMeta = make(map[string]string, len(combined)*2)
|
||||
// taskMetaSize is double to total meta keys to account for given and upper
|
||||
// cased values
|
||||
taskMetaSize := len(combined) * 2
|
||||
|
||||
// if job is parameterized initialize optional meta to empty strings
|
||||
if alloc.Job.Dispatched {
|
||||
optionalMetaCount := len(alloc.Job.ParameterizedJob.MetaOptional)
|
||||
b.taskMeta = make(map[string]string, taskMetaSize+optionalMetaCount*2)
|
||||
|
||||
for _, k := range alloc.Job.ParameterizedJob.MetaOptional {
|
||||
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = ""
|
||||
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = ""
|
||||
}
|
||||
} else {
|
||||
b.taskMeta = make(map[string]string, taskMetaSize)
|
||||
}
|
||||
|
||||
for k, v := range combined {
|
||||
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
|
||||
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -372,3 +373,20 @@ func TestEnvironment_UpdateTask(t *testing.T) {
|
|||
t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnvironment_InterpolateEmptyOptionalMeta asserts that in a parameterized
|
||||
// job, if an optional meta field is not set, it will get interpolated as an
|
||||
// empty string.
|
||||
func TestEnvironment_InterpolateEmptyOptionalMeta(t *testing.T) {
|
||||
require := require.New(t)
|
||||
a := mock.Alloc()
|
||||
a.Job.ParameterizedJob = &structs.ParameterizedJobConfig{
|
||||
MetaOptional: []string{"metaopt1", "metaopt2"},
|
||||
}
|
||||
a.Job.Dispatched = true
|
||||
task := a.Job.TaskGroups[0].Tasks[0]
|
||||
task.Meta = map[string]string{"metaopt1": "metaopt1val"}
|
||||
env := NewBuilder(mock.Node(), a, task, "global").Build()
|
||||
require.Equal("metaopt1val", env.ReplaceEnv("${NOMAD_META_metaopt1}"))
|
||||
require.Empty(env.ReplaceEnv("${NOMAD_META_metaopt2}"))
|
||||
}
|
||||
|
|
|
@ -124,10 +124,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
|||
}
|
||||
|
||||
// Validate job transitions if its an update
|
||||
if existingJob != nil {
|
||||
if err := validateJobUpdate(existingJob, args.Job); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := validateJobUpdate(existingJob, args.Job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure that the job has permissions for the requested Vault tokens
|
||||
|
@ -1327,6 +1325,14 @@ func validateJob(job *structs.Job) (invalid, warnings error) {
|
|||
|
||||
// validateJobUpdate ensures updates to a job are valid.
|
||||
func validateJobUpdate(old, new *structs.Job) error {
|
||||
// Validate Dispatch not set on new Jobs
|
||||
if old == nil {
|
||||
if new.Dispatched {
|
||||
return fmt.Errorf("job can't be submitted with 'Dispatched' set")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type transitions are disallowed
|
||||
if old.Type != new.Type {
|
||||
return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type)
|
||||
|
@ -1348,6 +1354,10 @@ func validateJobUpdate(old, new *structs.Job) error {
|
|||
return fmt.Errorf("cannot update parameterized job to being non-parameterized")
|
||||
}
|
||||
|
||||
if old.Dispatched != new.Dispatched {
|
||||
return fmt.Errorf("field 'Dispatched' is read-only")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1398,11 +1408,11 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
|||
|
||||
// Derive the child job and commit it via Raft
|
||||
dispatchJob := parameterizedJob.Copy()
|
||||
dispatchJob.ParameterizedJob = nil
|
||||
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
|
||||
dispatchJob.ParentID = parameterizedJob.ID
|
||||
dispatchJob.Name = dispatchJob.ID
|
||||
dispatchJob.SetSubmitTime()
|
||||
dispatchJob.Dispatched = true
|
||||
|
||||
// Merge in the meta data
|
||||
for k, v := range args.Meta {
|
||||
|
|
|
@ -458,6 +458,33 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Register_Dispatched(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request with a job with 'Dispatch' set to true
|
||||
job := mock.Job()
|
||||
job.Dispatched = true
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
require.Error(err)
|
||||
require.Contains(err.Error(), "job can't be submitted with 'Dispatched'")
|
||||
}
|
||||
func TestJobEndpoint_Register_EnforceIndex(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
|
@ -3959,6 +3986,7 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) {
|
|||
|
||||
func TestJobEndpoint_ValidateJobUpdate(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
old := mock.Job()
|
||||
new := mock.Job()
|
||||
|
||||
|
@ -3988,6 +4016,16 @@ func TestJobEndpoint_ValidateJobUpdate(t *testing.T) {
|
|||
} else {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
new = mock.Job()
|
||||
new.Dispatched = true
|
||||
require.Error(validateJobUpdate(old, new),
|
||||
"expected err when setting new job to dispatched")
|
||||
require.Error(validateJobUpdate(nil, new),
|
||||
"expected err when setting new job to dispatched")
|
||||
require.Error(validateJobUpdate(new, old),
|
||||
"expected err when setting dispatched to false")
|
||||
require.NoError(validateJobUpdate(nil, old))
|
||||
}
|
||||
|
||||
func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) {
|
||||
|
@ -4343,6 +4381,15 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
if out.ParentID != tc.parameterizedJob.ID {
|
||||
t.Fatalf("bad parent ID")
|
||||
}
|
||||
if !out.Dispatched {
|
||||
t.Fatal("expected dispatched job")
|
||||
}
|
||||
if out.IsParameterized() {
|
||||
t.Fatal("dispatched job should not be parameterized")
|
||||
}
|
||||
if out.ParameterizedJob == nil {
|
||||
t.Fatal("parameter job config should exist")
|
||||
}
|
||||
|
||||
if tc.noEval {
|
||||
return
|
||||
|
|
|
@ -150,6 +150,12 @@ func TestJobDiff(t *testing.T) {
|
|||
Old: "true",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Dispatched",
|
||||
Old: "false",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Meta[foo]",
|
||||
|
@ -213,6 +219,12 @@ func TestJobDiff(t *testing.T) {
|
|||
Old: "",
|
||||
New: "true",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Dispatched",
|
||||
Old: "",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Meta[foo]",
|
||||
|
|
|
@ -2018,6 +2018,10 @@ type Job struct {
|
|||
// for dispatching.
|
||||
ParameterizedJob *ParameterizedJobConfig
|
||||
|
||||
// Dispatched is used to identify if the Job has been dispatched from a
|
||||
// parameterized job.
|
||||
Dispatched bool
|
||||
|
||||
// Payload is the payload supplied when the job was dispatched.
|
||||
Payload []byte
|
||||
|
||||
|
@ -2328,7 +2332,7 @@ func (j *Job) IsPeriodicActive() bool {
|
|||
|
||||
// IsParameterized returns whether a job is parameterized job.
|
||||
func (j *Job) IsParameterized() bool {
|
||||
return j.ParameterizedJob != nil
|
||||
return j.ParameterizedJob != nil && !j.Dispatched
|
||||
}
|
||||
|
||||
// VaultPolicies returns the set of Vault policies per task group, per task
|
||||
|
|
Loading…
Reference in New Issue