remove backcompat support for non-atomic job registration (#16305)
In Nomad 0.12.1 we introduced atomic job registration/deregistration, where the new eval was written in the same raft entry. Backwards-compatibility checks were supposed to have been removed in Nomad 1.1.0, but we missed that. This is long safe to remove.
This commit is contained in:
parent
40494e64a9
commit
3c0eaba9db
|
@ -0,0 +1,3 @@
|
|||
```release-note:breaking-change
|
||||
api: job register and register requests from API clients older than version 0.12.1 will not longer emit an evaluation
|
||||
```
|
|
@ -365,9 +365,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
|||
|
||||
if existingJob == nil || specChanged {
|
||||
|
||||
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
|
||||
// 0.12.1 introduced atomic eval job registration
|
||||
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
|
||||
if eval != nil {
|
||||
args.Eval = eval
|
||||
submittedEval = true
|
||||
}
|
||||
|
@ -887,10 +885,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
|
|||
reply.EvalID = eval.ID
|
||||
}
|
||||
|
||||
// COMPAT(1.1.0): remove conditional and always set args.Eval
|
||||
if ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
|
||||
args.Eval = eval
|
||||
}
|
||||
args.Eval = eval
|
||||
|
||||
// Commit the job update via Raft
|
||||
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
|
||||
|
@ -904,27 +899,6 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
|
|||
reply.EvalCreateIndex = index
|
||||
reply.Index = index
|
||||
|
||||
// COMPAT(1.1.0) - Remove entire conditional block
|
||||
// 0.12.1 introduced atomic job deregistration eval
|
||||
if eval != nil && args.Eval == nil {
|
||||
// Create a new evaluation
|
||||
eval.JobModifyIndex = index
|
||||
update := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
WriteRequest: structs.WriteRequest{Region: args.Region},
|
||||
}
|
||||
|
||||
// Commit this evaluation via Raft
|
||||
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
|
||||
if err != nil {
|
||||
j.logger.Error("eval create failed", "error", err, "method", "deregister")
|
||||
return err
|
||||
}
|
||||
|
||||
reply.EvalCreateIndex = evalIndex
|
||||
reply.Index = evalIndex
|
||||
}
|
||||
|
||||
err = j.multiregionStop(job, args, reply)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -1998,8 +1998,8 @@ func TestJobEndpoint_Register_SemverConstraint(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestJobEndpoint_Register_EvalCreation_Modern asserts that job register creates an eval
|
||||
// atomically with the registration
|
||||
// TestJobEndpoint_Register_EvalCreation asserts that job register creates an
|
||||
// eval atomically with the registration
|
||||
func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
@ -2119,150 +2119,6 @@ func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job register creates an eval
|
||||
// atomically with the registration, but handle legacy clients by adding a new eval update
|
||||
func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 2
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
|
||||
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 2
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
|
||||
// simulate presense of a server that doesn't handle
|
||||
// new registration eval
|
||||
c.Build = "0.12.0"
|
||||
})
|
||||
defer cleanupS2()
|
||||
|
||||
TestJoin(t, s1, s2)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
testutil.WaitForLeader(t, s2.RPC)
|
||||
|
||||
// keep s1 as the leader
|
||||
if leader, _ := s1.getLeader(); !leader {
|
||||
s1, s2 = s2, s1
|
||||
}
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
// Create the register request
|
||||
t.Run("job registration always create evals", func(t *testing.T) {
|
||||
job := mock.Job()
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
//// initial registration should create the job and a new eval
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, resp.Index)
|
||||
require.NotEmpty(t, resp.EvalID)
|
||||
|
||||
// Check for the job in the FSM
|
||||
state := s1.fsm.State()
|
||||
out, err := state.JobByID(nil, job.Namespace, job.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
require.Equal(t, resp.JobModifyIndex, out.CreateIndex)
|
||||
|
||||
// Lookup the evaluation
|
||||
eval, err := state.EvalByID(nil, resp.EvalID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, eval)
|
||||
require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex)
|
||||
|
||||
raftEval := evalUpdateFromRaft(t, s1, eval.ID)
|
||||
require.Equal(t, eval, raftEval)
|
||||
|
||||
//// re-registration should create a new eval, but leave the job untouched
|
||||
var resp2 structs.JobRegisterResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, resp2.Index)
|
||||
require.NotEmpty(t, resp2.EvalID)
|
||||
require.NotEqual(t, resp.EvalID, resp2.EvalID)
|
||||
|
||||
// Check for the job in the FSM
|
||||
state = s1.fsm.State()
|
||||
out, err = state.JobByID(nil, job.Namespace, job.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
require.Equal(t, resp2.JobModifyIndex, out.CreateIndex)
|
||||
require.Equal(t, out.CreateIndex, out.JobModifyIndex)
|
||||
|
||||
// Lookup the evaluation
|
||||
eval, err = state.EvalByID(nil, resp2.EvalID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, eval)
|
||||
require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex)
|
||||
|
||||
// this raft eval is the one found above
|
||||
raftEval = evalUpdateFromRaft(t, s1, eval.ID)
|
||||
require.Equal(t, eval, raftEval)
|
||||
|
||||
//// an update should update the job and create a new eval
|
||||
req.Job.TaskGroups[0].Name += "a"
|
||||
var resp3 structs.JobRegisterResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, resp3.Index)
|
||||
require.NotEmpty(t, resp3.EvalID)
|
||||
require.NotEqual(t, resp.EvalID, resp3.EvalID)
|
||||
|
||||
// Check for the job in the FSM
|
||||
state = s1.fsm.State()
|
||||
out, err = state.JobByID(nil, job.Namespace, job.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex)
|
||||
|
||||
// Lookup the evaluation
|
||||
eval, err = state.EvalByID(nil, resp3.EvalID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, eval)
|
||||
require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex)
|
||||
|
||||
raftEval = evalUpdateFromRaft(t, s1, eval.ID)
|
||||
require.Equal(t, eval, raftEval)
|
||||
})
|
||||
|
||||
// Registering a parameterized job shouldn't create an eval
|
||||
t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) {
|
||||
job := mock.PeriodicJob()
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, resp.Index)
|
||||
require.Empty(t, resp.EvalID)
|
||||
|
||||
// Check for the job in the FSM
|
||||
state := s1.fsm.State()
|
||||
out, err := state.JobByID(nil, job.Namespace, job.ID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
require.Equal(t, resp.JobModifyIndex, out.CreateIndex)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
|
@ -3786,9 +3642,9 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestJobEndpoint_Deregister_EvalCreation_Modern asserts that job deregister creates an eval
|
||||
// atomically with the registration
|
||||
func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) {
|
||||
// TestJobEndpoint_Deregister_EvalCreation asserts that job deregister creates
|
||||
// an eval atomically with the registration
|
||||
func TestJobEndpoint_Deregister_EvalCreation(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
|
@ -3866,105 +3722,6 @@ func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestJobEndpoint_Deregister_EvalCreation_Legacy asserts that job deregister
|
||||
// creates an eval atomically with the registration, but handle legacy clients
|
||||
// by adding a new eval update
|
||||
func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 2
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer cleanupS1()
|
||||
|
||||
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 2
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
|
||||
// simulate presense of a server that doesn't handle
|
||||
// new registration eval
|
||||
c.Build = "0.12.0"
|
||||
})
|
||||
defer cleanupS2()
|
||||
|
||||
TestJoin(t, s1, s2)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
testutil.WaitForLeader(t, s2.RPC)
|
||||
|
||||
// keep s1 as the leader
|
||||
if leader, _ := s1.getLeader(); !leader {
|
||||
s1, s2 = s2, s1
|
||||
}
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
// Create the register request
|
||||
t.Run("job registration always create evals", func(t *testing.T) {
|
||||
job := mock.Job()
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
require.NoError(t, err)
|
||||
|
||||
dereg := &structs.JobDeregisterRequest{
|
||||
JobID: job.ID,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp2 structs.JobDeregisterResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, resp2.EvalID)
|
||||
|
||||
state := s1.fsm.State()
|
||||
eval, err := state.EvalByID(nil, resp2.EvalID)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, eval)
|
||||
require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex)
|
||||
|
||||
raftEval := evalUpdateFromRaft(t, s1, eval.ID)
|
||||
require.Equal(t, eval, raftEval)
|
||||
})
|
||||
|
||||
// Registering a parameterized job shouldn't create an eval
|
||||
t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) {
|
||||
job := mock.PeriodicJob()
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, resp.Index)
|
||||
|
||||
dereg := &structs.JobDeregisterRequest{
|
||||
JobID: job.ID,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp2 structs.JobDeregisterResponse
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, resp2.EvalID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
require := require.New(t)
|
||||
|
|
|
@ -45,8 +45,6 @@ var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0"))
|
|||
|
||||
var minClusterIDVersion = version.Must(version.NewVersion("0.10.4"))
|
||||
|
||||
var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1"))
|
||||
|
||||
var minOneTimeAuthenticationTokenVersion = version.Must(version.NewVersion("1.1.0"))
|
||||
|
||||
// minACLRoleVersion is the Nomad version at which the ACL role table was
|
||||
|
|
|
@ -76,27 +76,6 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
|
|||
eval.CreateIndex = index
|
||||
eval.ModifyIndex = index
|
||||
|
||||
// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
|
||||
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minJobRegisterAtomicEvalVersion, false) {
|
||||
// Create a new evaluation
|
||||
eval.JobModifyIndex = index
|
||||
update := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
}
|
||||
|
||||
// Commit this evaluation via Raft
|
||||
// There is a risk of partial failure where the JobRegister succeeds
|
||||
// but that the EvalUpdate does not, before Nomad 0.12.1
|
||||
_, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update its indexes.
|
||||
eval.CreateIndex = evalIndex
|
||||
eval.ModifyIndex = evalIndex
|
||||
}
|
||||
|
||||
return eval, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue