only set args.Eval after all servers upgrade

We set the Eval field on job (de-)registration only after all servers
get upgraded, to avoid dealing with duplicate evals.
This commit is contained in:
Mahmood Ali 2020-07-15 11:10:57 -04:00
parent 910776caf0
commit a314744210
3 changed files with 41 additions and 41 deletions

2
Vagrantfile vendored
View File

@ -2,7 +2,7 @@
# vi: set ft=ruby :
#
LINUX_BASE_BOX = "bento/ubuntu-16.04"
LINUX_BASE_BOX = "bento/ubuntu-18.04"
FREEBSD_BASE_BOX = "freebsd/FreeBSD-11.3-STABLE"
LINUX_IP_ADDRESS = "10.199.0.200"

View File

@ -315,13 +315,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Create a new evaluation
now := time.Now().UnixNano()
submittedEval := false
var eval *structs.Evaluation
// Set the submit time
args.Job.SubmitTime = now
// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
args.Eval = &structs.Evaluation{
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
@ -332,12 +333,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
reply.EvalID = eval.ID
}
// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}
// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
@ -349,13 +357,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
submittedEval = true
// Populate the reply with job information
reply.JobModifyIndex = index
reply.Index = index
if args.Eval != nil {
if submittedEval {
reply.EvalCreateIndex = index
}
@ -366,17 +372,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex
if args.Eval == nil {
if eval == nil {
return nil
}
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check.
// 0.12.1 introduced atomic eval job registration
if args.Eval != nil &&
!(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) {
args.Eval.JobModifyIndex = reply.JobModifyIndex
if eval != nil && !submittedEval {
eval.JobModifyIndex = reply.JobModifyIndex
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
@ -389,11 +392,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
if !submittedEval {
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
}
// Kick off a multiregion deployment (enterprise only).
if isRunner {
@ -786,12 +787,15 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
}
var eval *structs.Evaluation
// The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UnixNano()
// If the job is periodic or parameterized, we don't create an eval.
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
args.Eval = &structs.Evaluation{
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
@ -802,7 +806,12 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
CreateTime: now,
ModifyTime: now,
}
reply.EvalID = args.Eval.ID
reply.EvalID = eval.ID
}
// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}
// Commit the job update via Raft
@ -824,23 +833,26 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
result = multierror.Append(result, err)
}
// COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval
if args.Eval != nil &&
!ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
// COMPAT(1.1.0) - Remove entire conditional block
// 0.12.1 introduced atomic job deregistration eval
if eval != nil && args.Eval == nil {
// Create a new evaluation
args.Eval.JobModifyIndex = index
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{args.Eval},
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
return result.ErrorOrNil()

View File

@ -1640,9 +1640,7 @@ func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) {
require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex)
raftEval := evalUpdateFromRaft(t, s1, eval.ID)
require.NotNil(t, raftEval)
require.Equal(t, eval.CreateIndex, raftEval.CreateIndex)
require.Equal(t, eval, raftEval)
require.Equal(t, raftEval, eval)
//// an update should update the job and create a new eval
req.Job.TaskGroups[0].Name += "a"
@ -1759,10 +1757,7 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) {
require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex)
raftEval := evalUpdateFromRaft(t, s1, eval.ID)
require.NotNil(t, raftEval)
require.Equal(t, eval.ID, raftEval.ID)
require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex)
require.Greater(t, raftEval.CreateIndex, eval.CreateIndex)
require.Equal(t, eval, raftEval)
//// re-registration should create a new eval, but leave the job untouched
var resp2 structs.JobRegisterResponse
@ -1788,7 +1783,6 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) {
// this raft eval is the one found above
raftEval = evalUpdateFromRaft(t, s1, eval.ID)
require.Equal(t, raftEval.CreateIndex, eval.CreateIndex)
require.Equal(t, eval, raftEval)
//// an update should update the job and create a new eval
@ -1814,10 +1808,7 @@ func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) {
require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex)
raftEval = evalUpdateFromRaft(t, s1, eval.ID)
require.NotNil(t, raftEval)
require.Equal(t, eval.ID, raftEval.ID)
require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex)
require.Greater(t, raftEval.CreateIndex, eval.CreateIndex)
require.Equal(t, eval, raftEval)
})
// Registering a parameterized job shouldn't create an eval
@ -3297,10 +3288,7 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) {
require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex)
raftEval := evalUpdateFromRaft(t, s1, eval.ID)
require.NotNil(t, raftEval)
require.Equal(t, eval.ID, raftEval.ID)
require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex)
require.Greater(t, raftEval.CreateIndex, eval.CreateIndex)
require.Equal(t, eval, raftEval)
})
// Registering a parameterized job shouldn't create an eval