Merge pull request #2634 from hashicorp/f-update-block
New Update block syntax
This commit is contained in:
commit
4503e2d1f7
|
@ -27,7 +27,7 @@ func TestEvaluations_List(t *testing.T) {
|
|||
// Register a job. This will create an evaluation.
|
||||
jobs := c.Jobs()
|
||||
job := testJob()
|
||||
evalID, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -43,8 +43,8 @@ func TestEvaluations_List(t *testing.T) {
|
|||
// if the eval fails fast there can be more than 1
|
||||
// but they are in order of most recent first, so look at the last one
|
||||
idx := len(result) - 1
|
||||
if len(result) == 0 || result[idx].ID != evalID {
|
||||
t.Fatalf("expected eval (%s), got: %#v", evalID, result[idx])
|
||||
if len(result) == 0 || result[idx].ID != resp.EvalID {
|
||||
t.Fatalf("expected eval (%s), got: %#v", resp.EvalID, result[idx])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,21 +68,21 @@ func TestEvaluations_PrefixList(t *testing.T) {
|
|||
// Register a job. This will create an evaluation.
|
||||
jobs := c.Jobs()
|
||||
job := testJob()
|
||||
evalID, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
||||
// Check the evaluations again
|
||||
result, qm, err = e.PrefixList(evalID[:4])
|
||||
result, qm, err = e.PrefixList(resp.EvalID[:4])
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
assertQueryMeta(t, qm)
|
||||
|
||||
// Check if we have the right list
|
||||
if len(result) != 1 || result[0].ID != evalID {
|
||||
if len(result) != 1 || result[0].ID != resp.EvalID {
|
||||
t.Fatalf("bad: %#v", result)
|
||||
}
|
||||
}
|
||||
|
@ -101,22 +101,22 @@ func TestEvaluations_Info(t *testing.T) {
|
|||
// Register a job. Creates a new evaluation.
|
||||
jobs := c.Jobs()
|
||||
job := testJob()
|
||||
evalID, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
||||
// Try looking up by the new eval ID
|
||||
result, qm, err := e.Info(evalID, nil)
|
||||
result, qm, err := e.Info(resp.EvalID, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
assertQueryMeta(t, qm)
|
||||
|
||||
// Check that we got the right result
|
||||
if result == nil || result.ID != evalID {
|
||||
t.Fatalf("expected eval %q, got: %#v", evalID, result)
|
||||
if result == nil || result.ID != resp.EvalID {
|
||||
t.Fatalf("expected eval %q, got: %#v", resp.EvalID, result)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
136
api/jobs.go
136
api/jobs.go
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/gorhill/cronexpr"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -50,20 +51,20 @@ func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *Write
|
|||
|
||||
// Register is used to register a new job. It returns the ID
|
||||
// of the evaluation, along with any errors encountered.
|
||||
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
|
||||
func (j *Jobs) Register(job *Job, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
|
||||
|
||||
var resp JobRegisterResponse
|
||||
|
||||
req := &RegisterJobRequest{Job: job}
|
||||
wm, err := j.client.write("/v1/jobs", req, &resp, q)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return resp.EvalID, wm, nil
|
||||
return &resp, wm, nil
|
||||
}
|
||||
|
||||
// EnforceRegister is used to register a job enforcing its job modify index.
|
||||
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {
|
||||
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
|
||||
|
||||
var resp JobRegisterResponse
|
||||
|
||||
|
@ -74,9 +75,9 @@ func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (s
|
|||
}
|
||||
wm, err := j.client.write("/v1/jobs", req, &resp, q)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return resp.EvalID, wm, nil
|
||||
return &resp, wm, nil
|
||||
}
|
||||
|
||||
// List is used to list all of the existing jobs.
|
||||
|
@ -247,10 +248,111 @@ type periodicForceResponse struct {
|
|||
EvalID string
|
||||
}
|
||||
|
||||
// UpdateStrategy is for serializing update strategy for a job.
|
||||
// UpdateStrategy defines a task groups update strategy.
|
||||
type UpdateStrategy struct {
|
||||
Stagger time.Duration
|
||||
MaxParallel int `mapstructure:"max_parallel"`
|
||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||
Stagger time.Duration `mapstructure:"stagger"`
|
||||
MaxParallel *int `mapstructure:"max_parallel"`
|
||||
HealthCheck *string `mapstructure:"health_check"`
|
||||
MinHealthyTime *time.Duration `mapstructure:"min_healthy_time"`
|
||||
HealthyDeadline *time.Duration `mapstructure:"healthy_deadline"`
|
||||
AutoRevert *bool `mapstructure:"auto_revert"`
|
||||
Canary *int `mapstructure:"canary"`
|
||||
}
|
||||
|
||||
func (u *UpdateStrategy) Copy() *UpdateStrategy {
|
||||
if u == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
copy := new(UpdateStrategy)
|
||||
|
||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||
copy.Stagger = u.Stagger
|
||||
|
||||
if u.MaxParallel != nil {
|
||||
copy.MaxParallel = helper.IntToPtr(*u.MaxParallel)
|
||||
}
|
||||
|
||||
if u.HealthCheck != nil {
|
||||
copy.HealthCheck = helper.StringToPtr(*u.HealthCheck)
|
||||
}
|
||||
|
||||
if u.MinHealthyTime != nil {
|
||||
copy.MinHealthyTime = helper.TimeToPtr(*u.MinHealthyTime)
|
||||
}
|
||||
|
||||
if u.HealthyDeadline != nil {
|
||||
copy.HealthyDeadline = helper.TimeToPtr(*u.HealthyDeadline)
|
||||
}
|
||||
|
||||
if u.AutoRevert != nil {
|
||||
copy.AutoRevert = helper.BoolToPtr(*u.AutoRevert)
|
||||
}
|
||||
|
||||
if u.Canary != nil {
|
||||
copy.Canary = helper.IntToPtr(*u.Canary)
|
||||
}
|
||||
|
||||
return copy
|
||||
}
|
||||
|
||||
func (u *UpdateStrategy) Merge(o *UpdateStrategy) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if o.MaxParallel != nil {
|
||||
u.MaxParallel = helper.IntToPtr(*o.MaxParallel)
|
||||
}
|
||||
|
||||
if o.HealthCheck != nil {
|
||||
u.HealthCheck = helper.StringToPtr(*o.HealthCheck)
|
||||
}
|
||||
|
||||
if o.MinHealthyTime != nil {
|
||||
u.MinHealthyTime = helper.TimeToPtr(*o.MinHealthyTime)
|
||||
}
|
||||
|
||||
if o.HealthyDeadline != nil {
|
||||
u.HealthyDeadline = helper.TimeToPtr(*o.HealthyDeadline)
|
||||
}
|
||||
|
||||
if o.AutoRevert != nil {
|
||||
u.AutoRevert = helper.BoolToPtr(*o.AutoRevert)
|
||||
}
|
||||
|
||||
if o.Canary != nil {
|
||||
u.Canary = helper.IntToPtr(*o.Canary)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UpdateStrategy) Canonicalize() {
|
||||
if u.MaxParallel == nil {
|
||||
u.MaxParallel = helper.IntToPtr(0)
|
||||
}
|
||||
|
||||
d := structs.DefaultUpdateStrategy
|
||||
|
||||
if u.HealthCheck == nil {
|
||||
u.HealthCheck = helper.StringToPtr(d.HealthCheck)
|
||||
}
|
||||
|
||||
if u.HealthyDeadline == nil {
|
||||
u.HealthyDeadline = helper.TimeToPtr(d.HealthyDeadline)
|
||||
}
|
||||
|
||||
if u.MinHealthyTime == nil {
|
||||
u.MinHealthyTime = helper.TimeToPtr(d.MinHealthyTime)
|
||||
}
|
||||
|
||||
if u.AutoRevert == nil {
|
||||
u.AutoRevert = helper.BoolToPtr(d.AutoRevert)
|
||||
}
|
||||
|
||||
if u.Canary == nil {
|
||||
u.Canary = helper.IntToPtr(d.Canary)
|
||||
}
|
||||
}
|
||||
|
||||
// PeriodicConfig is for serializing periodic config for a job.
|
||||
|
@ -399,6 +501,9 @@ func (j *Job) Canonicalize() {
|
|||
if j.Periodic != nil {
|
||||
j.Periodic.Canonicalize()
|
||||
}
|
||||
if j.Update != nil {
|
||||
j.Update.Canonicalize()
|
||||
}
|
||||
|
||||
for _, tg := range j.TaskGroups {
|
||||
tg.Canonicalize(j)
|
||||
|
@ -556,6 +661,10 @@ type JobValidateResponse struct {
|
|||
|
||||
// Error is a string version of any error that may have occured
|
||||
Error string
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
}
|
||||
|
||||
// JobRevertRequest is used to revert a job to a prior version.
|
||||
|
@ -597,6 +706,11 @@ type JobRegisterResponse struct {
|
|||
EvalID string
|
||||
EvalCreateIndex uint64
|
||||
JobModifyIndex uint64
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
|
@ -621,6 +735,10 @@ type JobPlanResponse struct {
|
|||
Annotations *PlanAnnotations
|
||||
FailedTGAllocs map[string]*AllocationMetric
|
||||
NextPeriodicLaunch time.Time
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
}
|
||||
|
||||
type JobDiff struct {
|
||||
|
|
186
api/jobs_test.go
186
api/jobs_test.go
|
@ -31,11 +31,11 @@ func TestJobs_Register(t *testing.T) {
|
|||
|
||||
// Create a job and attempt to register it
|
||||
job := testJob()
|
||||
eval, wm, err := jobs.Register(job, nil)
|
||||
resp2, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp2 == nil || resp2.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
@ -209,8 +209,7 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Datacenters: []string{"dc1"},
|
||||
Type: helper.StringToPtr("service"),
|
||||
Update: &UpdateStrategy{
|
||||
Stagger: 10 * time.Second,
|
||||
MaxParallel: 1,
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
|
@ -294,8 +293,12 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
JobModifyIndex: helper.Uint64ToPtr(0),
|
||||
Datacenters: []string{"dc1"},
|
||||
Update: &UpdateStrategy{
|
||||
Stagger: 10 * time.Second,
|
||||
MaxParallel: 1,
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(5 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(0),
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
|
@ -312,6 +315,15 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
Migrate: helper.BoolToPtr(false),
|
||||
SizeMB: helper.IntToPtr(300),
|
||||
},
|
||||
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(5 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(0),
|
||||
},
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "redis",
|
||||
|
@ -406,6 +418,138 @@ func TestJobs_Canonicalize(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "update_merge",
|
||||
input: &Job{
|
||||
Name: helper.StringToPtr("foo"),
|
||||
ID: helper.StringToPtr("bar"),
|
||||
ParentID: helper.StringToPtr("lol"),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(6 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(0),
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: helper.StringToPtr("bar"),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(2),
|
||||
HealthCheck: helper.StringToPtr("manual"),
|
||||
MinHealthyTime: helper.TimeToPtr(1 * time.Second),
|
||||
AutoRevert: helper.BoolToPtr(true),
|
||||
Canary: helper.IntToPtr(1),
|
||||
},
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "task1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: helper.StringToPtr("baz"),
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "task1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &Job{
|
||||
ID: helper.StringToPtr("bar"),
|
||||
Name: helper.StringToPtr("foo"),
|
||||
Region: helper.StringToPtr("global"),
|
||||
Type: helper.StringToPtr("service"),
|
||||
ParentID: helper.StringToPtr("lol"),
|
||||
Priority: helper.IntToPtr(50),
|
||||
AllAtOnce: helper.BoolToPtr(false),
|
||||
VaultToken: helper.StringToPtr(""),
|
||||
Stop: helper.BoolToPtr(false),
|
||||
Stable: helper.BoolToPtr(false),
|
||||
Version: helper.Uint64ToPtr(0),
|
||||
Status: helper.StringToPtr(""),
|
||||
StatusDescription: helper.StringToPtr(""),
|
||||
CreateIndex: helper.Uint64ToPtr(0),
|
||||
ModifyIndex: helper.Uint64ToPtr(0),
|
||||
JobModifyIndex: helper.Uint64ToPtr(0),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(6 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(0),
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: helper.StringToPtr("bar"),
|
||||
Count: helper.IntToPtr(1),
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Sticky: helper.BoolToPtr(false),
|
||||
Migrate: helper.BoolToPtr(false),
|
||||
SizeMB: helper.IntToPtr(300),
|
||||
},
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Delay: helper.TimeToPtr(15 * time.Second),
|
||||
Attempts: helper.IntToPtr(2),
|
||||
Interval: helper.TimeToPtr(1 * time.Minute),
|
||||
Mode: helper.StringToPtr("delay"),
|
||||
},
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(2),
|
||||
HealthCheck: helper.StringToPtr("manual"),
|
||||
MinHealthyTime: helper.TimeToPtr(1 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(6 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(true),
|
||||
Canary: helper.IntToPtr(1),
|
||||
},
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "task1",
|
||||
LogConfig: DefaultLogConfig(),
|
||||
Resources: MinResources(),
|
||||
KillTimeout: helper.TimeToPtr(5 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: helper.StringToPtr("baz"),
|
||||
Count: helper.IntToPtr(1),
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Sticky: helper.BoolToPtr(false),
|
||||
Migrate: helper.BoolToPtr(false),
|
||||
SizeMB: helper.IntToPtr(300),
|
||||
},
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Delay: helper.TimeToPtr(15 * time.Second),
|
||||
Attempts: helper.IntToPtr(2),
|
||||
Interval: helper.TimeToPtr(1 * time.Minute),
|
||||
Mode: helper.StringToPtr("delay"),
|
||||
},
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(1),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(6 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(0),
|
||||
},
|
||||
Tasks: []*Task{
|
||||
{
|
||||
Name: "task1",
|
||||
LogConfig: DefaultLogConfig(),
|
||||
Resources: MinResources(),
|
||||
KillTimeout: helper.TimeToPtr(5 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
@ -438,17 +582,17 @@ func TestJobs_EnforceRegister(t *testing.T) {
|
|||
|
||||
// Create a job and attempt to register it with an incorrect index.
|
||||
job := testJob()
|
||||
eval, wm, err := jobs.EnforceRegister(job, 10, nil)
|
||||
resp2, wm, err := jobs.EnforceRegister(job, 10, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
|
||||
t.Fatalf("expected enforcement error: %v", err)
|
||||
}
|
||||
|
||||
// Register
|
||||
eval, wm, err = jobs.EnforceRegister(job, 0, nil)
|
||||
resp2, wm, err = jobs.EnforceRegister(job, 0, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp2 == nil || resp2.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
@ -471,17 +615,17 @@ func TestJobs_EnforceRegister(t *testing.T) {
|
|||
curIndex := resp[0].JobModifyIndex
|
||||
|
||||
// Fail at incorrect index
|
||||
eval, wm, err = jobs.EnforceRegister(job, 123456, nil)
|
||||
resp2, wm, err = jobs.EnforceRegister(job, 123456, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
|
||||
t.Fatalf("expected enforcement error: %v", err)
|
||||
}
|
||||
|
||||
// Works at correct index
|
||||
eval, wm, err = jobs.EnforceRegister(job, curIndex, nil)
|
||||
resp3, wm, err := jobs.EnforceRegister(job, curIndex, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp3 == nil || resp3.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
@ -494,20 +638,20 @@ func TestJobs_Revert(t *testing.T) {
|
|||
|
||||
// Register twice
|
||||
job := testJob()
|
||||
eval, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp == nil || resp.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
||||
eval, wm, err = jobs.Register(job, nil)
|
||||
resp, wm, err = jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp == nil || resp.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
@ -717,7 +861,7 @@ func TestJobs_Evaluations(t *testing.T) {
|
|||
// Insert a job. This also creates an evaluation so we should
|
||||
// be able to query that out after.
|
||||
job := testJob()
|
||||
evalID, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -733,8 +877,8 @@ func TestJobs_Evaluations(t *testing.T) {
|
|||
// Check that we got the evals back, evals are in order most recent to least recent
|
||||
// so the last eval is the original registered eval
|
||||
idx := len(evals) - 1
|
||||
if n := len(evals); n == 0 || evals[idx].ID != evalID {
|
||||
t.Fatalf("expected >= 1 eval (%s), got: %#v", evalID, evals[idx])
|
||||
if n := len(evals); n == 0 || evals[idx].ID != resp.EvalID {
|
||||
t.Fatalf("expected >= 1 eval (%s), got: %#v", resp.EvalID, evals[idx])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -895,11 +1039,11 @@ func TestJobs_Plan(t *testing.T) {
|
|||
|
||||
// Create a job and attempt to register it
|
||||
job := testJob()
|
||||
eval, wm, err := jobs.Register(job, nil)
|
||||
resp, wm, err := jobs.Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if eval == "" {
|
||||
if resp == nil || resp.EvalID == "" {
|
||||
t.Fatalf("missing eval id")
|
||||
}
|
||||
assertWriteMeta(t, wm)
|
||||
|
|
17
api/tasks.go
17
api/tasks.go
|
@ -143,6 +143,7 @@ type TaskGroup struct {
|
|||
Tasks []*Task
|
||||
RestartPolicy *RestartPolicy
|
||||
EphemeralDisk *EphemeralDisk
|
||||
Update *UpdateStrategy
|
||||
Meta map[string]string
|
||||
}
|
||||
|
||||
|
@ -170,6 +171,22 @@ func (g *TaskGroup) Canonicalize(job *Job) {
|
|||
g.EphemeralDisk.Canonicalize()
|
||||
}
|
||||
|
||||
// Merge the update policy from the job
|
||||
if ju, tu := job.Update != nil, g.Update != nil; ju && tu {
|
||||
// Merge the jobs and task groups definition of the update strategy
|
||||
jc := job.Update.Copy()
|
||||
jc.Merge(g.Update)
|
||||
g.Update = jc
|
||||
} else if ju {
|
||||
// Inherit the jobs
|
||||
jc := job.Update.Copy()
|
||||
g.Update = jc
|
||||
}
|
||||
|
||||
if g.Update != nil {
|
||||
g.Update.Canonicalize()
|
||||
}
|
||||
|
||||
var defaultRestartPolicy *RestartPolicy
|
||||
switch *job.Type {
|
||||
case "service", "system":
|
||||
|
|
|
@ -465,10 +465,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
|
|||
}
|
||||
}
|
||||
|
||||
// COMPAT: Remove in 0.7.0. Update has been pushed into the task groups
|
||||
if job.Update != nil {
|
||||
j.Update = structs.UpdateStrategy{
|
||||
Stagger: job.Update.Stagger,
|
||||
MaxParallel: job.Update.MaxParallel,
|
||||
Stagger: job.Update.Stagger,
|
||||
}
|
||||
if job.Update.MaxParallel != nil {
|
||||
j.Update.MaxParallel = *job.Update.MaxParallel
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -532,6 +535,17 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
|
|||
Migrate: *taskGroup.EphemeralDisk.Migrate,
|
||||
}
|
||||
|
||||
if taskGroup.Update != nil {
|
||||
tg.Update = &structs.UpdateStrategy{
|
||||
MaxParallel: *taskGroup.Update.MaxParallel,
|
||||
HealthCheck: *taskGroup.Update.HealthCheck,
|
||||
MinHealthyTime: *taskGroup.Update.MinHealthyTime,
|
||||
HealthyDeadline: *taskGroup.Update.HealthyDeadline,
|
||||
AutoRevert: *taskGroup.Update.AutoRevert,
|
||||
Canary: *taskGroup.Update.Canary,
|
||||
}
|
||||
}
|
||||
|
||||
if l := len(taskGroup.Tasks); l != 0 {
|
||||
tg.Tasks = make([]*structs.Task, l)
|
||||
for l, task := range taskGroup.Tasks {
|
||||
|
|
|
@ -708,16 +708,16 @@ func TestHTTP_PeriodicForce(t *testing.T) {
|
|||
func TestHTTP_JobPlan(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Create the job
|
||||
job := mock.Job()
|
||||
args := structs.JobPlanRequest{
|
||||
job := api.MockJob()
|
||||
args := api.JobPlanRequest{
|
||||
Job: job,
|
||||
Diff: true,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
WriteRequest: api.WriteRequest{Region: "global"},
|
||||
}
|
||||
buf := encodeReq(args)
|
||||
|
||||
// Make the HTTP request
|
||||
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/plan", buf)
|
||||
req, err := http.NewRequest("PUT", "/v1/job/"+*job.ID+"/plan", buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -858,8 +858,13 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
},
|
||||
},
|
||||
Update: &api.UpdateStrategy{
|
||||
Stagger: 1 * time.Second,
|
||||
MaxParallel: 5,
|
||||
Stagger: 1 * time.Second,
|
||||
MaxParallel: helper.IntToPtr(5),
|
||||
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Manual),
|
||||
MinHealthyTime: helper.TimeToPtr(1 * time.Minute),
|
||||
HealthyDeadline: helper.TimeToPtr(3 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(1),
|
||||
},
|
||||
Periodic: &api.PeriodicConfig{
|
||||
Enabled: helper.BoolToPtr(true),
|
||||
|
@ -899,6 +904,13 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
Sticky: helper.BoolToPtr(true),
|
||||
Migrate: helper.BoolToPtr(true),
|
||||
},
|
||||
Update: &api.UpdateStrategy{
|
||||
HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Checks),
|
||||
MinHealthyTime: helper.TimeToPtr(2 * time.Minute),
|
||||
HealthyDeadline: helper.TimeToPtr(5 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(true),
|
||||
},
|
||||
|
||||
Meta: map[string]string{
|
||||
"key": "value",
|
||||
},
|
||||
|
@ -1078,6 +1090,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
|
|||
Sticky: true,
|
||||
Migrate: true,
|
||||
},
|
||||
Update: &structs.UpdateStrategy{
|
||||
MaxParallel: 5,
|
||||
HealthCheck: structs.UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 2 * time.Minute,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: true,
|
||||
Canary: 1,
|
||||
},
|
||||
Meta: map[string]string{
|
||||
"key": "value",
|
||||
},
|
||||
|
|
|
@ -102,11 +102,11 @@ func TestAllocStatusCommand_Run(t *testing.T) {
|
|||
|
||||
jobID := "job1_sfx"
|
||||
job1 := testJob(jobID)
|
||||
evalId1, _, err := client.Jobs().Register(job1, nil)
|
||||
resp, _, err := client.Jobs().Register(job1, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if code := waitForSuccess(ui, client, fullId, t, evalId1); code != 0 {
|
||||
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
|
||||
t.Fatalf("status code non zero saw %d", code)
|
||||
}
|
||||
// get an alloc id
|
||||
|
|
|
@ -178,7 +178,7 @@ func TestMonitor_Monitor(t *testing.T) {
|
|||
|
||||
// Submit a job - this creates a new evaluation we can monitor
|
||||
job := testJob("job1")
|
||||
evalID, _, err := client.Jobs().Register(job, nil)
|
||||
resp, _, err := client.Jobs().Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func TestMonitor_Monitor(t *testing.T) {
|
|||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
code = mon.monitor(evalID, false)
|
||||
code = mon.monitor(resp.EvalID, false)
|
||||
}()
|
||||
|
||||
// Wait for completion
|
||||
|
@ -206,7 +206,7 @@ func TestMonitor_Monitor(t *testing.T) {
|
|||
|
||||
// Check the output
|
||||
out := ui.OutputWriter.String()
|
||||
if !strings.Contains(out, evalID) {
|
||||
if !strings.Contains(out, resp.EvalID) {
|
||||
t.Fatalf("missing eval\n\n%s", out)
|
||||
}
|
||||
if !strings.Contains(out, "finished with status") {
|
||||
|
@ -224,7 +224,7 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
|
|||
|
||||
// Submit a job - this creates a new evaluation we can monitor
|
||||
job := testJob("job1")
|
||||
evalID, _, err := client.Jobs().Register(job, nil)
|
||||
resp, _, err := client.Jobs().Register(job, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
@ -234,7 +234,7 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
|
|||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
code = mon.monitor(evalID[:8], true)
|
||||
code = mon.monitor(resp.EvalID[:8], true)
|
||||
}()
|
||||
|
||||
// Wait for completion
|
||||
|
@ -252,10 +252,10 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
|
|||
|
||||
// Check the output
|
||||
out := ui.OutputWriter.String()
|
||||
if !strings.Contains(out, evalID[:8]) {
|
||||
if !strings.Contains(out, resp.EvalID[:8]) {
|
||||
t.Fatalf("missing eval\n\n%s", out)
|
||||
}
|
||||
if strings.Contains(out, evalID) {
|
||||
if strings.Contains(out, resp.EvalID) {
|
||||
t.Fatalf("expected truncated eval id, got: %s", out)
|
||||
}
|
||||
if !strings.Contains(out, "finished with status") {
|
||||
|
@ -263,7 +263,7 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
|
|||
}
|
||||
|
||||
// Fail on identifier with too few characters
|
||||
code = mon.monitor(evalID[:1], true)
|
||||
code = mon.monitor(resp.EvalID[:1], true)
|
||||
if code != 1 {
|
||||
t.Fatalf("expect exit 1, got: %d", code)
|
||||
}
|
||||
|
@ -272,7 +272,7 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
|
|||
}
|
||||
ui.ErrorWriter.Reset()
|
||||
|
||||
code = mon.monitor(evalID[:3], true)
|
||||
code = mon.monitor(resp.EvalID[:3], true)
|
||||
if code != 2 {
|
||||
t.Fatalf("expect exit 2, got: %d", code)
|
||||
}
|
||||
|
|
|
@ -134,6 +134,12 @@ func (c *PlanCommand) Run(args []string) int {
|
|||
c.Ui.Output(c.Colorize().Color(formatDryRun(resp, job)))
|
||||
c.Ui.Output("")
|
||||
|
||||
// Print any warnings if there are any
|
||||
if resp.Warnings != "" {
|
||||
c.Ui.Output(
|
||||
c.Colorize().Color(fmt.Sprintf("[bold][yellow]Job Warnings:\n%s[reset]\n", resp.Warnings)))
|
||||
}
|
||||
|
||||
// Print the job index info
|
||||
c.Ui.Output(c.Colorize().Color(formatJobModifyIndex(resp.JobModifyIndex, path)))
|
||||
return getExitCode(resp)
|
||||
|
|
|
@ -165,19 +165,6 @@ func (c *RunCommand) Run(args []string) int {
|
|||
job.VaultToken = helper.StringToPtr(vaultToken)
|
||||
}
|
||||
|
||||
// COMPAT 0.4.1 -> 0.5 Remove in 0.6
|
||||
OUTSIDE:
|
||||
for _, tg := range job.TaskGroups {
|
||||
for _, task := range tg.Tasks {
|
||||
if task.Resources != nil {
|
||||
if task.Resources.DiskMB != nil {
|
||||
c.Ui.Error("WARNING: disk attribute is deprecated in the resources block. See https://www.nomadproject.io/docs/job-specification/ephemeral_disk.html")
|
||||
break OUTSIDE
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if output {
|
||||
req := api.RegisterJobRequest{Job: job}
|
||||
buf, err := json.MarshalIndent(req, "", " ")
|
||||
|
@ -198,11 +185,11 @@ OUTSIDE:
|
|||
}
|
||||
|
||||
// Submit the job
|
||||
var evalID string
|
||||
var resp *api.JobRegisterResponse
|
||||
if enforce {
|
||||
evalID, _, err = client.Jobs().EnforceRegister(job, checkIndex, nil)
|
||||
resp, _, err = client.Jobs().EnforceRegister(job, checkIndex, nil)
|
||||
} else {
|
||||
evalID, _, err = client.Jobs().Register(job, nil)
|
||||
resp, _, err = client.Jobs().Register(job, nil)
|
||||
}
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), api.RegisterEnforceIndexErrPrefix) {
|
||||
|
@ -220,6 +207,14 @@ OUTSIDE:
|
|||
return 1
|
||||
}
|
||||
|
||||
// Print any warnings if there are any
|
||||
if resp.Warnings != "" {
|
||||
c.Ui.Output(
|
||||
c.Colorize().Color(fmt.Sprintf("[bold][yellow]Job Warnings:\n%s[reset]\n", resp.Warnings)))
|
||||
}
|
||||
|
||||
evalID := resp.EvalID
|
||||
|
||||
// Check if we should enter monitor mode
|
||||
if detach || periodic || paramjob {
|
||||
c.Ui.Output("Job registration successful")
|
||||
|
|
|
@ -37,20 +37,20 @@ func TestStatusCommand_Run(t *testing.T) {
|
|||
|
||||
// Register two jobs
|
||||
job1 := testJob("job1_sfx")
|
||||
evalId1, _, err := client.Jobs().Register(job1, nil)
|
||||
resp, _, err := client.Jobs().Register(job1, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if code := waitForSuccess(ui, client, fullId, t, evalId1); code != 0 {
|
||||
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
|
||||
t.Fatalf("status code non zero saw %d", code)
|
||||
}
|
||||
|
||||
job2 := testJob("job2_sfx")
|
||||
evalId2, _, err := client.Jobs().Register(job2, nil)
|
||||
resp2, _, err := client.Jobs().Register(job2, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if code := waitForSuccess(ui, client, fullId, t, evalId2); code != 0 {
|
||||
if code := waitForSuccess(ui, client, fullId, t, resp2.EvalID); code != 0 {
|
||||
t.Fatalf("status code non zero saw %d", code)
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ func TestStatusCommand_Run(t *testing.T) {
|
|||
if strings.Contains(out, "Allocations") {
|
||||
t.Fatalf("should not dump allocations")
|
||||
}
|
||||
if strings.Contains(out, evalId1) {
|
||||
if strings.Contains(out, resp.EvalID) {
|
||||
t.Fatalf("should not contain full identifiers, got %s", out)
|
||||
}
|
||||
ui.OutputWriter.Reset()
|
||||
|
@ -159,7 +159,7 @@ func TestStatusCommand_Run(t *testing.T) {
|
|||
t.Fatalf("expected exit 0, got: %d", code)
|
||||
}
|
||||
out = ui.OutputWriter.String()
|
||||
if !strings.Contains(out, evalId1) {
|
||||
if !strings.Contains(out, resp.EvalID) {
|
||||
t.Fatalf("should contain full identifiers, got %s", out)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,12 @@ func (c *ValidateCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
// Print any warnings if there are any
|
||||
if jr.Warnings != "" {
|
||||
c.Ui.Output(
|
||||
c.Colorize().Color(fmt.Sprintf("[bold][yellow]Job Warnings:\n%s[reset]\n", jr.Warnings)))
|
||||
}
|
||||
|
||||
// Done!
|
||||
c.Ui.Output(
|
||||
c.Colorize().Color("[bold][green]Job validation successful[reset]"))
|
||||
|
@ -112,5 +118,9 @@ func (c *ValidateCommand) validateLocal(aj *api.Job) (*api.JobValidateResponse,
|
|||
}
|
||||
}
|
||||
|
||||
if warnings := job.Warnings(); warnings != nil {
|
||||
out.Warnings = warnings.Error()
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
|
|
@ -272,6 +272,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
|
|||
"meta",
|
||||
"task",
|
||||
"ephemeral_disk",
|
||||
"update",
|
||||
"vault",
|
||||
}
|
||||
if err := checkHCLKeys(listVal, valid); err != nil {
|
||||
|
@ -287,6 +288,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
|
|||
delete(m, "task")
|
||||
delete(m, "restart")
|
||||
delete(m, "ephemeral_disk")
|
||||
delete(m, "update")
|
||||
delete(m, "vault")
|
||||
|
||||
// Build the group with the basic decode
|
||||
|
@ -318,6 +320,13 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
|
|||
}
|
||||
}
|
||||
|
||||
// If we have an update strategy, then parse that
|
||||
if o := listVal.Filter("update"); len(o.Items) > 0 {
|
||||
if err := parseUpdate(&g.Update, o); err != nil {
|
||||
return multierror.Prefix(err, "update ->")
|
||||
}
|
||||
}
|
||||
|
||||
// Parse out meta fields. These are in HCL as a list so we need
|
||||
// to iterate over them and merge them.
|
||||
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
|
||||
|
@ -1104,7 +1113,7 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error {
|
|||
func parseUpdate(result **api.UpdateStrategy, list *ast.ObjectList) error {
|
||||
list = list.Elem()
|
||||
if len(list.Items) > 1 {
|
||||
return fmt.Errorf("only one 'update' block allowed per job")
|
||||
return fmt.Errorf("only one 'update' block allowed")
|
||||
}
|
||||
|
||||
// Get our resource object
|
||||
|
@ -1117,8 +1126,14 @@ func parseUpdate(result **api.UpdateStrategy, list *ast.ObjectList) error {
|
|||
|
||||
// Check for invalid keys
|
||||
valid := []string{
|
||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||
"stagger",
|
||||
"max_parallel",
|
||||
"health_check",
|
||||
"min_healthy_time",
|
||||
"healthy_deadline",
|
||||
"auto_revert",
|
||||
"canary",
|
||||
}
|
||||
if err := checkHCLKeys(o.Val, valid); err != nil {
|
||||
return err
|
||||
|
|
|
@ -46,8 +46,13 @@ func TestParse(t *testing.T) {
|
|||
},
|
||||
|
||||
Update: &api.UpdateStrategy{
|
||||
Stagger: 60 * time.Second,
|
||||
MaxParallel: 2,
|
||||
Stagger: 60 * time.Second,
|
||||
MaxParallel: helper.IntToPtr(2),
|
||||
HealthCheck: helper.StringToPtr("manual"),
|
||||
MinHealthyTime: helper.TimeToPtr(10 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(10 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(true),
|
||||
Canary: helper.IntToPtr(1),
|
||||
},
|
||||
|
||||
TaskGroups: []*api.TaskGroup{
|
||||
|
@ -92,6 +97,14 @@ func TestParse(t *testing.T) {
|
|||
Sticky: helper.BoolToPtr(true),
|
||||
SizeMB: helper.IntToPtr(150),
|
||||
},
|
||||
Update: &api.UpdateStrategy{
|
||||
MaxParallel: helper.IntToPtr(3),
|
||||
HealthCheck: helper.StringToPtr("checks"),
|
||||
MinHealthyTime: helper.TimeToPtr(1 * time.Second),
|
||||
HealthyDeadline: helper.TimeToPtr(1 * time.Minute),
|
||||
AutoRevert: helper.BoolToPtr(false),
|
||||
Canary: helper.IntToPtr(2),
|
||||
},
|
||||
Tasks: []*api.Task{
|
||||
&api.Task{
|
||||
Name: "binstore",
|
||||
|
|
|
@ -18,6 +18,11 @@ job "binstore-storagelocker" {
|
|||
update {
|
||||
stagger = "60s"
|
||||
max_parallel = 2
|
||||
health_check = "manual"
|
||||
min_healthy_time = "10s"
|
||||
healthy_deadline = "10m"
|
||||
auto_revert = true
|
||||
canary = 1
|
||||
}
|
||||
|
||||
task "outside" {
|
||||
|
@ -47,6 +52,15 @@ job "binstore-storagelocker" {
|
|||
size = 150
|
||||
}
|
||||
|
||||
update {
|
||||
max_parallel = 3
|
||||
health_check = "checks"
|
||||
min_healthy_time = "1s"
|
||||
healthy_deadline = "1m"
|
||||
auto_revert = false
|
||||
canary = 2
|
||||
}
|
||||
|
||||
task "binstore" {
|
||||
driver = "docker"
|
||||
user = "bob"
|
||||
|
|
24
nomad/fsm.go
24
nomad/fsm.go
|
@ -251,11 +251,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
// COMPAT: Remove in 0.6
|
||||
// Empty maps and slices should be treated as nil to avoid
|
||||
// un-intended destructive updates in scheduler since we use
|
||||
// reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||
// the incoming job.
|
||||
/* Handle upgrade paths:
|
||||
* - Empty maps and slices should be treated as nil to avoid
|
||||
* un-intended destructive updates in scheduler since we use
|
||||
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||
* the incoming job.
|
||||
* - Migrate from old style upgrade stanza that used only a stagger.
|
||||
*/
|
||||
req.Job.Canonicalize()
|
||||
|
||||
if err := n.state.UpsertJob(index, req.Job); err != nil {
|
||||
|
@ -635,11 +637,13 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// COMPAT: Remove in 0.5
|
||||
// Empty maps and slices should be treated as nil to avoid
|
||||
// un-intended destructive updates in scheduler since we use
|
||||
// reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||
// the incoming job.
|
||||
/* Handle upgrade paths:
|
||||
* - Empty maps and slices should be treated as nil to avoid
|
||||
* un-intended destructive updates in scheduler since we use
|
||||
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanatizes
|
||||
* the incoming job.
|
||||
* - Migrate from old style upgrade stanza that used only a stagger.
|
||||
*/
|
||||
job.Canonicalize()
|
||||
|
||||
if err := restore.JobRestore(job); err != nil {
|
||||
|
|
|
@ -61,9 +61,12 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
|||
// Add implicit constraints
|
||||
setImplicitConstraints(args.Job)
|
||||
|
||||
// Validate the job.
|
||||
if err := validateJob(args.Job); err != nil {
|
||||
// Validate the job and capture any warnings
|
||||
err, warnings := validateJob(args.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if warnings != nil {
|
||||
reply.Warnings = warnings.Error()
|
||||
}
|
||||
|
||||
if args.EnforceIndex {
|
||||
|
@ -289,7 +292,8 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
|
|||
func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValidateResponse) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "job", "validate"}, time.Now())
|
||||
|
||||
if err := validateJob(args.Job); err != nil {
|
||||
err, warnings := validateJob(args.Job)
|
||||
if err != nil {
|
||||
if merr, ok := err.(*multierror.Error); ok {
|
||||
for _, err := range merr.Errors {
|
||||
reply.ValidationErrors = append(reply.ValidationErrors, err.Error())
|
||||
|
@ -301,6 +305,10 @@ func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValid
|
|||
}
|
||||
}
|
||||
|
||||
if warnings != nil {
|
||||
reply.Warnings = warnings.Error()
|
||||
}
|
||||
|
||||
reply.DriverConfigValidated = true
|
||||
return nil
|
||||
}
|
||||
|
@ -723,9 +731,12 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
|
|||
// Add implicit constraints
|
||||
setImplicitConstraints(args.Job)
|
||||
|
||||
// Validate the job.
|
||||
if err := validateJob(args.Job); err != nil {
|
||||
// Validate the job and capture any warnings
|
||||
err, warnings := validateJob(args.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if warnings != nil {
|
||||
reply.Warnings = warnings.Error()
|
||||
}
|
||||
|
||||
// Acquire a snapshot of the state
|
||||
|
@ -818,12 +829,15 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
|
|||
// validateJob validates a Job and task drivers and returns an error if there is
|
||||
// a validation problem or if the Job is of a type a user is not allowed to
|
||||
// submit.
|
||||
func validateJob(job *structs.Job) error {
|
||||
func validateJob(job *structs.Job) (invalid, warnings error) {
|
||||
validationErrors := new(multierror.Error)
|
||||
if err := job.Validate(); err != nil {
|
||||
multierror.Append(validationErrors, err)
|
||||
}
|
||||
|
||||
// Get any warnings
|
||||
warnings = job.Warnings()
|
||||
|
||||
// Get the signals required
|
||||
signals := job.RequiredSignals()
|
||||
|
||||
|
@ -873,7 +887,7 @@ func validateJob(job *structs.Job) error {
|
|||
multierror.Append(validationErrors, fmt.Errorf("job can't be submitted with a payload, only dispatched"))
|
||||
}
|
||||
|
||||
return validationErrors.ErrorOrNil()
|
||||
return validationErrors.ErrorOrNil(), warnings
|
||||
}
|
||||
|
||||
// Dispatch a parameterized job.
|
||||
|
|
|
@ -119,6 +119,36 @@ func TestJobEndpoint_Register_InvalidDriverConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Register_UpdateWarning(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request with a job containing an invalid driver
|
||||
// config
|
||||
job := mock.Job()
|
||||
job.Update.Stagger = 1 * time.Second
|
||||
job.Update.MaxParallel = 1
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(resp.Warnings, "Update stagger deprecated") {
|
||||
t.Fatalf("expected a deprecation warning but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Register_Payload(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
|
@ -2240,9 +2270,14 @@ func TestJobEndpoint_ValidateJob_InvalidDriverConf(t *testing.T) {
|
|||
"foo": "bar",
|
||||
}
|
||||
|
||||
if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "-> config") {
|
||||
err, warnings := validateJob(job)
|
||||
if err == nil || !strings.Contains(err.Error(), "-> config") {
|
||||
t.Fatalf("Expected config error; got %v", err)
|
||||
}
|
||||
|
||||
if warnings != nil {
|
||||
t.Fatalf("got unexpected warnings: %v", warnings)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) {
|
||||
|
@ -2255,9 +2290,30 @@ func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) {
|
|||
ChangeSignal: "SIGUSR1",
|
||||
}
|
||||
|
||||
if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "support sending signals") {
|
||||
err, warnings := validateJob(job)
|
||||
if err == nil || !strings.Contains(err.Error(), "support sending signals") {
|
||||
t.Fatalf("Expected signal feasibility error; got %v", err)
|
||||
}
|
||||
|
||||
if warnings != nil {
|
||||
t.Fatalf("got unexpected warnings: %v", warnings)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_ValidateJob_UpdateWarning(t *testing.T) {
|
||||
// Create a mock job with an invalid config
|
||||
job := mock.Job()
|
||||
job.Update.Stagger = 1 * time.Second
|
||||
job.Update.MaxParallel = 1
|
||||
|
||||
err, warnings := validateJob(job)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected validation error; got %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(warnings.Error(), "Update stagger deprecated") {
|
||||
t.Fatalf("expected a deprecation warning but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
|
|
|
@ -57,25 +57,22 @@ type JobDiff struct {
|
|||
// diffable. If contextual diff is enabled, objects within the job will contain
|
||||
// field information even if unchanged.
|
||||
func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
|
||||
// COMPAT: Remove "Update" in 0.7.0. Update pushed down to task groups
|
||||
// in 0.6.0
|
||||
diff := &JobDiff{Type: DiffTypeNone}
|
||||
var oldPrimitiveFlat, newPrimitiveFlat map[string]string
|
||||
filter := []string{"ID", "Status", "StatusDescription", "Version", "Stable", "CreateIndex",
|
||||
"ModifyIndex", "JobModifyIndex"}
|
||||
|
||||
// Have to treat this special since it is a struct literal, not a pointer
|
||||
var jUpdate, otherUpdate *UpdateStrategy
|
||||
"ModifyIndex", "JobModifyIndex", "Update"}
|
||||
|
||||
if j == nil && other == nil {
|
||||
return diff, nil
|
||||
} else if j == nil {
|
||||
j = &Job{}
|
||||
otherUpdate = &other.Update
|
||||
diff.Type = DiffTypeAdded
|
||||
newPrimitiveFlat = flatmap.Flatten(other, filter, true)
|
||||
diff.ID = other.ID
|
||||
} else if other == nil {
|
||||
other = &Job{}
|
||||
jUpdate = &j.Update
|
||||
diff.Type = DiffTypeDeleted
|
||||
oldPrimitiveFlat = flatmap.Flatten(j, filter, true)
|
||||
diff.ID = j.ID
|
||||
|
@ -84,8 +81,6 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
|
|||
return nil, fmt.Errorf("can not diff jobs with different IDs: %q and %q", j.ID, other.ID)
|
||||
}
|
||||
|
||||
jUpdate = &j.Update
|
||||
otherUpdate = &other.Update
|
||||
oldPrimitiveFlat = flatmap.Flatten(j, filter, true)
|
||||
newPrimitiveFlat = flatmap.Flatten(other, filter, true)
|
||||
diff.ID = other.ID
|
||||
|
@ -117,11 +112,6 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
|
|||
}
|
||||
diff.TaskGroups = tgs
|
||||
|
||||
// Update diff
|
||||
if uDiff := primitiveObjectDiff(jUpdate, otherUpdate, nil, "Update", contextual); uDiff != nil {
|
||||
diff.Objects = append(diff.Objects, uDiff)
|
||||
}
|
||||
|
||||
// Periodic diff
|
||||
if pDiff := primitiveObjectDiff(j.Periodic, other.Periodic, nil, "Periodic", contextual); pDiff != nil {
|
||||
diff.Objects = append(diff.Objects, pDiff)
|
||||
|
@ -250,6 +240,12 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
|
|||
diff.Objects = append(diff.Objects, diskDiff)
|
||||
}
|
||||
|
||||
// Update diff
|
||||
// COMPAT: Remove "Stagger" in 0.7.0.
|
||||
if uDiff := primitiveObjectDiff(tg.Update, other.Update, []string{"Stagger"}, "Update", contextual); uDiff != nil {
|
||||
diff.Objects = append(diff.Objects, uDiff)
|
||||
}
|
||||
|
||||
// Tasks diff
|
||||
tasks, err := taskDiffs(tg.Tasks, other.Tasks, contextual)
|
||||
if err != nil {
|
||||
|
|
|
@ -187,26 +187,6 @@ func TestJobDiff(t *testing.T) {
|
|||
New: "",
|
||||
},
|
||||
},
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "MaxParallel",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Stagger",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -270,26 +250,6 @@ func TestJobDiff(t *testing.T) {
|
|||
New: "batch",
|
||||
},
|
||||
},
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "MaxParallel",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Stagger",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -460,83 +420,6 @@ func TestJobDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited
|
||||
Old: &Job{
|
||||
Update: UpdateStrategy{
|
||||
Stagger: 10 * time.Second,
|
||||
MaxParallel: 5,
|
||||
},
|
||||
},
|
||||
New: &Job{
|
||||
Update: UpdateStrategy{
|
||||
Stagger: 60 * time.Second,
|
||||
MaxParallel: 10,
|
||||
},
|
||||
},
|
||||
Expected: &JobDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "MaxParallel",
|
||||
Old: "5",
|
||||
New: "10",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Stagger",
|
||||
Old: "10000000000",
|
||||
New: "60000000000",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited with context
|
||||
Contextual: true,
|
||||
Old: &Job{
|
||||
Update: UpdateStrategy{
|
||||
Stagger: 10 * time.Second,
|
||||
MaxParallel: 5,
|
||||
},
|
||||
},
|
||||
New: &Job{
|
||||
Update: UpdateStrategy{
|
||||
Stagger: 60 * time.Second,
|
||||
MaxParallel: 5,
|
||||
},
|
||||
},
|
||||
Expected: &JobDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "MaxParallel",
|
||||
Old: "5",
|
||||
New: "5",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Stagger",
|
||||
Old: "10000000000",
|
||||
New: "60000000000",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Periodic added
|
||||
Old: &Job{},
|
||||
|
@ -1611,6 +1494,247 @@ func TestTaskGroupDiff(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy deleted
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
AutoRevert: true,
|
||||
},
|
||||
},
|
||||
New: &TaskGroup{},
|
||||
Expected: &TaskGroupDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "AutoRevert",
|
||||
Old: "true",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "Canary",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "HealthyDeadline",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "MaxParallel",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeDeleted,
|
||||
Name: "MinHealthyTime",
|
||||
Old: "0",
|
||||
New: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy added
|
||||
Old: &TaskGroup{},
|
||||
New: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
AutoRevert: true,
|
||||
},
|
||||
},
|
||||
Expected: &TaskGroupDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "AutoRevert",
|
||||
Old: "",
|
||||
New: "true",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "Canary",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "HealthyDeadline",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "MaxParallel",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeAdded,
|
||||
Name: "MinHealthyTime",
|
||||
Old: "",
|
||||
New: "0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 5,
|
||||
HealthCheck: "foo",
|
||||
MinHealthyTime: 1 * time.Second,
|
||||
HealthyDeadline: 30 * time.Second,
|
||||
AutoRevert: true,
|
||||
Canary: 2,
|
||||
},
|
||||
},
|
||||
New: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 7,
|
||||
HealthCheck: "bar",
|
||||
MinHealthyTime: 2 * time.Second,
|
||||
HealthyDeadline: 31 * time.Second,
|
||||
AutoRevert: false,
|
||||
Canary: 1,
|
||||
},
|
||||
},
|
||||
Expected: &TaskGroupDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "AutoRevert",
|
||||
Old: "true",
|
||||
New: "false",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Canary",
|
||||
Old: "2",
|
||||
New: "1",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "HealthCheck",
|
||||
Old: "foo",
|
||||
New: "bar",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "HealthyDeadline",
|
||||
Old: "30000000000",
|
||||
New: "31000000000",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "MaxParallel",
|
||||
Old: "5",
|
||||
New: "7",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "MinHealthyTime",
|
||||
Old: "1000000000",
|
||||
New: "2000000000",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Update strategy edited with context
|
||||
Contextual: true,
|
||||
Old: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 5,
|
||||
HealthCheck: "foo",
|
||||
MinHealthyTime: 1 * time.Second,
|
||||
HealthyDeadline: 30 * time.Second,
|
||||
AutoRevert: true,
|
||||
Canary: 2,
|
||||
},
|
||||
},
|
||||
New: &TaskGroup{
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 7,
|
||||
HealthCheck: "foo",
|
||||
MinHealthyTime: 1 * time.Second,
|
||||
HealthyDeadline: 30 * time.Second,
|
||||
AutoRevert: true,
|
||||
Canary: 2,
|
||||
},
|
||||
},
|
||||
Expected: &TaskGroupDiff{
|
||||
Type: DiffTypeEdited,
|
||||
Objects: []*ObjectDiff{
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "Update",
|
||||
Fields: []*FieldDiff{
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "AutoRevert",
|
||||
Old: "true",
|
||||
New: "true",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "Canary",
|
||||
Old: "2",
|
||||
New: "2",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "HealthCheck",
|
||||
Old: "foo",
|
||||
New: "foo",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "HealthyDeadline",
|
||||
Old: "30000000000",
|
||||
New: "30000000000",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeEdited,
|
||||
Name: "MaxParallel",
|
||||
Old: "5",
|
||||
New: "7",
|
||||
},
|
||||
{
|
||||
Type: DiffTypeNone,
|
||||
Name: "MinHealthyTime",
|
||||
Old: "1000000000",
|
||||
New: "1000000000",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// EphemeralDisk added
|
||||
Old: &TaskGroup{},
|
||||
|
|
|
@ -503,6 +503,11 @@ type JobRegisterResponse struct {
|
|||
EvalID string
|
||||
EvalCreateIndex uint64
|
||||
JobModifyIndex uint64
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
|
@ -525,6 +530,10 @@ type JobValidateResponse struct {
|
|||
|
||||
// Error is a string version of any error that may have occured
|
||||
Error string
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
}
|
||||
|
||||
// NodeUpdateResponse is used to respond to a node update
|
||||
|
@ -639,6 +648,10 @@ type JobPlanResponse struct {
|
|||
// submitted.
|
||||
NextPeriodicLaunch time.Time
|
||||
|
||||
// Warnings contains any warnings about the given job. These may include
|
||||
// deprecation warnings.
|
||||
Warnings string
|
||||
|
||||
WriteMeta
|
||||
}
|
||||
|
||||
|
@ -1200,7 +1213,7 @@ type Job struct {
|
|||
// to run. Each task group is an atomic unit of scheduling and placement.
|
||||
TaskGroups []*TaskGroup
|
||||
|
||||
// Update is used to control the update strategy
|
||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||
Update UpdateStrategy
|
||||
|
||||
// Periodic is used to define the interval the job is run at.
|
||||
|
@ -1263,6 +1276,45 @@ func (j *Job) Canonicalize() {
|
|||
if j.Periodic != nil {
|
||||
j.Periodic.Canonicalize()
|
||||
}
|
||||
|
||||
// COMPAT: Remove in 0.7.0
|
||||
// Rewrite any job that has an update block with pre 0.6.0 syntax.
|
||||
if j.Update.Stagger > 0 && j.Update.MaxParallel > 0 {
|
||||
// Build an appropriate update block and copy it down to each task group
|
||||
base := DefaultUpdateStrategy.Copy()
|
||||
base.MaxParallel = j.Update.MaxParallel
|
||||
base.MinHealthyTime = j.Update.Stagger
|
||||
|
||||
// Add to each task group, modifying as needed
|
||||
l := len(j.TaskGroups)
|
||||
for _, tg := range j.TaskGroups {
|
||||
// The task group doesn't need upgrading if it has an update block with the new syntax
|
||||
u := tg.Update
|
||||
if u != nil && u.Stagger == 0 && u.MaxParallel > 0 &&
|
||||
u.HealthCheck != "" && u.MinHealthyTime > 0 && u.HealthyDeadline > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// The MaxParallel for the job should be 10% of the total count
|
||||
// unless there is just one task group then we can infer the old
|
||||
// max parallel should be the new
|
||||
tgu := base.Copy()
|
||||
if l != 1 {
|
||||
// RoundTo 10%
|
||||
var percent float64 = float64(tg.Count) * 0.1
|
||||
tgu.MaxParallel = int(percent + 0.5)
|
||||
}
|
||||
|
||||
// Safety guards
|
||||
if tgu.MaxParallel == 0 {
|
||||
tgu.MaxParallel = 1
|
||||
} else if tgu.MaxParallel > tg.Count {
|
||||
tgu.MaxParallel = tg.Count
|
||||
}
|
||||
|
||||
tg.Update = tgu
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Copy returns a deep copy of the Job. It is expected that callers use recover.
|
||||
|
@ -1376,6 +1428,18 @@ func (j *Job) Validate() error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Warnings returns a list of warnings that may be from dubious settings or
|
||||
// deprecation warnings.
|
||||
func (j *Job) Warnings() error {
|
||||
var mErr multierror.Error
|
||||
|
||||
if j.Update.Stagger > 0 {
|
||||
multierror.Append(&mErr, fmt.Errorf("Update stagger deprecated. A best effort conversion to new syntax will be applied. Please update upgrade stanza before v0.7.0"))
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// LookupTaskGroup finds a task group by name
|
||||
func (j *Job) LookupTaskGroup(name string) *TaskGroup {
|
||||
for _, tg := range j.TaskGroups {
|
||||
|
@ -1605,15 +1669,105 @@ type TaskGroupSummary struct {
|
|||
Lost int
|
||||
}
|
||||
|
||||
const (
|
||||
// Checks uses any registered health check state in combination with task
|
||||
// states to determine if a allocation is healthy.
|
||||
UpdateStrategyHealthCheck_Checks = "checks"
|
||||
|
||||
// TaskStates uses the task states of an allocation to determine if the
|
||||
// allocation is healthy.
|
||||
UpdateStrategyHealthCheck_TaskStates = "task_states"
|
||||
|
||||
// Manual allows the operator to manually signal to Nomad when an
|
||||
// allocations is healthy. This allows more advanced health checking that is
|
||||
// outside of the scope of Nomad.
|
||||
UpdateStrategyHealthCheck_Manual = "manual"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultUpdateStrategy provides a baseline that can be used to upgrade
|
||||
// jobs with the old policy or for populating field defaults.
|
||||
DefaultUpdateStrategy = &UpdateStrategy{
|
||||
MaxParallel: 0,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
}
|
||||
)
|
||||
|
||||
// UpdateStrategy is used to modify how updates are done
|
||||
type UpdateStrategy struct {
|
||||
// Stagger is the amount of time between the updates
|
||||
// COMPAT: Remove in 0.7.0. Stagger is deprecated in 0.6.0.
|
||||
Stagger time.Duration
|
||||
|
||||
// MaxParallel is how many updates can be done in parallel
|
||||
MaxParallel int
|
||||
|
||||
// HealthCheck specifies the mechanism in which allocations are marked
|
||||
// healthy or unhealthy as part of a deployment.
|
||||
HealthCheck string
|
||||
|
||||
// MinHealthyTime is the minimum time an allocation must be in the healthy
|
||||
// state before it is marked as healthy, unblocking more alllocations to be
|
||||
// rolled.
|
||||
MinHealthyTime time.Duration
|
||||
|
||||
// HealthyDeadline is the time in which an allocation must be marked as
|
||||
// healthy before it is automatically transistioned to unhealthy. This time
|
||||
// period doesn't count against the MinHealthyTime.
|
||||
HealthyDeadline time.Duration
|
||||
|
||||
// AutoRevert declares that if a deployment fails because of unhealthy
|
||||
// allocations, there should be an attempt to auto-revert the job to a
|
||||
// stable version.
|
||||
AutoRevert bool
|
||||
|
||||
// Canary is the number of canaries to deploy when a change to the task
|
||||
// group is detected.
|
||||
Canary int
|
||||
}
|
||||
|
||||
func (u *UpdateStrategy) Copy() *UpdateStrategy {
|
||||
if u == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
copy := new(UpdateStrategy)
|
||||
*copy = *u
|
||||
return copy
|
||||
}
|
||||
|
||||
func (u *UpdateStrategy) Validate() error {
|
||||
if u == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var mErr multierror.Error
|
||||
switch u.HealthCheck {
|
||||
case UpdateStrategyHealthCheck_Checks, UpdateStrategyHealthCheck_TaskStates, UpdateStrategyHealthCheck_Manual:
|
||||
default:
|
||||
multierror.Append(&mErr, fmt.Errorf("Invalid health check given: %q", u.HealthCheck))
|
||||
}
|
||||
|
||||
if u.MaxParallel < 0 {
|
||||
multierror.Append(&mErr, fmt.Errorf("Max parallel can not be less than zero: %d < 0", u.MaxParallel))
|
||||
}
|
||||
if u.Canary < 0 {
|
||||
multierror.Append(&mErr, fmt.Errorf("Canary count can not be less than zero: %d < 0", u.Canary))
|
||||
}
|
||||
if u.MinHealthyTime < 0 {
|
||||
multierror.Append(&mErr, fmt.Errorf("Minimum healthy time may not be less than zero: %v", u.MinHealthyTime))
|
||||
}
|
||||
if u.HealthyDeadline <= 0 {
|
||||
multierror.Append(&mErr, fmt.Errorf("Healthy deadline must be greater than zero: %v", u.HealthyDeadline))
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// TODO(alexdadgar): Remove once no longer used by the scheduler.
|
||||
// Rolling returns if a rolling strategy should be used
|
||||
func (u *UpdateStrategy) Rolling() bool {
|
||||
return u.Stagger > 0 && u.MaxParallel > 0
|
||||
|
@ -1961,6 +2115,9 @@ type TaskGroup struct {
|
|||
// be scheduled.
|
||||
Count int
|
||||
|
||||
// Update is used to control the update strategy for this task group
|
||||
Update *UpdateStrategy
|
||||
|
||||
// Constraints can be specified at a task group level and apply to
|
||||
// all the tasks contained.
|
||||
Constraints []*Constraint
|
||||
|
@ -1985,8 +2142,8 @@ func (tg *TaskGroup) Copy() *TaskGroup {
|
|||
}
|
||||
ntg := new(TaskGroup)
|
||||
*ntg = *tg
|
||||
ntg.Update = ntg.Update.Copy()
|
||||
ntg.Constraints = CopySliceConstraints(ntg.Constraints)
|
||||
|
||||
ntg.RestartPolicy = ntg.RestartPolicy.Copy()
|
||||
|
||||
if tg.Tasks != nil {
|
||||
|
@ -2076,6 +2233,23 @@ func (tg *TaskGroup) Validate() error {
|
|||
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have an ephemeral disk object", tg.Name))
|
||||
}
|
||||
|
||||
// Validate the update strategy
|
||||
if u := tg.Update; u != nil {
|
||||
if err := u.Validate(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
|
||||
// Validate the counts are appropriate
|
||||
if u.MaxParallel > tg.Count {
|
||||
mErr.Errors = append(mErr.Errors,
|
||||
fmt.Errorf("Update max parallel count is greater than task group count: %d > %d", u.MaxParallel, tg.Count))
|
||||
}
|
||||
if u.Canary > tg.Count {
|
||||
mErr.Errors = append(mErr.Errors,
|
||||
fmt.Errorf("Update canary count is greater than task group count: %d > %d", u.Canary, tg.Count))
|
||||
}
|
||||
}
|
||||
|
||||
// Check for duplicate tasks and that there is only leader task if any
|
||||
tasks := make(map[string]int)
|
||||
leaderTasks := 0
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/kr/pretty"
|
||||
)
|
||||
|
||||
func TestJob_Validate(t *testing.T) {
|
||||
|
@ -95,6 +96,205 @@ func TestJob_Validate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJob_Warnings(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Job *Job
|
||||
Expected []string
|
||||
}{
|
||||
{
|
||||
Name: "Old Update spec",
|
||||
Job: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
},
|
||||
Expected: []string{"Update stagger deprecated"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.Name, func(t *testing.T) {
|
||||
warnings := c.Job.Warnings()
|
||||
if warnings == nil {
|
||||
if len(c.Expected) == 0 {
|
||||
return
|
||||
} else {
|
||||
t.Fatal("Got no warnings when they were expected")
|
||||
}
|
||||
}
|
||||
|
||||
a := warnings.Error()
|
||||
for _, e := range c.Expected {
|
||||
if !strings.Contains(a, e) {
|
||||
t.Fatalf("Got warnings %q; didn't contain %q", a, e)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJob_Canonicalize_Update(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Job *Job
|
||||
Expected *Job
|
||||
}{
|
||||
{
|
||||
Name: "One task group",
|
||||
Job: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
Expected: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
EphemeralDisk: DefaultEphemeralDisk(),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "One task group; too high of parallelism",
|
||||
Job: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 200,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
Expected: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 200,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
EphemeralDisk: DefaultEphemeralDisk(),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Multiple task group; rounding",
|
||||
Job: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Count: 14,
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 26,
|
||||
},
|
||||
},
|
||||
},
|
||||
Expected: &Job{
|
||||
Update: UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Stagger: 10 * time.Second,
|
||||
},
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 2,
|
||||
EphemeralDisk: DefaultEphemeralDisk(),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 1,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Count: 14,
|
||||
EphemeralDisk: DefaultEphemeralDisk(),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 1,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Count: 26,
|
||||
EphemeralDisk: DefaultEphemeralDisk(),
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Checks,
|
||||
MinHealthyTime: 10 * time.Second,
|
||||
HealthyDeadline: 5 * time.Minute,
|
||||
AutoRevert: false,
|
||||
Canary: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.Name, func(t *testing.T) {
|
||||
c.Job.Canonicalize()
|
||||
if !reflect.DeepEqual(c.Job, c.Expected) {
|
||||
t.Fatalf("Got %# v; want %# v", pretty.Formatter(c.Job), pretty.Formatter(c.Expected))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testJob() *Job {
|
||||
return &Job{
|
||||
Region: "global",
|
||||
|
@ -429,6 +629,14 @@ func TestTaskGroup_Validate(t *testing.T) {
|
|||
Attempts: 10,
|
||||
Mode: RestartPolicyModeDelay,
|
||||
},
|
||||
Update: &UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
HealthCheck: UpdateStrategyHealthCheck_Manual,
|
||||
MinHealthyTime: 1 * time.Second,
|
||||
HealthyDeadline: 1 * time.Second,
|
||||
AutoRevert: false,
|
||||
Canary: 3,
|
||||
},
|
||||
}
|
||||
|
||||
err = tg.Validate()
|
||||
|
@ -436,16 +644,22 @@ func TestTaskGroup_Validate(t *testing.T) {
|
|||
if !strings.Contains(mErr.Errors[0].Error(), "should have an ephemeral disk object") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[1].Error(), "2 redefines 'web' from task 1") {
|
||||
if !strings.Contains(mErr.Errors[1].Error(), "max parallel count is greater") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") {
|
||||
if !strings.Contains(mErr.Errors[2].Error(), "canary count is greater") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[3].Error(), "Only one task may be marked as leader") {
|
||||
if !strings.Contains(mErr.Errors[3].Error(), "2 redefines 'web' from task 1") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[4].Error(), "Task web validation failed") {
|
||||
if !strings.Contains(mErr.Errors[4].Error(), "Task 3 missing name") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[5].Error(), "Only one task may be marked as leader") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[6].Error(), "Task web validation failed") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -833,6 +1047,35 @@ func TestConstraint_Validate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdateStrategy_Validate(t *testing.T) {
|
||||
u := &UpdateStrategy{
|
||||
MaxParallel: -1,
|
||||
HealthCheck: "foo",
|
||||
MinHealthyTime: -10,
|
||||
HealthyDeadline: -10,
|
||||
AutoRevert: false,
|
||||
Canary: -1,
|
||||
}
|
||||
|
||||
err := u.Validate()
|
||||
mErr := err.(*multierror.Error)
|
||||
if !strings.Contains(mErr.Errors[0].Error(), "Invalid health check given") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[1].Error(), "Max parallel can not be less than zero") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[2].Error(), "Canary count can not be less than zero") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[3].Error(), "Minimum healthy time may not be less than zero") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !strings.Contains(mErr.Errors[4].Error(), "Healthy deadline must be greater than zero") {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResource_NetIndex(t *testing.T) {
|
||||
r := &Resources{
|
||||
Networks: []*NetworkResource{
|
||||
|
|
Loading…
Reference in New Issue