Retrieve job information for resources endpoint
requires further refactoring and logic for more contexts
This commit is contained in:
parent
255aab7139
commit
4dd6b46198
|
@ -145,6 +145,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.mux.HandleFunc("/v1/evaluations", s.wrap(s.EvalsRequest))
|
||||
s.mux.HandleFunc("/v1/evaluation/", s.wrap(s.EvalSpecificRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/resources/", s.wrap(s.ResourcesRequest))
|
||||
|
||||
s.mux.HandleFunc("/v1/deployments", s.wrap(s.DeploymentsRequest))
|
||||
s.mux.HandleFunc("/v1/deployment/", s.wrap(s.DeploymentSpecificRequest))
|
||||
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (s *HTTPServer) ResourcesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
return s.resourcesRequest(resp, req)
|
||||
default:
|
||||
return nil, CodedError(405, ErrInvalidMethod)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *HTTPServer) resourcesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// TODO test a failure case for this?
|
||||
args := structs.ResourcesRequest{}
|
||||
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.ResourcesResponse
|
||||
if err := s.agent.RPC("Resources.List", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &out.Resources, nil
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestHTTP_ResourcesWithIllegalMethod(t *testing.T) {
|
||||
t.Parallel()
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
req, err := http.NewRequest("DELETE", "/v1/resources", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
_, err = s.Server.ResourcesRequest(respW, req)
|
||||
assert.NotNil(t, err, "HTTP DELETE should not be accepted for this endpoint")
|
||||
})
|
||||
}
|
||||
|
||||
func createJobForTest(jobID string, s *TestAgent, t *testing.T) {
|
||||
job := mock.Job()
|
||||
job.ID = jobID
|
||||
job.TaskGroups[0].Count = 1
|
||||
args := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTP_ResourcesWithSingleJob(t *testing.T) {
|
||||
testJob := "aaaaaaaa-e8f7-fd38-c855-ab94ceb89706"
|
||||
t.Parallel()
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
createJobForTest(testJob, s, t)
|
||||
|
||||
endpoint := fmt.Sprintf("/v1/resources?context=job&prefix=%s", testJob)
|
||||
req, err := http.NewRequest("GET", endpoint, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
respW := httptest.NewRecorder()
|
||||
|
||||
resp, err := s.Server.ResourcesRequest(respW, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
res := resp.(*structs.ResourcesListStub)
|
||||
if len(res.Matches) != 1 {
|
||||
t.Fatalf("No expected key values in resources list")
|
||||
}
|
||||
|
||||
j := res.Matches["jobs"]
|
||||
if j == nil || len(j) != 1 {
|
||||
t.Fatalf("The number of jobs that were returned does not equal the number of jobs we expected (1)", j)
|
||||
}
|
||||
|
||||
// TODO verify that the job we are getting is the same that we created
|
||||
// assert.Equal(t, j[0], testJob)
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
//func TestHTTP_ResourcesWithNoJob(t *testing.T) {
|
||||
//}
|
|
@ -0,0 +1,55 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type Resources struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// List is used to list the jobs registered in the system
|
||||
// TODO logic to determine context, to return only that context if needed
|
||||
// TODO if no context, return all
|
||||
// TODO return top n matches
|
||||
// TODO refactor to prevent duplication
|
||||
func (r *Resources) List(args *structs.ResourcesRequest,
|
||||
reply *structs.ResourcesResponse) error {
|
||||
|
||||
resources := structs.ResourcesListStub{}
|
||||
resources.Matches = make(map[string][]string)
|
||||
|
||||
// Setup the blocking query
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
run: func(ws memdb.WatchSet, state *state.StateStore) error {
|
||||
|
||||
// return jobs matching given prefix
|
||||
var err error
|
||||
var iter memdb.ResultIterator
|
||||
iter, err = state.JobsByIDPrefix(ws, args.QueryOptions.Prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var jobs []string
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
job := raw.(*structs.Job)
|
||||
jobs = append(jobs, job.ID)
|
||||
}
|
||||
|
||||
resources.Matches["jobs"] = jobs
|
||||
reply.Resources = resources
|
||||
|
||||
return nil
|
||||
}}
|
||||
return r.srv.blockingRPC(&opts)
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"net/rpc"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func registerAndVerifyJob(codec rpc.ClientCodec, s *Server, t *testing.T) string {
|
||||
// Create the register request
|
||||
job := mock.Job()
|
||||
state := s.fsm.State()
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the job was created
|
||||
get := &structs.JobListRequest{
|
||||
QueryOptions: structs.QueryOptions{Region: "global"},
|
||||
}
|
||||
var resp2 structs.JobListResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp2.Index != 1000 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
|
||||
}
|
||||
|
||||
if len(resp2.Jobs) != 1 {
|
||||
t.Fatalf("bad: %#v", resp2.Jobs)
|
||||
}
|
||||
if resp2.Jobs[0].ID != job.ID {
|
||||
t.Fatalf("bad: %#v", resp2.Jobs[0])
|
||||
}
|
||||
|
||||
return job.ID
|
||||
}
|
||||
|
||||
func TestResourcesEndpoint_List(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
|
||||
defer s.Shutdown()
|
||||
codec := rpcClient(t, s)
|
||||
testutil.WaitForLeader(t, s.RPC)
|
||||
|
||||
jobID := registerAndVerifyJob(codec, s, t)
|
||||
|
||||
req := &structs.ResourcesRequest{
|
||||
QueryOptions: structs.QueryOptions{Region: "global", Prefix: jobID},
|
||||
}
|
||||
|
||||
var resp structs.ResourcesResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Resources.List", req, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
num_matches := len(resp.Resources.Matches["jobs"])
|
||||
if num_matches != 1 {
|
||||
t.Fatalf(fmt.Sprintf("err: the number of jobs expected %d does not match the number of jobs registered %d", 1, num_matches))
|
||||
}
|
||||
|
||||
assert.Equal(t, jobID, resp.Resources.Matches["jobs"][0])
|
||||
}
|
|
@ -174,6 +174,7 @@ type endpoints struct {
|
|||
Alloc *Alloc
|
||||
Deployment *Deployment
|
||||
Region *Region
|
||||
Resources *Resources
|
||||
Periodic *Periodic
|
||||
System *System
|
||||
Operator *Operator
|
||||
|
@ -725,6 +726,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
|
|||
s.endpoints.Region = &Region{s}
|
||||
s.endpoints.Status = &Status{s}
|
||||
s.endpoints.System = &System{s}
|
||||
s.endpoints.Resources = &Resources{s}
|
||||
|
||||
// Register the handlers
|
||||
s.rpcServer.Register(s.endpoints.Alloc)
|
||||
|
@ -738,6 +740,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
|
|||
s.rpcServer.Register(s.endpoints.Region)
|
||||
s.rpcServer.Register(s.endpoints.Status)
|
||||
s.rpcServer.Register(s.endpoints.System)
|
||||
s.rpcServer.Register(s.endpoints.Resources)
|
||||
|
||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
|
|
|
@ -231,6 +231,18 @@ type NodeSpecificRequest struct {
|
|||
QueryOptions
|
||||
}
|
||||
|
||||
type ResourcesResponse struct {
|
||||
Resources ResourcesListStub
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// TODO can there be a more generic Request object, if a specific one is not
|
||||
// needed?
|
||||
// ResourcesRequest is used to parameterize a resources request
|
||||
type ResourcesRequest struct {
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
// JobRegisterRequest is used for Job.Register endpoint
|
||||
// to register a job as being a schedulable entity.
|
||||
type JobRegisterRequest struct {
|
||||
|
@ -1871,6 +1883,12 @@ func (j *Job) SetSubmitTime() {
|
|||
j.SubmitTime = time.Now().UTC().UnixNano()
|
||||
}
|
||||
|
||||
// ResourcesListStub is used to return a subset of information
|
||||
// for jobs, allocations, evaluations, and nodes
|
||||
type ResourcesListStub struct {
|
||||
Matches map[string][]string
|
||||
}
|
||||
|
||||
// JobListStub is used to return a subset of job information
|
||||
// for the job list
|
||||
type JobListStub struct {
|
||||
|
|
Loading…
Reference in New Issue