Job endpoint handles periodic jobs

This commit is contained in:
Alex Dadgar 2015-12-01 11:40:40 -08:00
parent 144fc539bb
commit 8c484875ce
2 changed files with 178 additions and 4 deletions

View File

@ -49,6 +49,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if args.Job.IsPeriodic() {
return nil
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
@ -73,10 +81,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
// Setup the reply
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}
@ -116,6 +123,10 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
return fmt.Errorf("job not found")
}
if job.IsPeriodic() {
return fmt.Errorf("can't evaluate periodic job")
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
@ -153,6 +164,24 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
@ -160,6 +189,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if job.IsPeriodic() {
return nil
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
@ -185,10 +222,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// Setup the reply
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}

View File

@ -233,6 +233,55 @@ func TestJobEndpoint_Register_GC_Set(t *testing.T) {
}
}
func TestJobEndpoint_Register_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request for a periodic job.
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
serviceName := out.TaskGroups[0].Tasks[0].Services[0].Name
expectedServiceName := "web-frontend"
if serviceName != expectedServiceName {
t.Fatalf("Expected Service Name: %s, Actual: %s", expectedServiceName, serviceName)
}
if resp.EvalID != "" {
t.Fatalf("Register created an eval for a periodic job")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@ -304,6 +353,44 @@ func TestJobEndpoint_Evaluate(t *testing.T) {
}
}
func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Force a re-evaluation
reEval := &structs.JobEvaluateRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err == nil {
t.Fatal("expect an err")
}
}
func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@ -380,6 +467,57 @@ func TestJobEndpoint_Deregister(t *testing.T) {
}
}
func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Deregister
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
}
if resp.EvalID != "" {
t.Fatalf("Deregister created an eval for a periodic job")
}
}
func TestJobEndpoint_GetJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()