commit
11e05e7cc0
|
@ -0,0 +1,368 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// QueryOptions are used to parameterize a query
|
||||
type QueryOptions struct {
|
||||
// Providing a datacenter overwrites the region provided
|
||||
// by the Config
|
||||
Region string
|
||||
|
||||
// AllowStale allows any Nomad server (non-leader) to service
|
||||
// a read. This allows for lower latency and higher throughput
|
||||
AllowStale bool
|
||||
|
||||
// WaitIndex is used to enable a blocking query. Waits
|
||||
// until the timeout or the next index is reached
|
||||
WaitIndex uint64
|
||||
|
||||
// WaitTime is used to bound the duration of a wait.
|
||||
// Defaults to that of the Config, but can be overriden.
|
||||
WaitTime time.Duration
|
||||
}
|
||||
|
||||
// WriteOptions are used to parameterize a write
|
||||
type WriteOptions struct {
|
||||
// Providing a datacenter overwrites the region provided
|
||||
// by the Config
|
||||
Region string
|
||||
}
|
||||
|
||||
// QueryMeta is used to return meta data about a query
|
||||
type QueryMeta struct {
|
||||
// LastIndex. This can be used as a WaitIndex to perform
|
||||
// a blocking query
|
||||
LastIndex uint64
|
||||
|
||||
// Time of last contact from the leader for the
|
||||
// server servicing the request
|
||||
LastContact time.Duration
|
||||
|
||||
// Is there a known leader
|
||||
KnownLeader bool
|
||||
|
||||
// How long did the request take
|
||||
RequestTime time.Duration
|
||||
}
|
||||
|
||||
// WriteMeta is used to return meta data about a write
|
||||
type WriteMeta struct {
|
||||
// LastIndex. This can be used as a WaitIndex to perform
|
||||
// a blocking query
|
||||
LastIndex uint64
|
||||
|
||||
// How long did the request take
|
||||
RequestTime time.Duration
|
||||
}
|
||||
|
||||
// Config is used to configure the creation of a client
|
||||
type Config struct {
|
||||
// URL is the address of the Nomad agent
|
||||
URL string
|
||||
|
||||
// Region to use. If not provided, the default agent region is used.
|
||||
Region string
|
||||
|
||||
// HttpClient is the client to use. Default will be
|
||||
// used if not provided.
|
||||
HttpClient *http.Client
|
||||
|
||||
// WaitTime limits how long a Watch will block. If not provided,
|
||||
// the agent default values will be used.
|
||||
WaitTime time.Duration
|
||||
}
|
||||
|
||||
// DefaultConfig returns a default configuration for the client
|
||||
func DefaultConfig() *Config {
|
||||
config := &Config{
|
||||
URL: "http://127.0.0.1:4646",
|
||||
HttpClient: http.DefaultClient,
|
||||
}
|
||||
if url := os.Getenv("NOMAD_HTTP_URL"); url != "" {
|
||||
config.URL = url
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// Client provides a client to the Nomad API
|
||||
type Client struct {
|
||||
config Config
|
||||
}
|
||||
|
||||
// NewClient returns a new client
|
||||
func NewClient(config *Config) (*Client, error) {
|
||||
// bootstrap the config
|
||||
defConfig := DefaultConfig()
|
||||
|
||||
if config.URL == "" {
|
||||
config.URL = defConfig.URL
|
||||
} else if _, err := url.Parse(config.URL); err != nil {
|
||||
return nil, fmt.Errorf("invalid url '%s': %v", config.URL, err)
|
||||
}
|
||||
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = defConfig.HttpClient
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
config: *config,
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// request is used to help build up a request
|
||||
type request struct {
|
||||
config *Config
|
||||
method string
|
||||
url *url.URL
|
||||
params url.Values
|
||||
body io.Reader
|
||||
obj interface{}
|
||||
}
|
||||
|
||||
// setQueryOptions is used to annotate the request with
|
||||
// additional query options
|
||||
func (r *request) setQueryOptions(q *QueryOptions) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
if q.Region != "" {
|
||||
r.params.Set("region", q.Region)
|
||||
}
|
||||
if q.AllowStale {
|
||||
r.params.Set("stale", "")
|
||||
}
|
||||
if q.WaitIndex != 0 {
|
||||
r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
|
||||
}
|
||||
if q.WaitTime != 0 {
|
||||
r.params.Set("wait", durToMsec(q.WaitTime))
|
||||
}
|
||||
}
|
||||
|
||||
// durToMsec converts a duration to a millisecond specified string
|
||||
func durToMsec(dur time.Duration) string {
|
||||
return fmt.Sprintf("%dms", dur/time.Millisecond)
|
||||
}
|
||||
|
||||
// setWriteOptions is used to annotate the request with
|
||||
// additional write options
|
||||
func (r *request) setWriteOptions(q *WriteOptions) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
if q.Region != "" {
|
||||
r.params.Set("region", q.Region)
|
||||
}
|
||||
}
|
||||
|
||||
// toHTTP converts the request to an HTTP request
|
||||
func (r *request) toHTTP() (*http.Request, error) {
|
||||
// Encode the query parameters
|
||||
r.url.RawQuery = r.params.Encode()
|
||||
|
||||
// Check if we should encode the body
|
||||
if r.body == nil && r.obj != nil {
|
||||
if b, err := encodeBody(r.obj); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
r.body = b
|
||||
}
|
||||
}
|
||||
|
||||
// Create the HTTP request
|
||||
req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.URL.Host = r.url.Host
|
||||
req.URL.Scheme = r.url.Scheme
|
||||
req.Host = r.url.Host
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// newRequest is used to create a new request
|
||||
func (c *Client) newRequest(method, path string) *request {
|
||||
base, _ := url.Parse(c.config.URL)
|
||||
r := &request{
|
||||
config: &c.config,
|
||||
method: method,
|
||||
url: &url.URL{
|
||||
Scheme: base.Scheme,
|
||||
Host: base.Host,
|
||||
Path: path,
|
||||
},
|
||||
params: make(map[string][]string),
|
||||
}
|
||||
if c.config.Region != "" {
|
||||
r.params.Set("region", c.config.Region)
|
||||
}
|
||||
if c.config.WaitTime != 0 {
|
||||
r.params.Set("wait", durToMsec(r.config.WaitTime))
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// doRequest runs a request with our client
|
||||
func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
|
||||
req, err := r.toHTTP()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
start := time.Now()
|
||||
resp, err := c.config.HttpClient.Do(req)
|
||||
diff := time.Now().Sub(start)
|
||||
return diff, resp, err
|
||||
}
|
||||
|
||||
// Query is used to do a GET request against an endpoint
|
||||
// and deserialize the response into an interface using
|
||||
// standard Nomad conventions.
|
||||
func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
|
||||
r := c.newRequest("GET", endpoint)
|
||||
r.setQueryOptions(q)
|
||||
rtt, resp, err := requireOK(c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
if err := decodeBody(resp, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return qm, nil
|
||||
}
|
||||
|
||||
// write is used to do a PUT request against an endpoint
|
||||
// and serialize/deserialized using the standard Nomad conventions.
|
||||
func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
|
||||
r := c.newRequest("PUT", endpoint)
|
||||
r.setWriteOptions(q)
|
||||
r.obj = in
|
||||
rtt, resp, err := requireOK(c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{RequestTime: rtt}
|
||||
parseWriteMeta(resp, wm)
|
||||
|
||||
if out != nil {
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return wm, nil
|
||||
}
|
||||
|
||||
// write is used to do a PUT request against an endpoint
|
||||
// and serialize/deserialized using the standard Nomad conventions.
|
||||
func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
|
||||
r := c.newRequest("DELETE", endpoint)
|
||||
r.setWriteOptions(q)
|
||||
rtt, resp, err := requireOK(c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
wm := &WriteMeta{RequestTime: rtt}
|
||||
parseWriteMeta(resp, wm)
|
||||
|
||||
if out != nil {
|
||||
if err := decodeBody(resp, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return wm, nil
|
||||
}
|
||||
|
||||
// parseQueryMeta is used to help parse query meta-data
|
||||
func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
|
||||
header := resp.Header
|
||||
|
||||
// Parse the X-Nomad-Index
|
||||
index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err)
|
||||
}
|
||||
q.LastIndex = index
|
||||
|
||||
// Parse the X-Nomad-LastContact
|
||||
last, err := strconv.ParseUint(header.Get("X-Nomad-LastContact"), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse X-Nomad-LastContact: %v", err)
|
||||
}
|
||||
q.LastContact = time.Duration(last) * time.Millisecond
|
||||
|
||||
// Parse the X-Nomad-KnownLeader
|
||||
switch header.Get("X-Nomad-KnownLeader") {
|
||||
case "true":
|
||||
q.KnownLeader = true
|
||||
default:
|
||||
q.KnownLeader = false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseWriteMeta is used to help parse write meta-data
|
||||
func parseWriteMeta(resp *http.Response, q *WriteMeta) error {
|
||||
header := resp.Header
|
||||
|
||||
// Parse the X-Nomad-Index
|
||||
index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err)
|
||||
}
|
||||
q.LastIndex = index
|
||||
return nil
|
||||
}
|
||||
|
||||
// decodeBody is used to JSON decode a body
|
||||
func decodeBody(resp *http.Response, out interface{}) error {
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
return dec.Decode(out)
|
||||
}
|
||||
|
||||
// encodeBody is used to encode a request body
|
||||
func encodeBody(obj interface{}) (io.Reader, error) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
enc := json.NewEncoder(buf)
|
||||
if err := enc.Encode(obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// requireOK is used to wrap doRequest and check for a 200
|
||||
func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
|
||||
if e != nil {
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
return d, nil, e
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
var buf bytes.Buffer
|
||||
io.Copy(&buf, resp.Body)
|
||||
resp.Body.Close()
|
||||
return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
|
||||
}
|
||||
return d, resp, nil
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
type configCallback func(c *Config)
|
||||
|
||||
func makeClient(t *testing.T, cb1 configCallback,
|
||||
cb2 testutil.ServerConfigCallback) (*Client, *testutil.TestServer) {
|
||||
|
||||
// Make client config
|
||||
conf := DefaultConfig()
|
||||
if cb1 != nil {
|
||||
cb1(conf)
|
||||
}
|
||||
|
||||
// Create server
|
||||
server := testutil.NewTestServer(t, cb2)
|
||||
conf.URL = server.HTTPAddr
|
||||
|
||||
// Create client
|
||||
client, err := NewClient(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
return client, server
|
||||
}
|
||||
|
||||
func TestDefaultConfig_env(t *testing.T) {
|
||||
t.Parallel()
|
||||
url := "http://1.2.3.4:5678"
|
||||
|
||||
os.Setenv("NOMAD_HTTP_URL", url)
|
||||
defer os.Setenv("NOMAD_HTTP_URL", "")
|
||||
|
||||
config := DefaultConfig()
|
||||
|
||||
if config.URL != url {
|
||||
t.Errorf("expected %q to be %q", config.URL, url)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetQueryOptions(t *testing.T) {
|
||||
// TODO t.Parallel()
|
||||
c, s := makeClient(t, nil, nil)
|
||||
defer s.Stop()
|
||||
|
||||
r := c.newRequest("GET", "/v1/jobs")
|
||||
q := &QueryOptions{
|
||||
Region: "foo",
|
||||
AllowStale: true,
|
||||
WaitIndex: 1000,
|
||||
WaitTime: 100 * time.Second,
|
||||
}
|
||||
r.setQueryOptions(q)
|
||||
|
||||
if r.params.Get("region") != "foo" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
if _, ok := r.params["stale"]; !ok {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
if r.params.Get("index") != "1000" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
if r.params.Get("wait") != "100000ms" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetWriteOptions(t *testing.T) {
|
||||
// TODO t.Parallel()
|
||||
c, s := makeClient(t, nil, nil)
|
||||
defer s.Stop()
|
||||
|
||||
r := c.newRequest("GET", "/v1/jobs")
|
||||
q := &WriteOptions{
|
||||
Region: "foo",
|
||||
}
|
||||
r.setWriteOptions(q)
|
||||
|
||||
if r.params.Get("region") != "foo" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestToHTTP(t *testing.T) {
|
||||
// TODO t.Parallel()
|
||||
c, s := makeClient(t, nil, nil)
|
||||
defer s.Stop()
|
||||
|
||||
r := c.newRequest("DELETE", "/v1/jobs/foo")
|
||||
q := &QueryOptions{
|
||||
Region: "foo",
|
||||
}
|
||||
r.setQueryOptions(q)
|
||||
req, err := r.toHTTP()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if req.Method != "DELETE" {
|
||||
t.Fatalf("bad: %v", req)
|
||||
}
|
||||
if req.URL.RequestURI() != "/v1/jobs/foo?region=foo" {
|
||||
t.Fatalf("bad: %v", req)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseQueryMeta(t *testing.T) {
|
||||
t.Parallel()
|
||||
resp := &http.Response{
|
||||
Header: make(map[string][]string),
|
||||
}
|
||||
resp.Header.Set("X-Nomad-Index", "12345")
|
||||
resp.Header.Set("X-Nomad-LastContact", "80")
|
||||
resp.Header.Set("X-Nomad-KnownLeader", "true")
|
||||
|
||||
qm := &QueryMeta{}
|
||||
if err := parseQueryMeta(resp, qm); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if qm.LastIndex != 12345 {
|
||||
t.Fatalf("Bad: %v", qm)
|
||||
}
|
||||
if qm.LastContact != 80*time.Millisecond {
|
||||
t.Fatalf("Bad: %v", qm)
|
||||
}
|
||||
if !qm.KnownLeader {
|
||||
t.Fatalf("Bad: %v", qm)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseWriteMeta(t *testing.T) {
|
||||
t.Parallel()
|
||||
resp := &http.Response{
|
||||
Header: make(map[string][]string),
|
||||
}
|
||||
resp.Header.Set("X-Nomad-Index", "12345")
|
||||
|
||||
wm := &WriteMeta{}
|
||||
if err := parseWriteMeta(resp, wm); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if wm.LastIndex != 12345 {
|
||||
t.Fatalf("Bad: %v", wm)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package api
|
||||
|
||||
// Raw can be used to do raw queries against custom endpoints
|
||||
type Raw struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
// Raw returns a handle to query endpoints
|
||||
func (c *Client) Raw() *Raw {
|
||||
return &Raw{c}
|
||||
}
|
||||
|
||||
// Query is used to do a GET request against an endpoint
|
||||
// and deserialize the response into an interface using
|
||||
// standard Nomad conventions.
|
||||
func (raw *Raw) Query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
|
||||
return raw.c.query(endpoint, out, q)
|
||||
}
|
||||
|
||||
// Write is used to do a PUT request against an endpoint
|
||||
// and serialize/deserialized using the standard Nomad conventions.
|
||||
func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
|
||||
return raw.c.write(endpoint, in, out, q)
|
||||
}
|
||||
|
||||
// Delete is used to do a DELETE request against an endpoint
|
||||
// and serialize/deserialized using the standard Nomad conventions.
|
||||
func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
|
||||
return raw.c.delete(endpoint, out, q)
|
||||
}
|
|
@ -164,6 +164,7 @@ func DefaultConfig() *Config {
|
|||
LogLevel: "INFO",
|
||||
Region: "region1",
|
||||
Datacenter: "dc1",
|
||||
HttpAddr: "127.0.0.1:4646",
|
||||
Client: &ClientConfig{
|
||||
Enabled: false,
|
||||
},
|
||||
|
|
|
@ -0,0 +1,254 @@
|
|||
package testutil
|
||||
|
||||
// TestServer is a test helper. It uses a fork/exec model to create
|
||||
// a test Nomad server instance in the background and initialize it
|
||||
// with some data and/or services. The test server can then be used
|
||||
// to run a unit test, and offers an easy API to tear itself down
|
||||
// when the test has completed. The only prerequisite is to have a nomad
|
||||
// binary available on the $PATH.
|
||||
//
|
||||
// This package does not use Nomad's official API client. This is
|
||||
// because we use TestServer to test the API client, which would
|
||||
// otherwise cause an import cycle.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// offset is used to atomically increment the port numbers.
|
||||
var offset uint64
|
||||
|
||||
// TestServerConfig is the main server configuration struct.
|
||||
type TestServerConfig struct {
|
||||
HTTPAddr string `json:"http_addr,omitempty"`
|
||||
Bootstrap bool `json:"bootstrap,omitempty"`
|
||||
DataDir string `json:"data_dir,omitempty"`
|
||||
Region string `json:"region,omitempty"`
|
||||
DisableCheckpoint bool `json:"disable_update_check"`
|
||||
LogLevel string `json:"log_level,omitempty"`
|
||||
Stdout, Stderr io.Writer `json:"-"`
|
||||
}
|
||||
|
||||
// ServerConfigCallback is a function interface which can be
|
||||
// passed to NewTestServerConfig to modify the server config.
|
||||
type ServerConfigCallback func(c *TestServerConfig)
|
||||
|
||||
// defaultServerConfig returns a new TestServerConfig struct
|
||||
// with all of the listen ports incremented by one.
|
||||
func defaultServerConfig() *TestServerConfig {
|
||||
idx := int(atomic.AddUint64(&offset, 1))
|
||||
|
||||
return &TestServerConfig{
|
||||
DisableCheckpoint: true,
|
||||
Bootstrap: true,
|
||||
LogLevel: "DEBUG",
|
||||
HTTPAddr: fmt.Sprintf("127.0.0.1:%d", 20000+idx),
|
||||
}
|
||||
}
|
||||
|
||||
// TestServer is the main server wrapper struct.
|
||||
type TestServer struct {
|
||||
PID int
|
||||
Config *TestServerConfig
|
||||
t *testing.T
|
||||
|
||||
HTTPAddr string
|
||||
HttpClient *http.Client
|
||||
}
|
||||
|
||||
// NewTestServerConfig creates a new TestServer, and makes a call to
|
||||
// an optional callback function to modify the configuration.
|
||||
func NewTestServer(t *testing.T, cb ServerConfigCallback) *TestServer {
|
||||
if path, err := exec.LookPath("nomad"); err != nil || path == "" {
|
||||
t.Skip("nomad not found on $PATH, skipping")
|
||||
}
|
||||
|
||||
dataDir, err := ioutil.TempDir("", "nomad")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
configFile, err := ioutil.TempFile(dataDir, "nomad")
|
||||
if err != nil {
|
||||
defer os.RemoveAll(dataDir)
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
nomadConfig := defaultServerConfig()
|
||||
nomadConfig.DataDir = dataDir
|
||||
|
||||
if cb != nil {
|
||||
cb(nomadConfig)
|
||||
}
|
||||
|
||||
configContent, err := json.Marshal(nomadConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if _, err := configFile.Write(configContent); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
configFile.Close()
|
||||
|
||||
stdout := io.Writer(os.Stdout)
|
||||
if nomadConfig.Stdout != nil {
|
||||
stdout = nomadConfig.Stdout
|
||||
}
|
||||
|
||||
stderr := io.Writer(os.Stderr)
|
||||
if nomadConfig.Stderr != nil {
|
||||
stderr = nomadConfig.Stderr
|
||||
}
|
||||
|
||||
// Start the server
|
||||
// TODO: Use "-config", configFile.Name()
|
||||
cmd := exec.Command("nomad", "agent", "-dev")
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
var client *http.Client
|
||||
client = http.DefaultClient
|
||||
|
||||
server := &TestServer{
|
||||
Config: nomadConfig,
|
||||
PID: cmd.Process.Pid,
|
||||
t: t,
|
||||
|
||||
HTTPAddr: "127.0.0.1:4646", // TODO nomadConfig.HTTPAddr,
|
||||
HttpClient: client,
|
||||
}
|
||||
|
||||
// Wait for the server to be ready
|
||||
if nomadConfig.Bootstrap {
|
||||
server.waitForLeader()
|
||||
} else {
|
||||
server.waitForAPI()
|
||||
}
|
||||
return server
|
||||
}
|
||||
|
||||
// Stop stops the test Nomad server, and removes the Nomad data
|
||||
// directory once we are done.
|
||||
func (s *TestServer) Stop() {
|
||||
defer os.RemoveAll(s.Config.DataDir)
|
||||
|
||||
cmd := exec.Command("kill", "-9", fmt.Sprintf("%d", s.PID))
|
||||
if err := cmd.Run(); err != nil {
|
||||
s.t.Errorf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForAPI waits for only the agent HTTP endpoint to start
|
||||
// responding. This is an indication that the agent has started,
|
||||
// but will likely return before a leader is elected.
|
||||
func (s *TestServer) waitForAPI() {
|
||||
WaitForResult(func() (bool, error) {
|
||||
resp, err := s.HttpClient.Get(s.url("/v1/jobs?stale"))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
defer s.Stop()
|
||||
s.t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
// waitForLeader waits for the Nomad server's HTTP API to become
|
||||
// available, and then waits for a known leader and an index of
|
||||
// 1 or more to be observed to confirm leader election is done.
|
||||
func (s *TestServer) waitForLeader() {
|
||||
WaitForResult(func() (bool, error) {
|
||||
// Query the API and check the status code
|
||||
resp, err := s.HttpClient.Get(s.url("/v1/jobs"))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Ensure we have a leader and a node registeration
|
||||
if leader := resp.Header.Get("X-Nomad-KnownLeader"); leader != "true" {
|
||||
fmt.Println(leader)
|
||||
return false, fmt.Errorf("Nomad leader status: %#v", leader)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
defer s.Stop()
|
||||
s.t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
// url is a helper function which takes a relative URL and
|
||||
// makes it into a proper URL against the local Nomad server.
|
||||
func (s *TestServer) url(path string) string {
|
||||
return fmt.Sprintf("http://%s%s", s.HTTPAddr, path)
|
||||
}
|
||||
|
||||
// requireOK checks the HTTP response code and ensures it is acceptable.
|
||||
func (s *TestServer) requireOK(resp *http.Response) error {
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf("Bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// put performs a new HTTP PUT request.
|
||||
func (s *TestServer) put(path string, body io.Reader) *http.Response {
|
||||
req, err := http.NewRequest("PUT", s.url(path), body)
|
||||
if err != nil {
|
||||
s.t.Fatalf("err: %s", err)
|
||||
}
|
||||
resp, err := s.HttpClient.Do(req)
|
||||
if err != nil {
|
||||
s.t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
defer resp.Body.Close()
|
||||
s.t.Fatal(err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// get performs a new HTTP GET request.
|
||||
func (s *TestServer) get(path string) *http.Response {
|
||||
resp, err := s.HttpClient.Get(s.url(path))
|
||||
if err != nil {
|
||||
s.t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err := s.requireOK(resp); err != nil {
|
||||
defer resp.Body.Close()
|
||||
s.t.Fatal(err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// encodePayload returns a new io.Reader wrapping the encoded contents
|
||||
// of the payload, suitable for passing directly to a new request.
|
||||
func (s *TestServer) encodePayload(payload interface{}) io.Reader {
|
||||
var encoded bytes.Buffer
|
||||
enc := json.NewEncoder(&encoded)
|
||||
if err := enc.Encode(payload); err != nil {
|
||||
s.t.Fatalf("err: %s", err)
|
||||
}
|
||||
return &encoded
|
||||
}
|
Loading…
Reference in New Issue