diff --git a/api/jobs.go b/api/jobs.go index 7687e9b7d..bbfaa1bcc 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -266,6 +266,23 @@ func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64, return &resp, wm, nil } +// Stable is used to mark a job version's stability. +func (j *Jobs) Stable(jobID string, version uint64, stable bool, + q *WriteOptions) (*JobStabilityResponse, *WriteMeta, error) { + + var resp JobStabilityResponse + req := &JobStabilityRequest{ + JobID: jobID, + JobVersion: version, + Stable: stable, + } + wm, err := j.client.write("/v1/job/"+jobID+"/stable", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // periodicForceResponse is used to deserialize a force response type periodicForceResponse struct { EvalID string @@ -842,3 +859,20 @@ type JobVersionsResponse struct { Diffs []*JobDiff QueryMeta } + +// JobStabilityRequest is used to marked a job as stable. +type JobStabilityRequest struct { + // Job to set the stability on + JobID string + JobVersion uint64 + + // Set the stability + Stable bool + WriteRequest +} + +// JobStabilityResponse is the response when marking a job as stable. +type JobStabilityResponse struct { + JobModifyIndex uint64 + WriteMeta +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index df2c630e1..4eabb91d4 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -76,6 +76,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/deployment"): jobName := strings.TrimSuffix(path, "/deployment") return s.jobLatestDeployment(resp, req, jobName) + case strings.HasSuffix(path, "/stable"): + jobName := strings.TrimSuffix(path, "/stable") + return s.jobStable(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -455,6 +458,35 @@ func (s *HTTPServer) jobRevert(resp http.ResponseWriter, req *http.Request, return out, nil } +func (s *HTTPServer) jobStable(resp http.ResponseWriter, req *http.Request, + jobName string) (interface{}, error) { + + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + var stableRequest structs.JobStabilityRequest + if err := decodeBody(req, &stableRequest); err != nil { + return nil, CodedError(400, err.Error()) + } + if stableRequest.JobID == "" { + return nil, CodedError(400, "JobID must be specified") + } + if stableRequest.JobID != jobName { + return nil, CodedError(400, "Job ID does not match") + } + + s.parseRegion(req, &stableRequest.Region) + + var out structs.JobStabilityResponse + if err := s.agent.RPC("Job.Stable", &stableRequest, &out); err != nil { + return nil, err + } + + setIndex(resp, out.Index) + return out, nil +} + func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) { args := structs.JobSummaryRequest{ JobID: name, diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 6e71235d6..af5ade693 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -923,6 +923,57 @@ func TestHTTP_JobRevert(t *testing.T) { }) } +func TestHTTP_JobStable(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create the job and register it twice + job := mock.Job() + regReq := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var regResp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", ®Req, ®Resp); err != nil { + t.Fatalf("err: %v", err) + } + + if err := s.Agent.RPC("Job.Register", ®Req, ®Resp); err != nil { + t.Fatalf("err: %v", err) + } + + args := structs.JobStabilityRequest{ + JobID: job.ID, + JobVersion: 0, + Stable: true, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + buf := encodeReq(args) + + // Make the HTTP request + req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/stable", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the response + stableResp := obj.(structs.JobStabilityResponse) + if stableResp.JobModifyIndex == 0 { + t.Fatalf("bad: %v", stableResp) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + }) +} + func TestJobs_ApiJobToStructsJob(t *testing.T) { apiJob := &api.Job{ Stop: helper.BoolToPtr(true), diff --git a/nomad/fsm.go b/nomad/fsm.go index e4bc3db2d..fd8f294e2 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -165,6 +165,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyDeploymentAllocHealth(buf[1:], log.Index) case structs.DeploymentDeleteRequestType: return n.applyDeploymentDelete(buf[1:], log.Index) + case structs.JobStabilityRequestType: + return n.applyJobStability(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -651,6 +653,22 @@ func (n *nomadFSM) applyDeploymentDelete(buf []byte, index uint64) interface{} { return nil } +// applyJobStability is used to set the stability of a job +func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_job_stability"}, time.Now()) + var req structs.JobStabilityRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpdateJobStability(index, req.JobID, req.JobVersion, req.Stable); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpdateJobStability failed: %v", err) + return err + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b73696e97..a79d6aa1b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1210,6 +1210,43 @@ func TestFSM_DeploymentStatusUpdate(t *testing.T) { } } +func TestFSM_JobStabilityUpdate(t *testing.T) { + fsm := testFSM(t) + fsm.evalBroker.SetEnabled(true) + state := fsm.State() + + // Upsert a deployment + job := mock.Job() + if err := state.UpsertJob(1, job); err != nil { + t.Fatalf("bad: %v", err) + } + + // Create a request to update the job to stable + req := &structs.JobStabilityRequest{ + JobID: job.ID, + JobVersion: job.Version, + Stable: true, + } + buf, err := structs.Encode(structs.JobStabilityRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Check that the stability was updated properly + ws := memdb.NewWatchSet() + jout, _ := state.JobByIDAndVersion(ws, job.ID, job.Version) + if err != nil { + t.Fatalf("bad: %v", err) + } + if jout == nil || !jout.Stable { + t.Fatalf("bad: %#v", jout) + } +} + func TestFSM_DeploymentPromotion(t *testing.T) { fsm := testFSM(t) fsm.evalBroker.SetEnabled(true) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 3d1f608c6..1f6b2faf1 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -340,7 +340,7 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR // Validate the arguments if args.JobID == "" { - return fmt.Errorf("missing job ID for evaluation") + return fmt.Errorf("missing job ID for revert") } // Lookup the job by version @@ -389,6 +389,46 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR return j.Register(reg, reply) } +// Stable is used to mark the job version as stable +func (j *Job) Stable(args *structs.JobStabilityRequest, reply *structs.JobStabilityResponse) error { + if done, err := j.srv.forward("Job.Stable", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "stable"}, time.Now()) + + // Validate the arguments + if args.JobID == "" { + return fmt.Errorf("missing job ID for marking job as stable") + } + + // Lookup the job by version + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion) + if err != nil { + return err + } + if jobV == nil { + return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion) + } + + // Commit this evaluation via Raft + _, modifyIndex, err := j.srv.raftApply(structs.JobStabilityRequestType, args) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err) + return err + } + + // Setup the reply + reply.JobModifyIndex = modifyIndex + reply.Index = modifyIndex + return nil +} + // Evaluate is used to force a job for re-evaluation func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done { @@ -460,7 +500,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Validate the arguments if args.JobID == "" { - return fmt.Errorf("missing job ID for evaluation") + return fmt.Errorf("missing job ID for deregistering") } // Lookup the job diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 233bf3ad5..ddc6f2357 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -895,6 +895,65 @@ func TestJobEndpoint_Revert(t *testing.T) { } } +func TestJobEndpoint_Stable(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the initial register request + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Create stablility request + stableReq := &structs.JobStabilityRequest{ + JobID: job.ID, + JobVersion: 0, + Stable: true, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var stableResp structs.JobStabilityResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &stableResp); err != nil { + t.Fatalf("err: %v", err) + } + if stableResp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check that the job is marked stable + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if !out.Stable { + t.Fatalf("Job is not marked stable") + } + if out.JobModifyIndex != stableResp.JobModifyIndex { + t.Fatalf("got job modify index %d; want %d", out.JobModifyIndex, stableResp.JobModifyIndex) + } +} + func TestJobEndpoint_Evaluate(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 580ade621..a2f4e2fb8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -238,7 +238,7 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl // If the deployment is being marked as complete, set the job to stable. if deployment.Status == structs.DeploymentStatusSuccessful { - if err := s.updateJobStability(index, deployment, txn); err != nil { + if err := s.updateJobStabilityImpl(index, deployment.JobID, deployment.JobVersion, true, txn); err != nil { return fmt.Errorf("failed to update job stability: %v", err) } } @@ -1891,7 +1891,7 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym // If the deployment is being marked as complete, set the job to stable. if copy.Status == structs.DeploymentStatusSuccessful { - if err := s.updateJobStability(index, copy, txn); err != nil { + if err := s.updateJobStabilityImpl(index, copy.JobID, copy.JobVersion, true, txn); err != nil { return fmt.Errorf("failed to update job stability: %v", err) } } @@ -1899,16 +1899,24 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym return nil } -// updateJobStability updates the job version referenced by a successful -// deployment to stable. -func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error { - // Hot-path - if deployment.Status != structs.DeploymentStatusSuccessful { - return nil +// UpdateJobStability updates the stability of the given job and version to the +// desired status. +func (s *StateStore) UpdateJobStability(index uint64, jobID string, jobVersion uint64, stable bool) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if err := s.updateJobStabilityImpl(index, jobID, jobVersion, stable, txn); err != nil { + return err } + txn.Commit() + return nil +} + +// updateJobStabilityImpl updates the stability of the given job and version +func (s *StateStore) updateJobStabilityImpl(index uint64, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error { // Get the job that is referenced - job, err := s.jobByIDAndVersionImpl(nil, deployment.JobID, deployment.JobVersion, txn) + job, err := s.jobByIDAndVersionImpl(nil, jobID, jobVersion, txn) if err != nil { return err } @@ -1918,13 +1926,13 @@ func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deploy return nil } - // If the job is already stable, nothing to do - if job.Stable { + // If the job already has the desired stability, nothing to do + if job.Stable == stable { return nil } copy := job.Copy() - copy.Stable = true + copy.Stable = stable return s.upsertJobImpl(index, copy, true, txn) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ae78483ba..ac16f9c1a 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4847,6 +4847,57 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) { } } +func TestStateStore_UpdateJobStability(t *testing.T) { + state := testStateStore(t) + + // Insert a job twice to get two versions + job := mock.Job() + if err := state.UpsertJob(1, job); err != nil { + t.Fatalf("bad: %v", err) + } + + if err := state.UpsertJob(2, job); err != nil { + t.Fatalf("bad: %v", err) + } + + // Update the stability to true + err := state.UpdateJobStability(3, job.ID, 0, true) + if err != nil { + t.Fatalf("bad: %v", err) + } + + // Check that the job was updated properly + ws := memdb.NewWatchSet() + jout, _ := state.JobByIDAndVersion(ws, job.ID, 0) + if err != nil { + t.Fatalf("bad: %v", err) + } + if jout == nil { + t.Fatalf("bad: %#v", jout) + } + if !jout.Stable { + t.Fatalf("job not marked stable %#v", jout) + } + + // Update the stability to false + err = state.UpdateJobStability(3, job.ID, 0, false) + if err != nil { + t.Fatalf("bad: %v", err) + } + + // Check that the job was updated properly + jout, _ = state.JobByIDAndVersion(ws, job.ID, 0) + if err != nil { + t.Fatalf("bad: %v", err) + } + if jout == nil { + t.Fatalf("bad: %#v", jout) + } + if jout.Stable { + t.Fatalf("job marked stable %#v", jout) + } +} + // Test that non-existant deployment can't be promoted func TestStateStore_UpsertDeploymentPromotion_NonExistant(t *testing.T) { state := testStateStore(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6586db483..3b67f2f27 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -58,6 +58,7 @@ const ( DeploymentPromoteRequestType DeploymentAllocHealthRequestType DeploymentDeleteRequestType + JobStabilityRequestType ) const ( @@ -318,6 +319,23 @@ type JobRevertRequest struct { WriteRequest } +// JobStabilityRequest is used to marked a job as stable. +type JobStabilityRequest struct { + // Job to set the stability on + JobID string + JobVersion uint64 + + // Set the stability + Stable bool + WriteRequest +} + +// JobStabilityResponse is the response when marking a job as stable. +type JobStabilityResponse struct { + JobModifyIndex uint64 + WriteMeta +} + // NodeListRequest is used to parameterize a list request type NodeListRequest struct { QueryOptions