diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go new file mode 100644 index 000000000..6d90037c9 --- /dev/null +++ b/command/agent/agent_test.go @@ -0,0 +1,38 @@ +package agent + +import ( + "io/ioutil" + "os" + "sync/atomic" + "testing" +) + +var nextPort uint32 = 17000 + +func getPort() int { + return int(atomic.AddUint32(&nextPort, 1)) +} + +func tmpDir(t *testing.T) string { + dir, err := ioutil.TempDir("", "nomad") + if err != nil { + t.Fatalf("err: %v", err) + } + return dir +} + +func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) { + dir := tmpDir(t) + conf := DevConfig() + + if cb != nil { + cb(conf) + } + + agent, err := NewAgent(conf, os.Stderr) + if err != nil { + os.RemoveAll(dir) + t.Fatalf("err: %v", err) + } + return dir, agent +} diff --git a/command/agent/config.go b/command/agent/config.go index 3d5a44725..fe25d59c2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -132,14 +132,16 @@ type Telemetry struct { // DevConfig is a Config that is used for dev mode of Nomad. func DevConfig() *Config { + conf := DefaultConfig() + conf.LogLevel = "DEBUG" + conf.Client.Enabled = true + conf.Server.Enabled = true + conf.DevMode = true + conf.EnableDebug = true + conf.DisableAnonymousSignature = true + return conf return &Config{ - LogLevel: "DEBUG", - Client: &ClientConfig{ - Enabled: true, - }, - Server: &ServerConfig{ - Enabled: true, - }, + LogLevel: "DEBUG", DevMode: true, EnableDebug: true, DisableAnonymousSignature: true, @@ -149,7 +151,15 @@ func DevConfig() *Config { // DefaultConfig is a the baseline configuration for Nomad func DefaultConfig() *Config { return &Config{ - LogLevel: "INFO", + LogLevel: "INFO", + Region: "region1", + Datacenter: "dc1", + Client: &ClientConfig{ + Enabled: false, + }, + Server: &ServerConfig{ + Enabled: false, + }, } } diff --git a/command/agent/http.go b/command/agent/http.go index 41d1a1a52..9df80204a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1,12 +1,18 @@ package agent import ( + "encoding/json" "fmt" "io" "log" "net" "net/http" "net/http/pprof" + "strconv" + "time" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/mapstructure" ) // HTTPServer is used to wrap an Agent and expose it over an HTTP interface @@ -52,6 +58,9 @@ func (s *HTTPServer) Shutdown() { // registerHandlers is used to attach our handlers to the mux func (s *HTTPServer) registerHandlers(enableDebug bool) { + s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsList)) + s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -59,3 +68,166 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) } } + +// HTTPCodedError is used to provide the HTTP error code +type HTTPCodedError interface { + error + Code() int +} + +func CodedError(c int, s string) HTTPCodedError { + return &codedError{s, c} +} + +type codedError struct { + s string + code int +} + +func (e *codedError) Error() string { + return e.s +} + +func (e *codedError) Code() int { + return e.code +} + +// wrap is used to wrap functions to make them more convenient +func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { + f := func(resp http.ResponseWriter, req *http.Request) { + // Invoke the handler + reqURL := req.URL.String() + start := time.Now() + defer func() { + s.logger.Printf("[DEBUG] http: Request %v (%v)", reqURL, time.Now().Sub(start)) + }() + obj, err := handler(resp, req) + + // Check for an error + HAS_ERR: + if err != nil { + s.logger.Printf("[ERR] http: Request %v, error: %v", reqURL, err) + code := 500 + if http, ok := err.(HTTPCodedError); ok { + code = http.Code() + } + resp.WriteHeader(code) + resp.Write([]byte(err.Error())) + return + } + + prettyPrint := false + if _, ok := req.URL.Query()["pretty"]; ok { + prettyPrint = true + } + + // Write out the JSON object + if obj != nil { + var buf []byte + if prettyPrint { + buf, err = json.MarshalIndent(obj, "", " ") + } else { + buf, err = json.Marshal(obj) + } + if err != nil { + goto HAS_ERR + } + resp.Header().Set("Content-Type", "application/json") + resp.Write(buf) + } + } + return f +} + +// decodeBody is used to decode a JSON request body +func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error) error { + var raw interface{} + dec := json.NewDecoder(req.Body) + if err := dec.Decode(&raw); err != nil { + return err + } + + // Invoke the callback prior to decode + if cb != nil { + if err := cb(raw); err != nil { + return err + } + } + return mapstructure.Decode(raw, out) +} + +// setIndex is used to set the index response header +func setIndex(resp http.ResponseWriter, index uint64) { + resp.Header().Set("X-Nomad-Index", strconv.FormatUint(index, 10)) +} + +// setKnownLeader is used to set the known leader header +func setKnownLeader(resp http.ResponseWriter, known bool) { + s := "true" + if !known { + s = "false" + } + resp.Header().Set("X-Nomad-KnownLeader", s) +} + +// setLastContact is used to set the last contact header +func setLastContact(resp http.ResponseWriter, last time.Duration) { + lastMsec := uint64(last / time.Millisecond) + resp.Header().Set("X-Nomad-LastContact", strconv.FormatUint(lastMsec, 10)) +} + +// setMeta is used to set the query response meta data +func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { + setIndex(resp, m.Index) + setLastContact(resp, m.LastContact) + setKnownLeader(resp, m.KnownLeader) +} + +// parseWait is used to parse the ?wait and ?index query params +// Returns true on error +func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool { + query := req.URL.Query() + if wait := query.Get("wait"); wait != "" { + dur, err := time.ParseDuration(wait) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte("Invalid wait time")) + return true + } + b.MaxQueryTime = dur + } + if idx := query.Get("index"); idx != "" { + index, err := strconv.ParseUint(idx, 10, 64) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte("Invalid index")) + return true + } + b.MinQueryIndex = index + } + return false +} + +// parseConsistency is used to parse the ?stale query params. +func parseConsistency(req *http.Request, b *structs.QueryOptions) { + query := req.URL.Query() + if _, ok := query["stale"]; ok { + b.AllowStale = true + } +} + +// parseRegion is used to parse the ?region query param +func (s *HTTPServer) parseRegion(req *http.Request, r *string) { + if other := req.URL.Query().Get("region"); other != "" { + *r = other + } else if *r == "" { + *r = s.agent.config.Region + } +} + +// parse is a convenience method for endpoints that need to parse multiple flags +func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, r *string, b *structs.QueryOptions) bool { + s.parseRegion(req, r) + parseConsistency(req, b) + return parseWait(resp, req, b) +} diff --git a/command/agent/http_test.go b/command/agent/http_test.go new file mode 100644 index 000000000..89ffa9139 --- /dev/null +++ b/command/agent/http_test.go @@ -0,0 +1,302 @@ +package agent + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "strconv" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type TestServer struct { + T *testing.T + Dir string + Agent *Agent + Server *HTTPServer +} + +func (s *TestServer) Cleanup() { + s.Server.Shutdown() + s.Agent.Shutdown() + os.RemoveAll(s.Dir) +} + +func makeHTTPServer(t *testing.T, cb func(c *Config)) *TestServer { + dir, agent := makeAgent(t, cb) + srv, err := NewHTTPServer(agent, agent.config, agent.logOutput) + if err != nil { + t.Fatalf("err: %v", err) + } + s := &TestServer{ + T: t, + Dir: dir, + Agent: agent, + Server: srv, + } + return s +} + +func TestSetIndex(t *testing.T) { + resp := httptest.NewRecorder() + setIndex(resp, 1000) + header := resp.Header().Get("X-Nomad-Index") + if header != "1000" { + t.Fatalf("Bad: %v", header) + } + setIndex(resp, 2000) + if v := resp.Header()["X-Nomad-Index"]; len(v) != 1 { + t.Fatalf("bad: %#v", v) + } +} + +func TestSetKnownLeader(t *testing.T) { + resp := httptest.NewRecorder() + setKnownLeader(resp, true) + header := resp.Header().Get("X-Nomad-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + resp = httptest.NewRecorder() + setKnownLeader(resp, false) + header = resp.Header().Get("X-Nomad-KnownLeader") + if header != "false" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetLastContact(t *testing.T) { + resp := httptest.NewRecorder() + setLastContact(resp, 123456*time.Microsecond) + header := resp.Header().Get("X-Nomad-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + +func TestSetMeta(t *testing.T) { + meta := structs.QueryMeta{ + Index: 1000, + KnownLeader: true, + LastContact: 123456 * time.Microsecond, + } + resp := httptest.NewRecorder() + setMeta(resp, &meta) + header := resp.Header().Get("X-Nomad-Index") + if header != "1000" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Nomad-KnownLeader") + if header != "true" { + t.Fatalf("Bad: %v", header) + } + header = resp.Header().Get("X-Nomad-LastContact") + if header != "123" { + t.Fatalf("Bad: %v", header) + } +} + +func TestContentTypeIsJSON(t *testing.T) { + s := makeHTTPServer(t, nil) + defer s.Cleanup() + + resp := httptest.NewRecorder() + + handler := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return &structs.Job{Name: "foo"}, nil + } + + req, _ := http.NewRequest("GET", "/v1/kv/key", nil) + s.Server.wrap(handler)(resp, req) + + contentType := resp.Header().Get("Content-Type") + + if contentType != "application/json" { + t.Fatalf("Content-Type header was not 'application/json'") + } +} + +func TestPrettyPrint(t *testing.T) { + testPrettyPrint("pretty=1", t) +} + +func TestPrettyPrintBare(t *testing.T) { + testPrettyPrint("pretty", t) +} + +func testPrettyPrint(pretty string, t *testing.T) { + s := makeHTTPServer(t, nil) + defer s.Cleanup() + + r := &structs.Job{Name: "foo"} + + resp := httptest.NewRecorder() + handler := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return r, nil + } + + urlStr := "/v1/job/foo?" + pretty + req, _ := http.NewRequest("GET", urlStr, nil) + s.Server.wrap(handler)(resp, req) + + expected, _ := json.MarshalIndent(r, "", " ") + actual, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("err: %s", err) + } + + if !bytes.Equal(expected, actual) { + t.Fatalf("bad: %q", string(actual)) + } +} + +func TestParseWait(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?wait=60s&index=1000", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseWait(resp, req, &b); d { + t.Fatalf("unexpected done") + } + + if b.MinQueryIndex != 1000 { + t.Fatalf("Bad: %v", b) + } + if b.MaxQueryTime != 60*time.Second { + t.Fatalf("Bad: %v", b) + } +} + +func TestParseWait_InvalidTime(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?wait=60foo&index=1000", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseWait(resp, req, &b); !d { + t.Fatalf("expected done") + } + + if resp.Code != 400 { + t.Fatalf("bad code: %v", resp.Code) + } +} + +func TestParseWait_InvalidIndex(t *testing.T) { + resp := httptest.NewRecorder() + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?wait=60s&index=foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if d := parseWait(resp, req, &b); !d { + t.Fatalf("expected done") + } + + if resp.Code != 400 { + t.Fatalf("bad code: %v", resp.Code) + } +} + +func TestParseConsistency(t *testing.T) { + var b structs.QueryOptions + + req, err := http.NewRequest("GET", + "/v1/catalog/nodes?stale", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + parseConsistency(req, &b) + if !b.AllowStale { + t.Fatalf("Bad: %v", b) + } + + b = structs.QueryOptions{} + req, err = http.NewRequest("GET", + "/v1/catalog/nodes?consistent", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + parseConsistency(req, &b) + if b.AllowStale { + t.Fatalf("Bad: %v", b) + } +} + +func TestParseRegion(t *testing.T) { + s := makeHTTPServer(t, nil) + defer s.Cleanup() + + req, err := http.NewRequest("GET", + "/v1/jobs?region=foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + var region string + s.Server.parseRegion(req, ®ion) + if region != "foo" { + t.Fatalf("bad %s", region) + } + + region = "" + req, err = http.NewRequest("GET", "/v1/jobs", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + s.Server.parseRegion(req, ®ion) + if region != "region1" { + t.Fatalf("bad %s", region) + } +} + +// assertIndex tests that X-Nomad-Index is set and non-zero +func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { + header := resp.Header().Get("X-Nomad-Index") + if header == "" || header == "0" { + t.Fatalf("Bad: %v", header) + } +} + +// checkIndex is like assertIndex but returns an error +func checkIndex(resp *httptest.ResponseRecorder) error { + header := resp.Header().Get("X-Nomad-Index") + if header == "" || header == "0" { + return fmt.Errorf("Bad: %v", header) + } + return nil +} + +// getIndex parses X-Nomad-Index +func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 { + header := resp.Header().Get("X-Nomad-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + val, err := strconv.Atoi(header) + if err != nil { + t.Fatalf("Bad: %v", header) + } + return uint64(val) +} diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go new file mode 100644 index 000000000..fee60e85f --- /dev/null +++ b/command/agent/job_endpoint.go @@ -0,0 +1,50 @@ +package agent + +import ( + "fmt" + "net/http" + + "github.com/hashicorp/consul/consul/structs" +) + +func (s *HTTPServer) JobsList(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var args structs.RegisterRequest + if err := decodeBody(req, &args, nil); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) + return nil, nil + } + + // Setup the default DC if not provided + if args.Datacenter == "" { + args.Datacenter = s.agent.config.Datacenter + } + + // Forward to the servers + var out struct{} + if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil { + return nil, err + } + return true, nil +} + +func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var args structs.RegisterRequest + if err := decodeBody(req, &args, nil); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) + return nil, nil + } + + // Setup the default DC if not provided + if args.Datacenter == "" { + args.Datacenter = s.agent.config.Datacenter + } + + // Forward to the servers + var out struct{} + if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil { + return nil, err + } + return true, nil +}