From 80dd30b03d3759a981ab0fc56b4b1bdf47e3b529 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jan 2016 10:19:53 -0800 Subject: [PATCH 1/5] Add force spawn endpoint --- api/jobs.go | 6 ++ api/periodic.go | 26 ++++++++ api/periodic_test.go | 61 ++++++++++++++++++ api/util_test.go | 9 +++ command/agent/http.go | 2 + command/agent/periodic_endpoint.go | 40 ++++++++++++ command/agent/periodic_endpoint_test.go | 49 +++++++++++++++ nomad/leader.go | 2 +- nomad/periodic.go | 39 +++++++----- nomad/periodic_endpoint.go | 55 ++++++++++++++++ nomad/periodic_endpoint_test.go | 83 +++++++++++++++++++++++++ nomad/periodic_test.go | 8 +-- nomad/server.go | 17 ++--- nomad/structs/structs.go | 15 ++++- 14 files changed, 382 insertions(+), 30 deletions(-) create mode 100644 api/periodic.go create mode 100644 api/periodic_test.go create mode 100644 command/agent/periodic_endpoint.go create mode 100644 command/agent/periodic_endpoint_test.go create mode 100644 nomad/periodic_endpoint.go create mode 100644 nomad/periodic_endpoint_test.go diff --git a/api/jobs.go b/api/jobs.go index c0f849044..e45e67990 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -221,6 +221,12 @@ func (j *Job) AddTaskGroup(grp *TaskGroup) *Job { return j } +// AddPeriodicConfig adds a periodic config to an existing job. +func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job { + j.Periodic = cfg + return j +} + // registerJobRequest is used to serialize a job registration type registerJobRequest struct { Job *Job diff --git a/api/periodic.go b/api/periodic.go new file mode 100644 index 000000000..c4ec1e781 --- /dev/null +++ b/api/periodic.go @@ -0,0 +1,26 @@ +package api + +// Periodic is used to access periodic job endpoints +type Periodic struct { + client *Client +} + +// Regions returns a handle on the allocs endpoints. +func (c *Client) PeriodicJobs() *Periodic { + return &Periodic{client: c} +} + +// List returns a list of all of the regions. +func (p *Periodic) Force(jobID string, q *WriteOptions) (string, *WriteMeta, error) { + var resp periodicForceResponse + wm, err := p.client.write("/v1/periodic/"+jobID+"/force", nil, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// periodicForceResponse is used to deserialize a force response +type periodicForceResponse struct { + EvalID string +} diff --git a/api/periodic_test.go b/api/periodic_test.go new file mode 100644 index 000000000..f65d67719 --- /dev/null +++ b/api/periodic_test.go @@ -0,0 +1,61 @@ +package api + +import ( + "strings" + "testing" + + "github.com/hashicorp/nomad/testutil" +) + +func TestPeriodicForce(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + periodic := c.PeriodicJobs() + + // Force-eval on a non-existent job fails + _, _, err := periodic.Force("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Create a new job + job := testPeriodicJob() + _, _, err = jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + + testutil.WaitForResult(func() (bool, error) { + out, _, err := jobs.Info(job.ID, nil) + if err != nil || out == nil || out.ID != job.ID { + return false, err + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Try force again + evalID, wm, err := periodic.Force(job.ID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + if evalID == "" { + t.Fatalf("empty evalID") + } + + // Retrieve the eval + evals := c.Evaluations() + eval, qm, err := evals.Info(evalID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + if eval.ID == evalID { + return + } + t.Fatalf("evaluation %q missing", evalID) +} diff --git a/api/util_test.go b/api/util_test.go index dfd0e95d0..408d93a54 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -32,3 +32,12 @@ func testJob() *Job { return job } + +func testPeriodicJob() *Job { + job := testJob().AddPeriodicConfig(&PeriodicConfig{ + Enabled: true, + Spec: "*/30 * * * *", + SpecType: "cron", + }) + return job +} diff --git a/command/agent/http.go b/command/agent/http.go index 4c46ee8b4..e1ab90a22 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -114,6 +114,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest)) s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) + s.mux.HandleFunc("/v1/periodic/", s.wrap(s.PeriodicSpecificReqeust)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/periodic_endpoint.go b/command/agent/periodic_endpoint.go new file mode 100644 index 000000000..eec622127 --- /dev/null +++ b/command/agent/periodic_endpoint.go @@ -0,0 +1,40 @@ +package agent + +import ( + "net/http" + "strings" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) PeriodicSpecificReqeust(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + path := strings.TrimPrefix(req.URL.Path, "/v1/periodic/") + switch { + case strings.HasSuffix(path, "/force"): + jobName := strings.TrimSuffix(path, "/force") + return s.periodicForceRequest(resp, req, jobName) + default: + return nil, CodedError(405, ErrInvalidMethod) + + } + +} + +func (s *HTTPServer) periodicForceRequest(resp http.ResponseWriter, req *http.Request, + jobName string) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.PeriodicForceRequest{ + JobID: jobName, + } + s.parseRegion(req, &args.Region) + + var out structs.PeriodicForceResponse + if err := s.agent.RPC("Periodic.Force", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} diff --git a/command/agent/periodic_endpoint_test.go b/command/agent/periodic_endpoint_test.go new file mode 100644 index 000000000..500034cdf --- /dev/null +++ b/command/agent/periodic_endpoint_test.go @@ -0,0 +1,49 @@ +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestHTTP_PeriodicForce(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create and register a periodic job. + job := mock.PeriodicJob() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/periodic/"+job.ID+"/force", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.PeriodicSpecificReqeust(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + + // Check the response + r := obj.(structs.PeriodicForceResponse) + if r.EvalID == "" { + t.Fatalf("bad: %#v", r) + } + }) +} diff --git a/nomad/leader.go b/nomad/leader.go index 5969a9239..b11ff81f0 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -211,7 +211,7 @@ func (s *Server) restorePeriodicDispatcher() error { continue } - if err := s.periodicDispatcher.ForceRun(job.ID); err != nil { + if _, err := s.periodicDispatcher.ForceRun(job.ID); err != nil { msg := fmt.Sprintf("force run of periodic job %q failed: %v", job.ID, err) s.logger.Printf("[ERR] nomad.periodic: %s", msg) return errors.New(msg) diff --git a/nomad/periodic.go b/nomad/periodic.go index 0609f1f79..3e20357cc 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -34,21 +34,21 @@ type PeriodicDispatch struct { // for them. type JobEvalDispatcher interface { // DispatchJob takes a job a new, untracked job and creates an evaluation - // for it. - DispatchJob(job *structs.Job) error + // for it and returns the eval. + DispatchJob(job *structs.Job) (*structs.Evaluation, error) // RunningChildren returns whether the passed job has any running children. RunningChildren(job *structs.Job) (bool, error) } // DispatchJob creates an evaluation for the passed job and commits both the -// evaluation and the job to the raft log. -func (s *Server) DispatchJob(job *structs.Job) error { +// evaluation and the job to the raft log. It returns the eval. +func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { // Commit this update via Raft req := structs.JobRegisterRequest{Job: job} _, index, err := s.raftApply(structs.JobRegisterRequestType, req) if err != nil { - return err + return nil, err } // Create a new evaluation @@ -68,12 +68,15 @@ func (s *Server) DispatchJob(job *structs.Job) error { // Commit this evaluation via Raft // XXX: There is a risk of partial failure where the JobRegister succeeds // but that the EvalUpdate does not. - _, _, err = s.raftApply(structs.EvalUpdateRequestType, update) + _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) if err != nil { - return err + return nil, err } - return nil + // Update its indexes. + eval.CreateIndex = evalIndex + eval.ModifyIndex = evalIndex + return eval, nil } // RunningChildren checks whether the passed job has any running children. @@ -262,18 +265,19 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error { return nil } -// ForceRun causes the periodic job to be evaluated immediately. -func (p *PeriodicDispatch) ForceRun(jobID string) error { +// ForceRun causes the periodic job to be evaluated immediately and returns the +// subsequent eval. +func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) { p.l.Lock() // Do nothing if not enabled if !p.enabled { - return fmt.Errorf("periodic dispatch disabled") + return nil, fmt.Errorf("periodic dispatch disabled") } job, tracked := p.tracked[jobID] if !tracked { - return fmt.Errorf("can't force run non-tracked job %v", jobID) + return nil, fmt.Errorf("can't force run non-tracked job %v", jobID) } p.l.Unlock() @@ -370,18 +374,19 @@ func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) { // createEval instantiates a job based on the passed periodic job and submits an // evaluation for it. This should not be called with the lock held. -func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error { +func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) { derived, err := p.deriveJob(periodicJob, time) if err != nil { - return err + return nil, err } - if err := p.dispatcher.DispatchJob(derived); err != nil { + eval, err := p.dispatcher.DispatchJob(derived) + if err != nil { p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err) - return err + return nil, err } - return nil + return eval, nil } // deriveJob instantiates a new job based on the passed periodic job and the diff --git a/nomad/periodic_endpoint.go b/nomad/periodic_endpoint.go new file mode 100644 index 000000000..f7aea0612 --- /dev/null +++ b/nomad/periodic_endpoint.go @@ -0,0 +1,55 @@ +package nomad + +import ( + "fmt" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Job endpoint is used for job interactions +type Periodic struct { + srv *Server +} + +// Evaluate is used to force a job for re-evaluation +func (p *Periodic) Force(args *structs.PeriodicForceRequest, reply *structs.PeriodicForceResponse) error { + if done, err := p.srv.forward("Periodic.Force", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "periodic", "force"}, time.Now()) + + // Validate the arguments + if args.JobID == "" { + return fmt.Errorf("missing job ID for evaluation") + } + + // Lookup the job + snap, err := p.srv.fsm.State().Snapshot() + if err != nil { + return err + } + job, err := snap.JobByID(args.JobID) + if err != nil { + return err + } + if job == nil { + return fmt.Errorf("job not found") + } + + if !job.IsPeriodic() { + return fmt.Errorf("can't force launch non-periodic job") + } + + // Force run the job. + eval, err := p.srv.periodicDispatcher.ForceRun(job.ID) + if err != nil { + return fmt.Errorf("force launch for job %q failed: %v", job.ID, err) + } + + reply.EvalID = eval.ID + reply.EvalCreateIndex = eval.CreateIndex + reply.Index = eval.CreateIndex + return nil +} diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go new file mode 100644 index 000000000..295070162 --- /dev/null +++ b/nomad/periodic_endpoint_test.go @@ -0,0 +1,83 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestPeriodicEndpoint_Force(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + state := s1.fsm.State() + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create and insert a periodic job. + job := mock.PeriodicJob() + job.Periodic.ProhibitOverlap = true // Shouldn't affect anything. + if err := state.UpsertJob(100, job); err != nil { + t.Fatalf("err: %v", err) + } + s1.periodicDispatcher.Add(job) + + // Force launch it. + req := &structs.PeriodicForceRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.PeriodicForceResponse + if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Lookup the evaluation + eval, err := state.EvalByID(resp.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != resp.EvalCreateIndex { + t.Fatalf("index mis-match") + } +} + +func TestPeriodicEndpoint_Force_NonPeriodic(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + state := s1.fsm.State() + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create and insert a non-periodic job. + job := mock.Job() + if err := state.UpsertJob(100, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Force launch it. + req := &structs.PeriodicForceRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.PeriodicForceResponse + if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err == nil { + t.Fatalf("Force on non-perodic job should err") + } +} diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 04acd54c8..10219d0ad 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -27,11 +27,11 @@ func NewMockJobEvalDispatcher() *MockJobEvalDispatcher { return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)} } -func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error { +func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { m.lock.Lock() defer m.lock.Unlock() m.Jobs[job.ID] = job - return nil + return nil, nil } func (m *MockJobEvalDispatcher) RunningChildren(parent *structs.Job) (bool, error) { @@ -266,7 +266,7 @@ func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() - if err := p.ForceRun("foo"); err == nil { + if _, err := p.ForceRun("foo"); err == nil { t.Fatal("ForceRun of untracked job should fail") } } @@ -284,7 +284,7 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { } // ForceRun the job - if err := p.ForceRun(job.ID); err != nil { + if _, err := p.ForceRun(job.ID); err != nil { t.Fatalf("ForceRun failed %v", err) } diff --git a/nomad/server.go b/nomad/server.go index 3a17c8224..b6c13826d 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -132,13 +132,14 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Status *Status - Node *Node - Job *Job - Eval *Eval - Plan *Plan - Alloc *Alloc - Region *Region + Status *Status + Node *Node + Job *Job + Eval *Eval + Plan *Plan + Alloc *Alloc + Region *Region + Periodic *Periodic } // NewServer is used to construct a new Nomad server from the @@ -362,6 +363,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Plan = &Plan{s} s.endpoints.Alloc = &Alloc{s} s.endpoints.Region = &Region{s} + s.endpoints.Periodic = &Periodic{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -371,6 +373,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Plan) s.rpcServer.Register(s.endpoints.Alloc) s.rpcServer.Register(s.endpoints.Region) + s.rpcServer.Register(s.endpoints.Periodic) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 68b7e6e8d..f9e802ee5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -123,7 +123,7 @@ type QueryMeta struct { KnownLeader bool } -// WriteMeta allows a write response to includ e potentially +// WriteMeta allows a write response to include potentially // useful metadata about the write type WriteMeta struct { // This is the index associated with the write @@ -272,6 +272,12 @@ type AllocSpecificRequest struct { QueryOptions } +// PeriodicForceReqeuest is used to force a specific periodic job. +type PeriodicForceRequest struct { + JobID string + WriteRequest +} + // GenericRequest is used to request where no // specific information is needed. type GenericRequest struct { @@ -415,6 +421,13 @@ type EvalAllocationsResponse struct { QueryMeta } +// PeriodicForceResponse is used to respond to a periodic job force launch +type PeriodicForceResponse struct { + EvalID string + EvalCreateIndex uint64 + WriteMeta +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" From 999db4cd0f6ee6a4c670e91a2b8c8a9c7ee79819 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jan 2016 10:47:22 -0800 Subject: [PATCH 2/5] Docs --- website/source/docs/http/periodic.html.md | 48 +++++++++++++++++++++++ website/source/docs/jobspec/index.html.md | 7 ++-- website/source/layouts/http.erb | 4 ++ 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 website/source/docs/http/periodic.html.md diff --git a/website/source/docs/http/periodic.html.md b/website/source/docs/http/periodic.html.md new file mode 100644 index 000000000..156df9331 --- /dev/null +++ b/website/source/docs/http/periodic.html.md @@ -0,0 +1,48 @@ +--- +layout: "http" +page_title: "HTTP API: /v1/periodic" +sidebar_current: "docs-http-periodic" +description: > + The '/v1/periodic' endpoint is used to interact with periodic jobs. +--- + +# /v1/periodic + +The `periodic` endpoint is used to interact with a single periodic job. By +default, the agent's local region is used; another region can be specified using +the `?region=` query parameter. + +## PUT / POST + +
+
Description
+
+ Forces a new instance of the periodic job. A new instance will be created + even if it violates the job's + [`prohibit_overlap`](/docs/jobspec/index.html#prohibit_overlap) settings. As + such, this should be only used to immediately run a periodic job. +
+ +
Method
+
PUT or POST
+ +
URL
+
`/v1/periodic//force`
+ +
Parameters
+
+ None +
+ +
Returns
+
+ + ```javascript + { + "EvalCreateIndex": 7, + "EvalID": "57983ddd-7fcf-3e3a-fd24-f699ccfb36f4" + } + ``` + +
+
diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index 0923d3255..139a00835 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -184,9 +184,10 @@ The `job` object supports the following keys: [here](https://github.com/gorhill/cronexpr#implementation) for full documentation of supported cron specs and the predefined expressions. - * `prohibit_overlap` - `prohibit_overlap` can be set to true to enforce that - the periodic job doesn't spawn a new instance of the job if any of the - previous jobs are still running. It is defaulted to false. + * `prohibit_overlap` - `prohibit_overlap` can + be set to true to enforce that the periodic job doesn't spawn a new + instance of the job if any of the previous jobs are still running. It is + defaulted to false. An example `periodic` block: diff --git a/website/source/layouts/http.erb b/website/source/layouts/http.erb index f22413041..a64893c17 100644 --- a/website/source/layouts/http.erb +++ b/website/source/layouts/http.erb @@ -97,6 +97,10 @@ Status + > + Periodic Jobs + + <% end %> From a7b986328a2930099ee40e533c5704ee236a1193 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 15 Jan 2016 11:44:11 -0800 Subject: [PATCH 3/5] Fix comments --- api/periodic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/periodic.go b/api/periodic.go index c4ec1e781..c9c05fc40 100644 --- a/api/periodic.go +++ b/api/periodic.go @@ -5,12 +5,12 @@ type Periodic struct { client *Client } -// Regions returns a handle on the allocs endpoints. +// Periodic returns a handle to access periodic job endpoints. func (c *Client) PeriodicJobs() *Periodic { return &Periodic{client: c} } -// List returns a list of all of the regions. +// Force spawns a new instance of the periodic job and returns the eval ID func (p *Periodic) Force(jobID string, q *WriteOptions) (string, *WriteMeta, error) { var resp periodicForceResponse wm, err := p.client.write("/v1/periodic/"+jobID+"/force", nil, &resp, q) From 5e900b94d003224b29748f948cd78c2481e11eb3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jan 2016 11:09:36 -0800 Subject: [PATCH 4/5] Move endpoint to be under job --- api/jobs.go | 15 ++++++ api/jobs_test.go | 54 ++++++++++++++++++++ api/periodic.go | 26 ---------- api/periodic_test.go | 61 ----------------------- command/agent/http.go | 2 - command/agent/job_endpoint.go | 22 ++++++++ command/agent/job_endpoint_test.go | 39 +++++++++++++++ command/agent/periodic_endpoint.go | 40 --------------- command/agent/periodic_endpoint_test.go | 49 ------------------ website/source/docs/http/job.html.md | 33 ++++++++++++ website/source/docs/http/periodic.html.md | 48 ------------------ website/source/layouts/http.erb | 4 -- 12 files changed, 163 insertions(+), 230 deletions(-) delete mode 100644 api/periodic.go delete mode 100644 api/periodic_test.go delete mode 100644 command/agent/periodic_endpoint.go delete mode 100644 command/agent/periodic_endpoint_test.go delete mode 100644 website/source/docs/http/periodic.html.md diff --git a/api/jobs.go b/api/jobs.go index e45e67990..a00c3a169 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -106,6 +106,21 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, return resp.EvalID, wm, nil } +// PeriodicForce spawns a new instance of the periodic job and returns the eval ID +func (j *Jobs) PeriodicForce(jobID string, q *WriteOptions) (string, *WriteMeta, error) { + var resp periodicForceResponse + wm, err := j.client.write("/v1/job/"+jobID+"/periodic/force", nil, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// periodicForceResponse is used to deserialize a force response +type periodicForceResponse struct { + EvalID string +} + // UpdateStrategy is for serializing update strategy for a job. type UpdateStrategy struct { Stagger time.Duration diff --git a/api/jobs_test.go b/api/jobs_test.go index 3ab49a185..cb998ac89 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -5,6 +5,8 @@ import ( "sort" "strings" "testing" + + "github.com/hashicorp/nomad/testutil" ) func TestJobs_Register(t *testing.T) { @@ -295,6 +297,58 @@ func TestJobs_ForceEvaluate(t *testing.T) { t.Fatalf("evaluation %q missing", evalID) } +func TestJobs_PeriodicForce(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Force-eval on a non-existent job fails + _, _, err := jobs.PeriodicForce("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Create a new job + job := testPeriodicJob() + _, _, err = jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + + testutil.WaitForResult(func() (bool, error) { + out, _, err := jobs.Info(job.ID, nil) + if err != nil || out == nil || out.ID != job.ID { + return false, err + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Try force again + evalID, wm, err := jobs.PeriodicForce(job.ID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + if evalID == "" { + t.Fatalf("empty evalID") + } + + // Retrieve the eval + evals := c.Evaluations() + eval, qm, err := evals.Info(evalID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + if eval.ID == evalID { + return + } + t.Fatalf("evaluation %q missing", evalID) +} + func TestJobs_NewBatchJob(t *testing.T) { job := NewBatchJob("job1", "myjob", "region1", 5) expect := &Job{ diff --git a/api/periodic.go b/api/periodic.go deleted file mode 100644 index c9c05fc40..000000000 --- a/api/periodic.go +++ /dev/null @@ -1,26 +0,0 @@ -package api - -// Periodic is used to access periodic job endpoints -type Periodic struct { - client *Client -} - -// Periodic returns a handle to access periodic job endpoints. -func (c *Client) PeriodicJobs() *Periodic { - return &Periodic{client: c} -} - -// Force spawns a new instance of the periodic job and returns the eval ID -func (p *Periodic) Force(jobID string, q *WriteOptions) (string, *WriteMeta, error) { - var resp periodicForceResponse - wm, err := p.client.write("/v1/periodic/"+jobID+"/force", nil, &resp, q) - if err != nil { - return "", nil, err - } - return resp.EvalID, wm, nil -} - -// periodicForceResponse is used to deserialize a force response -type periodicForceResponse struct { - EvalID string -} diff --git a/api/periodic_test.go b/api/periodic_test.go deleted file mode 100644 index f65d67719..000000000 --- a/api/periodic_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package api - -import ( - "strings" - "testing" - - "github.com/hashicorp/nomad/testutil" -) - -func TestPeriodicForce(t *testing.T) { - c, s := makeClient(t, nil, nil) - defer s.Stop() - jobs := c.Jobs() - periodic := c.PeriodicJobs() - - // Force-eval on a non-existent job fails - _, _, err := periodic.Force("job1", nil) - if err == nil || !strings.Contains(err.Error(), "not found") { - t.Fatalf("expected not found error, got: %#v", err) - } - - // Create a new job - job := testPeriodicJob() - _, _, err = jobs.Register(job, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - - testutil.WaitForResult(func() (bool, error) { - out, _, err := jobs.Info(job.ID, nil) - if err != nil || out == nil || out.ID != job.ID { - return false, err - } - return true, nil - }, func(err error) { - t.Fatalf("err: %s", err) - }) - - // Try force again - evalID, wm, err := periodic.Force(job.ID, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - assertWriteMeta(t, wm) - - if evalID == "" { - t.Fatalf("empty evalID") - } - - // Retrieve the eval - evals := c.Evaluations() - eval, qm, err := evals.Info(evalID, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - assertQueryMeta(t, qm) - if eval.ID == evalID { - return - } - t.Fatalf("evaluation %q missing", evalID) -} diff --git a/command/agent/http.go b/command/agent/http.go index e1ab90a22..4c46ee8b4 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -114,8 +114,6 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest)) s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) - s.mux.HandleFunc("/v1/periodic/", s.wrap(s.PeriodicSpecificReqeust)) - if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 67672d26b..d66954a3c 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -48,6 +48,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/evaluations"): jobName := strings.TrimSuffix(path, "/evaluations") return s.jobEvaluations(resp, req, jobName) + case strings.HasSuffix(path, "/periodic/force"): + jobName := strings.TrimSuffix(path, "/periodic/force") + return s.periodicForceRequest(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -71,6 +74,25 @@ func (s *HTTPServer) jobForceEvaluate(resp http.ResponseWriter, req *http.Reques return out, nil } +func (s *HTTPServer) periodicForceRequest(resp http.ResponseWriter, req *http.Request, + jobName string) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.PeriodicForceRequest{ + JobID: jobName, + } + s.parseRegion(req, &args.Region) + + var out structs.PeriodicForceResponse + if err := s.agent.RPC("Periodic.Force", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} + func (s *HTTPServer) jobAllocations(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) { if req.Method != "GET" { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index cdb1bc88f..62643ac51 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -443,3 +443,42 @@ func TestHTTP_JobAllocations(t *testing.T) { } }) } + +func TestHTTP_PeriodicForce(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create and register a periodic job. + job := mock.PeriodicJob() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/job/"+job.ID+"periodic/force", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + + // Check the response + r := obj.(structs.PeriodicForceResponse) + if r.EvalID == "" { + t.Fatalf("bad: %#v", r) + } + }) +} diff --git a/command/agent/periodic_endpoint.go b/command/agent/periodic_endpoint.go deleted file mode 100644 index eec622127..000000000 --- a/command/agent/periodic_endpoint.go +++ /dev/null @@ -1,40 +0,0 @@ -package agent - -import ( - "net/http" - "strings" - - "github.com/hashicorp/nomad/nomad/structs" -) - -func (s *HTTPServer) PeriodicSpecificReqeust(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - path := strings.TrimPrefix(req.URL.Path, "/v1/periodic/") - switch { - case strings.HasSuffix(path, "/force"): - jobName := strings.TrimSuffix(path, "/force") - return s.periodicForceRequest(resp, req, jobName) - default: - return nil, CodedError(405, ErrInvalidMethod) - - } - -} - -func (s *HTTPServer) periodicForceRequest(resp http.ResponseWriter, req *http.Request, - jobName string) (interface{}, error) { - if req.Method != "PUT" && req.Method != "POST" { - return nil, CodedError(405, ErrInvalidMethod) - } - - args := structs.PeriodicForceRequest{ - JobID: jobName, - } - s.parseRegion(req, &args.Region) - - var out structs.PeriodicForceResponse - if err := s.agent.RPC("Periodic.Force", &args, &out); err != nil { - return nil, err - } - setIndex(resp, out.Index) - return out, nil -} diff --git a/command/agent/periodic_endpoint_test.go b/command/agent/periodic_endpoint_test.go deleted file mode 100644 index 500034cdf..000000000 --- a/command/agent/periodic_endpoint_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package agent - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" -) - -func TestHTTP_PeriodicForce(t *testing.T) { - httpTest(t, nil, func(s *TestServer) { - // Create and register a periodic job. - job := mock.PeriodicJob() - args := structs.JobRegisterRequest{ - Job: job, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var resp structs.JobRegisterResponse - if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { - t.Fatalf("err: %v", err) - } - - // Make the HTTP request - req, err := http.NewRequest("POST", "/v1/periodic/"+job.ID+"/force", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() - - // Make the request - obj, err := s.Server.PeriodicSpecificReqeust(respW, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check for the index - if respW.HeaderMap.Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - - // Check the response - r := obj.(structs.PeriodicForceResponse) - if r.EvalID == "" { - t.Fatalf("bad: %#v", r) - } - }) -} diff --git a/website/source/docs/http/job.html.md b/website/source/docs/http/job.html.md index 05484aec9..867389bcc 100644 --- a/website/source/docs/http/job.html.md +++ b/website/source/docs/http/job.html.md @@ -313,6 +313,39 @@ region is used; another region can be specified using the `?region=` query param +
+
Description
+
+ Forces a new instance of the periodic job. A new instance will be created + even if it violates the job's + [`prohibit_overlap`](/docs/jobspec/index.html#prohibit_overlap) settings. As + such, this should be only used to immediately run a periodic job. +
+ +
Method
+
PUT or POST
+ +
URL
+
`/v1/job//periodic/force`
+ +
Parameters
+
+ None +
+ +
Returns
+
+ + ```javascript + { + "EvalCreateIndex": 7, + "EvalID": "57983ddd-7fcf-3e3a-fd24-f699ccfb36f4" + } + ``` + +
+
+ ## DELETE
diff --git a/website/source/docs/http/periodic.html.md b/website/source/docs/http/periodic.html.md deleted file mode 100644 index 156df9331..000000000 --- a/website/source/docs/http/periodic.html.md +++ /dev/null @@ -1,48 +0,0 @@ ---- -layout: "http" -page_title: "HTTP API: /v1/periodic" -sidebar_current: "docs-http-periodic" -description: > - The '/v1/periodic' endpoint is used to interact with periodic jobs. ---- - -# /v1/periodic - -The `periodic` endpoint is used to interact with a single periodic job. By -default, the agent's local region is used; another region can be specified using -the `?region=` query parameter. - -## PUT / POST - -
-
Description
-
- Forces a new instance of the periodic job. A new instance will be created - even if it violates the job's - [`prohibit_overlap`](/docs/jobspec/index.html#prohibit_overlap) settings. As - such, this should be only used to immediately run a periodic job. -
- -
Method
-
PUT or POST
- -
URL
-
`/v1/periodic//force`
- -
Parameters
-
- None -
- -
Returns
-
- - ```javascript - { - "EvalCreateIndex": 7, - "EvalID": "57983ddd-7fcf-3e3a-fd24-f699ccfb36f4" - } - ``` - -
-
diff --git a/website/source/layouts/http.erb b/website/source/layouts/http.erb index a64893c17..f22413041 100644 --- a/website/source/layouts/http.erb +++ b/website/source/layouts/http.erb @@ -97,10 +97,6 @@ Status - > - Periodic Jobs - - <% end %> From cffc04fa919cca63f8b5983a4f7c8f38ebe08524 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jan 2016 11:18:03 -0800 Subject: [PATCH 5/5] Missing forward slash --- command/agent/job_endpoint_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 62643ac51..4c71e7f8c 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -458,7 +458,7 @@ func TestHTTP_PeriodicForce(t *testing.T) { } // Make the HTTP request - req, err := http.NewRequest("POST", "/v1/job/"+job.ID+"periodic/force", nil) + req, err := http.NewRequest("POST", "/v1/job/"+job.ID+"/periodic/force", nil) if err != nil { t.Fatalf("err: %v", err) }