Prevent kill_timeout greater than progress_deadline (#16761)

* func: add validation for kill timeout smaller than progress dealine

* style: add changelog

* style: typo in changelog

* style: remove refactored test

* Update .changelog/16761.txt

Co-authored-by: James Rasell <jrasell@users.noreply.github.com>

* Update nomad/structs/structs.go

Co-authored-by: James Rasell <jrasell@users.noreply.github.com>

---------

Co-authored-by: James Rasell <jrasell@users.noreply.github.com>
This commit is contained in:
Juana De La Cuesta 2023-04-04 18:17:10 +02:00 committed by GitHub
parent 15a2d912b3
commit 9b4871fece
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 432 additions and 332 deletions

3
.changelog/16761.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
core: Prevent `task.kill_timeout` being greater than `update.progress_deadline`
```

View File

@ -5136,6 +5136,12 @@ func (u *UpdateStrategy) IsEmpty() bool {
return true return true
} }
// When the Job is transformed from api to struct, the Update Strategy block is
// copied into the existing task groups, the only things that are passed along
// are MaxParallel and Stagger, because they are enforced at job level.
// That is why checking if MaxParallel is zero is enough to know if the
// update block is empty.
return u.MaxParallel == 0 return u.MaxParallel == 0
} }
@ -6708,6 +6714,8 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, outer) mErr.Errors = append(mErr.Errors, outer)
} }
isTypeService := j.Type == JobTypeService
// Validate the tasks // Validate the tasks
for _, task := range tg.Tasks { for _, task := range tg.Tasks {
// Validate the task does not reference undefined volume mounts // Validate the task does not reference undefined volume mounts
@ -6727,6 +6735,15 @@ func (tg *TaskGroup) Validate(j *Job) error {
outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err)
mErr.Errors = append(mErr.Errors, outer) mErr.Errors = append(mErr.Errors, outer)
} }
// Validate the group's Update Strategy does not conflict with the Task's kill_timeout for service type jobs
if isTypeService && tg.Update != nil {
if task.KillTimeout > tg.Update.ProgressDeadline {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a kill timout (%s) longer than the group's progress deadline (%s)",
task.Name, task.KillTimeout.String(), tg.Update.ProgressDeadline.String()))
}
}
} }
return mErr.ErrorOrNil() return mErr.ErrorOrNil()

View File

@ -22,81 +22,109 @@ import (
func TestJob_Validate(t *testing.T) { func TestJob_Validate(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
j := &Job{} tests := []struct {
err := j.Validate() name string
requireErrors(t, err, job *Job
"datacenters", expErr []string
"job ID", }{
"job name", {
"job region", name: "job is empty",
"job type", job: &Job{},
"namespace", expErr: []string{
"task groups", "datacenters",
) "job ID",
"job name",
j = &Job{ "job region",
Type: "invalid-job-type", "job type",
} "namespace",
err = j.Validate() "task groups",
if expected := `Invalid job type: "invalid-job-type"`; !strings.Contains(err.Error(), expected) { },
t.Errorf("expected %s but found: %v", expected, err)
}
j = &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: true,
}, },
} {
err = j.Validate() name: "job type is invalid",
require.Error(t, err, "Periodic") job: &Job{
Type: "invalid-job-type",
j = &Job{ },
Region: "global", expErr: []string{
ID: uuid.Generate(), `Invalid job type: "invalid-job-type"`,
Namespace: "test", },
Name: "my-job", },
Type: JobTypeService, {
Priority: JobDefaultPriority, name: "job periodic specification type is missing",
Datacenters: []string{"*"}, job: &Job{
TaskGroups: []*TaskGroup{ Type: JobTypeService,
{ Periodic: &PeriodicConfig{
Name: "web", Enabled: true,
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
}, },
}, },
{ expErr: []string{
Name: "web", `Unknown periodic specification type ""`,
RestartPolicy: &RestartPolicy{ "Must specify a spec",
Interval: 5 * time.Minute, },
Delay: 10 * time.Second, },
Attempts: 10, {
name: "job datacenters is empty",
job: &Job{
Datacenters: []string{""},
},
expErr: []string{
"datacenter must be non-empty string",
},
},
{
name: "job task group is type invalid",
job: &Job{
Region: "global",
ID: uuid.Generate(),
Namespace: "test",
Name: "my-job",
Type: JobTypeService,
Priority: JobDefaultPriority,
Datacenters: []string{"*"},
TaskGroups: []*TaskGroup{
{
Name: "web",
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
{
Name: "web",
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
{
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
}, },
}, },
{ expErr: []string{
RestartPolicy: &RestartPolicy{ "2 redefines 'web' from group 1",
Interval: 5 * time.Minute, "group 3 missing name",
Delay: 10 * time.Second, "Task group web validation failed",
Attempts: 10, "Missing tasks for task group",
}, "Unsupported restart mode",
"Task Group web should have a reschedule policy",
"Task Group web should have an ephemeral disk object",
}, },
}, },
} }
err = j.Validate() for _, tc := range tests {
requireErrors(t, err, t.Run(tc.name, func(t *testing.T) {
"2 redefines 'web' from group 1", err := tc.job.Validate()
"group 3 missing name", requireErrors(t, err, tc.expErr...)
"Task group web validation failed", })
)
// test for invalid datacenters
j = &Job{
Datacenters: []string{""},
} }
err = j.Validate()
require.Error(t, err, "datacenter must be non-empty string")
} }
func TestJob_ValidateScaling(t *testing.T) { func TestJob_ValidateScaling(t *testing.T) {
@ -1007,322 +1035,374 @@ func TestTaskGroup_UsesConnect(t *testing.T) {
func TestTaskGroup_Validate(t *testing.T) { func TestTaskGroup_Validate(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
j := testJob() tests := []struct {
tg := &TaskGroup{ name string
Count: -1, tg *TaskGroup
RestartPolicy: &RestartPolicy{ expErr []string
Interval: 5 * time.Minute, jobType string
Delay: 10 * time.Second, }{
Attempts: 10, {
Mode: RestartPolicyModeDelay, name: "task group is missing basic specs",
}, tg: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{ Count: -1,
Interval: 5 * time.Minute, RestartPolicy: &RestartPolicy{
Attempts: 5, Interval: 5 * time.Minute,
Delay: 5 * time.Second, Delay: 10 * time.Second,
}, Attempts: 10,
} Mode: RestartPolicyModeDelay,
err := tg.Validate(j) },
requireErrors(t, err, ReschedulePolicy: &ReschedulePolicy{
"group name", Interval: 5 * time.Minute,
"count can't be negative", Attempts: 5,
"Missing tasks", Delay: 5 * time.Second,
)
tg = &TaskGroup{
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{
Networks: []*NetworkResource{
{
ReservedPorts: []Port{{Label: "foo", Value: 123}},
},
},
}, },
}, },
{ expErr: []string{
Name: "task-b", "group name",
Resources: &Resources{ "count can't be negative",
Networks: []*NetworkResource{ "Missing tasks",
{ },
ReservedPorts: []Port{{Label: "foo", Value: 123}}, jobType: JobTypeService,
},
{
name: "two tasks using same port",
tg: &TaskGroup{
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{
Networks: []*NetworkResource{
{
ReservedPorts: []Port{{Label: "foo", Value: 123}},
},
},
}, },
}, },
}, {
}, Name: "task-b",
}, Resources: &Resources{
} Networks: []*NetworkResource{
err = tg.Validate(&Job{}) {
expected := `Static port 123 already reserved by task-a:foo` ReservedPorts: []Port{{Label: "foo", Value: 123}},
if !strings.Contains(err.Error(), expected) { },
t.Errorf("expected %s but found: %v", expected, err)
}
tg = &TaskGroup{
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{
Networks: []*NetworkResource{
{
ReservedPorts: []Port{
{Label: "foo", Value: 123},
{Label: "bar", Value: 123},
}, },
}, },
}, },
}, },
}, },
}, expErr: []string{
} "Static port 123 already reserved by task-a:foo",
err = tg.Validate(&Job{})
expected = `Static port 123 already reserved by task-a:foo`
if !strings.Contains(err.Error(), expected) {
t.Errorf("expected %s but found: %v", expected, err)
}
tg = &TaskGroup{
Name: "web",
Count: 1,
Tasks: []*Task{
{Name: "web", Leader: true},
{Name: "web", Leader: true},
{},
},
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "constant",
},
}
err = tg.Validate(j)
requireErrors(t, err,
"should have an ephemeral disk object",
"2 redefines 'web' from task 1",
"Task 3 missing name",
"Only one task may be marked as leader",
"Task web validation failed",
)
tg = &TaskGroup{
Name: "web",
Count: 1,
Tasks: []*Task{
{Name: "web", Leader: true},
},
Update: DefaultUpdateStrategy.Copy(),
}
j.Type = JobTypeBatch
err = tg.Validate(j)
require.Error(t, err, "does not allow update block")
tg = &TaskGroup{
Count: -1,
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 5,
Delay: 5 * time.Second,
},
}
j.Type = JobTypeSystem
err = tg.Validate(j)
if !strings.Contains(err.Error(), "System jobs should not have a reschedule policy") {
t.Fatalf("err: %s", err)
}
tg = &TaskGroup{
Networks: []*NetworkResource{
{
DynamicPorts: []Port{{"http", 0, 80, ""}},
}, },
jobType: JobTypeService,
}, },
Tasks: []*Task{ {
{ name: "one task using same port twice",
Resources: &Resources{ tg: &TaskGroup{
Networks: []*NetworkResource{ Tasks: []*Task{
{ {
DynamicPorts: []Port{{"http", 0, 80, ""}}, Name: "task-a",
Resources: &Resources{
Networks: []*NetworkResource{
{
ReservedPorts: []Port{
{Label: "foo", Value: 123},
{Label: "bar", Value: 123},
},
},
},
}, },
}, },
}, },
}, },
}, expErr: []string{
} "Static port 123 already reserved by task-a:foo",
err = tg.Validate(j)
require.Contains(t, err.Error(), "Port label http already in use")
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "nothost",
Source: "foo",
}, },
jobType: JobTypeService,
}, },
Tasks: []*Task{ {
{ name: "multiple leaders defined and one empty task",
Name: "task-a", tg: &TaskGroup{
Resources: &Resources{}, Name: "web",
Count: 1,
Tasks: []*Task{
{Name: "web", Leader: true},
{Name: "web", Leader: true},
{},
},
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "constant",
},
}, },
}, expErr: []string{
} "should have an ephemeral disk object",
err = tg.Validate(&Job{}) "2 redefines 'web' from task 1",
require.Contains(t, err.Error(), `volume has unrecognized type nothost`) "Task 3 missing name",
"Only one task may be marked as leader",
tg = &TaskGroup{ "Task web validation failed",
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "host",
}, },
jobType: JobTypeService,
}, },
Tasks: []*Task{ {
{ name: "invalid update block for batch job",
Name: "task-a", tg: &TaskGroup{
Resources: &Resources{}, Name: "web",
Count: 1,
Tasks: []*Task{
{Name: "web", Leader: true},
},
Update: DefaultUpdateStrategy.Copy(),
}, },
}, expErr: []string{
} "does not allow update block",
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `volume has an empty source`)
tg = &TaskGroup{
Name: "group-a",
Update: &UpdateStrategy{
Canary: 1,
},
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "csi",
PerAlloc: true,
}, },
jobType: JobTypeBatch,
}, },
Tasks: []*Task{ {
{ name: "invalid reschedule policy for system job",
Name: "task-a", tg: &TaskGroup{
Resources: &Resources{}, Count: -1,
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 5,
Delay: 5 * time.Second,
},
}, },
}, expErr: []string{
} "System jobs should not have a reschedule policy",
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `volume has an empty source`)
require.Contains(t, err.Error(), `volume cannot be per_alloc when canaries are in use`)
require.Contains(t, err.Error(), `CSI volumes must have an attachment mode`)
require.Contains(t, err.Error(), `CSI volumes must have an access mode`)
tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "host",
}, },
jobType: JobTypeSystem,
}, },
Tasks: []*Task{ {
{ name: "duplicated por label",
Name: "task-a", tg: &TaskGroup{
Resources: &Resources{}, Networks: []*NetworkResource{
VolumeMounts: []*VolumeMount{
{ {
Volume: "", DynamicPorts: []Port{{"http", 0, 80, ""}},
},
},
Tasks: []*Task{
{
Resources: &Resources{
Networks: []*NetworkResource{
{
DynamicPorts: []Port{{"http", 0, 80, ""}},
},
},
},
}, },
}, },
}, },
{ expErr: []string{
Name: "task-b", "Port label http already in use",
Resources: &Resources{}, },
VolumeMounts: []*VolumeMount{ jobType: JobTypeService,
},
{
name: "invalid volume type",
tg: &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "nothost",
Source: "foo",
},
},
Tasks: []*Task{
{ {
Volume: "foob", Name: "task-a",
Resources: &Resources{},
}, },
}, },
}, },
expErr: []string{
"volume has unrecognized type nothost",
},
jobType: JobTypeService,
}, },
} {
err = tg.Validate(&Job{}) name: "invalid volume with wrong CSI and canary specs",
expected = `Task task-a has a volume mount (0) referencing an empty volume` tg: &TaskGroup{
require.Contains(t, err.Error(), expected) Name: "group-a",
Update: &UpdateStrategy{
expected = `Task task-b has a volume mount (0) referencing undefined volume foob` Canary: 1,
require.Contains(t, err.Error(), expected) },
Volumes: map[string]*VolumeRequest{
taskA := &Task{Name: "task-a"} "foo": {
tg = &TaskGroup{ Type: "csi",
Name: "group-a", PerAlloc: true,
Services: []*Service{ },
{ },
Name: "service-a", Tasks: []*Task{
Provider: "consul",
Checks: []*ServiceCheck{
{ {
Name: "check-a", Name: "task-a",
Type: "tcp", Resources: &Resources{},
TaskName: "task-b",
PortLabel: "http",
Interval: time.Duration(1 * time.Second),
Timeout: time.Duration(1 * time.Second),
}, },
}, },
}, },
}, expErr: []string{
Tasks: []*Task{taskA}, `volume has an empty source`,
} `volume cannot be per_alloc when canaries are in use`,
err = tg.Validate(&Job{}) `CSI volumes must have an attachment mode`,
expected = `Check check-a invalid: refers to non-existent task task-b` `CSI volumes must have an access mode`,
require.Contains(t, err.Error(), expected)
tg = &TaskGroup{
Name: "group-a",
Services: []*Service{
{
Name: "service-a",
Provider: "nomad",
},
{
Name: "service-b",
Provider: "consul",
}, },
jobType: JobTypeService,
}, },
Tasks: []*Task{{Name: "task-a"}}, {
} name: "invalid task referencing non existent task",
err = tg.Validate(&Job{}) tg: &TaskGroup{
expected = "Multiple service providers used: task group services must use the same provider" Name: "group-a",
require.Contains(t, err.Error(), expected)
tg = &TaskGroup{
Name: "group-a",
Services: []*Service{
{
Name: "service-a",
Provider: "nomad",
},
},
Tasks: []*Task{
{
Name: "task-a",
Services: []*Service{ Services: []*Service{
{
Name: "service-a",
Provider: "consul",
Checks: []*ServiceCheck{
{
Name: "check-a",
Type: "tcp",
TaskName: "task-b",
PortLabel: "http",
Interval: time.Duration(1 * time.Second),
Timeout: time.Duration(1 * time.Second),
},
},
},
},
Tasks: []*Task{
{Name: "task-a"},
},
},
expErr: []string{
"Check check-a invalid: refers to non-existent task task-b",
},
jobType: JobTypeService,
},
{
name: "invalid volume for tasks",
tg: &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "host",
},
},
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{},
VolumeMounts: []*VolumeMount{
{
Volume: "",
},
},
},
{
Name: "task-b",
Resources: &Resources{},
VolumeMounts: []*VolumeMount{
{
Volume: "foob",
},
},
},
},
},
expErr: []string{
`Task task-a has a volume mount (0) referencing an empty volume`,
`Task task-b has a volume mount (0) referencing undefined volume foob`,
},
jobType: JobTypeService,
},
{
name: "services inside group using different providers",
tg: &TaskGroup{
Name: "group-a",
Services: []*Service{
{
Name: "service-a",
Provider: "nomad",
},
{ {
Name: "service-b", Name: "service-b",
Provider: "consul", Provider: "consul",
}, },
}, },
Tasks: []*Task{{Name: "task-a"}},
}, },
expErr: []string{
"Multiple service providers used: task group services must use the same provider",
},
jobType: JobTypeService,
},
{
name: "conflicting progress deadline and kill timeout",
tg: &TaskGroup{
Name: "web",
Count: 1,
Tasks: []*Task{
{
Name: "web",
Leader: true,
KillTimeout: DefaultUpdateStrategy.ProgressDeadline + 25*time.Minute,
},
},
Update: DefaultUpdateStrategy.Copy(),
},
expErr: []string{
"Task web has a kill timout (35m0s) longer than the group's progress deadline (10m0s)",
},
jobType: JobTypeService,
},
{
name: "service and task using different provider",
tg: &TaskGroup{
Name: "group-a",
Services: []*Service{
{
Name: "service-a",
Provider: "nomad",
},
},
Tasks: []*Task{
{
Name: "task-a",
Services: []*Service{
{
Name: "service-b",
Provider: "consul",
},
},
},
},
},
expErr: []string{
"Multiple service providers used: task group services must use the same provider",
},
jobType: JobTypeService,
}, },
} }
err = tg.Validate(&Job{})
expected = "Multiple service providers used: task group services must use the same provider" for _, tc := range tests {
require.Contains(t, err.Error(), expected) t.Run(tc.name, func(t *testing.T) {
j := testJob()
j.Type = tc.jobType
err := tc.tg.Validate(j)
requireErrors(t, err, tc.expErr...)
})
}
} }
func TestTaskGroupNetwork_Validate(t *testing.T) { func TestTaskGroupNetwork_Validate(t *testing.T) {