Job stability

This commit is contained in:
Alex Dadgar 2017-07-06 12:49:13 -07:00
parent d07a5a2008
commit 5457bb7962
10 changed files with 362 additions and 14 deletions

View File

@ -266,6 +266,23 @@ func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64,
return &resp, wm, nil
}
// Stable is used to mark a job version's stability.
func (j *Jobs) Stable(jobID string, version uint64, stable bool,
q *WriteOptions) (*JobStabilityResponse, *WriteMeta, error) {
var resp JobStabilityResponse
req := &JobStabilityRequest{
JobID: jobID,
JobVersion: version,
Stable: stable,
}
wm, err := j.client.write("/v1/job/"+jobID+"/stable", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
@ -842,3 +859,20 @@ type JobVersionsResponse struct {
Diffs []*JobDiff
QueryMeta
}
// JobStabilityRequest is used to marked a job as stable.
type JobStabilityRequest struct {
// Job to set the stability on
JobID string
JobVersion uint64
// Set the stability
Stable bool
WriteRequest
}
// JobStabilityResponse is the response when marking a job as stable.
type JobStabilityResponse struct {
JobModifyIndex uint64
WriteMeta
}

View File

@ -76,6 +76,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/deployment"):
jobName := strings.TrimSuffix(path, "/deployment")
return s.jobLatestDeployment(resp, req, jobName)
case strings.HasSuffix(path, "/stable"):
jobName := strings.TrimSuffix(path, "/stable")
return s.jobStable(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
@ -455,6 +458,35 @@ func (s *HTTPServer) jobRevert(resp http.ResponseWriter, req *http.Request,
return out, nil
}
func (s *HTTPServer) jobStable(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
var stableRequest structs.JobStabilityRequest
if err := decodeBody(req, &stableRequest); err != nil {
return nil, CodedError(400, err.Error())
}
if stableRequest.JobID == "" {
return nil, CodedError(400, "JobID must be specified")
}
if stableRequest.JobID != jobName {
return nil, CodedError(400, "Job ID does not match")
}
s.parseRegion(req, &stableRequest.Region)
var out structs.JobStabilityResponse
if err := s.agent.RPC("Job.Stable", &stableRequest, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}
func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,

View File

@ -923,6 +923,57 @@ func TestHTTP_JobRevert(t *testing.T) {
})
}
func TestHTTP_JobStable(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job and register it twice
job := mock.Job()
regReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var regResp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}
args := structs.JobStabilityRequest{
JobID: job.ID,
JobVersion: 0,
Stable: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf := encodeReq(args)
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/stable", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
stableResp := obj.(structs.JobStabilityResponse)
if stableResp.JobModifyIndex == 0 {
t.Fatalf("bad: %v", stableResp)
}
// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}
func TestJobs_ApiJobToStructsJob(t *testing.T) {
apiJob := &api.Job{
Stop: helper.BoolToPtr(true),

View File

@ -165,6 +165,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyDeploymentAllocHealth(buf[1:], log.Index)
case structs.DeploymentDeleteRequestType:
return n.applyDeploymentDelete(buf[1:], log.Index)
case structs.JobStabilityRequestType:
return n.applyJobStability(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -651,6 +653,22 @@ func (n *nomadFSM) applyDeploymentDelete(buf []byte, index uint64) interface{} {
return nil
}
// applyJobStability is used to set the stability of a job
func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_job_stability"}, time.Now())
var req structs.JobStabilityRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateJobStability(index, req.JobID, req.JobVersion, req.Stable); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateJobStability failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()

View File

@ -1210,6 +1210,43 @@ func TestFSM_DeploymentStatusUpdate(t *testing.T) {
}
}
func TestFSM_JobStabilityUpdate(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
state := fsm.State()
// Upsert a deployment
job := mock.Job()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a request to update the job to stable
req := &structs.JobStabilityRequest{
JobID: job.ID,
JobVersion: job.Version,
Stable: true,
}
buf, err := structs.Encode(structs.JobStabilityRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Check that the stability was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.ID, job.Version)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil || !jout.Stable {
t.Fatalf("bad: %#v", jout)
}
}
func TestFSM_DeploymentPromotion(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)

View File

@ -340,7 +340,7 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
return fmt.Errorf("missing job ID for revert")
}
// Lookup the job by version
@ -389,6 +389,46 @@ func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterR
return j.Register(reg, reply)
}
// Stable is used to mark the job version as stable
func (j *Job) Stable(args *structs.JobStabilityRequest, reply *structs.JobStabilityResponse) error {
if done, err := j.srv.forward("Job.Stable", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "stable"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for marking job as stable")
}
// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion)
if err != nil {
return err
}
if jobV == nil {
return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion)
}
// Commit this evaluation via Raft
_, modifyIndex, err := j.srv.raftApply(structs.JobStabilityRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}
// Setup the reply
reply.JobModifyIndex = modifyIndex
reply.Index = modifyIndex
return nil
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
@ -460,7 +500,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
return fmt.Errorf("missing job ID for deregistering")
}
// Lookup the job

View File

@ -895,6 +895,65 @@ func TestJobEndpoint_Revert(t *testing.T) {
}
}
func TestJobEndpoint_Stable(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the initial register request
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Create stablility request
stableReq := &structs.JobStabilityRequest{
JobID: job.ID,
JobVersion: 0,
Stable: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var stableResp structs.JobStabilityResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &stableResp); err != nil {
t.Fatalf("err: %v", err)
}
if stableResp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check that the job is marked stable
state := s1.fsm.State()
ws := memdb.NewWatchSet()
out, err := state.JobByID(ws, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if !out.Stable {
t.Fatalf("Job is not marked stable")
}
if out.JobModifyIndex != stableResp.JobModifyIndex {
t.Fatalf("got job modify index %d; want %d", out.JobModifyIndex, stableResp.JobModifyIndex)
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue

View File

@ -238,7 +238,7 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
// If the deployment is being marked as complete, set the job to stable.
if deployment.Status == structs.DeploymentStatusSuccessful {
if err := s.updateJobStability(index, deployment, txn); err != nil {
if err := s.updateJobStabilityImpl(index, deployment.JobID, deployment.JobVersion, true, txn); err != nil {
return fmt.Errorf("failed to update job stability: %v", err)
}
}
@ -1891,7 +1891,7 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
// If the deployment is being marked as complete, set the job to stable.
if copy.Status == structs.DeploymentStatusSuccessful {
if err := s.updateJobStability(index, copy, txn); err != nil {
if err := s.updateJobStabilityImpl(index, copy.JobID, copy.JobVersion, true, txn); err != nil {
return fmt.Errorf("failed to update job stability: %v", err)
}
}
@ -1899,16 +1899,24 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
return nil
}
// updateJobStability updates the job version referenced by a successful
// deployment to stable.
func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error {
// Hot-path
if deployment.Status != structs.DeploymentStatusSuccessful {
return nil
// UpdateJobStability updates the stability of the given job and version to the
// desired status.
func (s *StateStore) UpdateJobStability(index uint64, jobID string, jobVersion uint64, stable bool) error {
txn := s.db.Txn(true)
defer txn.Abort()
if err := s.updateJobStabilityImpl(index, jobID, jobVersion, stable, txn); err != nil {
return err
}
txn.Commit()
return nil
}
// updateJobStabilityImpl updates the stability of the given job and version
func (s *StateStore) updateJobStabilityImpl(index uint64, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error {
// Get the job that is referenced
job, err := s.jobByIDAndVersionImpl(nil, deployment.JobID, deployment.JobVersion, txn)
job, err := s.jobByIDAndVersionImpl(nil, jobID, jobVersion, txn)
if err != nil {
return err
}
@ -1918,13 +1926,13 @@ func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deploy
return nil
}
// If the job is already stable, nothing to do
if job.Stable {
// If the job already has the desired stability, nothing to do
if job.Stable == stable {
return nil
}
copy := job.Copy()
copy.Stable = true
copy.Stable = stable
return s.upsertJobImpl(index, copy, true, txn)
}

View File

@ -4847,6 +4847,57 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) {
}
}
func TestStateStore_UpdateJobStability(t *testing.T) {
state := testStateStore(t)
// Insert a job twice to get two versions
job := mock.Job()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("bad: %v", err)
}
if err := state.UpsertJob(2, job); err != nil {
t.Fatalf("bad: %v", err)
}
// Update the stability to true
err := state.UpdateJobStability(3, job.ID, 0, true)
if err != nil {
t.Fatalf("bad: %v", err)
}
// Check that the job was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if !jout.Stable {
t.Fatalf("job not marked stable %#v", jout)
}
// Update the stability to false
err = state.UpdateJobStability(3, job.ID, 0, false)
if err != nil {
t.Fatalf("bad: %v", err)
}
// Check that the job was updated properly
jout, _ = state.JobByIDAndVersion(ws, job.ID, 0)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
if jout.Stable {
t.Fatalf("job marked stable %#v", jout)
}
}
// Test that non-existant deployment can't be promoted
func TestStateStore_UpsertDeploymentPromotion_NonExistant(t *testing.T) {
state := testStateStore(t)

View File

@ -58,6 +58,7 @@ const (
DeploymentPromoteRequestType
DeploymentAllocHealthRequestType
DeploymentDeleteRequestType
JobStabilityRequestType
)
const (
@ -318,6 +319,23 @@ type JobRevertRequest struct {
WriteRequest
}
// JobStabilityRequest is used to marked a job as stable.
type JobStabilityRequest struct {
// Job to set the stability on
JobID string
JobVersion uint64
// Set the stability
Stable bool
WriteRequest
}
// JobStabilityResponse is the response when marking a job as stable.
type JobStabilityResponse struct {
JobModifyIndex uint64
WriteMeta
}
// NodeListRequest is used to parameterize a list request
type NodeListRequest struct {
QueryOptions