From 9b4871fece4a32963a0d485c1956ae289bdad5e0 Mon Sep 17 00:00:00 2001 From: Juana De La Cuesta Date: Tue, 4 Apr 2023 18:17:10 +0200 Subject: [PATCH] Prevent kill_timeout greater than progress_deadline (#16761) * func: add validation for kill timeout smaller than progress dealine * style: add changelog * style: typo in changelog * style: remove refactored test * Update .changelog/16761.txt Co-authored-by: James Rasell * Update nomad/structs/structs.go Co-authored-by: James Rasell --------- Co-authored-by: James Rasell --- .changelog/16761.txt | 3 + nomad/structs/structs.go | 17 + nomad/structs/structs_test.go | 744 +++++++++++++++++++--------------- 3 files changed, 432 insertions(+), 332 deletions(-) create mode 100644 .changelog/16761.txt diff --git a/.changelog/16761.txt b/.changelog/16761.txt new file mode 100644 index 000000000..d6bdb1839 --- /dev/null +++ b/.changelog/16761.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: Prevent `task.kill_timeout` being greater than `update.progress_deadline` +``` diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 490c96358..8e2bae74b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5136,6 +5136,12 @@ func (u *UpdateStrategy) IsEmpty() bool { return true } + // When the Job is transformed from api to struct, the Update Strategy block is + // copied into the existing task groups, the only things that are passed along + // are MaxParallel and Stagger, because they are enforced at job level. + // That is why checking if MaxParallel is zero is enough to know if the + // update block is empty. + return u.MaxParallel == 0 } @@ -6708,6 +6714,8 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, outer) } + isTypeService := j.Type == JobTypeService + // Validate the tasks for _, task := range tg.Tasks { // Validate the task does not reference undefined volume mounts @@ -6727,6 +6735,15 @@ func (tg *TaskGroup) Validate(j *Job) error { outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) mErr.Errors = append(mErr.Errors, outer) } + + // Validate the group's Update Strategy does not conflict with the Task's kill_timeout for service type jobs + if isTypeService && tg.Update != nil { + if task.KillTimeout > tg.Update.ProgressDeadline { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a kill timout (%s) longer than the group's progress deadline (%s)", + task.Name, task.KillTimeout.String(), tg.Update.ProgressDeadline.String())) + } + } + } return mErr.ErrorOrNil() diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 772fa49ff..b9769012f 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -22,81 +22,109 @@ import ( func TestJob_Validate(t *testing.T) { ci.Parallel(t) - j := &Job{} - err := j.Validate() - requireErrors(t, err, - "datacenters", - "job ID", - "job name", - "job region", - "job type", - "namespace", - "task groups", - ) - - j = &Job{ - Type: "invalid-job-type", - } - err = j.Validate() - if expected := `Invalid job type: "invalid-job-type"`; !strings.Contains(err.Error(), expected) { - t.Errorf("expected %s but found: %v", expected, err) - } - - j = &Job{ - Type: JobTypeService, - Periodic: &PeriodicConfig{ - Enabled: true, + tests := []struct { + name string + job *Job + expErr []string + }{ + { + name: "job is empty", + job: &Job{}, + expErr: []string{ + "datacenters", + "job ID", + "job name", + "job region", + "job type", + "namespace", + "task groups", + }, }, - } - err = j.Validate() - require.Error(t, err, "Periodic") - - j = &Job{ - Region: "global", - ID: uuid.Generate(), - Namespace: "test", - Name: "my-job", - Type: JobTypeService, - Priority: JobDefaultPriority, - Datacenters: []string{"*"}, - TaskGroups: []*TaskGroup{ - { - Name: "web", - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, + { + name: "job type is invalid", + job: &Job{ + Type: "invalid-job-type", + }, + expErr: []string{ + `Invalid job type: "invalid-job-type"`, + }, + }, + { + name: "job periodic specification type is missing", + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: true, }, }, - { - Name: "web", - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, + expErr: []string{ + `Unknown periodic specification type ""`, + "Must specify a spec", + }, + }, + { + name: "job datacenters is empty", + job: &Job{ + Datacenters: []string{""}, + }, + expErr: []string{ + "datacenter must be non-empty string", + }, + }, + { + name: "job task group is type invalid", + job: &Job{ + Region: "global", + ID: uuid.Generate(), + Namespace: "test", + Name: "my-job", + Type: JobTypeService, + Priority: JobDefaultPriority, + Datacenters: []string{"*"}, + TaskGroups: []*TaskGroup{ + { + Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, + { + Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, + { + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, }, }, - { - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, - }, + expErr: []string{ + "2 redefines 'web' from group 1", + "group 3 missing name", + "Task group web validation failed", + "Missing tasks for task group", + "Unsupported restart mode", + "Task Group web should have a reschedule policy", + "Task Group web should have an ephemeral disk object", }, }, } - err = j.Validate() - requireErrors(t, err, - "2 redefines 'web' from group 1", - "group 3 missing name", - "Task group web validation failed", - ) - // test for invalid datacenters - j = &Job{ - Datacenters: []string{""}, + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.job.Validate() + requireErrors(t, err, tc.expErr...) + }) } - err = j.Validate() - require.Error(t, err, "datacenter must be non-empty string") + } func TestJob_ValidateScaling(t *testing.T) { @@ -1007,322 +1035,374 @@ func TestTaskGroup_UsesConnect(t *testing.T) { func TestTaskGroup_Validate(t *testing.T) { ci.Parallel(t) - j := testJob() - tg := &TaskGroup{ - Count: -1, - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, - Mode: RestartPolicyModeDelay, - }, - ReschedulePolicy: &ReschedulePolicy{ - Interval: 5 * time.Minute, - Attempts: 5, - Delay: 5 * time.Second, - }, - } - err := tg.Validate(j) - requireErrors(t, err, - "group name", - "count can't be negative", - "Missing tasks", - ) - - tg = &TaskGroup{ - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{ - Networks: []*NetworkResource{ - { - ReservedPorts: []Port{{Label: "foo", Value: 123}}, - }, - }, + tests := []struct { + name string + tg *TaskGroup + expErr []string + jobType string + }{ + { + name: "task group is missing basic specs", + tg: &TaskGroup{ + Count: -1, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 5, + Delay: 5 * time.Second, }, }, - { - Name: "task-b", - Resources: &Resources{ - Networks: []*NetworkResource{ - { - ReservedPorts: []Port{{Label: "foo", Value: 123}}, + expErr: []string{ + "group name", + "count can't be negative", + "Missing tasks", + }, + jobType: JobTypeService, + }, + { + name: "two tasks using same port", + tg: &TaskGroup{ + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{{Label: "foo", Value: 123}}, + }, + }, }, }, - }, - }, - }, - } - err = tg.Validate(&Job{}) - expected := `Static port 123 already reserved by task-a:foo` - if !strings.Contains(err.Error(), expected) { - t.Errorf("expected %s but found: %v", expected, err) - } - - tg = &TaskGroup{ - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{ - Networks: []*NetworkResource{ - { - ReservedPorts: []Port{ - {Label: "foo", Value: 123}, - {Label: "bar", Value: 123}, + { + Name: "task-b", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{{Label: "foo", Value: 123}}, + }, }, }, }, }, }, - }, - } - err = tg.Validate(&Job{}) - expected = `Static port 123 already reserved by task-a:foo` - if !strings.Contains(err.Error(), expected) { - t.Errorf("expected %s but found: %v", expected, err) - } - - tg = &TaskGroup{ - Name: "web", - Count: 1, - Tasks: []*Task{ - {Name: "web", Leader: true}, - {Name: "web", Leader: true}, - {}, - }, - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, - Mode: RestartPolicyModeDelay, - }, - ReschedulePolicy: &ReschedulePolicy{ - Interval: 5 * time.Minute, - Attempts: 10, - Delay: 5 * time.Second, - DelayFunction: "constant", - }, - } - - err = tg.Validate(j) - requireErrors(t, err, - "should have an ephemeral disk object", - "2 redefines 'web' from task 1", - "Task 3 missing name", - "Only one task may be marked as leader", - "Task web validation failed", - ) - - tg = &TaskGroup{ - Name: "web", - Count: 1, - Tasks: []*Task{ - {Name: "web", Leader: true}, - }, - Update: DefaultUpdateStrategy.Copy(), - } - j.Type = JobTypeBatch - err = tg.Validate(j) - require.Error(t, err, "does not allow update block") - - tg = &TaskGroup{ - Count: -1, - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, - Mode: RestartPolicyModeDelay, - }, - ReschedulePolicy: &ReschedulePolicy{ - Interval: 5 * time.Minute, - Attempts: 5, - Delay: 5 * time.Second, - }, - } - j.Type = JobTypeSystem - err = tg.Validate(j) - if !strings.Contains(err.Error(), "System jobs should not have a reschedule policy") { - t.Fatalf("err: %s", err) - } - - tg = &TaskGroup{ - Networks: []*NetworkResource{ - { - DynamicPorts: []Port{{"http", 0, 80, ""}}, + expErr: []string{ + "Static port 123 already reserved by task-a:foo", }, + jobType: JobTypeService, }, - Tasks: []*Task{ - { - Resources: &Resources{ - Networks: []*NetworkResource{ - { - DynamicPorts: []Port{{"http", 0, 80, ""}}, + { + name: "one task using same port twice", + tg: &TaskGroup{ + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{ + {Label: "foo", Value: 123}, + {Label: "bar", Value: 123}, + }, + }, + }, }, }, }, }, - }, - } - err = tg.Validate(j) - require.Contains(t, err.Error(), "Port label http already in use") - - tg = &TaskGroup{ - Volumes: map[string]*VolumeRequest{ - "foo": { - Type: "nothost", - Source: "foo", + expErr: []string{ + "Static port 123 already reserved by task-a:foo", }, + jobType: JobTypeService, }, - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{}, + { + name: "multiple leaders defined and one empty task", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + {Name: "web", Leader: true}, + {Name: "web", Leader: true}, + {}, + }, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 10, + Delay: 5 * time.Second, + DelayFunction: "constant", + }, }, - }, - } - err = tg.Validate(&Job{}) - require.Contains(t, err.Error(), `volume has unrecognized type nothost`) - - tg = &TaskGroup{ - Volumes: map[string]*VolumeRequest{ - "foo": { - Type: "host", + expErr: []string{ + "should have an ephemeral disk object", + "2 redefines 'web' from task 1", + "Task 3 missing name", + "Only one task may be marked as leader", + "Task web validation failed", }, + jobType: JobTypeService, }, - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{}, + { + name: "invalid update block for batch job", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + {Name: "web", Leader: true}, + }, + Update: DefaultUpdateStrategy.Copy(), }, - }, - } - err = tg.Validate(&Job{}) - require.Contains(t, err.Error(), `volume has an empty source`) - - tg = &TaskGroup{ - Name: "group-a", - Update: &UpdateStrategy{ - Canary: 1, - }, - Volumes: map[string]*VolumeRequest{ - "foo": { - Type: "csi", - PerAlloc: true, + expErr: []string{ + "does not allow update block", }, + jobType: JobTypeBatch, }, - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{}, + { + name: "invalid reschedule policy for system job", + tg: &TaskGroup{ + Count: -1, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 5, + Delay: 5 * time.Second, + }, }, - }, - } - err = tg.Validate(&Job{}) - require.Contains(t, err.Error(), `volume has an empty source`) - require.Contains(t, err.Error(), `volume cannot be per_alloc when canaries are in use`) - require.Contains(t, err.Error(), `CSI volumes must have an attachment mode`) - require.Contains(t, err.Error(), `CSI volumes must have an access mode`) - - tg = &TaskGroup{ - Volumes: map[string]*VolumeRequest{ - "foo": { - Type: "host", + expErr: []string{ + "System jobs should not have a reschedule policy", }, + jobType: JobTypeSystem, }, - Tasks: []*Task{ - { - Name: "task-a", - Resources: &Resources{}, - VolumeMounts: []*VolumeMount{ + { + name: "duplicated por label", + tg: &TaskGroup{ + Networks: []*NetworkResource{ { - Volume: "", + DynamicPorts: []Port{{"http", 0, 80, ""}}, + }, + }, + Tasks: []*Task{ + { + Resources: &Resources{ + Networks: []*NetworkResource{ + { + DynamicPorts: []Port{{"http", 0, 80, ""}}, + }, + }, + }, }, }, }, - { - Name: "task-b", - Resources: &Resources{}, - VolumeMounts: []*VolumeMount{ + expErr: []string{ + "Port label http already in use", + }, + jobType: JobTypeService, + }, + { + name: "invalid volume type", + tg: &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "nothost", + Source: "foo", + }, + }, + Tasks: []*Task{ { - Volume: "foob", + Name: "task-a", + Resources: &Resources{}, }, }, }, + expErr: []string{ + "volume has unrecognized type nothost", + }, + jobType: JobTypeService, }, - } - err = tg.Validate(&Job{}) - expected = `Task task-a has a volume mount (0) referencing an empty volume` - require.Contains(t, err.Error(), expected) - - expected = `Task task-b has a volume mount (0) referencing undefined volume foob` - require.Contains(t, err.Error(), expected) - - taskA := &Task{Name: "task-a"} - tg = &TaskGroup{ - Name: "group-a", - Services: []*Service{ - { - Name: "service-a", - Provider: "consul", - Checks: []*ServiceCheck{ + { + name: "invalid volume with wrong CSI and canary specs", + tg: &TaskGroup{ + Name: "group-a", + Update: &UpdateStrategy{ + Canary: 1, + }, + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "csi", + PerAlloc: true, + }, + }, + Tasks: []*Task{ { - Name: "check-a", - Type: "tcp", - TaskName: "task-b", - PortLabel: "http", - Interval: time.Duration(1 * time.Second), - Timeout: time.Duration(1 * time.Second), + Name: "task-a", + Resources: &Resources{}, }, }, }, - }, - Tasks: []*Task{taskA}, - } - err = tg.Validate(&Job{}) - expected = `Check check-a invalid: refers to non-existent task task-b` - require.Contains(t, err.Error(), expected) - - tg = &TaskGroup{ - Name: "group-a", - Services: []*Service{ - { - Name: "service-a", - Provider: "nomad", - }, - { - Name: "service-b", - Provider: "consul", + expErr: []string{ + `volume has an empty source`, + `volume cannot be per_alloc when canaries are in use`, + `CSI volumes must have an attachment mode`, + `CSI volumes must have an access mode`, }, + jobType: JobTypeService, }, - Tasks: []*Task{{Name: "task-a"}}, - } - err = tg.Validate(&Job{}) - expected = "Multiple service providers used: task group services must use the same provider" - require.Contains(t, err.Error(), expected) - - tg = &TaskGroup{ - Name: "group-a", - Services: []*Service{ - { - Name: "service-a", - Provider: "nomad", - }, - }, - Tasks: []*Task{ - { - Name: "task-a", + { + name: "invalid task referencing non existent task", + tg: &TaskGroup{ + Name: "group-a", Services: []*Service{ + { + Name: "service-a", + Provider: "consul", + Checks: []*ServiceCheck{ + { + Name: "check-a", + Type: "tcp", + TaskName: "task-b", + PortLabel: "http", + Interval: time.Duration(1 * time.Second), + Timeout: time.Duration(1 * time.Second), + }, + }, + }, + }, + Tasks: []*Task{ + {Name: "task-a"}, + }, + }, + expErr: []string{ + "Check check-a invalid: refers to non-existent task task-b", + }, + jobType: JobTypeService, + }, + { + name: "invalid volume for tasks", + tg: &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "host", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "", + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "foob", + }, + }, + }, + }, + }, + expErr: []string{ + `Task task-a has a volume mount (0) referencing an empty volume`, + `Task task-b has a volume mount (0) referencing undefined volume foob`, + }, + jobType: JobTypeService, + }, + { + name: "services inside group using different providers", + tg: &TaskGroup{ + Name: "group-a", + Services: []*Service{ + { + Name: "service-a", + Provider: "nomad", + }, { Name: "service-b", Provider: "consul", }, }, + Tasks: []*Task{{Name: "task-a"}}, }, + expErr: []string{ + "Multiple service providers used: task group services must use the same provider", + }, + jobType: JobTypeService, + }, + { + name: "conflicting progress deadline and kill timeout", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + { + Name: "web", + Leader: true, + KillTimeout: DefaultUpdateStrategy.ProgressDeadline + 25*time.Minute, + }, + }, + Update: DefaultUpdateStrategy.Copy(), + }, + expErr: []string{ + "Task web has a kill timout (35m0s) longer than the group's progress deadline (10m0s)", + }, + jobType: JobTypeService, + }, + { + name: "service and task using different provider", + tg: &TaskGroup{ + Name: "group-a", + Services: []*Service{ + { + Name: "service-a", + Provider: "nomad", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Services: []*Service{ + { + Name: "service-b", + Provider: "consul", + }, + }, + }, + }, + }, + expErr: []string{ + "Multiple service providers used: task group services must use the same provider", + }, + jobType: JobTypeService, }, } - err = tg.Validate(&Job{}) - expected = "Multiple service providers used: task group services must use the same provider" - require.Contains(t, err.Error(), expected) + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + j := testJob() + j.Type = tc.jobType + + err := tc.tg.Validate(j) + requireErrors(t, err, tc.expErr...) + }) + } } func TestTaskGroupNetwork_Validate(t *testing.T) {