handle upgrading old update block syntax
This commit is contained in:
parent
6232b66ea7
commit
50eec3ef35
24
nomad/fsm.go
24
nomad/fsm.go
|
@ -247,11 +247,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// COMPAT: Remove in 0.6
|
/* Handle upgrade paths:
|
||||||
// Empty maps and slices should be treated as nil to avoid
|
* - Empty maps and slices should be treated as nil to avoid
|
||||||
// un-intended destructive updates in scheduler since we use
|
* un-intended destructive updates in scheduler since we use
|
||||||
// reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||||
// the incoming job.
|
* the incoming job.
|
||||||
|
* - Migrate from old style upgrade stanza that used only a stagger.
|
||||||
|
*/
|
||||||
req.Job.Canonicalize()
|
req.Job.Canonicalize()
|
||||||
|
|
||||||
if err := n.state.UpsertJob(index, req.Job); err != nil {
|
if err := n.state.UpsertJob(index, req.Job); err != nil {
|
||||||
|
@ -615,11 +617,13 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// COMPAT: Remove in 0.5
|
/* Handle upgrade paths:
|
||||||
// Empty maps and slices should be treated as nil to avoid
|
* - Empty maps and slices should be treated as nil to avoid
|
||||||
// un-intended destructive updates in scheduler since we use
|
* un-intended destructive updates in scheduler since we use
|
||||||
// reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||||
// the incoming job.
|
* the incoming job.
|
||||||
|
* - Migrate from old style upgrade stanza that used only a stagger.
|
||||||
|
*/
|
||||||
job.Canonicalize()
|
job.Canonicalize()
|
||||||
|
|
||||||
if err := restore.JobRestore(job); err != nil {
|
if err := restore.JobRestore(job); err != nil {
|
||||||
|
|
|
@ -1257,6 +1257,45 @@ func (j *Job) Canonicalize() {
|
||||||
if j.Periodic != nil {
|
if j.Periodic != nil {
|
||||||
j.Periodic.Canonicalize()
|
j.Periodic.Canonicalize()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// COMPAT: Remove in 0.7.0
|
||||||
|
// Rewrite any job that has an update block with pre 0.6.0 syntax.
|
||||||
|
if j.Update.Stagger > 0 && j.Update.MaxParallel > 0 {
|
||||||
|
// Build an appropriate update block and copy it down to each task group
|
||||||
|
base := DefaultUpdateStrategy.Copy()
|
||||||
|
base.MaxParallel = j.Update.MaxParallel
|
||||||
|
base.MinHealthyTime = j.Update.Stagger
|
||||||
|
|
||||||
|
// Add to each task group, modifying as needed
|
||||||
|
l := len(j.TaskGroups)
|
||||||
|
for _, tg := range j.TaskGroups {
|
||||||
|
// The task group doesn't need upgrading if it has an update block with the new syntax
|
||||||
|
u := tg.Update
|
||||||
|
if u != nil && u.Stagger == 0 && u.MaxParallel > 0 &&
|
||||||
|
u.HealthCheck != "" && u.MinHealthyTime > 0 && u.HealthyDeadline > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// The MaxParallel for the job should be 10% of the total count
|
||||||
|
// unless there is just one task group then we can infer the old
|
||||||
|
// max parallel should be the new
|
||||||
|
tgu := base.Copy()
|
||||||
|
if l != 1 {
|
||||||
|
// RoundTo 10%
|
||||||
|
var percent float64 = float64(tg.Count) * 0.1
|
||||||
|
tgu.MaxParallel = int(percent + 0.5)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safety guards
|
||||||
|
if tgu.MaxParallel == 0 {
|
||||||
|
tgu.MaxParallel = 1
|
||||||
|
} else if tgu.MaxParallel > tg.Count {
|
||||||
|
tgu.MaxParallel = tg.Count
|
||||||
|
}
|
||||||
|
|
||||||
|
tg.Update = tgu
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy returns a deep copy of the Job. It is expected that callers use recover.
|
// Copy returns a deep copy of the Job. It is expected that callers use recover.
|
||||||
|
@ -1626,6 +1665,19 @@ const (
|
||||||
UpdateStrategyHealthCheck_Manual = "manual"
|
UpdateStrategyHealthCheck_Manual = "manual"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// DefaultUpdateStrategy provides a baseline that can be used to upgrade
|
||||||
|
// jobs with the old policy
|
||||||
|
DefaultUpdateStrategy = &UpdateStrategy{
|
||||||
|
MaxParallel: 0,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// UpdateStrategy is used to modify how updates are done
|
// UpdateStrategy is used to modify how updates are done
|
||||||
type UpdateStrategy struct {
|
type UpdateStrategy struct {
|
||||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/kr/pretty"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestJob_Validate(t *testing.T) {
|
func TestJob_Validate(t *testing.T) {
|
||||||
|
@ -134,6 +135,166 @@ func TestJob_Warnings(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJob_Canonicalize_Update(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
Name string
|
||||||
|
Job *Job
|
||||||
|
Expected *Job
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "One task group",
|
||||||
|
Job: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expected: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
EphemeralDisk: DefaultEphemeralDisk(),
|
||||||
|
Update: &UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "One task group; too high of parallelism",
|
||||||
|
Job: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 200,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expected: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 200,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
EphemeralDisk: DefaultEphemeralDisk(),
|
||||||
|
Update: &UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Multiple task group; rounding",
|
||||||
|
Job: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "bar",
|
||||||
|
Count: 14,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 26,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Expected: &Job{
|
||||||
|
Update: UpdateStrategy{
|
||||||
|
MaxParallel: 2,
|
||||||
|
Stagger: 10 * time.Second,
|
||||||
|
},
|
||||||
|
TaskGroups: []*TaskGroup{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 2,
|
||||||
|
EphemeralDisk: DefaultEphemeralDisk(),
|
||||||
|
Update: &UpdateStrategy{
|
||||||
|
MaxParallel: 1,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "bar",
|
||||||
|
Count: 14,
|
||||||
|
EphemeralDisk: DefaultEphemeralDisk(),
|
||||||
|
Update: &UpdateStrategy{
|
||||||
|
MaxParallel: 1,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Count: 26,
|
||||||
|
EphemeralDisk: DefaultEphemeralDisk(),
|
||||||
|
Update: &UpdateStrategy{
|
||||||
|
MaxParallel: 3,
|
||||||
|
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||||
|
MinHealthyTime: 10 * time.Second,
|
||||||
|
HealthyDeadline: 5 * time.Minute,
|
||||||
|
AutoRevert: false,
|
||||||
|
Canary: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.Name, func(t *testing.T) {
|
||||||
|
c.Job.Canonicalize()
|
||||||
|
if !reflect.DeepEqual(c.Job, c.Expected) {
|
||||||
|
t.Fatalf("Got %# v; want %# v", pretty.Formatter(c.Job), pretty.Formatter(c.Expected))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testJob() *Job {
|
func testJob() *Job {
|
||||||
return &Job{
|
return &Job{
|
||||||
Region: "global",
|
Region: "global",
|
||||||
|
|
Loading…
Reference in a new issue