Add address selector methods to the agent

This commit is contained in:
Evan Gilman 2016-09-02 16:23:45 -07:00 committed by Alex Dadgar
parent 06909d2465
commit de33949df8
5 changed files with 172 additions and 109 deletions

View File

@ -152,74 +152,37 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
}
// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}
if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}
// Resolve the Server's HTTP Address
if a.config.AdvertiseAddrs.HTTP != "" {
a.serverHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.serverHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.serverHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
}
addr, err := net.ResolveTCPAddr("tcp", a.serverHTTPAddr)
rpcAddr, err := a.getRPCAddr(true)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.serverHTTPAddr, err)
return nil, err
}
a.serverHTTPAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
// Resolve the Server's RPC Address
if a.config.AdvertiseAddrs.RPC != "" {
a.serverRPCAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.Addresses.RPC, strconv.Itoa(a.config.Ports.RPC))
} else if a.config.BindAddr != "" {
a.serverRPCAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.RPC))
} else {
a.serverRPCAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.RPC))
}
addr, err = net.ResolveTCPAddr("tcp", a.serverRPCAddr)
serfAddr, err := a.getSerfAddr(true)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %+q: %v", a.serverRPCAddr, err)
return nil, err
}
a.serverRPCAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
conf.RPCAddr.Port = rpcAddr.Port
conf.RPCAddr.IP = rpcAddr.IP
conf.SerfConfig.MemberlistConfig.BindPort = serfAddr.Port
conf.SerfConfig.MemberlistConfig.BindAddr = serfAddr.IP.String()
// Resolve the Server's Serf Address
if a.config.AdvertiseAddrs.Serf != "" {
a.serverSerfAddr = a.config.AdvertiseAddrs.Serf
} else if a.config.Addresses.Serf != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.Addresses.Serf, strconv.Itoa(a.config.Ports.Serf))
} else if a.config.BindAddr != "" {
a.serverSerfAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.Serf))
} else {
a.serverSerfAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.Serf))
}
addr, err = net.ResolveTCPAddr("tcp", a.serverSerfAddr)
// Set up the advertise addresses
httpAddr, err := a.getHTTPAddr(false)
if err != nil {
return nil, fmt.Errorf("error resolving Serf addr %+q: %v", a.serverSerfAddr, err)
return nil, err
}
a.serverSerfAddr = net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
rpcAddr, err = a.getRPCAddr(false)
if err != nil {
return nil, err
}
serfAddr, err = a.getSerfAddr(false)
if err != nil {
return nil, err
}
a.serverHTTPAddr = net.JoinHostPort(httpAddr.IP.String(), strconv.Itoa(httpAddr.Port))
a.serverRPCAddr = net.JoinHostPort(rpcAddr.IP.String(), strconv.Itoa(rpcAddr.Port))
a.serverSerfAddr = net.JoinHostPort(serfAddr.IP.String(), strconv.Itoa(serfAddr.Port))
// Set up gc threshold and heartbeat grace period
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
@ -317,22 +280,11 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass
// Resolve the Client's HTTP address
if a.config.AdvertiseAddrs.HTTP != "" {
a.clientHTTPAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
a.clientHTTPAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
a.clientHTTPAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
}
addr, err := net.ResolveTCPAddr("tcp", a.clientHTTPAddr)
// Set up the HTTP advertise address
httpAddr, err := a.selectAddr(a.getHTTPAddr, false)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", a.clientHTTPAddr, err)
return nil, err
}
httpAddr := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
conf.Node.HTTPAddr = httpAddr
a.clientHTTPAddr = httpAddr
@ -392,6 +344,20 @@ func (a *Agent) setupServer() error {
}
a.server = server
// Resolve consul check addresses. Always use advertise address for services
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.ChecksUseAdvertise)
if err != nil {
return err
}
rpcCheckAddr, err := a.selectAddr(a.getRPCAddr, !a.config.ChecksUseAdvertise)
if err != nil {
return err
}
serfCheckAddr, err := a.selectAddr(a.getSerfAddr, !a.config.ChecksUseAdvertise)
if err != nil {
return err
}
// Create the Nomad Server services for Consul
// TODO re-introduce HTTP/S checks when Consul 0.7.1 comes out
if a.config.Consul.AutoAdvertise {
@ -401,10 +367,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
Name: "Nomad Server HTTP Check",
Type: "tcp",
Interval: serverHttpCheckInterval,
Timeout: serverHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
@ -414,10 +381,11 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagRPC},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
PortLabel: rpcCheckAddr,
},
},
}
@ -427,23 +395,15 @@ func (a *Agent) setupServer() error {
Tags: []string{consul.ServiceTagSerf},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
PortLabel: serfCheckAddr,
},
},
}
// Resolve ServiceCheck addresses
if a.config.Addresses.HTTP != "" && a.config.Addresses.HTTP != "0.0.0.0" {
httpServ.Checks[0].PortLabel = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
}
if a.config.Addresses.RPC != "" && a.config.Addresses.RPC != "0.0.0.0" {
rpcServ.Checks[0].PortLabel = net.JoinHostPort(a.config.Addresses.RPC, strconv.Itoa(a.config.Ports.RPC))
}
if a.config.Addresses.Serf != "" && a.config.Addresses.Serf != "0.0.0.0" {
serfServ.Checks[0].PortLabel = net.JoinHostPort(a.config.Addresses.Serf, strconv.Itoa(a.config.Ports.Serf))
}
a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(httpServ): httpServ,
consul.GenerateServiceKey(rpcServ): rpcServ,
@ -504,6 +464,12 @@ func (a *Agent) setupClient() error {
}
a.client = client
// Resolve the http check address
httpCheckAddr, err := a.selectAddr(a.getHTTPAddr, !a.config.ChecksUseAdvertise)
if err != nil {
return err
}
// Create the Nomad Client services for Consul
// TODO think how we can re-introduce HTTP/S checks when Consul 0.7.1 comes
// out
@ -514,16 +480,14 @@ func (a *Agent) setupClient() error {
Tags: []string{consul.ServiceTagHTTP},
Checks: []*structs.ServiceCheck{
&structs.ServiceCheck{
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
Name: "Nomad Client HTTP Check",
Type: "tcp",
Interval: clientHttpCheckInterval,
Timeout: clientHttpCheckTimeout,
PortLabel: httpCheckAddr,
},
},
}
if a.config.Addresses.HTTP != "" && a.config.Addresses.HTTP != "0.0.0.0" {
httpServ.Checks[0].PortLabel = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
}
a.consulSyncer.SetServices(consul.ClientDomain, map[consul.ServiceKey]*structs.Service{
consul.GenerateServiceKey(httpServ): httpServ,
})
@ -532,6 +496,87 @@ func (a *Agent) setupClient() error {
return nil
}
// Defines the selector interface
type addrSelector func(bool) (*net.TCPAddr, error)
// Choose the right address given a selector, and return it as a PortLabel
func (a *Agent) selectAddr(selector addrSelector, preferBind bool) (string, error) {
addr, err := selector(preferBind)
if err != nil {
return "", err
}
if preferBind && strings.HasPrefix(addr.IP.String(), "0.0.0.0:") {
addr, err = selector(false)
if err != nil {
return "", err
}
}
address := net.JoinHostPort(addr.IP.String(), strconv.Itoa(addr.Port))
return address, nil
}
// HTTP address selector
func (a *Agent) getHTTPAddr(returnBind bool) (*net.TCPAddr, error) {
var serverAddr string
if a.config.AdvertiseAddrs.HTTP != "" && !returnBind {
serverAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
serverAddr = net.JoinHostPort(a.config.Addresses.HTTP, strconv.Itoa(a.config.Ports.HTTP))
} else if a.config.BindAddr != "" {
serverAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.HTTP))
} else {
serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.HTTP))
}
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %+q: %v", serverAddr, err)
}
return addr, nil
}
// RPC address selector
func (a *Agent) getRPCAddr(returnBind bool) (*net.TCPAddr, error) {
var serverAddr string
if a.config.AdvertiseAddrs.RPC != "" && !returnBind {
serverAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
serverAddr = net.JoinHostPort(a.config.Addresses.RPC, strconv.Itoa(a.config.Ports.RPC))
} else if a.config.BindAddr != "" {
serverAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.RPC))
} else {
serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.RPC))
}
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %+q: %v", serverAddr, err)
}
return addr, nil
}
// Serf address selector
func (a *Agent) getSerfAddr(returnBind bool) (*net.TCPAddr, error) {
var serverAddr string
if a.config.AdvertiseAddrs.Serf != "" && !returnBind {
serverAddr = a.config.AdvertiseAddrs.Serf
} else if a.config.Addresses.Serf != "" {
serverAddr = net.JoinHostPort(a.config.Addresses.Serf, strconv.Itoa(a.config.Ports.Serf))
} else if a.config.BindAddr != "" {
serverAddr = net.JoinHostPort(a.config.BindAddr, strconv.Itoa(a.config.Ports.Serf))
} else {
serverAddr = net.JoinHostPort("127.0.0.1", strconv.Itoa(a.config.Ports.Serf))
}
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
return nil, fmt.Errorf("error resolving Serf addr %+q: %v", serverAddr, err)
}
return addr, nil
}
// reservePortsForClient reserves a range of ports for the client to use when
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {

View File

@ -155,7 +155,9 @@ func TestAgent_ServerConfig(t *testing.T) {
conf.Addresses.RPC = "127.0.0.2"
conf.Addresses.Serf = "127.0.0.2"
conf.Addresses.HTTP = "127.0.0.2"
conf.AdvertiseAddrs.HTTP = ""
conf.AdvertiseAddrs.HTTP = "10.0.0.1"
conf.AdvertiseAddrs.RPC = ""
conf.AdvertiseAddrs.Serf = "10.0.0.2"
out, err = a.serverConfig()
if err != nil {
@ -167,15 +169,20 @@ func TestAgent_ServerConfig(t *testing.T) {
if addr := out.SerfConfig.MemberlistConfig.BindAddr; addr != "127.0.0.2" {
t.Fatalf("expect 127.0.0.2, got: %s", addr)
}
if addr := a.serverHTTPAddr; addr != "127.0.0.2:4646" {
t.Fatalf("expect 127.0.0.2:4646, got: %s", addr)
if addr := a.serverHTTPAddr; addr != "10.0.0.1:4646" {
t.Fatalf("expect 10.0.0.1:4646, got: %s", addr)
}
// NOTE: AdvertiseAddr > Addresses > BindAddr > Defaults
if addr := a.serverRPCAddr; addr != "127.0.0.1:4001" {
t.Fatalf("expect 127.0.0.1:4001, got: %s", addr)
}
if addr := a.serverSerfAddr; addr != "127.0.0.1:4000" {
t.Fatalf("expect 127.0.0.1:4000, got: %s", addr)
if addr := a.serverSerfAddr; addr != "10.0.0.2:4000" {
t.Fatalf("expect 10.0.0.2:4000, got: %s", addr)
}
// It correctly identifies the bind address when requested
if addr := a.selectAddr(a.getHTTPAddr, true); addr != "127.0.0.2:4646" {
t.Fatalf("expect 127.0.0.2:4646, got: %s", addr)
}
conf.Server.NodeGCThreshold = "42g"

View File

@ -51,6 +51,9 @@ type Config struct {
// AdvertiseAddrs is used to control the addresses we advertise.
AdvertiseAddrs *AdvertiseAddrs `mapstructure:"advertise"`
// Specifies that consul checks should use Advertise address instead of bind
ChecksUseAdvertise bool `mapstructure:"checks_use_advertise"`
// Client has our client related settings
Client *ClientConfig `mapstructure:"client"`

View File

@ -49,7 +49,11 @@ type HTTPServer struct {
// NewHTTPServer starts new HTTP server over the agent
func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServer, error) {
// Start the listener
ln, err := config.Listener("tcp", config.Addresses.HTTP, config.Ports.HTTP)
lnAddr, err := agent.getHTTPAddr(true)
if err != nil {
return nil, err
}
ln, err := config.Listener("tcp", lnAddr.IP.String(), lnAddr.Port)
if err != nil {
return nil, fmt.Errorf("failed to start HTTP listener: %v", err)
}

View File

@ -176,6 +176,10 @@ nodes, unless otherwise specified:
reachable from all server nodes. It is not required that clients can reach
this address.
* `checks_use_advertise`: By default, Nomad will use the configured bind address
as the target for its consul checks. This boolean option allows you to request
that the advertise address be used instead.
* `consul`: The `consul` configuration block changes how Nomad interacts with
Consul. Nomad can automatically advertise Nomad services via Consul, and can
automatically bootstrap itself using Consul. For more details see the [`consul`