http: basic framework
This commit is contained in:
parent
ae3979818a
commit
4bf257e951
|
@ -0,0 +1,38 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var nextPort uint32 = 17000
|
||||
|
||||
func getPort() int {
|
||||
return int(atomic.AddUint32(&nextPort, 1))
|
||||
}
|
||||
|
||||
func tmpDir(t *testing.T) string {
|
||||
dir, err := ioutil.TempDir("", "nomad")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return dir
|
||||
}
|
||||
|
||||
func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) {
|
||||
dir := tmpDir(t)
|
||||
conf := DevConfig()
|
||||
|
||||
if cb != nil {
|
||||
cb(conf)
|
||||
}
|
||||
|
||||
agent, err := NewAgent(conf, os.Stderr)
|
||||
if err != nil {
|
||||
os.RemoveAll(dir)
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return dir, agent
|
||||
}
|
|
@ -132,14 +132,16 @@ type Telemetry struct {
|
|||
|
||||
// DevConfig is a Config that is used for dev mode of Nomad.
|
||||
func DevConfig() *Config {
|
||||
conf := DefaultConfig()
|
||||
conf.LogLevel = "DEBUG"
|
||||
conf.Client.Enabled = true
|
||||
conf.Server.Enabled = true
|
||||
conf.DevMode = true
|
||||
conf.EnableDebug = true
|
||||
conf.DisableAnonymousSignature = true
|
||||
return conf
|
||||
return &Config{
|
||||
LogLevel: "DEBUG",
|
||||
Client: &ClientConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
Server: &ServerConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
LogLevel: "DEBUG",
|
||||
DevMode: true,
|
||||
EnableDebug: true,
|
||||
DisableAnonymousSignature: true,
|
||||
|
@ -149,7 +151,15 @@ func DevConfig() *Config {
|
|||
// DefaultConfig is a the baseline configuration for Nomad
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
LogLevel: "INFO",
|
||||
LogLevel: "INFO",
|
||||
Region: "region1",
|
||||
Datacenter: "dc1",
|
||||
Client: &ClientConfig{
|
||||
Enabled: false,
|
||||
},
|
||||
Server: &ServerConfig{
|
||||
Enabled: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
// HTTPServer is used to wrap an Agent and expose it over an HTTP interface
|
||||
|
@ -52,6 +58,9 @@ func (s *HTTPServer) Shutdown() {
|
|||
|
||||
// registerHandlers is used to attach our handlers to the mux
|
||||
func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsList))
|
||||
s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest))
|
||||
|
||||
if enableDebug {
|
||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||
|
@ -59,3 +68,166 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPCodedError is used to provide the HTTP error code
|
||||
type HTTPCodedError interface {
|
||||
error
|
||||
Code() int
|
||||
}
|
||||
|
||||
func CodedError(c int, s string) HTTPCodedError {
|
||||
return &codedError{s, c}
|
||||
}
|
||||
|
||||
type codedError struct {
|
||||
s string
|
||||
code int
|
||||
}
|
||||
|
||||
func (e *codedError) Error() string {
|
||||
return e.s
|
||||
}
|
||||
|
||||
func (e *codedError) Code() int {
|
||||
return e.code
|
||||
}
|
||||
|
||||
// wrap is used to wrap functions to make them more convenient
|
||||
func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
|
||||
f := func(resp http.ResponseWriter, req *http.Request) {
|
||||
// Invoke the handler
|
||||
reqURL := req.URL.String()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
s.logger.Printf("[DEBUG] http: Request %v (%v)", reqURL, time.Now().Sub(start))
|
||||
}()
|
||||
obj, err := handler(resp, req)
|
||||
|
||||
// Check for an error
|
||||
HAS_ERR:
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] http: Request %v, error: %v", reqURL, err)
|
||||
code := 500
|
||||
if http, ok := err.(HTTPCodedError); ok {
|
||||
code = http.Code()
|
||||
}
|
||||
resp.WriteHeader(code)
|
||||
resp.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
prettyPrint := false
|
||||
if _, ok := req.URL.Query()["pretty"]; ok {
|
||||
prettyPrint = true
|
||||
}
|
||||
|
||||
// Write out the JSON object
|
||||
if obj != nil {
|
||||
var buf []byte
|
||||
if prettyPrint {
|
||||
buf, err = json.MarshalIndent(obj, "", " ")
|
||||
} else {
|
||||
buf, err = json.Marshal(obj)
|
||||
}
|
||||
if err != nil {
|
||||
goto HAS_ERR
|
||||
}
|
||||
resp.Header().Set("Content-Type", "application/json")
|
||||
resp.Write(buf)
|
||||
}
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// decodeBody is used to decode a JSON request body
|
||||
func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error) error {
|
||||
var raw interface{}
|
||||
dec := json.NewDecoder(req.Body)
|
||||
if err := dec.Decode(&raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Invoke the callback prior to decode
|
||||
if cb != nil {
|
||||
if err := cb(raw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return mapstructure.Decode(raw, out)
|
||||
}
|
||||
|
||||
// setIndex is used to set the index response header
|
||||
func setIndex(resp http.ResponseWriter, index uint64) {
|
||||
resp.Header().Set("X-Nomad-Index", strconv.FormatUint(index, 10))
|
||||
}
|
||||
|
||||
// setKnownLeader is used to set the known leader header
|
||||
func setKnownLeader(resp http.ResponseWriter, known bool) {
|
||||
s := "true"
|
||||
if !known {
|
||||
s = "false"
|
||||
}
|
||||
resp.Header().Set("X-Nomad-KnownLeader", s)
|
||||
}
|
||||
|
||||
// setLastContact is used to set the last contact header
|
||||
func setLastContact(resp http.ResponseWriter, last time.Duration) {
|
||||
lastMsec := uint64(last / time.Millisecond)
|
||||
resp.Header().Set("X-Nomad-LastContact", strconv.FormatUint(lastMsec, 10))
|
||||
}
|
||||
|
||||
// setMeta is used to set the query response meta data
|
||||
func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
|
||||
setIndex(resp, m.Index)
|
||||
setLastContact(resp, m.LastContact)
|
||||
setKnownLeader(resp, m.KnownLeader)
|
||||
}
|
||||
|
||||
// parseWait is used to parse the ?wait and ?index query params
|
||||
// Returns true on error
|
||||
func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
|
||||
query := req.URL.Query()
|
||||
if wait := query.Get("wait"); wait != "" {
|
||||
dur, err := time.ParseDuration(wait)
|
||||
if err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte("Invalid wait time"))
|
||||
return true
|
||||
}
|
||||
b.MaxQueryTime = dur
|
||||
}
|
||||
if idx := query.Get("index"); idx != "" {
|
||||
index, err := strconv.ParseUint(idx, 10, 64)
|
||||
if err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte("Invalid index"))
|
||||
return true
|
||||
}
|
||||
b.MinQueryIndex = index
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// parseConsistency is used to parse the ?stale query params.
|
||||
func parseConsistency(req *http.Request, b *structs.QueryOptions) {
|
||||
query := req.URL.Query()
|
||||
if _, ok := query["stale"]; ok {
|
||||
b.AllowStale = true
|
||||
}
|
||||
}
|
||||
|
||||
// parseRegion is used to parse the ?region query param
|
||||
func (s *HTTPServer) parseRegion(req *http.Request, r *string) {
|
||||
if other := req.URL.Query().Get("region"); other != "" {
|
||||
*r = other
|
||||
} else if *r == "" {
|
||||
*r = s.agent.config.Region
|
||||
}
|
||||
}
|
||||
|
||||
// parse is a convenience method for endpoints that need to parse multiple flags
|
||||
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, r *string, b *structs.QueryOptions) bool {
|
||||
s.parseRegion(req, r)
|
||||
parseConsistency(req, b)
|
||||
return parseWait(resp, req, b)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
type TestServer struct {
|
||||
T *testing.T
|
||||
Dir string
|
||||
Agent *Agent
|
||||
Server *HTTPServer
|
||||
}
|
||||
|
||||
func (s *TestServer) Cleanup() {
|
||||
s.Server.Shutdown()
|
||||
s.Agent.Shutdown()
|
||||
os.RemoveAll(s.Dir)
|
||||
}
|
||||
|
||||
func makeHTTPServer(t *testing.T, cb func(c *Config)) *TestServer {
|
||||
dir, agent := makeAgent(t, cb)
|
||||
srv, err := NewHTTPServer(agent, agent.config, agent.logOutput)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
s := &TestServer{
|
||||
T: t,
|
||||
Dir: dir,
|
||||
Agent: agent,
|
||||
Server: srv,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func TestSetIndex(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
setIndex(resp, 1000)
|
||||
header := resp.Header().Get("X-Nomad-Index")
|
||||
if header != "1000" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
setIndex(resp, 2000)
|
||||
if v := resp.Header()["X-Nomad-Index"]; len(v) != 1 {
|
||||
t.Fatalf("bad: %#v", v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetKnownLeader(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
setKnownLeader(resp, true)
|
||||
header := resp.Header().Get("X-Nomad-KnownLeader")
|
||||
if header != "true" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
resp = httptest.NewRecorder()
|
||||
setKnownLeader(resp, false)
|
||||
header = resp.Header().Get("X-Nomad-KnownLeader")
|
||||
if header != "false" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetLastContact(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
setLastContact(resp, 123456*time.Microsecond)
|
||||
header := resp.Header().Get("X-Nomad-LastContact")
|
||||
if header != "123" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetMeta(t *testing.T) {
|
||||
meta := structs.QueryMeta{
|
||||
Index: 1000,
|
||||
KnownLeader: true,
|
||||
LastContact: 123456 * time.Microsecond,
|
||||
}
|
||||
resp := httptest.NewRecorder()
|
||||
setMeta(resp, &meta)
|
||||
header := resp.Header().Get("X-Nomad-Index")
|
||||
if header != "1000" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
header = resp.Header().Get("X-Nomad-KnownLeader")
|
||||
if header != "true" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
header = resp.Header().Get("X-Nomad-LastContact")
|
||||
if header != "123" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentTypeIsJSON(t *testing.T) {
|
||||
s := makeHTTPServer(t, nil)
|
||||
defer s.Cleanup()
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
|
||||
handler := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
return &structs.Job{Name: "foo"}, nil
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/kv/key", nil)
|
||||
s.Server.wrap(handler)(resp, req)
|
||||
|
||||
contentType := resp.Header().Get("Content-Type")
|
||||
|
||||
if contentType != "application/json" {
|
||||
t.Fatalf("Content-Type header was not 'application/json'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrettyPrint(t *testing.T) {
|
||||
testPrettyPrint("pretty=1", t)
|
||||
}
|
||||
|
||||
func TestPrettyPrintBare(t *testing.T) {
|
||||
testPrettyPrint("pretty", t)
|
||||
}
|
||||
|
||||
func testPrettyPrint(pretty string, t *testing.T) {
|
||||
s := makeHTTPServer(t, nil)
|
||||
defer s.Cleanup()
|
||||
|
||||
r := &structs.Job{Name: "foo"}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
handler := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
urlStr := "/v1/job/foo?" + pretty
|
||||
req, _ := http.NewRequest("GET", urlStr, nil)
|
||||
s.Server.wrap(handler)(resp, req)
|
||||
|
||||
expected, _ := json.MarshalIndent(r, "", " ")
|
||||
actual, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(expected, actual) {
|
||||
t.Fatalf("bad: %q", string(actual))
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseWait(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
var b structs.QueryOptions
|
||||
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?wait=60s&index=1000", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if d := parseWait(resp, req, &b); d {
|
||||
t.Fatalf("unexpected done")
|
||||
}
|
||||
|
||||
if b.MinQueryIndex != 1000 {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
if b.MaxQueryTime != 60*time.Second {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseWait_InvalidTime(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
var b structs.QueryOptions
|
||||
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?wait=60foo&index=1000", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if d := parseWait(resp, req, &b); !d {
|
||||
t.Fatalf("expected done")
|
||||
}
|
||||
|
||||
if resp.Code != 400 {
|
||||
t.Fatalf("bad code: %v", resp.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseWait_InvalidIndex(t *testing.T) {
|
||||
resp := httptest.NewRecorder()
|
||||
var b structs.QueryOptions
|
||||
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?wait=60s&index=foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if d := parseWait(resp, req, &b); !d {
|
||||
t.Fatalf("expected done")
|
||||
}
|
||||
|
||||
if resp.Code != 400 {
|
||||
t.Fatalf("bad code: %v", resp.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConsistency(t *testing.T) {
|
||||
var b structs.QueryOptions
|
||||
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?stale", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
parseConsistency(req, &b)
|
||||
if !b.AllowStale {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
|
||||
b = structs.QueryOptions{}
|
||||
req, err = http.NewRequest("GET",
|
||||
"/v1/catalog/nodes?consistent", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
parseConsistency(req, &b)
|
||||
if b.AllowStale {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRegion(t *testing.T) {
|
||||
s := makeHTTPServer(t, nil)
|
||||
defer s.Cleanup()
|
||||
|
||||
req, err := http.NewRequest("GET",
|
||||
"/v1/jobs?region=foo", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
var region string
|
||||
s.Server.parseRegion(req, ®ion)
|
||||
if region != "foo" {
|
||||
t.Fatalf("bad %s", region)
|
||||
}
|
||||
|
||||
region = ""
|
||||
req, err = http.NewRequest("GET", "/v1/jobs", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
s.Server.parseRegion(req, ®ion)
|
||||
if region != "region1" {
|
||||
t.Fatalf("bad %s", region)
|
||||
}
|
||||
}
|
||||
|
||||
// assertIndex tests that X-Nomad-Index is set and non-zero
|
||||
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
|
||||
header := resp.Header().Get("X-Nomad-Index")
|
||||
if header == "" || header == "0" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
}
|
||||
|
||||
// checkIndex is like assertIndex but returns an error
|
||||
func checkIndex(resp *httptest.ResponseRecorder) error {
|
||||
header := resp.Header().Get("X-Nomad-Index")
|
||||
if header == "" || header == "0" {
|
||||
return fmt.Errorf("Bad: %v", header)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getIndex parses X-Nomad-Index
|
||||
func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
|
||||
header := resp.Header().Get("X-Nomad-Index")
|
||||
if header == "" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
val, err := strconv.Atoi(header)
|
||||
if err != nil {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
return uint64(val)
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func (s *HTTPServer) JobsList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var args structs.RegisterRequest
|
||||
if err := decodeBody(req, &args, nil); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Setup the default DC if not provided
|
||||
if args.Datacenter == "" {
|
||||
args.Datacenter = s.agent.config.Datacenter
|
||||
}
|
||||
|
||||
// Forward to the servers
|
||||
var out struct{}
|
||||
if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
var args structs.RegisterRequest
|
||||
if err := decodeBody(req, &args, nil); err != nil {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Setup the default DC if not provided
|
||||
if args.Datacenter == "" {
|
||||
args.Datacenter = s.agent.config.Datacenter
|
||||
}
|
||||
|
||||
// Forward to the servers
|
||||
var out struct{}
|
||||
if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
Loading…
Reference in New Issue