open-nomad/api/api.go

493 lines
12 KiB
Go

package api
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/hashicorp/go-cleanhttp"
)
// QueryOptions are used to parameterize a query
type QueryOptions struct {
// Providing a datacenter overwrites the region provided
// by the Config
Region string
// AllowStale allows any Nomad server (non-leader) to service
// a read. This allows for lower latency and higher throughput
AllowStale bool
// WaitIndex is used to enable a blocking query. Waits
// until the timeout or the next index is reached
WaitIndex uint64
// WaitTime is used to bound the duration of a wait.
// Defaults to that of the Config, but can be overridden.
WaitTime time.Duration
// If set, used as prefix for resource list searches
Prefix string
// Set HTTP parameters on the query.
Params map[string]string
}
// WriteOptions are used to parameterize a write
type WriteOptions struct {
// Providing a datacenter overwrites the region provided
// by the Config
Region string
}
// QueryMeta is used to return meta data about a query
type QueryMeta struct {
// LastIndex. This can be used as a WaitIndex to perform
// a blocking query
LastIndex uint64
// Time of last contact from the leader for the
// server servicing the request
LastContact time.Duration
// Is there a known leader
KnownLeader bool
// How long did the request take
RequestTime time.Duration
}
// WriteMeta is used to return meta data about a write
type WriteMeta struct {
// LastIndex. This can be used as a WaitIndex to perform
// a blocking query
LastIndex uint64
// How long did the request take
RequestTime time.Duration
}
// HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
type HttpBasicAuth struct {
// Username to use for HTTP Basic Authentication
Username string
// Password to use for HTTP Basic Authentication
Password string
}
// Config is used to configure the creation of a client
type Config struct {
// Address is the address of the Nomad agent
Address string
// Region to use. If not provided, the default agent region is used.
Region string
// HttpClient is the client to use. Default will be
// used if not provided.
HttpClient *http.Client
// HttpAuth is the auth info to use for http access.
HttpAuth *HttpBasicAuth
// WaitTime limits how long a Watch will block. If not provided,
// the agent default values will be used.
WaitTime time.Duration
}
// DefaultConfig returns a default configuration for the client
func DefaultConfig() *Config {
config := &Config{
Address: "http://127.0.0.1:4646",
HttpClient: cleanhttp.DefaultClient(),
}
if addr := os.Getenv("NOMAD_ADDR"); addr != "" {
config.Address = addr
}
if auth := os.Getenv("NOMAD_HTTP_AUTH"); auth != "" {
var username, password string
if strings.Contains(auth, ":") {
split := strings.SplitN(auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = auth
}
config.HttpAuth = &HttpBasicAuth{
Username: username,
Password: password,
}
}
return config
}
// Client provides a client to the Nomad API
type Client struct {
config Config
}
// NewClient returns a new client
func NewClient(config *Config) (*Client, error) {
// bootstrap the config
defConfig := DefaultConfig()
if config.Address == "" {
config.Address = defConfig.Address
} else if _, err := url.Parse(config.Address); err != nil {
return nil, fmt.Errorf("invalid address '%s': %v", config.Address, err)
}
if config.HttpClient == nil {
config.HttpClient = defConfig.HttpClient
}
client := &Client{
config: *config,
}
return client, nil
}
// SetRegion sets the region to forward API requests to.
func (c *Client) SetRegion(region string) {
c.config.Region = region
}
// request is used to help build up a request
type request struct {
config *Config
method string
url *url.URL
params url.Values
body io.Reader
obj interface{}
}
// setQueryOptions is used to annotate the request with
// additional query options
func (r *request) setQueryOptions(q *QueryOptions) {
if q == nil {
return
}
if q.Region != "" {
r.params.Set("region", q.Region)
}
if q.AllowStale {
r.params.Set("stale", "")
}
if q.WaitIndex != 0 {
r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
}
if q.WaitTime != 0 {
r.params.Set("wait", durToMsec(q.WaitTime))
}
if q.Prefix != "" {
r.params.Set("prefix", q.Prefix)
}
for k, v := range q.Params {
r.params.Set(k, v)
}
}
// durToMsec converts a duration to a millisecond specified string
func durToMsec(dur time.Duration) string {
return fmt.Sprintf("%dms", dur/time.Millisecond)
}
// setWriteOptions is used to annotate the request with
// additional write options
func (r *request) setWriteOptions(q *WriteOptions) {
if q == nil {
return
}
if q.Region != "" {
r.params.Set("region", q.Region)
}
}
// toHTTP converts the request to an HTTP request
func (r *request) toHTTP() (*http.Request, error) {
// Encode the query parameters
r.url.RawQuery = r.params.Encode()
// Check if we should encode the body
if r.body == nil && r.obj != nil {
if b, err := encodeBody(r.obj); err != nil {
return nil, err
} else {
r.body = b
}
}
// Create the HTTP request
req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
if err != nil {
return nil, err
}
// Optionally configure HTTP basic authentication
if r.url.User != nil {
username := r.url.User.Username()
password, _ := r.url.User.Password()
req.SetBasicAuth(username, password)
} else if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
}
req.Header.Add("Accept-Encoding", "gzip")
req.URL.Host = r.url.Host
req.URL.Scheme = r.url.Scheme
req.Host = r.url.Host
return req, nil
}
// newRequest is used to create a new request
func (c *Client) newRequest(method, path string) *request {
base, _ := url.Parse(c.config.Address)
u, _ := url.Parse(path)
r := &request{
config: &c.config,
method: method,
url: &url.URL{
Scheme: base.Scheme,
User: base.User,
Host: base.Host,
Path: u.Path,
},
params: make(map[string][]string),
}
if c.config.Region != "" {
r.params.Set("region", c.config.Region)
}
if c.config.WaitTime != 0 {
r.params.Set("wait", durToMsec(r.config.WaitTime))
}
// Add in the query parameters, if any
for key, values := range u.Query() {
for _, value := range values {
r.params.Add(key, value)
}
}
return r
}
// multiCloser is to wrap a ReadCloser such that when close is called, multiple
// Closes occur.
type multiCloser struct {
reader io.Reader
inorderClose []io.Closer
}
func (m *multiCloser) Close() error {
for _, c := range m.inorderClose {
if err := c.Close(); err != nil {
return err
}
}
return nil
}
func (m *multiCloser) Read(p []byte) (int, error) {
return m.reader.Read(p)
}
// doRequest runs a request with our client
func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
req, err := r.toHTTP()
if err != nil {
return 0, nil, err
}
start := time.Now()
resp, err := c.config.HttpClient.Do(req)
diff := time.Now().Sub(start)
// If the response is compressed, we swap the body's reader.
if resp != nil && resp.Header != nil {
var reader io.ReadCloser
switch resp.Header.Get("Content-Encoding") {
case "gzip":
greader, err := gzip.NewReader(resp.Body)
if err != nil {
return 0, nil, err
}
// The gzip reader doesn't close the wrapped reader so we use
// multiCloser.
reader = &multiCloser{
reader: greader,
inorderClose: []io.Closer{greader, resp.Body},
}
default:
reader = resp.Body
}
resp.Body = reader
}
return diff, resp, err
}
// rawQuery makes a GET request to the specified endpoint but returns just the
// response body.
func (c *Client) rawQuery(endpoint string, q *QueryOptions) (io.ReadCloser, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
_, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Nomad conventions.
func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}
// write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Nomad conventions.
func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r := c.newRequest("PUT", endpoint)
r.setWriteOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
parseWriteMeta(resp, wm)
if out != nil {
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
}
return wm, nil
}
// write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Nomad conventions.
func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r := c.newRequest("DELETE", endpoint)
r.setWriteOptions(q)
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
parseWriteMeta(resp, wm)
if out != nil {
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
}
return wm, nil
}
// parseQueryMeta is used to help parse query meta-data
func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
header := resp.Header
// Parse the X-Nomad-Index
index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64)
if err != nil {
return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err)
}
q.LastIndex = index
// Parse the X-Nomad-LastContact
last, err := strconv.ParseUint(header.Get("X-Nomad-LastContact"), 10, 64)
if err != nil {
return fmt.Errorf("Failed to parse X-Nomad-LastContact: %v", err)
}
q.LastContact = time.Duration(last) * time.Millisecond
// Parse the X-Nomad-KnownLeader
switch header.Get("X-Nomad-KnownLeader") {
case "true":
q.KnownLeader = true
default:
q.KnownLeader = false
}
return nil
}
// parseWriteMeta is used to help parse write meta-data
func parseWriteMeta(resp *http.Response, q *WriteMeta) error {
header := resp.Header
// Parse the X-Nomad-Index
index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64)
if err != nil {
return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err)
}
q.LastIndex = index
return nil
}
// decodeBody is used to JSON decode a body
func decodeBody(resp *http.Response, out interface{}) error {
dec := json.NewDecoder(resp.Body)
return dec.Decode(out)
}
// encodeBody is used to encode a request body
func encodeBody(obj interface{}) (io.Reader, error) {
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
if err := enc.Encode(obj); err != nil {
return nil, err
}
return buf, nil
}
// requireOK is used to wrap doRequest and check for a 200
func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
if e != nil {
if resp != nil {
resp.Body.Close()
}
return d, nil, e
}
if resp.StatusCode != 200 {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
resp.Body.Close()
return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
}
return d, resp, nil
}