9eb2433871
In #15605 we fixed the bug where the presense of "stale" query parameter was mean to imply stale, even if the value of the parameter was "false" or malformed. In parsing, we missed the case where the slice of values would be nil which lead to a failing test case that was missed because CI didn't run against the original PR.
975 lines
32 KiB
Go
975 lines
32 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
assetfs "github.com/elazarl/go-bindata-assetfs"
|
|
"github.com/gorilla/handlers"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/hashicorp/go-connlimit"
|
|
log "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/rs/cors"
|
|
"golang.org/x/time/rate"
|
|
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/helper/noxssrw"
|
|
"github.com/hashicorp/nomad/helper/tlsutil"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// ErrInvalidMethod is used if the HTTP method is not supported
|
|
ErrInvalidMethod = "Invalid method"
|
|
|
|
// ErrEntOnly is the error returned if accessing an enterprise only
|
|
// endpoint
|
|
ErrEntOnly = "Nomad Enterprise only endpoint"
|
|
|
|
// ErrServerOnly is the error text returned if accessing a server only
|
|
// endpoint
|
|
ErrServerOnly = "Server only endpoint"
|
|
|
|
// ContextKeyReqID is a unique ID for a given request
|
|
ContextKeyReqID = "requestID"
|
|
|
|
// MissingRequestID is a placeholder if we cannot retrieve a request
|
|
// UUID from context
|
|
MissingRequestID = "<missing request id>"
|
|
)
|
|
|
|
var (
|
|
// Set to false by stub_asset if the ui build tag isn't enabled
|
|
uiEnabled = true
|
|
|
|
// Displayed when ui is disabled, but overridden if the ui build
|
|
// tag isn't enabled
|
|
stubHTML = "<html><p>Nomad UI is disabled</p></html>"
|
|
|
|
// allowCORSWithMethods sets permissive CORS headers for a handler, used by
|
|
// wrapCORS and wrapCORSWithMethods
|
|
allowCORSWithMethods = func(methods ...string) *cors.Cors {
|
|
return cors.New(cors.Options{
|
|
AllowedOrigins: []string{"*"},
|
|
AllowedMethods: methods,
|
|
AllowedHeaders: []string{"*"},
|
|
AllowCredentials: true,
|
|
})
|
|
}
|
|
)
|
|
|
|
type handlerFn func(resp http.ResponseWriter, req *http.Request) (interface{}, error)
|
|
type handlerByteFn func(resp http.ResponseWriter, req *http.Request) ([]byte, error)
|
|
|
|
// HTTPServer is used to wrap an Agent and expose it over an HTTP interface
|
|
type HTTPServer struct {
|
|
agent *Agent
|
|
mux *http.ServeMux
|
|
listener net.Listener
|
|
listenerCh chan struct{}
|
|
logger log.Logger
|
|
Addr string
|
|
|
|
wsUpgrader *websocket.Upgrader
|
|
}
|
|
|
|
// NewHTTPServers starts an HTTP server for every address.http configured in
|
|
// the agent.
|
|
func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
|
|
var srvs []*HTTPServer
|
|
var serverInitializationErrors error
|
|
|
|
// Get connection handshake timeout limit
|
|
handshakeTimeout, err := time.ParseDuration(config.Limits.HTTPSHandshakeTimeout)
|
|
if err != nil {
|
|
return srvs, fmt.Errorf("error parsing https_handshake_timeout: %v", err)
|
|
} else if handshakeTimeout < 0 {
|
|
return srvs, fmt.Errorf("https_handshake_timeout must be >= 0")
|
|
}
|
|
|
|
// Get max connection limit
|
|
maxConns := 0
|
|
if mc := config.Limits.HTTPMaxConnsPerClient; mc != nil {
|
|
maxConns = *mc
|
|
}
|
|
if maxConns < 0 {
|
|
return srvs, fmt.Errorf("http_max_conns_per_client must be >= 0")
|
|
}
|
|
|
|
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, config.TLSConfig.VerifyHTTPSClient, true)
|
|
if err != nil && config.TLSConfig.EnableHTTP {
|
|
return srvs, fmt.Errorf("failed to initialize HTTP server TLS configuration: %s", err)
|
|
}
|
|
|
|
wsUpgrader := &websocket.Upgrader{
|
|
ReadBufferSize: 2048,
|
|
WriteBufferSize: 2048,
|
|
}
|
|
|
|
// Start the listener
|
|
for _, addr := range config.normalizedAddrs.HTTP {
|
|
lnAddr, err := net.ResolveTCPAddr("tcp", addr)
|
|
if err != nil {
|
|
serverInitializationErrors = multierror.Append(serverInitializationErrors, err)
|
|
continue
|
|
}
|
|
ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port)
|
|
if err != nil {
|
|
serverInitializationErrors = multierror.Append(serverInitializationErrors, fmt.Errorf("failed to start HTTP listener: %v", err))
|
|
continue
|
|
}
|
|
|
|
// If TLS is enabled, wrap the listener with a TLS listener
|
|
if config.TLSConfig.EnableHTTP {
|
|
tlsConfig, err := tlsConf.IncomingTLSConfig()
|
|
if err != nil {
|
|
serverInitializationErrors = multierror.Append(serverInitializationErrors, err)
|
|
continue
|
|
}
|
|
ln = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
|
|
}
|
|
|
|
// Create the server
|
|
srv := &HTTPServer{
|
|
agent: agent,
|
|
mux: http.NewServeMux(),
|
|
listener: ln,
|
|
listenerCh: make(chan struct{}),
|
|
logger: agent.httpLogger,
|
|
Addr: ln.Addr().String(),
|
|
wsUpgrader: wsUpgrader,
|
|
}
|
|
srv.registerHandlers(config.EnableDebug)
|
|
|
|
// Create HTTP server with timeouts
|
|
httpServer := http.Server{
|
|
Addr: srv.Addr,
|
|
Handler: handlers.CompressHandler(srv.mux),
|
|
ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns, srv.logger),
|
|
ErrorLog: newHTTPServerLogger(srv.logger),
|
|
}
|
|
|
|
go func() {
|
|
defer close(srv.listenerCh)
|
|
httpServer.Serve(ln)
|
|
}()
|
|
|
|
srvs = append(srvs, srv)
|
|
}
|
|
|
|
// This HTTP server is only create when running in client mode, otherwise
|
|
// the builtinDialer and builtinListener will be nil.
|
|
if agent.builtinDialer != nil && agent.builtinListener != nil {
|
|
srv := &HTTPServer{
|
|
agent: agent,
|
|
mux: http.NewServeMux(),
|
|
listener: agent.builtinListener,
|
|
listenerCh: make(chan struct{}),
|
|
logger: agent.httpLogger,
|
|
Addr: "builtin",
|
|
wsUpgrader: wsUpgrader,
|
|
}
|
|
|
|
srv.registerHandlers(config.EnableDebug)
|
|
|
|
httpServer := http.Server{
|
|
Addr: srv.Addr,
|
|
Handler: srv.mux,
|
|
ErrorLog: newHTTPServerLogger(srv.logger),
|
|
}
|
|
|
|
go func() {
|
|
defer close(srv.listenerCh)
|
|
httpServer.Serve(agent.builtinListener)
|
|
}()
|
|
|
|
srvs = append(srvs, srv)
|
|
}
|
|
|
|
if serverInitializationErrors != nil {
|
|
for _, srv := range srvs {
|
|
srv.Shutdown()
|
|
}
|
|
}
|
|
|
|
return srvs, serverInitializationErrors
|
|
}
|
|
|
|
// makeConnState returns a ConnState func for use in an http.Server. If
|
|
// isTLS=true and handshakeTimeout>0 then the handshakeTimeout will be applied
|
|
// as a connection deadline to new connections and removed when the connection
|
|
// is active (meaning it has successfully completed the TLS handshake).
|
|
//
|
|
// If limit > 0, a per-address connection limit will be enabled regardless of
|
|
// TLS. If connLimit == 0 there is no connection limit.
|
|
func makeConnState(isTLS bool, handshakeTimeout time.Duration, connLimit int, logger log.Logger) func(conn net.Conn, state http.ConnState) {
|
|
connLimiter := connLimiter(connLimit, logger)
|
|
if !isTLS || handshakeTimeout == 0 {
|
|
if connLimit > 0 {
|
|
// Still return the connection limiter
|
|
return connLimiter
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if connLimit > 0 {
|
|
// Return conn state callback with connection limiting and a
|
|
// handshake timeout.
|
|
|
|
return func(conn net.Conn, state http.ConnState) {
|
|
switch state {
|
|
case http.StateNew:
|
|
// Set deadline to prevent slow send before TLS handshake or first
|
|
// byte of request.
|
|
conn.SetDeadline(time.Now().Add(handshakeTimeout))
|
|
case http.StateActive:
|
|
// Clear read deadline. We should maybe set read timeouts more
|
|
// generally but that's a bigger task as some HTTP endpoints may
|
|
// stream large requests and responses (e.g. snapshot) so we can't
|
|
// set sensible blanket timeouts here.
|
|
conn.SetDeadline(time.Time{})
|
|
}
|
|
|
|
// Call connection limiter
|
|
connLimiter(conn, state)
|
|
}
|
|
}
|
|
|
|
// Return conn state callback with just a handshake timeout
|
|
// (connection limiting disabled).
|
|
return func(conn net.Conn, state http.ConnState) {
|
|
switch state {
|
|
case http.StateNew:
|
|
// Set deadline to prevent slow send before TLS handshake or first
|
|
// byte of request.
|
|
conn.SetDeadline(time.Now().Add(handshakeTimeout))
|
|
case http.StateActive:
|
|
// Clear read deadline. We should maybe set read timeouts more
|
|
// generally but that's a bigger task as some HTTP endpoints may
|
|
// stream large requests and responses (e.g. snapshot) so we can't
|
|
// set sensible blanket timeouts here.
|
|
conn.SetDeadline(time.Time{})
|
|
}
|
|
}
|
|
}
|
|
|
|
// connLimiter returns a connection-limiter function with a rate-limited 429-response error handler.
|
|
// The rate-limit prevents the TLS handshake necessary to write the HTTP response
|
|
// from consuming too many server resources.
|
|
func connLimiter(connLimit int, logger log.Logger) func(conn net.Conn, state http.ConnState) {
|
|
// Global rate-limit of 10 responses per second with a 100-response burst.
|
|
limiter := rate.NewLimiter(10, 100)
|
|
|
|
tooManyConnsMsg := "Your IP is issuing too many concurrent connections, please rate limit your calls\n"
|
|
tooManyRequestsResponse := []byte(fmt.Sprintf("HTTP/1.1 429 Too Many Requests\r\n"+
|
|
"Content-Type: text/plain\r\n"+
|
|
"Content-Length: %d\r\n"+
|
|
"Connection: close\r\n\r\n%s", len(tooManyConnsMsg), tooManyConnsMsg))
|
|
return connlimit.NewLimiter(connlimit.Config{
|
|
MaxConnsPerClientIP: connLimit,
|
|
}).HTTPConnStateFuncWithErrorHandler(func(err error, conn net.Conn) {
|
|
if err == connlimit.ErrPerClientIPLimitReached {
|
|
metrics.IncrCounter([]string{"nomad", "agent", "http", "exceeded"}, 1)
|
|
if n := limiter.Reserve(); n.Delay() == 0 {
|
|
logger.Warn("Too many concurrent connections", "address", conn.RemoteAddr().String(), "limit", connLimit)
|
|
conn.SetDeadline(time.Now().Add(10 * time.Millisecond))
|
|
conn.Write(tooManyRequestsResponse)
|
|
} else {
|
|
n.Cancel()
|
|
}
|
|
}
|
|
conn.Close()
|
|
})
|
|
}
|
|
|
|
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
|
|
// connections. It's used by NewHttpServer so
|
|
// dead TCP connections eventually go away.
|
|
type tcpKeepAliveListener struct {
|
|
*net.TCPListener
|
|
}
|
|
|
|
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
|
|
tc, err := ln.AcceptTCP()
|
|
if err != nil {
|
|
return
|
|
}
|
|
tc.SetKeepAlive(true)
|
|
tc.SetKeepAlivePeriod(30 * time.Second)
|
|
return tc, nil
|
|
}
|
|
|
|
// Shutdown is used to shutdown the HTTP server
|
|
func (s *HTTPServer) Shutdown() {
|
|
if s != nil {
|
|
s.logger.Debug("shutting down http server")
|
|
s.listener.Close()
|
|
<-s.listenerCh // block until http.Serve has returned.
|
|
}
|
|
}
|
|
|
|
// ResolveToken extracts the ACL token secret ID from the request and
|
|
// translates it into an ACL object. Returns nil if ACLs are disabled.
|
|
func (s *HTTPServer) ResolveToken(req *http.Request) (*acl.ACL, error) {
|
|
var secret string
|
|
s.parseToken(req, &secret)
|
|
|
|
var aclObj *acl.ACL
|
|
var err error
|
|
|
|
if srv := s.agent.Server(); srv != nil {
|
|
aclObj, err = srv.ResolveToken(secret)
|
|
} else {
|
|
// Not a Server, so use the Client for token resolution. Note
|
|
// this gets forwarded to a server with AllowStale = true if
|
|
// the local ACL cache TTL has expired (30s by default)
|
|
aclObj, err = s.agent.Client().ResolveToken(secret)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to resolve ACL token: %v", err)
|
|
}
|
|
|
|
return aclObj, nil
|
|
}
|
|
|
|
// registerHandlers is used to attach our handlers to the mux
|
|
func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|
s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest))
|
|
s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest))
|
|
s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/nodes", s.wrap(s.NodesRequest))
|
|
s.mux.HandleFunc("/v1/node/", s.wrap(s.NodeSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/allocations", s.wrap(s.AllocsRequest))
|
|
s.mux.HandleFunc("/v1/allocation/", s.wrap(s.AllocSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/evaluations", s.wrap(s.EvalsRequest))
|
|
s.mux.HandleFunc("/v1/evaluations/count", s.wrap(s.EvalsCountRequest))
|
|
s.mux.HandleFunc("/v1/evaluation/", s.wrap(s.EvalSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/deployments", s.wrap(s.DeploymentsRequest))
|
|
s.mux.HandleFunc("/v1/deployment/", s.wrap(s.DeploymentSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/volumes", s.wrap(s.CSIVolumesRequest))
|
|
s.mux.HandleFunc("/v1/volumes/external", s.wrap(s.CSIExternalVolumesRequest))
|
|
s.mux.HandleFunc("/v1/volumes/snapshot", s.wrap(s.CSISnapshotsRequest))
|
|
s.mux.HandleFunc("/v1/volume/csi/", s.wrap(s.CSIVolumeSpecificRequest))
|
|
s.mux.HandleFunc("/v1/plugins", s.wrap(s.CSIPluginsRequest))
|
|
s.mux.HandleFunc("/v1/plugin/csi/", s.wrap(s.CSIPluginSpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/acl/policies", s.wrap(s.ACLPoliciesRequest))
|
|
s.mux.HandleFunc("/v1/acl/policy/", s.wrap(s.ACLPolicySpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/acl/token/onetime", s.wrap(s.UpsertOneTimeToken))
|
|
s.mux.HandleFunc("/v1/acl/token/onetime/exchange", s.wrap(s.ExchangeOneTimeToken))
|
|
s.mux.HandleFunc("/v1/acl/bootstrap", s.wrap(s.ACLTokenBootstrap))
|
|
s.mux.HandleFunc("/v1/acl/tokens", s.wrap(s.ACLTokensRequest))
|
|
s.mux.HandleFunc("/v1/acl/token", s.wrap(s.ACLTokenSpecificRequest))
|
|
s.mux.HandleFunc("/v1/acl/token/", s.wrap(s.ACLTokenSpecificRequest))
|
|
|
|
// Register our ACL role handlers.
|
|
s.mux.HandleFunc("/v1/acl/roles", s.wrap(s.ACLRoleListRequest))
|
|
s.mux.HandleFunc("/v1/acl/role", s.wrap(s.ACLRoleRequest))
|
|
s.mux.HandleFunc("/v1/acl/role/", s.wrap(s.ACLRoleSpecificRequest))
|
|
|
|
// Register our ACL auth-method handlers.
|
|
s.mux.HandleFunc("/v1/acl/auth-methods", s.wrap(s.ACLAuthMethodListRequest))
|
|
s.mux.HandleFunc("/v1/acl/auth-method", s.wrap(s.ACLAuthMethodRequest))
|
|
s.mux.HandleFunc("/v1/acl/auth-method/", s.wrap(s.ACLAuthMethodSpecificRequest))
|
|
|
|
// Register our ACL binding rule handlers.
|
|
s.mux.HandleFunc("/v1/acl/binding-rules", s.wrap(s.ACLBindingRuleListRequest))
|
|
s.mux.HandleFunc("/v1/acl/binding-rule", s.wrap(s.ACLBindingRuleRequest))
|
|
s.mux.HandleFunc("/v1/acl/binding-rule/", s.wrap(s.ACLBindingRuleSpecificRequest))
|
|
|
|
s.mux.Handle("/v1/client/fs/", wrapCORS(s.wrap(s.FsRequest)))
|
|
s.mux.HandleFunc("/v1/client/gc", s.wrap(s.ClientGCRequest))
|
|
s.mux.Handle("/v1/client/stats", wrapCORS(s.wrap(s.ClientStatsRequest)))
|
|
s.mux.Handle("/v1/client/allocation/", wrapCORS(s.wrap(s.ClientAllocRequest)))
|
|
|
|
s.mux.HandleFunc("/v1/agent/self", s.wrap(s.AgentSelfRequest))
|
|
s.mux.HandleFunc("/v1/agent/join", s.wrap(s.AgentJoinRequest))
|
|
s.mux.HandleFunc("/v1/agent/members", s.wrap(s.AgentMembersRequest))
|
|
s.mux.HandleFunc("/v1/agent/force-leave", s.wrap(s.AgentForceLeaveRequest))
|
|
s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest))
|
|
s.mux.HandleFunc("/v1/agent/schedulers", s.wrap(s.AgentSchedulerWorkerInfoRequest))
|
|
s.mux.HandleFunc("/v1/agent/schedulers/config", s.wrap(s.AgentSchedulerWorkerConfigRequest))
|
|
s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest))
|
|
s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest))
|
|
s.mux.HandleFunc("/v1/agent/host", s.wrap(s.AgentHostRequest))
|
|
|
|
// Register our service registration handlers.
|
|
s.mux.HandleFunc("/v1/services", s.wrap(s.ServiceRegistrationListRequest))
|
|
s.mux.HandleFunc("/v1/service/", s.wrap(s.ServiceRegistrationRequest))
|
|
|
|
// Monitor is *not* an untrusted endpoint despite the log contents
|
|
// potentially containing unsanitized user input. Monitor, like
|
|
// "/v1/client/fs/logs", explicitly sets a "text/plain" or
|
|
// "application/json" Content-Type depending on the ?plain= query
|
|
// parameter.
|
|
s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor))
|
|
|
|
s.mux.HandleFunc("/v1/agent/pprof/", s.wrapNonJSON(s.AgentPprofRequest))
|
|
|
|
s.mux.HandleFunc("/v1/metrics", s.wrap(s.MetricsRequest))
|
|
|
|
s.mux.HandleFunc("/v1/validate/job", s.wrap(s.ValidateJobRequest))
|
|
|
|
s.mux.HandleFunc("/v1/regions", s.wrap(s.RegionListRequest))
|
|
|
|
s.mux.HandleFunc("/v1/scaling/policies", s.wrap(s.ScalingPoliciesRequest))
|
|
s.mux.HandleFunc("/v1/scaling/policy/", s.wrap(s.ScalingPolicySpecificRequest))
|
|
|
|
s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest))
|
|
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))
|
|
|
|
s.mux.HandleFunc("/v1/search/fuzzy", s.wrap(s.FuzzySearchRequest))
|
|
s.mux.HandleFunc("/v1/search", s.wrap(s.SearchRequest))
|
|
s.mux.HandleFunc("/v1/operator/license", s.wrap(s.LicenseRequest))
|
|
s.mux.HandleFunc("/v1/operator/raft/", s.wrap(s.OperatorRequest))
|
|
s.mux.HandleFunc("/v1/operator/keyring/", s.wrap(s.KeyringRequest))
|
|
s.mux.HandleFunc("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
|
|
s.mux.HandleFunc("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
|
|
s.mux.HandleFunc("/v1/operator/snapshot", s.wrap(s.SnapshotRequest))
|
|
|
|
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
|
|
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
|
|
|
|
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))
|
|
|
|
s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))
|
|
|
|
s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest))
|
|
s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest))
|
|
s.mux.HandleFunc("/v1/namespace/", s.wrap(s.NamespaceSpecificRequest))
|
|
|
|
s.mux.Handle("/v1/vars", wrapCORS(s.wrap(s.VariablesListRequest)))
|
|
s.mux.Handle("/v1/var/", wrapCORSWithAllowedMethods(s.wrap(s.VariableSpecificRequest), "HEAD", "GET", "PUT", "DELETE"))
|
|
|
|
uiConfigEnabled := s.agent.config.UI != nil && s.agent.config.UI.Enabled
|
|
|
|
if uiEnabled && uiConfigEnabled {
|
|
s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
|
|
s.logger.Debug("UI is enabled")
|
|
} else {
|
|
// Write the stubHTML
|
|
s.mux.HandleFunc("/ui/", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Write([]byte(stubHTML))
|
|
})
|
|
if uiEnabled && !uiConfigEnabled {
|
|
s.logger.Warn("UI is disabled")
|
|
} else {
|
|
s.logger.Debug("UI is disabled in this build")
|
|
}
|
|
}
|
|
s.mux.Handle("/", s.handleRootFallthrough())
|
|
|
|
if enableDebug {
|
|
if !s.agent.config.DevMode {
|
|
s.logger.Warn("enable_debug is set to true. This is insecure and should not be enabled in production")
|
|
}
|
|
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
|
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
}
|
|
|
|
// Register enterprise endpoints.
|
|
s.registerEnterpriseHandlers()
|
|
}
|
|
|
|
// HTTPCodedError is used to provide the HTTP error code
|
|
type HTTPCodedError interface {
|
|
error
|
|
Code() int
|
|
}
|
|
|
|
type UIAssetWrapper struct {
|
|
FileSystem *assetfs.AssetFS
|
|
}
|
|
|
|
func (fs *UIAssetWrapper) Open(name string) (http.File, error) {
|
|
if file, err := fs.FileSystem.Open(name); err == nil {
|
|
return file, nil
|
|
} else {
|
|
// serve index.html instead of 404ing
|
|
if err == os.ErrNotExist {
|
|
return fs.FileSystem.Open("index.html")
|
|
}
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func CodedError(c int, s string) HTTPCodedError {
|
|
return &codedError{s, c}
|
|
}
|
|
|
|
type codedError struct {
|
|
s string
|
|
code int
|
|
}
|
|
|
|
func (e *codedError) Error() string {
|
|
return e.s
|
|
}
|
|
|
|
func (e *codedError) Code() int {
|
|
return e.code
|
|
}
|
|
|
|
func (s *HTTPServer) handleUI(h http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
header := w.Header()
|
|
header.Add("Content-Security-Policy", "default-src 'none'; connect-src *; img-src 'self' data:; script-src 'self'; style-src 'self' 'unsafe-inline'; form-action 'none'; frame-ancestors 'none'")
|
|
h.ServeHTTP(w, req)
|
|
})
|
|
}
|
|
|
|
func (s *HTTPServer) handleRootFallthrough() http.Handler {
|
|
return s.auditHTTPHandler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
if req.URL.Path == "/" {
|
|
url := "/ui/"
|
|
if req.URL.RawQuery != "" {
|
|
url = url + "?" + req.URL.RawQuery
|
|
}
|
|
http.Redirect(w, req, url, 307)
|
|
} else {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
}
|
|
}))
|
|
}
|
|
|
|
func errCodeFromHandler(err error) (int, string) {
|
|
if err == nil {
|
|
return 0, ""
|
|
}
|
|
|
|
code := 500
|
|
errMsg := err.Error()
|
|
if http, ok := err.(HTTPCodedError); ok {
|
|
code = http.Code()
|
|
} else if ecode, emsg, ok := structs.CodeFromRPCCodedErr(err); ok {
|
|
code = ecode
|
|
errMsg = emsg
|
|
} else {
|
|
// RPC errors get wrapped, so manually unwrap by only looking at their suffix
|
|
if strings.HasSuffix(errMsg, structs.ErrPermissionDenied.Error()) {
|
|
errMsg = structs.ErrPermissionDenied.Error()
|
|
code = 403
|
|
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
|
|
errMsg = structs.ErrTokenNotFound.Error()
|
|
code = 403
|
|
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
|
|
errMsg = structs.ErrJobRegistrationDisabled.Error()
|
|
code = 403
|
|
}
|
|
}
|
|
|
|
return code, errMsg
|
|
}
|
|
|
|
// wrap is used to wrap functions to make them more convenient
|
|
func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
|
|
f := func(resp http.ResponseWriter, req *http.Request) {
|
|
setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders)
|
|
// Invoke the handler
|
|
reqURL := req.URL.String()
|
|
start := time.Now()
|
|
defer func() {
|
|
s.logger.Debug("request complete", "method", req.Method, "path", reqURL, "duration", time.Since(start))
|
|
}()
|
|
obj, err := s.auditHandler(handler)(resp, req)
|
|
|
|
// Check for an error
|
|
HAS_ERR:
|
|
if err != nil {
|
|
code := 500
|
|
errMsg := err.Error()
|
|
if http, ok := err.(HTTPCodedError); ok {
|
|
code = http.Code()
|
|
} else if ecode, emsg, ok := structs.CodeFromRPCCodedErr(err); ok {
|
|
code = ecode
|
|
errMsg = emsg
|
|
} else {
|
|
// RPC errors get wrapped, so manually unwrap by only looking at their suffix
|
|
if strings.HasSuffix(errMsg, structs.ErrPermissionDenied.Error()) {
|
|
errMsg = structs.ErrPermissionDenied.Error()
|
|
code = 403
|
|
} else if strings.HasSuffix(errMsg, structs.ErrTokenNotFound.Error()) {
|
|
errMsg = structs.ErrTokenNotFound.Error()
|
|
code = 403
|
|
} else if strings.HasSuffix(errMsg, structs.ErrJobRegistrationDisabled.Error()) {
|
|
errMsg = structs.ErrJobRegistrationDisabled.Error()
|
|
code = 403
|
|
} else if strings.HasSuffix(errMsg, structs.ErrIncompatibleFiltering.Error()) {
|
|
errMsg = structs.ErrIncompatibleFiltering.Error()
|
|
code = 400
|
|
}
|
|
}
|
|
|
|
resp.WriteHeader(code)
|
|
resp.Write([]byte(errMsg))
|
|
if isAPIClientError(code) {
|
|
s.logger.Debug("request failed", "method", req.Method, "path", reqURL, "error", err, "code", code)
|
|
} else {
|
|
s.logger.Error("request failed", "method", req.Method, "path", reqURL, "error", err, "code", code)
|
|
}
|
|
return
|
|
}
|
|
|
|
prettyPrint := false
|
|
if v, ok := req.URL.Query()["pretty"]; ok {
|
|
if len(v) > 0 && (len(v[0]) == 0 || v[0] != "0") {
|
|
prettyPrint = true
|
|
}
|
|
}
|
|
|
|
// Write out the JSON object
|
|
if obj != nil {
|
|
var buf bytes.Buffer
|
|
if prettyPrint {
|
|
enc := codec.NewEncoder(&buf, structs.JsonHandlePretty)
|
|
err = enc.Encode(obj)
|
|
if err == nil {
|
|
buf.Write([]byte("\n"))
|
|
}
|
|
} else {
|
|
enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions)
|
|
err = enc.Encode(obj)
|
|
}
|
|
if err != nil {
|
|
goto HAS_ERR
|
|
}
|
|
resp.Header().Set("Content-Type", "application/json")
|
|
resp.Write(buf.Bytes())
|
|
}
|
|
}
|
|
return f
|
|
}
|
|
|
|
// wrapNonJSON is used to wrap functions returning non JSON
|
|
// serializeable data to make them more convenient. It is primarily
|
|
// responsible for setting nomad headers and logging.
|
|
// Handler functions are responsible for setting Content-Type Header
|
|
func (s *HTTPServer) wrapNonJSON(handler func(resp http.ResponseWriter, req *http.Request) ([]byte, error)) func(resp http.ResponseWriter, req *http.Request) {
|
|
f := func(resp http.ResponseWriter, req *http.Request) {
|
|
setHeaders(resp, s.agent.config.HTTPAPIResponseHeaders)
|
|
// Invoke the handler
|
|
reqURL := req.URL.String()
|
|
start := time.Now()
|
|
defer func() {
|
|
s.logger.Debug("request complete", "method", req.Method, "path", reqURL, "duration", time.Since(start))
|
|
}()
|
|
obj, err := s.auditNonJSONHandler(handler)(resp, req)
|
|
|
|
// Check for an error
|
|
if err != nil {
|
|
code, errMsg := errCodeFromHandler(err)
|
|
resp.WriteHeader(code)
|
|
resp.Write([]byte(errMsg))
|
|
if isAPIClientError(code) {
|
|
s.logger.Debug("request failed", "method", req.Method, "path", reqURL, "error", err, "code", code)
|
|
} else {
|
|
s.logger.Error("request failed", "method", req.Method, "path", reqURL, "error", err, "code", code)
|
|
}
|
|
return
|
|
}
|
|
|
|
// write response
|
|
if obj != nil {
|
|
resp.Write(obj)
|
|
}
|
|
}
|
|
return f
|
|
}
|
|
|
|
// isAPIClientError returns true if the passed http code represents a client error
|
|
func isAPIClientError(code int) bool {
|
|
return 400 <= code && code <= 499
|
|
}
|
|
|
|
// decodeBody is used to decode a JSON request body
|
|
func decodeBody(req *http.Request, out interface{}) error {
|
|
|
|
if req.Body == http.NoBody {
|
|
return errors.New("Request body is empty")
|
|
}
|
|
|
|
dec := json.NewDecoder(req.Body)
|
|
return dec.Decode(&out)
|
|
}
|
|
|
|
// setIndex is used to set the index response header
|
|
func setIndex(resp http.ResponseWriter, index uint64) {
|
|
resp.Header().Set("X-Nomad-Index", strconv.FormatUint(index, 10))
|
|
}
|
|
|
|
// setKnownLeader is used to set the known leader header
|
|
func setKnownLeader(resp http.ResponseWriter, known bool) {
|
|
s := "true"
|
|
if !known {
|
|
s = "false"
|
|
}
|
|
resp.Header().Set("X-Nomad-KnownLeader", s)
|
|
}
|
|
|
|
// setLastContact is used to set the last contact header
|
|
func setLastContact(resp http.ResponseWriter, last time.Duration) {
|
|
lastMsec := uint64(last / time.Millisecond)
|
|
resp.Header().Set("X-Nomad-LastContact", strconv.FormatUint(lastMsec, 10))
|
|
}
|
|
|
|
// setNextToken is used to set the next token header for pagination
|
|
func setNextToken(resp http.ResponseWriter, nextToken string) {
|
|
if nextToken != "" {
|
|
resp.Header().Set("X-Nomad-NextToken", nextToken)
|
|
}
|
|
}
|
|
|
|
// setMeta is used to set the query response meta data
|
|
func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
|
|
setIndex(resp, m.Index)
|
|
setLastContact(resp, m.LastContact)
|
|
setKnownLeader(resp, m.KnownLeader)
|
|
setNextToken(resp, m.NextToken)
|
|
}
|
|
|
|
// setHeaders is used to set canonical response header fields
|
|
func setHeaders(resp http.ResponseWriter, headers map[string]string) {
|
|
for field, value := range headers {
|
|
resp.Header().Set(field, value)
|
|
}
|
|
}
|
|
|
|
// parseWait is used to parse the ?wait and ?index query params
|
|
// Returns true on error
|
|
func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
|
|
query := req.URL.Query()
|
|
if wait := query.Get("wait"); wait != "" {
|
|
dur, err := time.ParseDuration(wait)
|
|
if err != nil {
|
|
resp.WriteHeader(400)
|
|
resp.Write([]byte("Invalid wait time"))
|
|
return true
|
|
}
|
|
b.MaxQueryTime = dur
|
|
}
|
|
if idx := query.Get("index"); idx != "" {
|
|
index, err := strconv.ParseUint(idx, 10, 64)
|
|
if err != nil {
|
|
resp.WriteHeader(400)
|
|
resp.Write([]byte("Invalid index"))
|
|
return true
|
|
}
|
|
b.MinQueryIndex = index
|
|
}
|
|
return false
|
|
}
|
|
|
|
// parseConsistency is used to parse the ?stale query params.
|
|
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) {
|
|
query := req.URL.Query()
|
|
if staleVal, ok := query["stale"]; ok {
|
|
if len(staleVal) == 0 || staleVal[0] == "" {
|
|
b.AllowStale = true
|
|
return
|
|
}
|
|
staleQuery, err := strconv.ParseBool(staleVal[0])
|
|
if err != nil {
|
|
resp.WriteHeader(400)
|
|
_, _ = resp.Write([]byte(fmt.Sprintf("Expect `true` or `false` for `stale` query string parameter, got %s", staleVal[0])))
|
|
return
|
|
}
|
|
b.AllowStale = staleQuery
|
|
}
|
|
}
|
|
|
|
// parsePrefix is used to parse the ?prefix query param
|
|
func parsePrefix(req *http.Request, b *structs.QueryOptions) {
|
|
query := req.URL.Query()
|
|
if prefix := query.Get("prefix"); prefix != "" {
|
|
b.Prefix = prefix
|
|
}
|
|
}
|
|
|
|
// parseRegion is used to parse the ?region query param
|
|
func (s *HTTPServer) parseRegion(req *http.Request, r *string) {
|
|
if other := req.URL.Query().Get("region"); other != "" {
|
|
*r = other
|
|
} else if *r == "" {
|
|
*r = s.agent.config.Region
|
|
}
|
|
}
|
|
|
|
// parseNamespace is used to parse the ?namespace parameter
|
|
func parseNamespace(req *http.Request, n *string) {
|
|
if other := req.URL.Query().Get("namespace"); other != "" {
|
|
*n = other
|
|
} else if *n == "" {
|
|
*n = structs.DefaultNamespace
|
|
}
|
|
}
|
|
|
|
// parseIdempotencyToken is used to parse the ?idempotency_token parameter
|
|
func parseIdempotencyToken(req *http.Request, n *string) {
|
|
if idempotencyToken := req.URL.Query().Get("idempotency_token"); idempotencyToken != "" {
|
|
*n = idempotencyToken
|
|
}
|
|
}
|
|
|
|
// parseBool parses a query parameter to a boolean or returns (nil, nil) if the
|
|
// parameter is not present.
|
|
func parseBool(req *http.Request, field string) (*bool, error) {
|
|
if str := req.URL.Query().Get(field); str != "" {
|
|
param, err := strconv.ParseBool(str)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", field, str, err)
|
|
}
|
|
return ¶m, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// parseInt parses a query parameter to a int or returns (nil, nil) if the
|
|
// parameter is not present.
|
|
func parseInt(req *http.Request, field string) (*int, error) {
|
|
if str := req.URL.Query().Get(field); str != "" {
|
|
param, err := strconv.Atoi(str)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a int: %v", field, str, err)
|
|
}
|
|
return ¶m, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// parseToken is used to parse the X-Nomad-Token param
|
|
func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
|
if other := req.Header.Get("X-Nomad-Token"); other != "" {
|
|
*token = other
|
|
return
|
|
}
|
|
|
|
if other := req.Header.Get("Authorization"); other != "" {
|
|
// HTTP Authorization headers are in the format: <Scheme>[SPACE]<Value>
|
|
// Ref. https://tools.ietf.org/html/rfc7236#section-3
|
|
parts := strings.Split(other, " ")
|
|
|
|
// Authorization Header is invalid if containing 1 or 0 parts, e.g.:
|
|
// "" || "<Scheme><Value>" || "<Scheme>" || "<Value>"
|
|
if len(parts) > 1 {
|
|
scheme := parts[0]
|
|
// Everything after "<Scheme>" is "<Value>", trimmed
|
|
value := strings.TrimSpace(strings.Join(parts[1:], " "))
|
|
|
|
// <Scheme> must be "Bearer"
|
|
if strings.ToLower(scheme) == "bearer" {
|
|
// Since Bearer tokens shouldn't contain spaces (rfc6750#section-2.1)
|
|
// "value" is tokenized, only the first item is used
|
|
*token = strings.TrimSpace(strings.Split(value, " ")[0])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// parse is a convenience method for endpoints that need to parse multiple flags
|
|
// It sets r to the region and b to the QueryOptions in req
|
|
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, r *string, b *structs.QueryOptions) bool {
|
|
s.parseRegion(req, r)
|
|
s.parseToken(req, &b.AuthToken)
|
|
parseConsistency(resp, req, b)
|
|
parsePrefix(req, b)
|
|
parseNamespace(req, &b.Namespace)
|
|
parsePagination(req, b)
|
|
parseFilter(req, b)
|
|
parseReverse(req, b)
|
|
return parseWait(resp, req, b)
|
|
}
|
|
|
|
// parsePagination parses the pagination fields for QueryOptions
|
|
func parsePagination(req *http.Request, b *structs.QueryOptions) {
|
|
query := req.URL.Query()
|
|
rawPerPage := query.Get("per_page")
|
|
if rawPerPage != "" {
|
|
perPage, err := strconv.ParseInt(rawPerPage, 10, 32)
|
|
if err == nil {
|
|
b.PerPage = int32(perPage)
|
|
}
|
|
}
|
|
|
|
b.NextToken = query.Get("next_token")
|
|
}
|
|
|
|
// parseFilter parses the filter query parameter for QueryOptions
|
|
func parseFilter(req *http.Request, b *structs.QueryOptions) {
|
|
query := req.URL.Query()
|
|
if filter := query.Get("filter"); filter != "" {
|
|
b.Filter = filter
|
|
}
|
|
}
|
|
|
|
// parseReverse parses the reverse query parameter for QueryOptions
|
|
func parseReverse(req *http.Request, b *structs.QueryOptions) {
|
|
query := req.URL.Query()
|
|
b.Reverse = query.Get("reverse") == "true"
|
|
}
|
|
|
|
// parseWriteRequest is a convenience method for endpoints that need to parse a
|
|
// write request.
|
|
func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteRequest) {
|
|
parseNamespace(req, &w.Namespace)
|
|
s.parseToken(req, &w.AuthToken)
|
|
s.parseRegion(req, &w.Region)
|
|
parseIdempotencyToken(req, &w.IdempotencyToken)
|
|
}
|
|
|
|
// wrapUntrustedContent wraps handlers in a http.ResponseWriter that prevents
|
|
// setting Content-Types that a browser may render (eg text/html). Any API that
|
|
// returns service-generated content (eg /v1/client/fs/cat) must be wrapped.
|
|
func (s *HTTPServer) wrapUntrustedContent(handler handlerFn) handlerFn {
|
|
return func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
resp, closeWriter := noxssrw.NewResponseWriter(resp)
|
|
defer func() {
|
|
if _, err := closeWriter(); err != nil {
|
|
// Can't write an error response at this point so just
|
|
// log. s.wrap does not even log when resp.Write fails,
|
|
// so log at low level.
|
|
s.logger.Debug("error writing HTTP response", "error", err,
|
|
"method", req.Method, "path", req.URL.String())
|
|
}
|
|
}()
|
|
|
|
// Call the wrapped handler
|
|
return handler(resp, req)
|
|
}
|
|
}
|
|
|
|
// wrapCORS wraps a HandlerFunc in allowCORS with read ("HEAD", "GET") methods
|
|
// and returns a http.Handler
|
|
func wrapCORS(f func(http.ResponseWriter, *http.Request)) http.Handler {
|
|
return wrapCORSWithAllowedMethods(f, "HEAD", "GET")
|
|
}
|
|
|
|
// wrapCORSWithAllowedMethods wraps a HandlerFunc in an allowCORS with the given
|
|
// method list and returns a http.Handler
|
|
func wrapCORSWithAllowedMethods(f func(http.ResponseWriter, *http.Request), methods ...string) http.Handler {
|
|
return allowCORSWithMethods(methods...).Handler(http.HandlerFunc(f))
|
|
}
|