diff --git a/api/agent.go b/api/agent.go new file mode 100644 index 000000000..ae2eb1a61 --- /dev/null +++ b/api/agent.go @@ -0,0 +1,157 @@ +package api + +import ( + "fmt" + "net/url" +) + +// Agent encapsulates an API client which talks to Nomad's +// agent endpoints for a specific node. +type Agent struct { + client *Client + + // Cache static agent info + nodeName string + datacenter string + region string +} + +// Agent returns a new agent which can be used to query +// the agent-specific endpoints. +func (c *Client) Agent() *Agent { + return &Agent{client: c} +} + +// Self is used to query the /v1/agent/self endpoint and +// returns information specific to the running agent. +func (a *Agent) Self() (map[string]map[string]interface{}, error) { + var out map[string]map[string]interface{} + + // Query the self endpoint on the agent + _, err := a.client.query("/v1/agent/self", &out, nil) + if err != nil { + return nil, fmt.Errorf("failed querying self endpoint: %s", err) + } + + // Populate the cache for faster queries + a.populateCache(out) + + return out, nil +} + +// populateCache is used to insert various pieces of static +// data into the agent handle. This is used during subsequent +// lookups for the same data later on to save the round trip. +func (a *Agent) populateCache(info map[string]map[string]interface{}) { + if a.nodeName == "" { + a.nodeName, _ = info["member"]["Name"].(string) + } + if tags, ok := info["member"]["Tags"].(map[string]interface{}); ok { + if a.datacenter == "" { + a.datacenter, _ = tags["dc"].(string) + } + if a.region == "" { + a.region, _ = tags["region"].(string) + } + } +} + +// NodeName is used to query the Nomad agent for its node name. +func (a *Agent) NodeName() (string, error) { + // Return from cache if we have it + if a.nodeName != "" { + return a.nodeName, nil + } + + // Query the node name + _, err := a.Self() + return a.nodeName, err +} + +// Datacenter is used to return the name of the datacenter which +// the agent is a member of. +func (a *Agent) Datacenter() (string, error) { + // Return from cache if we have it + if a.datacenter != "" { + return a.datacenter, nil + } + + // Query the agent for the DC + _, err := a.Self() + return a.datacenter, err +} + +// Region is used to look up the region the agent is in. +func (a *Agent) Region() (string, error) { + // Return from cache if we have it + if a.region != "" { + return a.region, nil + } + + // Query the agent for the region + _, err := a.Self() + return a.region, err +} + +// Join is used to instruct a server node to join another server +// via the gossip protocol. Multiple addresses may be specified. +// We attempt to join all of the hosts in the list. If one or +// more nodes have a successful result, no error is returned. +func (a *Agent) Join(addrs ...string) error { + // Accumulate the addresses + v := url.Values{} + for _, addr := range addrs { + v.Add("address", addr) + } + + // Send the join request + var resp joinResponse + _, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil) + if err != nil { + return fmt.Errorf("failed joining: %s", err) + } + if resp.Error != "" { + return fmt.Errorf("failed joining: %s", resp.Error) + } + return nil +} + +// Members is used to query all of the known server members +func (a *Agent) Members() ([]*AgentMember, error) { + var resp []*AgentMember + + // Query the known members + _, err := a.client.query("/v1/agent/members", &resp, nil) + if err != nil { + return nil, err + } + return resp, nil +} + +// ForceLeave is used to eject an existing node from the cluster. +func (a *Agent) ForceLeave(node string) error { + _, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil) + return err +} + +// joinResponse is used to decode the response we get while +// sending a member join request. +type joinResponse struct { + NumNodes int `json:"num_nodes"` + Error string `json:"error"` +} + +// AgentMember represents a cluster member known to the agent +type AgentMember struct { + Name string + Addr string + Port uint16 + Tags map[string]string + Status int + ProtocolMin uint8 + ProtocolMax uint8 + ProtocolCur uint8 + DelegateMin uint8 + DelegateMax uint8 + DelegateCur uint8 +} diff --git a/api/agent_test.go b/api/agent_test.go new file mode 100644 index 000000000..7eb891ea1 --- /dev/null +++ b/api/agent_test.go @@ -0,0 +1,111 @@ +package api + +import ( + "testing" +) + +func TestAgent_Self(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // Get a handle on the Agent endpoints + a := c.Agent() + + // Query the endpoint + res, err := a.Self() + if err != nil { + t.Fatalf("err: %s", err) + } + + // Check that we got a valid response + if name, ok := res["member"]["Name"]; !ok || name == "" { + t.Fatalf("bad member name in response: %#v", res) + } + + // Local cache was populated + if a.nodeName == "" || a.datacenter == "" || a.region == "" { + t.Fatalf("cache should be populated, got: %#v", a) + } +} + +func TestAgent_NodeName(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + // Query the agent for the node name + res, err := a.NodeName() + if err != nil { + t.Fatalf("err: %s", err) + } + if res == "" { + t.Fatalf("expected node name, got nothing") + } +} + +func TestAgent_Datacenter(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + // Query the agent for the datacenter + dc, err := a.Datacenter() + if err != nil { + t.Fatalf("err: %s", err) + } + if dc != "dc1" { + t.Fatalf("expected dc1, got: %q", dc) + } +} + +func TestAgent_Join(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + // Attempting to join a non-existent host returns error + if err := a.Join("nope"); err == nil { + t.Fatalf("expected error, got nothing") + } + + // TODO(ryanuber): This is pretty much a worthless test, + // since we are just joining ourselves. Once the agent + // respects config options, change this to actually make + // two nodes and join them. + if err := a.Join("127.0.0.1"); err != nil { + t.Fatalf("err: %s", err) + } +} + +func TestAgent_Members(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + // Query nomad for all the known members + mem, err := a.Members() + if err != nil { + t.Fatalf("err: %s", err) + } + + // Check that we got the expected result + if n := len(mem); n != 1 { + t.Fatalf("expected 1 member, got: %d", n) + } + if m := mem[0]; m.Name == "" || m.Addr == "" || m.Port == 0 { + t.Fatalf("bad member: %#v", m) + } +} + +func TestAgent_ForceLeave(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + // Force-leave on a non-existent node does not error + if err := a.ForceLeave("nope"); err != nil { + t.Fatalf("err: %s", err) + } + + // TODO: test force-leave on an existing node +} diff --git a/api/allocations.go b/api/allocations.go new file mode 100644 index 000000000..f247fed2a --- /dev/null +++ b/api/allocations.go @@ -0,0 +1,45 @@ +package api + +// Allocations is used to query the alloc-related endpoints. +type Allocations struct { + client *Client +} + +// Allocations returns a handle on the allocs endpoints. +func (c *Client) Allocations() *Allocations { + return &Allocations{client: c} +} + +// List returns a list of all of the allocations. +func (a *Allocations) List(q *QueryOptions) ([]*Allocation, *QueryMeta, error) { + var resp []*Allocation + qm, err := a.client.query("/v1/allocations", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + +// Info is used to retrieve a single allocation. +func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *QueryMeta, error) { + var resp Allocation + qm, err := a.client.query("/v1/allocation/"+allocID, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Allocation is used for serialization of allocations. +type Allocation struct { + ID string + EvalID string + Name string + NodeID string + JobID string + TaskGroup string + DesiredStatus string + DesiredDescription string + ClientStatus string + ClientDescription string +} diff --git a/api/allocations_test.go b/api/allocations_test.go new file mode 100644 index 000000000..be1860eee --- /dev/null +++ b/api/allocations_test.go @@ -0,0 +1,51 @@ +package api + +import ( + "testing" +) + +func TestAllocations_List(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Allocations() + + // Querying when no allocs exist returns nothing + allocs, qm, err := a.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(allocs); n != 0 { + t.Fatalf("expected 0 allocs, got: %d", n) + } + + // TODO: do something that causes an allocation to actually happen + // so we can query for them. + return + + job := &Job{ + ID: "job1", + Name: "Job #1", + Type: JobTypeService, + } + eval, _, err := c.Jobs().Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + + // List the allocations again + allocs, qm, err = a.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex == 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + + // Check that we got the allocation back + if len(allocs) == 0 || allocs[0].EvalID != eval { + t.Fatalf("bad: %#v", allocs) + } +} diff --git a/api/api.go b/api/api.go index 48e4bdf5b..e1bb8e520 100644 --- a/api/api.go +++ b/api/api.go @@ -195,13 +195,14 @@ func (r *request) toHTTP() (*http.Request, error) { // newRequest is used to create a new request func (c *Client) newRequest(method, path string) *request { base, _ := url.Parse(c.config.URL) + u, _ := url.Parse(path) r := &request{ config: &c.config, method: method, url: &url.URL{ Scheme: base.Scheme, Host: base.Host, - Path: path, + Path: u.Path, }, params: make(map[string][]string), } @@ -211,6 +212,14 @@ func (c *Client) newRequest(method, path string) *request { if c.config.WaitTime != 0 { r.params.Set("wait", durToMsec(r.config.WaitTime)) } + + // Add in the query parameters, if any + for key, values := range u.Query() { + for _, value := range values { + r.params.Add(key, value) + } + } + return r } diff --git a/api/api_test.go b/api/api_test.go index 466d6e011..e9114cf31 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -22,7 +22,7 @@ func makeClient(t *testing.T, cb1 configCallback, // Create server server := testutil.NewTestServer(t, cb2) - conf.URL = server.HTTPAddr + conf.URL = "http://" + server.HTTPAddr // Create client client, err := NewClient(conf) @@ -155,3 +155,22 @@ func TestParseWriteMeta(t *testing.T) { t.Fatalf("Bad: %v", wm) } } + +func TestQueryString(t *testing.T) { + // TODO t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + + r := c.newRequest("PUT", "/v1/abc?foo=bar&baz=zip") + q := &WriteOptions{Region: "foo"} + r.setWriteOptions(q) + + req, err := r.toHTTP() + if err != nil { + t.Fatalf("err: %s", err) + } + + if uri := req.URL.RequestURI(); uri != "/v1/abc?baz=zip&foo=bar®ion=foo" { + t.Fatalf("bad uri: %q", uri) + } +} diff --git a/api/compose_test.go b/api/compose_test.go new file mode 100644 index 000000000..b8f6d781a --- /dev/null +++ b/api/compose_test.go @@ -0,0 +1,121 @@ +package api + +import ( + "reflect" + "testing" +) + +func TestCompose(t *testing.T) { + // Compose a task + task := NewTask("task1", "exec"). + SetConfig("foo", "bar"). + SetMeta("foo", "bar"). + Constrain(HardConstraint("kernel.name", "=", "linux")). + Require(&Resources{ + CPU: 1.25, + MemoryMB: 1024, + DiskMB: 2048, + IOPS: 1024, + Networks: []*NetworkResource{ + &NetworkResource{ + CIDR: "0.0.0.0/0", + MBits: 100, + ReservedPorts: []int{80, 443}, + }, + }, + }) + + // Compose a task group + grp := NewTaskGroup("grp1", 2). + Constrain(HardConstraint("kernel.name", "=", "linux")). + SetMeta("foo", "bar"). + AddTask(task) + + // Compose a job + job := NewServiceJob("job1", "myjob", 2). + SetMeta("foo", "bar"). + AddDatacenter("dc1"). + Constrain(HardConstraint("kernel.name", "=", "linux")). + AddTaskGroup(grp) + + // Check that the composed result looks correct + expect := &Job{ + ID: "job1", + Name: "myjob", + Type: JobTypeService, + Priority: 2, + Datacenters: []string{ + "dc1", + }, + Meta: map[string]string{ + "foo": "bar", + }, + Constraints: []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "linux", + Operand: "=", + Weight: 0, + }, + }, + TaskGroups: []*TaskGroup{ + &TaskGroup{ + Name: "grp1", + Count: 2, + Constraints: []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "linux", + Operand: "=", + Weight: 0, + }, + }, + Tasks: []*Task{ + &Task{ + Name: "task1", + Driver: "exec", + Resources: &Resources{ + CPU: 1.25, + MemoryMB: 1024, + DiskMB: 2048, + IOPS: 1024, + Networks: []*NetworkResource{ + &NetworkResource{ + CIDR: "0.0.0.0/0", + MBits: 100, + ReservedPorts: []int{ + 80, + 443, + }, + }, + }, + }, + Constraints: []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "linux", + Operand: "=", + Weight: 0, + }, + }, + Config: map[string]string{ + "foo": "bar", + }, + Meta: map[string]string{ + "foo": "bar", + }, + }, + }, + Meta: map[string]string{ + "foo": "bar", + }, + }, + }, + } + if !reflect.DeepEqual(job, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, job) + } +} diff --git a/api/constraint.go b/api/constraint.go new file mode 100644 index 000000000..1a0f3233a --- /dev/null +++ b/api/constraint.go @@ -0,0 +1,33 @@ +package api + +// Constraint is used to serialize a job placement constraint. +type Constraint struct { + Hard bool + LTarget string + RTarget string + Operand string + Weight int +} + +// HardConstraint is used to create a new hard constraint. +func HardConstraint(left, operand, right string) *Constraint { + return constraint(left, operand, right, true, 0) +} + +// SoftConstraint is used to create a new soft constraint. It +// takes an additional weight parameter to allow balancing +// multiple soft constraints amongst eachother. +func SoftConstraint(left, operand, right string, weight int) *Constraint { + return constraint(left, operand, right, false, weight) +} + +// constraint generates a new job placement constraint. +func constraint(left, operand, right string, hard bool, weight int) *Constraint { + return &Constraint{ + Hard: hard, + LTarget: left, + RTarget: right, + Operand: operand, + Weight: weight, + } +} diff --git a/api/constraint_test.go b/api/constraint_test.go new file mode 100644 index 000000000..a8c7b9838 --- /dev/null +++ b/api/constraint_test.go @@ -0,0 +1,32 @@ +package api + +import ( + "reflect" + "testing" +) + +func TestCompose_Constraints(t *testing.T) { + c := HardConstraint("kernel.name", "=", "darwin") + expect := &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "darwin", + Operand: "=", + Weight: 0, + } + if !reflect.DeepEqual(c, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, c) + } + + c = SoftConstraint("memory.totalbytes", ">=", "250000000", 5) + expect = &Constraint{ + Hard: false, + LTarget: "memory.totalbytes", + RTarget: "250000000", + Operand: ">=", + Weight: 5, + } + if !reflect.DeepEqual(c, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, c) + } +} diff --git a/api/evaluations.go b/api/evaluations.go new file mode 100644 index 000000000..2956664b4 --- /dev/null +++ b/api/evaluations.go @@ -0,0 +1,63 @@ +package api + +import ( + "time" +) + +// Evaluations is used to query the evaluation endpoints. +type Evaluations struct { + client *Client +} + +// Evaluations returns a new handle on the evaluations. +func (c *Client) Evaluations() *Evaluations { + return &Evaluations{client: c} +} + +// List is used to dump all of the evaluations. +func (e *Evaluations) List(q *QueryOptions) ([]*Evaluation, *QueryMeta, error) { + var resp []*Evaluation + qm, err := e.client.query("/v1/evaluations", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + +// Info is used to query a single evaluation by its ID. +func (e *Evaluations) Info(evalID string, q *QueryOptions) (*Evaluation, *QueryMeta, error) { + var resp Evaluation + qm, err := e.client.query("/v1/evaluation/"+evalID, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Allocations is used to retrieve a set of allocations given +// an evaluation ID. +func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*Allocation, *QueryMeta, error) { + var resp []*Allocation + qm, err := e.client.query("/v1/evaluation/"+evalID+"/allocations", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + +// Evaluation is used to serialize an evaluation. +type Evaluation struct { + ID string + Priority int + Type string + TriggeredBy string + JobID string + JobModifyIndex uint64 + NodeID string + NodeModifyIndex uint64 + Status string + StatusDescription string + Wait time.Duration + NextEval string + PreviousEval string +} diff --git a/api/evaluations_test.go b/api/evaluations_test.go new file mode 100644 index 000000000..03bd1be53 --- /dev/null +++ b/api/evaluations_test.go @@ -0,0 +1,96 @@ +package api + +import ( + "strings" + "testing" +) + +func TestEvaluations_List(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + e := c.Evaluations() + + // Listing when nothing exists returns empty + result, qm, err := e.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(result); n != 0 { + t.Fatalf("expected 0 evaluations, got: %d", n) + } + + // Register a job. This will create an evaluation. + jobs := c.Jobs() + job := testJob() + evalID, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Check the evaluations again + result, qm, err = e.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check if we have the right list + if len(result) != 1 || result[0].ID != evalID { + t.Fatalf("bad: %#v", result) + } +} + +func TestEvaluations_Info(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + e := c.Evaluations() + + // Querying a non-existent evaluation returns error + _, _, err := e.Info("8E231CF4-CA48-43FF-B694-5801E69E22FA", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %s", err) + } + + // Register a job. Creates a new evaluation. + jobs := c.Jobs() + job := testJob() + evalID, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Try looking up by the new eval ID + result, qm, err := e.Info(evalID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that we got the right result + if result == nil || result.ID != evalID { + t.Fatalf("expected eval %q, got: %#v", evalID, result) + } +} + +func TestEvaluations_Allocations(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + e := c.Evaluations() + + // Returns empty if no allocations + allocs, qm, err := e.Allocations("8E231CF4-CA48-43FF-B694-5801E69E22FA", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(allocs); n != 0 { + t.Fatalf("expected 0 allocs, got: %d", n) + } +} diff --git a/api/jobs.go b/api/jobs.go new file mode 100644 index 000000000..6898b6278 --- /dev/null +++ b/api/jobs.go @@ -0,0 +1,169 @@ +package api + +const ( + // JobTypeService indicates a long-running processes + JobTypeService = "service" + + // JobTypeBatch indicates a short-lived process + JobTypeBatch = "batch" +) + +// Jobs is used to access the job-specific endpoints. +type Jobs struct { + client *Client +} + +// Jobs returns a handle on the jobs endpoints. +func (c *Client) Jobs() *Jobs { + return &Jobs{client: c} +} + +// Register is used to register a new job. It returns the ID +// of the evaluation, along with any errors encountered. +func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) { + var resp registerJobResponse + + req := ®isterJobRequest{job} + wm, err := j.client.write("/v1/jobs", req, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// List is used to list all of the existing jobs. +func (j *Jobs) List(q *QueryOptions) ([]*Job, *QueryMeta, error) { + var resp []*Job + qm, err := j.client.query("/v1/jobs", &resp, q) + if err != nil { + return nil, qm, err + } + return resp, qm, nil +} + +// Info is used to retrieve information about a particular +// job given its unique ID. +func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { + var resp Job + qm, err := j.client.query("/v1/job/"+jobID, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// Allocations is used to return the allocs for a given job ID. +func (j *Jobs) Allocations(jobID string, q *QueryOptions) ([]*Allocation, *QueryMeta, error) { + var resp []*Allocation + qm, err := j.client.query("/v1/job/"+jobID+"/allocations", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + +// Evaluations is used to query the evaluations associated with +// the given job ID. +func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *QueryMeta, error) { + var resp []*Evaluation + qm, err := j.client.query("/v1/job/"+jobID+"/evaluations", &resp, q) + if err != nil { + return nil, nil, err + } + return resp, qm, nil +} + +// Delete is used to remove an existing job. +func (j *Jobs) Delete(jobID string, q *WriteOptions) (*WriteMeta, error) { + wm, err := j.client.delete("/v1/job/"+jobID, nil, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// ForceEvaluate is used to force-evaluate an existing job. +func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) { + var resp registerJobResponse + wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// Job is used to serialize a job. +type Job struct { + ID string + Name string + Type string + Priority int + AllAtOnce bool + Datacenters []string + Constraints []*Constraint + TaskGroups []*TaskGroup + Meta map[string]string + Status string + StatusDescription string +} + +// NewServiceJob creates and returns a new service-style job +// for long-lived processes using the provided name, ID, and +// relative job priority. +func NewServiceJob(id, name string, pri int) *Job { + return newJob(id, name, JobTypeService, pri) +} + +// NewBatchJob creates and returns a new batch-style job for +// short-lived processes using the provided name and ID along +// with the relative job priority. +func NewBatchJob(id, name string, pri int) *Job { + return newJob(id, name, JobTypeBatch, pri) +} + +// newJob is used to create a new Job struct. +func newJob(jobID, jobName, jobType string, pri int) *Job { + return &Job{ + ID: jobID, + Name: jobName, + Type: jobType, + Priority: pri, + } +} + +// SetMeta is used to set arbitrary k/v pairs of metadata on a job. +func (j *Job) SetMeta(key, val string) *Job { + if j.Meta == nil { + j.Meta = make(map[string]string) + } + j.Meta[key] = val + return j +} + +// AddDatacenter is used to add a datacenter to a job. +func (j *Job) AddDatacenter(dc string) *Job { + j.Datacenters = append(j.Datacenters, dc) + return j +} + +// Constrain is used to add a constraint to a job. +func (j *Job) Constrain(c *Constraint) *Job { + j.Constraints = append(j.Constraints, c) + return j +} + +// AddTaskGroup adds a task group to an existing job. +func (j *Job) AddTaskGroup(grp *TaskGroup) *Job { + j.TaskGroups = append(j.TaskGroups, grp) + return j +} + +// registerJobRequest is used to serialize a job registration +type registerJobRequest struct { + Job *Job +} + +// registerJobResponse is used to deserialize a job response +type registerJobResponse struct { + EvalID string +} diff --git a/api/jobs_test.go b/api/jobs_test.go new file mode 100644 index 000000000..8b9d1366c --- /dev/null +++ b/api/jobs_test.go @@ -0,0 +1,304 @@ +package api + +import ( + "reflect" + "strings" + "testing" +) + +func TestJobs_Register(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, qm, err := jobs.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(resp); n != 0 { + t.Fatalf("expected 0 jobs, got: %d", n) + } + + // Create a job and attempt to register it + job := testJob() + eval, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if eval == "" { + t.Fatalf("missing eval id") + } + assertWriteMeta(t, wm) + + // Query the jobs back out again + resp, qm, err = jobs.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that we got the expected response + expect := []*Job{job} + if !reflect.DeepEqual(resp, expect) { + t.Fatalf("bad: %#v", resp[0]) + } +} + +func TestJobs_Info(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Trying to retrieve a job by ID before it exists + // returns an error + _, _, err := jobs.Info("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Register the job + job := testJob() + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Query the job again and ensure it exists + result, qm, err := jobs.Info("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that the result is what we expect + if !reflect.DeepEqual(result, job) { + t.Fatalf("expect: %#v, got: %#v", job, result) + } +} + +func TestJobs_Allocations(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Looking up by a non-existent job returns nothing + allocs, qm, err := jobs.Allocations("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(allocs); n != 0 { + t.Fatalf("expected 0 allocs, got: %d", n) + } + + // TODO: do something here to create some allocations for + // an existing job, lookup again. +} + +func TestJobs_Evaluations(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Looking up by a non-existent job ID returns nothing + evals, qm, err := jobs.Evaluations("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(evals); n != 0 { + t.Fatalf("expected 0 evals, got: %d", n) + } + + // Insert a job. This also creates an evaluation so we should + // be able to query that out after. + job := testJob() + evalID, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Look up the evaluations again. + evals, qm, err = jobs.Evaluations("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that we got the evals back + if n := len(evals); n == 0 || evals[0].ID != evalID { + t.Fatalf("expected 1 eval (%s), got: %#v", evalID, evals) + } +} + +func TestJobs_Delete(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Register a new job + job := testJob() + _, wm, err := jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Attempting delete on non-existing job does not error + wm2, err := jobs.Delete("nope", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm2) + + // Deleting an existing job works + wm3, err := jobs.Delete("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm3) + + // Check that the job is really gone + result, qm, err := jobs.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + if n := len(result); n != 0 { + t.Fatalf("expected 0 jobs, got: %d", n) + } +} + +func TestJobs_ForceEvaluate(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Force-eval on a non-existent job fails + _, _, err := jobs.ForceEvaluate("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Create a new job + _, wm, err := jobs.Register(testJob(), nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Try force-eval again + evalID, wm, err := jobs.ForceEvaluate("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + // Retrieve the evals and see if we get a matching one + evals, qm, err := jobs.Evaluations("job1", nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + for _, eval := range evals { + if eval.ID == evalID { + return + } + } + t.Fatalf("evaluation %q missing", evalID) +} + +func TestJobs_NewBatchJob(t *testing.T) { + job := NewBatchJob("job1", "myjob", 5) + expect := &Job{ + ID: "job1", + Name: "myjob", + Type: JobTypeBatch, + Priority: 5, + } + if !reflect.DeepEqual(job, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, job) + } +} + +func TestJobs_NewServiceJob(t *testing.T) { + job := NewServiceJob("job1", "myjob", 5) + expect := &Job{ + ID: "job1", + Name: "myjob", + Type: JobTypeService, + Priority: 5, + } + if !reflect.DeepEqual(job, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, job) + } +} + +func TestJobs_SetMeta(t *testing.T) { + job := &Job{Meta: nil} + + // Initializes a nil map + out := job.SetMeta("foo", "bar") + if job.Meta == nil { + t.Fatalf("should initialize metadata") + } + + // Check that the job was returned + if job != out { + t.Fatalf("expect: %#v, got: %#v", job, out) + } + + // Setting another pair is additive + job.SetMeta("baz", "zip") + expect := map[string]string{"foo": "bar", "baz": "zip"} + if !reflect.DeepEqual(job.Meta, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, job.Meta) + } +} + +func TestJobs_Constrain(t *testing.T) { + job := &Job{Constraints: nil} + + // Create and add a constraint + out := job.Constrain(HardConstraint("kernel.name", "=", "darwin")) + if n := len(job.Constraints); n != 1 { + t.Fatalf("expected 1 constraint, got: %d", n) + } + + // Check that the job was returned + if job != out { + t.Fatalf("expect: %#v, got: %#v", job, out) + } + + // Adding another constraint preserves the original + job.Constrain(SoftConstraint("memory.totalbytes", ">=", "128000000", 2)) + expect := []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "darwin", + Operand: "=", + Weight: 0, + }, + &Constraint{ + Hard: false, + LTarget: "memory.totalbytes", + RTarget: "128000000", + Operand: ">=", + Weight: 2, + }, + } + if !reflect.DeepEqual(job.Constraints, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, job.Constraints) + } +} diff --git a/api/resources.go b/api/resources.go new file mode 100644 index 000000000..64739df5a --- /dev/null +++ b/api/resources.go @@ -0,0 +1,20 @@ +package api + +// Resources encapsulates the required resources of +// a given task or task group. +type Resources struct { + CPU float64 + MemoryMB int + DiskMB int + IOPS int + Networks []*NetworkResource +} + +// NetworkResource is used to describe required network +// resources of a given task. +type NetworkResource struct { + Public bool + CIDR string + ReservedPorts []int + MBits int +} diff --git a/api/tasks.go b/api/tasks.go new file mode 100644 index 000000000..23e9ca638 --- /dev/null +++ b/api/tasks.go @@ -0,0 +1,88 @@ +package api + +// TaskGroup is the unit of scheduling. +type TaskGroup struct { + Name string + Count int + Constraints []*Constraint + Tasks []*Task + Meta map[string]string +} + +// NewTaskGroup creates a new TaskGroup. +func NewTaskGroup(name string, count int) *TaskGroup { + return &TaskGroup{ + Name: name, + Count: count, + } +} + +// Constrain is used to add a constraint to a task group. +func (g *TaskGroup) Constrain(c *Constraint) *TaskGroup { + g.Constraints = append(g.Constraints, c) + return g +} + +// AddMeta is used to add a meta k/v pair to a task group +func (g *TaskGroup) SetMeta(key, val string) *TaskGroup { + if g.Meta == nil { + g.Meta = make(map[string]string) + } + g.Meta[key] = val + return g +} + +// AddTask is used to add a new task to a task group. +func (g *TaskGroup) AddTask(t *Task) *TaskGroup { + g.Tasks = append(g.Tasks, t) + return g +} + +// Task is a single process in a task group. +type Task struct { + Name string + Driver string + Config map[string]string + Constraints []*Constraint + Resources *Resources + Meta map[string]string +} + +// NewTask creates and initializes a new Task. +func NewTask(name, driver string) *Task { + return &Task{ + Name: name, + Driver: driver, + } +} + +// Configure is used to configure a single k/v pair on +// the task. +func (t *Task) SetConfig(key, val string) *Task { + if t.Config == nil { + t.Config = make(map[string]string) + } + t.Config[key] = val + return t +} + +// SetMeta is used to add metadata k/v pairs to the task. +func (t *Task) SetMeta(key, val string) *Task { + if t.Meta == nil { + t.Meta = make(map[string]string) + } + t.Meta[key] = val + return t +} + +// Require is used to add resource requirements to a task. +func (t *Task) Require(r *Resources) *Task { + t.Resources = r + return t +} + +// Constraint adds a new constraints to a single task. +func (t *Task) Constrain(c *Constraint) *Task { + t.Constraints = append(t.Constraints, c) + return t +} diff --git a/api/tasks_test.go b/api/tasks_test.go new file mode 100644 index 000000000..f5442ee98 --- /dev/null +++ b/api/tasks_test.go @@ -0,0 +1,227 @@ +package api + +import ( + "reflect" + "testing" +) + +func TestTaskGroup_NewTaskGroup(t *testing.T) { + grp := NewTaskGroup("grp1", 2) + expect := &TaskGroup{ + Name: "grp1", + Count: 2, + } + if !reflect.DeepEqual(grp, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, grp) + } +} + +func TestTaskGroup_Constrain(t *testing.T) { + grp := NewTaskGroup("grp1", 1) + + // Add a constraint to the group + out := grp.Constrain(HardConstraint("kernel.name", "=", "darwin")) + if n := len(grp.Constraints); n != 1 { + t.Fatalf("expected 1 constraint, got: %d", n) + } + + // Check that the group was returned + if out != grp { + t.Fatalf("expected: %#v, got: %#v", grp, out) + } + + // Add a second constraint + grp.Constrain(SoftConstraint("memory.totalbytes", ">=", "128000000", 2)) + expect := []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "darwin", + Operand: "=", + Weight: 0, + }, + &Constraint{ + Hard: false, + LTarget: "memory.totalbytes", + RTarget: "128000000", + Operand: ">=", + Weight: 2, + }, + } + if !reflect.DeepEqual(grp.Constraints, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, grp.Constraints) + } +} + +func TestTaskGroup_SetMeta(t *testing.T) { + grp := NewTaskGroup("grp1", 1) + + // Initializes an empty map + out := grp.SetMeta("foo", "bar") + if grp.Meta == nil { + t.Fatalf("should be initialized") + } + + // Check that we returned the group + if out != grp { + t.Fatalf("expect: %#v, got: %#v", grp, out) + } + + // Add a second meta k/v + grp.SetMeta("baz", "zip") + expect := map[string]string{"foo": "bar", "baz": "zip"} + if !reflect.DeepEqual(grp.Meta, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, grp.Meta) + } +} + +func TestTaskGroup_AddTask(t *testing.T) { + grp := NewTaskGroup("grp1", 1) + + // Add the task to the task group + out := grp.AddTask(NewTask("task1", "java")) + if n := len(grp.Tasks); n != 1 { + t.Fatalf("expected 1 task, got: %d", n) + } + + // Check that we returned the group + if out != grp { + t.Fatalf("expect: %#v, got: %#v", grp, out) + } + + // Add a second task + grp.AddTask(NewTask("task2", "exec")) + expect := []*Task{ + &Task{ + Name: "task1", + Driver: "java", + }, + &Task{ + Name: "task2", + Driver: "exec", + }, + } + if !reflect.DeepEqual(grp.Tasks, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, grp.Tasks) + } +} + +func TestTask_NewTask(t *testing.T) { + task := NewTask("task1", "exec") + expect := &Task{ + Name: "task1", + Driver: "exec", + } + if !reflect.DeepEqual(task, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, task) + } +} + +func TestTask_SetConfig(t *testing.T) { + task := NewTask("task1", "exec") + + // Initializes an empty map + out := task.SetConfig("foo", "bar") + if task.Config == nil { + t.Fatalf("should be initialized") + } + + // Check that we returned the task + if out != task { + t.Fatalf("expect: %#v, got: %#v", task, out) + } + + // Set another config value + task.SetConfig("baz", "zip") + expect := map[string]string{"foo": "bar", "baz": "zip"} + if !reflect.DeepEqual(task.Config, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, task.Config) + } +} + +func TestTask_SetMeta(t *testing.T) { + task := NewTask("task1", "exec") + + // Initializes an empty map + out := task.SetMeta("foo", "bar") + if task.Meta == nil { + t.Fatalf("should be initialized") + } + + // Check that we returned the task + if out != task { + t.Fatalf("expect: %#v, got: %#v", task, out) + } + + // Set another meta k/v + task.SetMeta("baz", "zip") + expect := map[string]string{"foo": "bar", "baz": "zip"} + if !reflect.DeepEqual(task.Meta, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, task.Meta) + } +} + +func TestTask_Require(t *testing.T) { + task := NewTask("task1", "exec") + + // Create some require resources + resources := &Resources{ + CPU: 1.25, + MemoryMB: 128, + DiskMB: 2048, + IOPS: 1024, + Networks: []*NetworkResource{ + &NetworkResource{ + CIDR: "0.0.0.0/0", + MBits: 100, + ReservedPorts: []int{80, 443}, + }, + }, + } + out := task.Require(resources) + if !reflect.DeepEqual(task.Resources, resources) { + t.Fatalf("expect: %#v, got: %#v", resources, task.Resources) + } + + // Check that we returned the task + if out != task { + t.Fatalf("expect: %#v, got: %#v", task, out) + } +} + +func TestTask_Constrain(t *testing.T) { + task := NewTask("task1", "exec") + + // Add a constraint to the task + out := task.Constrain(HardConstraint("kernel.name", "=", "darwin")) + if n := len(task.Constraints); n != 1 { + t.Fatalf("expected 1 constraint, got: %d", n) + } + + // Check that the task was returned + if out != task { + t.Fatalf("expected: %#v, got: %#v", task, out) + } + + // Add a second constraint + task.Constrain(SoftConstraint("memory.totalbytes", ">=", "128000000", 2)) + expect := []*Constraint{ + &Constraint{ + Hard: true, + LTarget: "kernel.name", + RTarget: "darwin", + Operand: "=", + Weight: 0, + }, + &Constraint{ + Hard: false, + LTarget: "memory.totalbytes", + RTarget: "128000000", + Operand: ">=", + Weight: 2, + }, + } + if !reflect.DeepEqual(task.Constraints, expect) { + t.Fatalf("expect: %#v, got: %#v", expect, task.Constraints) + } +} diff --git a/api/util_test.go b/api/util_test.go new file mode 100644 index 000000000..87066444b --- /dev/null +++ b/api/util_test.go @@ -0,0 +1,35 @@ +package api + +import ( + "testing" +) + +func assertQueryMeta(t *testing.T, qm *QueryMeta) { + if qm.LastIndex == 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if qm.RequestTime == 0 { + t.Fatalf("bad request time: %d", qm.RequestTime) + } + if !qm.KnownLeader { + t.Fatalf("expected known leader, got none") + } +} + +func assertWriteMeta(t *testing.T, wm *WriteMeta) { + if wm.LastIndex == 0 { + t.Fatalf("bad index: %d", wm.LastIndex) + } + if wm.RequestTime == 0 { + t.Fatalf("bad request time: %d", wm.RequestTime) + } +} + +func testJob() *Job { + return &Job{ + ID: "job1", + Name: "redis", + Type: JobTypeService, + Priority: 1, + } +} diff --git a/scripts/build.sh b/scripts/build.sh index 109b792de..015187610 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -46,7 +46,7 @@ go get \ # Build! echo "--> Building..." go build \ - -ldflags "${CGO_LDFLAGS} -X main.GitCommit ${GIT_COMMIT}${GIT_DIRTY} -X main.GitDescribe ${GIT_DESCRIBE}" \ + -ldflags "${CGO_LDFLAGS} -X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.GitDescribe=${GIT_DESCRIBE}" \ -v \ -o bin/nomad${EXTENSION} cp bin/nomad${EXTENSION} ${GOPATHSINGLE}/bin