agent: support multiple http address in addresses.http (#11582)

This commit is contained in:
Kevin Schoonover 2022-01-03 06:33:53 -08:00 committed by GitHub
parent 395628efe1
commit 5d9a506bc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 275 additions and 90 deletions

View File

@ -935,7 +935,7 @@ func (a *Agent) setupClient() error {
// If no HTTP health check can be supported nil is returned.
func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck {
// Resolve the http check address
httpCheckAddr := a.config.normalizedAddrs.HTTP
httpCheckAddr := a.config.normalizedAddrs.HTTP[0]
if *a.config.Consul.ChecksUseAdvertise {
httpCheckAddr = a.config.AdvertiseAddrs.HTTP
}

View File

@ -105,7 +105,7 @@ func TestAgent_ServerConfig(t *testing.T) {
require.Equal(t, "127.0.0.2", conf.Addresses.HTTP)
require.Equal(t, "127.0.0.2", conf.Addresses.RPC)
require.Equal(t, "127.0.0.2", conf.Addresses.Serf)
require.Equal(t, "127.0.0.2:4646", conf.normalizedAddrs.HTTP)
require.Equal(t, []string{"127.0.0.2:4646"}, conf.normalizedAddrs.HTTP)
require.Equal(t, "127.0.0.2:4003", conf.normalizedAddrs.RPC)
require.Equal(t, "127.0.0.2:4004", conf.normalizedAddrs.Serf)
require.Equal(t, "10.0.0.10:4646", conf.AdvertiseAddrs.HTTP)
@ -166,7 +166,7 @@ func TestAgent_ServerConfig(t *testing.T) {
require.Equal(t, "127.0.0.3", conf.Addresses.HTTP)
require.Equal(t, "127.0.0.3", conf.Addresses.RPC)
require.Equal(t, "127.0.0.3", conf.Addresses.Serf)
require.Equal(t, "127.0.0.3:4646", conf.normalizedAddrs.HTTP)
require.Equal(t, []string{"127.0.0.3:4646"}, conf.normalizedAddrs.HTTP)
require.Equal(t, "127.0.0.3:4647", conf.normalizedAddrs.RPC)
require.Equal(t, "127.0.0.3:4648", conf.normalizedAddrs.Serf)
@ -563,7 +563,7 @@ func TestAgent_HTTPCheck(t *testing.T) {
logger: logger,
config: &Config{
AdvertiseAddrs: &AdvertiseAddrs{HTTP: "advertise:4646"},
normalizedAddrs: &Addresses{HTTP: "normalized:4646"},
normalizedAddrs: &NormalizedAddrs{HTTP: []string{"normalized:4646"}},
Consul: &config.ConsulConfig{
ChecksUseAdvertise: helper.BoolToPtr(false),
},
@ -587,7 +587,7 @@ func TestAgent_HTTPCheck(t *testing.T) {
if check.Protocol != "http" {
t.Errorf("expected http proto not: %q", check.Protocol)
}
if expected := a.config.normalizedAddrs.HTTP; check.PortLabel != expected {
if expected := a.config.normalizedAddrs.HTTP[0]; check.PortLabel != expected {
t.Errorf("expected normalized addr not %q", check.PortLabel)
}
})

View File

@ -51,7 +51,7 @@ type Command struct {
args []string
agent *Agent
httpServer *HTTPServer
httpServers []*HTTPServer
logFilter *logutils.LevelFilter
logOutput io.Writer
retryJoinErrCh chan struct{}
@ -500,13 +500,13 @@ func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOu
c.agent = agent
// Setup the HTTP server
http, err := NewHTTPServer(agent, config)
httpServers, err := NewHTTPServers(agent, config)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err))
return err
}
c.httpServer = http
c.httpServers = httpServers
// If DisableUpdateCheck is not enabled, set up update checking
// (DisableUpdateCheck is false by default)
@ -700,8 +700,10 @@ func (c *Command) Run(args []string) int {
// Shutdown the http server at the end, to ease debugging if
// the agent takes long to shutdown
if c.httpServer != nil {
c.httpServer.Shutdown()
if len(c.httpServers) > 0 {
for _, srv := range c.httpServers {
srv.Shutdown()
}
}
}()
@ -902,13 +904,15 @@ WAIT:
func (c *Command) reloadHTTPServer() error {
c.agent.logger.Info("reloading HTTP server with new TLS configuration")
c.httpServer.Shutdown()
for _, srv := range c.httpServers {
srv.Shutdown()
}
http, err := NewHTTPServer(c.agent, c.agent.config)
httpServers, err := NewHTTPServers(c.agent, c.agent.config)
if err != nil {
return err
}
c.httpServer = http
c.httpServers = httpServers
return nil
}

View File

@ -85,7 +85,7 @@ type Config struct {
Addresses *Addresses `hcl:"addresses"`
// normalizedAddr is set to the Address+Port by normalizeAddrs()
normalizedAddrs *Addresses
normalizedAddrs *NormalizedAddrs
// AdvertiseAddrs is used to control the addresses we advertise.
AdvertiseAddrs *AdvertiseAddrs `hcl:"advertise"`
@ -774,6 +774,15 @@ type Addresses struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
type NormalizedAddrs struct {
HTTP []string
RPC string
Serf string
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
@ -1228,13 +1237,13 @@ func (c *Config) normalizeAddrs() error {
c.BindAddr = ipStr
}
addr, err := normalizeBind(c.Addresses.HTTP, c.BindAddr)
httpAddrs, err := normalizeMultipleBind(c.Addresses.HTTP, c.BindAddr)
if err != nil {
return fmt.Errorf("Failed to parse HTTP address: %v", err)
}
c.Addresses.HTTP = addr
c.Addresses.HTTP = strings.Join(httpAddrs, " ")
addr, err = normalizeBind(c.Addresses.RPC, c.BindAddr)
addr, err := normalizeBind(c.Addresses.RPC, c.BindAddr)
if err != nil {
return fmt.Errorf("Failed to parse RPC address: %v", err)
}
@ -1246,13 +1255,13 @@ func (c *Config) normalizeAddrs() error {
}
c.Addresses.Serf = addr
c.normalizedAddrs = &Addresses{
HTTP: net.JoinHostPort(c.Addresses.HTTP, strconv.Itoa(c.Ports.HTTP)),
c.normalizedAddrs = &NormalizedAddrs{
HTTP: joinHostPorts(httpAddrs, strconv.Itoa(c.Ports.HTTP)),
RPC: net.JoinHostPort(c.Addresses.RPC, strconv.Itoa(c.Ports.RPC)),
Serf: net.JoinHostPort(c.Addresses.Serf, strconv.Itoa(c.Ports.Serf)),
}
addr, err = normalizeAdvertise(c.AdvertiseAddrs.HTTP, c.Addresses.HTTP, c.Ports.HTTP, c.DevMode)
addr, err = normalizeAdvertise(c.AdvertiseAddrs.HTTP, httpAddrs[0], c.Ports.HTTP, c.DevMode)
if err != nil {
return fmt.Errorf("Failed to parse HTTP advertise address (%v, %v, %v, %v): %v", c.AdvertiseAddrs.HTTP, c.Addresses.HTTP, c.Ports.HTTP, c.DevMode, err)
}
@ -1335,6 +1344,22 @@ func parseSingleIPTemplate(ipTmpl string) (string, error) {
}
}
// parseMultipleIPTemplate is used as a helper function to parse out a multiple IP
// addresses from a config parameter.
func parseMultipleIPTemplate(ipTmpl string) ([]string, error) {
out, err := template.Parse(ipTmpl)
if err != nil {
return []string{}, fmt.Errorf("Unable to parse address template %q: %v", ipTmpl, err)
}
ips := strings.Split(out, " ")
if len(ips) == 0 {
return []string{}, errors.New("No addresses found, please configure one.")
}
return deduplicateAddrs(ips), nil
}
// normalizeBind returns a normalized bind address.
//
// If addr is set it is used, if not the default bind address is used.
@ -1345,6 +1370,16 @@ func normalizeBind(addr, bind string) (string, error) {
return parseSingleIPTemplate(addr)
}
// normalizeMultipleBind returns normalized bind addresses.
//
// If addr is set it is used, if not the default bind address is used.
func normalizeMultipleBind(addr, bind string) ([]string, error) {
if addr == "" {
return []string{bind}, nil
}
return parseMultipleIPTemplate(addr)
}
// normalizeAdvertise returns a normalized advertise address.
//
// If addr is set, it is used and the default port is appended if no port is
@ -1996,6 +2031,17 @@ func LoadConfigDir(dir string) (*Config, error) {
return result, nil
}
// joinHostPorts joins every addr in addrs with the specified port
func joinHostPorts(addrs []string, port string) []string {
localAddrs := make([]string, len(addrs))
for i, k := range addrs {
localAddrs[i] = net.JoinHostPort(k, port)
}
return localAddrs
}
// isTemporaryFile returns true or false depending on whether the
// provided file name is a temporary file for the following editors:
// emacs or vim.
@ -2004,3 +2050,16 @@ func isTemporaryFile(name string) bool {
strings.HasPrefix(name, ".#") || // emacs
(strings.HasPrefix(name, "#") && strings.HasSuffix(name, "#")) // emacs
}
func deduplicateAddrs(addrs []string) []string {
keys := make(map[string]bool)
list := []string{}
for _, entry := range addrs {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}

View File

@ -747,7 +747,7 @@ func TestConfig_normalizeAddrs_DevMode(t *testing.T) {
t.Fatalf("expected BindAddr 127.0.0.1, got %s", c.BindAddr)
}
if c.normalizedAddrs.HTTP != "127.0.0.1:4646" {
if c.normalizedAddrs.HTTP[0] != "127.0.0.1:4646" {
t.Fatalf("expected HTTP address 127.0.0.1:4646, got %s", c.normalizedAddrs.HTTP)
}
@ -880,6 +880,55 @@ func TestConfig_normalizeAddrs_IPv6Loopback(t *testing.T) {
}
}
// TestConfig_normalizeAddrs_MultipleInterface asserts that normalizeAddrs will
// handle normalizing multiple interfaces in a single protocol.
func TestConfig_normalizeAddrs_MultipleInterfaces(t *testing.T) {
testCases := []struct {
name string
addressConfig *Addresses
expectedNormalizedAddrs *NormalizedAddrs
expectErr bool
}{
{
name: "multiple http addresses",
addressConfig: &Addresses{
HTTP: "127.0.0.1 127.0.0.2",
},
expectedNormalizedAddrs: &NormalizedAddrs{
HTTP: []string{"127.0.0.1:4646", "127.0.0.2:4646"},
RPC: "127.0.0.1:4647",
Serf: "127.0.0.1:4648",
},
expectErr: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := &Config{
BindAddr: "127.0.0.1",
Ports: &Ports{
HTTP: 4646,
RPC: 4647,
Serf: 4648,
},
Addresses: tc.addressConfig,
AdvertiseAddrs: &AdvertiseAddrs{
HTTP: "127.0.0.1",
RPC: "127.0.0.1",
Serf: "127.0.0.1",
},
}
err := c.normalizeAddrs()
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedNormalizedAddrs, c.normalizedAddrs)
})
}
}
func TestConfig_normalizeAddrs(t *testing.T) {
c := &Config{
BindAddr: "169.254.1.5",
@ -1315,3 +1364,26 @@ func TestEventBroker_Parse(t *testing.T) {
require.Equal(20000, *result.EventBufferSize)
}
}
func TestParseMultipleIPTemplates(t *testing.T) {
testCases := []struct {
name string
tmpl string
expectedOut []string
expectErr bool
}{
{
name: "deduplicates same ip",
tmpl: "127.0.0.1 127.0.0.1",
expectedOut: []string{"127.0.0.1"},
expectErr: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
out, err := parseMultipleIPTemplate(tc.tmpl)
require.NoError(t, err)
require.Equal(t, tc.expectedOut, out)
})
}
}

View File

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/go-connlimit"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
"github.com/rs/cors"
"github.com/hashicorp/nomad/helper/noxssrw"
@ -75,64 +76,24 @@ type HTTPServer struct {
wsUpgrader *websocket.Upgrader
}
// NewHTTPServer starts new HTTP server over the agent
func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
// Start the listener
lnAddr, err := net.ResolveTCPAddr("tcp", config.normalizedAddrs.HTTP)
if err != nil {
return nil, err
}
ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port)
if err != nil {
return nil, fmt.Errorf("failed to start HTTP listener: %v", err)
}
// If TLS is enabled, wrap the listener with a TLS listener
if config.TLSConfig.EnableHTTP {
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, config.TLSConfig.VerifyHTTPSClient, true)
if err != nil {
return nil, err
}
tlsConfig, err := tlsConf.IncomingTLSConfig()
if err != nil {
return nil, err
}
ln = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
}
// Create the mux
mux := http.NewServeMux()
wsUpgrader := &websocket.Upgrader{
ReadBufferSize: 2048,
WriteBufferSize: 2048,
}
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: ln,
listenerCh: make(chan struct{}),
logger: agent.httpLogger,
Addr: ln.Addr().String(),
wsUpgrader: wsUpgrader,
}
srv.registerHandlers(config.EnableDebug)
// NewHTTPServers starts an HTTP server for every address.http configured in
// the agent.
func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
var srvs []*HTTPServer
var serverInitializationErrors error
// Handle requests with gzip compression
gzip, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(0))
if err != nil {
return nil, err
return srvs, err
}
// Get connection handshake timeout limit
handshakeTimeout, err := time.ParseDuration(config.Limits.HTTPSHandshakeTimeout)
if err != nil {
return nil, fmt.Errorf("error parsing https_handshake_timeout: %v", err)
return srvs, fmt.Errorf("error parsing https_handshake_timeout: %v", err)
} else if handshakeTimeout < 0 {
return nil, fmt.Errorf("https_handshake_timeout must be >= 0")
return srvs, fmt.Errorf("https_handshake_timeout must be >= 0")
}
// Get max connection limit
@ -141,23 +102,80 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
maxConns = *mc
}
if maxConns < 0 {
return nil, fmt.Errorf("http_max_conns_per_client must be >= 0")
return srvs, fmt.Errorf("http_max_conns_per_client must be >= 0")
}
// Create HTTP server with timeouts
httpServer := http.Server{
Addr: srv.Addr,
Handler: gzip(mux),
ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns),
ErrorLog: newHTTPServerLogger(srv.logger),
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, config.TLSConfig.VerifyHTTPSClient, true)
if err != nil && config.TLSConfig.EnableHTTP {
return srvs, fmt.Errorf("failed to initialize HTTP server TLS configuration: %s", err)
}
go func() {
defer close(srv.listenerCh)
httpServer.Serve(ln)
}()
wsUpgrader := &websocket.Upgrader{
ReadBufferSize: 2048,
WriteBufferSize: 2048,
}
return srv, nil
// Start the listener
for _, addr := range config.normalizedAddrs.HTTP {
// Create the mux
mux := http.NewServeMux()
lnAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
serverInitializationErrors = multierror.Append(serverInitializationErrors, err)
continue
}
ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port)
if err != nil {
serverInitializationErrors = multierror.Append(serverInitializationErrors, fmt.Errorf("failed to start HTTP listener: %v", err))
continue
}
// If TLS is enabled, wrap the listener with a TLS listener
if config.TLSConfig.EnableHTTP {
tlsConfig, err := tlsConf.IncomingTLSConfig()
if err != nil {
serverInitializationErrors = multierror.Append(serverInitializationErrors, err)
continue
}
ln = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
}
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: ln,
listenerCh: make(chan struct{}),
logger: agent.httpLogger,
Addr: ln.Addr().String(),
wsUpgrader: wsUpgrader,
}
srv.registerHandlers(config.EnableDebug)
// Create HTTP server with timeouts
httpServer := http.Server{
Addr: srv.Addr,
Handler: gzip(mux),
ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns),
ErrorLog: newHTTPServerLogger(srv.logger),
}
go func() {
defer close(srv.listenerCh)
httpServer.Serve(ln)
}()
srvs = append(srvs, srv)
}
if serverInitializationErrors != nil {
for _, srv := range srvs {
srv.Shutdown()
}
}
return srvs, serverInitializationErrors
}
// makeConnState returns a ConnState func for use in an http.Server. If
@ -249,7 +267,7 @@ func (s *HTTPServer) Shutdown() {
}
// registerHandlers is used to attach our handlers to the mux
func (s *HTTPServer) registerHandlers(enableDebug bool) {
func (s HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/jobs", s.wrap(s.JobsRequest))
s.mux.HandleFunc("/v1/jobs/parse", s.wrap(s.JobsParseRequest))
s.mux.HandleFunc("/v1/job/", s.wrap(s.JobSpecificRequest))

View File

@ -69,6 +69,24 @@ func BenchmarkHTTPRequests(b *testing.B) {
})
}
func TestMultipleInterfaces(t *testing.T) {
httpIps := []string{"127.0.0.1", "127.0.0.2"}
s := makeHTTPServer(t, func(c *Config) {
c.Addresses.HTTP = strings.Join(httpIps, " ")
c.ACL.Enabled = true
})
defer s.Shutdown()
httpPort := s.ports[0]
for _, ip := range httpIps {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/", ip, httpPort))
assert.Nil(t, err)
assert.Equal(t, resp.StatusCode, 200)
}
}
// TestRootFallthrough tests rootFallthrough handler to
// verify redirect and 404 behavior
func TestRootFallthrough(t *testing.T) {
@ -945,8 +963,8 @@ func TestHTTPServer_Limits_Error(t *testing.T) {
t.Parallel()
conf := &Config{
normalizedAddrs: &Addresses{
HTTP: "localhost:0", // port is never used
normalizedAddrs: &NormalizedAddrs{
HTTP: []string{"localhost:0"}, // port is never used
},
TLSConfig: &config.TLSConfig{
EnableHTTP: tc.tls,
@ -964,7 +982,7 @@ func TestHTTPServer_Limits_Error(t *testing.T) {
config: conf,
}
srv, err := NewHTTPServer(agent, conf)
srv, err := NewHTTPServers(agent, conf)
require.Error(t, err)
require.Nil(t, srv)
require.Contains(t, err.Error(), tc.expectedErr)

View File

@ -66,7 +66,11 @@ type TestAgent struct {
// Key is the optional encryption key for the keyring.
Key string
// Server is a reference to the started HTTP endpoint.
// All HTTP servers started. Used to prevent server leaks and preserve
// backwards compability.
Servers []*HTTPServer
// Server is a reference to the primary, started HTTP endpoint.
// It is valid after Start().
Server *HTTPServer
@ -255,12 +259,15 @@ func (a *TestAgent) start() (*Agent, error) {
}
// Setup the HTTP server
http, err := NewHTTPServer(agent, a.Config)
httpServers, err := NewHTTPServers(agent, a.Config)
if err != nil {
return agent, err
}
a.Server = http
// TODO: investigate if there is a way to remove the requirement by updating test.
// Initial pass at implementing this is https://github.com/kevinschoonover/nomad/tree/tests.
a.Servers = httpServers
a.Server = httpServers[0]
return agent, nil
}
@ -284,7 +291,10 @@ func (a *TestAgent) Shutdown() error {
ch := make(chan error, 1)
go func() {
defer close(ch)
a.Server.Shutdown()
for _, srv := range a.Servers {
srv.Shutdown()
}
ch <- a.Agent.Shutdown()
}()

View File

@ -68,7 +68,11 @@ configuring Nomad to talk to Consul via DNS such as consul.service.consul
Consul communication. If this is set then you need to also set `key_file`.
- `checks_use_advertise` `(bool: false)` - Specifies if Consul health checks
should bind to the advertise address. By default, this is the bind address.
should bind to the advertise address. By default, this is the first [HTTP
address](https://www.nomadproject.io/docs/configuration#http). If no
[HTTP address](https://www.nomadproject.io/docs/configuration#http) is
specified, it will fall back to the
[bind_addr](https://www.nomadproject.io/docs/configuration#bind_addr).
- `client_auto_join` `(bool: true)` - Specifies if the Nomad clients should
automatically discover servers in the same region by searching for the Consul