Merge pull request #33 from hashicorp/f-network

Network config
This commit is contained in:
Ryan Uber 2015-09-11 17:43:32 -07:00
commit 987bbe6cd2
6 changed files with 375 additions and 55 deletions

View file

@ -58,13 +58,9 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
return a, nil
}
// setupServer is used to setup the server if enabled
func (a *Agent) setupServer() error {
if !a.config.Server.Enabled {
return nil
}
// Setup the configuration
// serverConfig is used to generate a new server configuration struct
// for initializing a nomad server.
func (a *Agent) serverConfig() (*nomad.Config, error) {
conf := a.config.NomadConfig
if conf == nil {
conf = nomad.DefaultConfig()
@ -102,19 +98,57 @@ func (a *Agent) setupServer() error {
if len(a.config.Server.EnabledSchedulers) != 0 {
conf.EnabledSchedulers = a.config.Server.EnabledSchedulers
}
if addr := a.config.Server.AdvertiseAddr; addr != "" {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
// Set up the advertise addrs
if addr := a.config.AdvertiseAddrs.Serf; addr != "" {
serfAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return fmt.Errorf("failed to resolve advertise address: %v", err)
return nil, fmt.Errorf("error resolving serf advertise address: %s", err)
}
conf.RPCAdvertise = tcpAddr
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
}
if addr := a.config.Server.BindAddr; addr != "" {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if addr := a.config.AdvertiseAddrs.RPC; addr != "" {
rpcAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return fmt.Errorf("failed to resolve bind address: %v", err)
return nil, fmt.Errorf("error resolving rpc advertise address: %s", err)
}
conf.RPCAddr = tcpAddr
conf.RPCAdvertise = rpcAddr
}
// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}
if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}
return conf, nil
}
// setupServer is used to setup the server if enabled
func (a *Agent) setupServer() error {
if !a.config.Server.Enabled {
return nil
}
// Setup the configuration
conf, err := a.serverConfig()
if err != nil {
return fmt.Errorf("server config setup failed: %s", err)
}
// Create the server
@ -122,6 +156,7 @@ func (a *Agent) setupServer() error {
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}
a.server = server
return nil
}

View file

@ -5,6 +5,7 @@ import (
"io/ioutil"
"net"
"os"
"strings"
"sync/atomic"
"testing"
"time"
@ -79,3 +80,86 @@ func TestAgent_RPCPing(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestAgent_ServerConfig(t *testing.T) {
conf := DefaultConfig()
a := &Agent{config: conf}
// Returns error on bad serf addr
conf.AdvertiseAddrs.Serf = "nope"
_, err := a.serverConfig()
if err == nil || !strings.Contains(err.Error(), "serf advertise") {
t.Fatalf("expected serf address error, got: %#v", err)
}
conf.AdvertiseAddrs.Serf = "127.0.0.1:4000"
// Returns error on bad rpc addr
conf.AdvertiseAddrs.RPC = "nope"
_, err = a.serverConfig()
if err == nil || !strings.Contains(err.Error(), "rpc advertise") {
t.Fatalf("expected rpc address error, got: %#v", err)
}
conf.AdvertiseAddrs.RPC = "127.0.0.1:4001"
// Parses the advertise addrs correctly
out, err := a.serverConfig()
if err != nil {
t.Fatalf("err: %s", err)
}
serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr
if serfAddr != "127.0.0.1" {
t.Fatalf("expect 127.0.0.1, got: %s", serfAddr)
}
serfPort := out.SerfConfig.MemberlistConfig.AdvertisePort
if serfPort != 4000 {
t.Fatalf("expected 4000, got: %d", serfPort)
}
if addr := out.RPCAdvertise; addr.IP.String() != "127.0.0.1" || addr.Port != 4001 {
t.Fatalf("bad rpc advertise addr: %#v", addr)
}
// Sets up the ports properly
conf.Ports.RPC = 4003
conf.Ports.Serf = 4004
out, err = a.serverConfig()
if err != nil {
t.Fatalf("err: %s", err)
}
if addr := out.RPCAddr.Port; addr != 4003 {
t.Fatalf("expect 4003, got: %d", out.RPCAddr.Port)
}
if port := out.SerfConfig.MemberlistConfig.BindPort; port != 4004 {
t.Fatalf("expect 4004, got: %d", port)
}
// Prefers the most specific bind addrs
conf.BindAddr = "127.0.0.3"
conf.Addresses.RPC = "127.0.0.2"
conf.Addresses.Serf = "127.0.0.2"
out, err = a.serverConfig()
if err != nil {
t.Fatalf("err: %s", err)
}
if addr := out.RPCAddr.IP.String(); addr != "127.0.0.2" {
t.Fatalf("expect 127.0.0.2, got: %s", addr)
}
if addr := out.SerfConfig.MemberlistConfig.BindAddr; addr != "127.0.0.2" {
t.Fatalf("expect 127.0.0.2, got: %s", addr)
}
// Defaults to the global bind addr
conf.Addresses.RPC = ""
conf.Addresses.Serf = ""
out, err = a.serverConfig()
if err != nil {
t.Fatalf("err: %s", err)
}
if addr := out.RPCAddr.IP.String(); addr != "127.0.0.3" {
t.Fatalf("expect 127.0.0.3, got: %s", addr)
}
if addr := out.SerfConfig.MemberlistConfig.BindAddr; addr != "127.0.0.3" {
t.Fatalf("expect 127.0.0.3, got: %s", addr)
}
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
@ -31,13 +32,22 @@ type Config struct {
// LogLevel is the level of the logs to putout
LogLevel string `hcl:"log_level"`
// HttpAddr is used to control the address and port we bind to.
// If not specified, 127.0.0.1:4646 is used.
HttpAddr string `hcl:"http_addr"`
// BindAddr is the address on which all of nomad's services will
// be bound. If not specified, this defaults to 127.0.0.1.
BindAddr string `hcl:"bind_addr"`
// EnableDebug is used to enable debugging HTTP endpoints
EnableDebug bool `hcl:"enable_debug"`
// Ports is used to control the network ports we bind to.
Ports *Ports `hcl:"ports"`
// Addresses is used to override the network addresses we bind to.
Addresses *Addresses `hcl:"addresses"`
// AdvertiseAddrs is used to control the addresses we advertise.
AdvertiseAddrs *AdvertiseAddrs `hcl:"advertise"`
// Client has our client related settings
Client *ClientConfig `hcl:"client"`
@ -112,16 +122,6 @@ type ServerConfig struct {
// ProtocolVersionMin and ProtocolVersionMax.
ProtocolVersion int `hcl:"protocol_version"`
// AdvertiseAddr is the address we use for advertising our Serf,
// and Consul RPC IP. If not specified, bind address is used.
AdvertiseAddr string `mapstructure:"advertise_addr"`
// BindAddr is used to control the address we bind to.
// If not specified, the first private IP we find is used.
// This controls the address we use for cluster facing
// services (Gossip, Server RPC)
BindAddr string `hcl:"bind_addr"`
// NumSchedulers is the number of scheduler thread that are run.
// This can be as many as one per core, or zero to disable this server
// from doing any scheduling work.
@ -140,6 +140,30 @@ type Telemetry struct {
DisableHostname bool `hcl:"disable_hostname"`
}
// Ports is used to encapsulate the various ports we bind to for network
// services. If any are not specified then the defaults are used instead.
type Ports struct {
HTTP int `hcl:"http"`
RPC int `hcl:"rpc"`
Serf int `hcl:"serf"`
}
// Addresses encapsulates all of the addresses we bind to for various
// network services. Everything is optional and defaults to BindAddr.
type Addresses struct {
HTTP string `hcl:"http"`
RPC string `hcl:"rpc"`
Serf string `hcl:"serf"`
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. Not all network services support an
// advertise address. All are optional and default to BindAddr.
type AdvertiseAddrs struct {
RPC string `hcl:"rpc"`
Serf string `hcl:"serf"`
}
// DevConfig is a Config that is used for dev mode of Nomad.
func DevConfig() *Config {
conf := DefaultConfig()
@ -164,7 +188,14 @@ func DefaultConfig() *Config {
LogLevel: "INFO",
Region: "region1",
Datacenter: "dc1",
HttpAddr: "127.0.0.1:4646",
BindAddr: "127.0.0.1",
Ports: &Ports{
HTTP: 4646,
RPC: 4647,
Serf: 4648,
},
Addresses: &Addresses{},
AdvertiseAddrs: &AdvertiseAddrs{},
Client: &ClientConfig{
Enabled: false,
},
@ -174,6 +205,15 @@ func DefaultConfig() *Config {
}
}
// GetListener can be used to get a new listener using a custom bind address.
// If the bind provided address is empty, the BindAddr is used instead.
func (c *Config) Listener(proto, addr string, port int) (net.Listener, error) {
if addr == "" {
addr = c.BindAddr
}
return net.Listen(proto, fmt.Sprintf("%s:%d", addr, port))
}
// Merge merges two configurations.
func (a *Config) Merge(b *Config) *Config {
var result Config = *a
@ -193,8 +233,8 @@ func (a *Config) Merge(b *Config) *Config {
if b.LogLevel != "" {
result.LogLevel = b.LogLevel
}
if b.HttpAddr != "" {
result.HttpAddr = b.HttpAddr
if b.BindAddr != "" {
result.BindAddr = b.BindAddr
}
if b.EnableDebug {
result.EnableDebug = true
@ -242,6 +282,30 @@ func (a *Config) Merge(b *Config) *Config {
result.Server = result.Server.Merge(b.Server)
}
// Apply the ports config
if result.Ports == nil && b.Ports != nil {
ports := *b.Ports
result.Ports = &ports
} else if b.Ports != nil {
result.Ports = result.Ports.Merge(b.Ports)
}
// Apply the address config
if result.Addresses == nil && b.Addresses != nil {
addrs := *b.Addresses
result.Addresses = &addrs
} else if b.Addresses != nil {
result.Addresses = result.Addresses.Merge(b.Addresses)
}
// Apply the advertise addrs config
if result.AdvertiseAddrs == nil && b.AdvertiseAddrs != nil {
advertise := *b.AdvertiseAddrs
result.AdvertiseAddrs = &advertise
} else if b.AdvertiseAddrs != nil {
result.AdvertiseAddrs = result.AdvertiseAddrs.Merge(b.AdvertiseAddrs)
}
return &result
}
@ -264,12 +328,6 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.ProtocolVersion != 0 {
result.ProtocolVersion = b.ProtocolVersion
}
if b.AdvertiseAddr != "" {
result.AdvertiseAddr = b.AdvertiseAddr
}
if b.BindAddr != "" {
result.BindAddr = b.BindAddr
}
if b.NumSchedulers != 0 {
result.NumSchedulers = b.NumSchedulers
}
@ -330,6 +388,51 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
return &result
}
// Merge is used to merge two port configurations.
func (a *Ports) Merge(b *Ports) *Ports {
var result Ports = *a
if b.HTTP != 0 {
result.HTTP = b.HTTP
}
if b.RPC != 0 {
result.RPC = b.RPC
}
if b.Serf != 0 {
result.Serf = b.Serf
}
return &result
}
// Merge is used to merge two address configs together.
func (a *Addresses) Merge(b *Addresses) *Addresses {
var result Addresses = *a
if b.HTTP != "" {
result.HTTP = b.HTTP
}
if b.RPC != "" {
result.RPC = b.RPC
}
if b.Serf != "" {
result.Serf = b.Serf
}
return &result
}
// Merge merges two advertise addrs configs together.
func (a *AdvertiseAddrs) Merge(b *AdvertiseAddrs) *AdvertiseAddrs {
var result AdvertiseAddrs = *a
if b.RPC != "" {
result.RPC = b.RPC
}
if b.Serf != "" {
result.Serf = b.Serf
}
return &result
}
// LoadConfig loads the configuration at the given path, regardless if
// its a file or directory.
func LoadConfig(path string) (*Config, error) {

View file

@ -17,7 +17,6 @@ func TestConfig_Merge(t *testing.T) {
NodeName: "node1",
DataDir: "/tmp/dir1",
LogLevel: "INFO",
HttpAddr: "127.0.0.1:4646",
EnableDebug: false,
LeaveOnInt: false,
LeaveOnTerm: false,
@ -25,6 +24,7 @@ func TestConfig_Merge(t *testing.T) {
SyslogFacility: "local0.info",
DisableUpdateCheck: false,
DisableAnonymousSignature: false,
BindAddr: "127.0.0.1",
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:8125",
StatsdAddr: "127.0.0.1:8125",
@ -43,10 +43,22 @@ func TestConfig_Merge(t *testing.T) {
BootstrapExpect: 1,
DataDir: "/tmp/data1",
ProtocolVersion: 1,
AdvertiseAddr: "127.0.0.1:4647",
BindAddr: "127.0.0.1",
NumSchedulers: 1,
},
Ports: &Ports{
HTTP: 4646,
RPC: 4647,
Serf: 4648,
},
Addresses: &Addresses{
HTTP: "127.0.0.1",
RPC: "127.0.0.1",
Serf: "127.0.0.1",
},
AdvertiseAddrs: &AdvertiseAddrs{
RPC: "127.0.0.1",
Serf: "127.0.0.1",
},
}
c2 := &Config{
@ -55,7 +67,6 @@ func TestConfig_Merge(t *testing.T) {
NodeName: "node2",
DataDir: "/tmp/dir2",
LogLevel: "DEBUG",
HttpAddr: "0.0.0.0:80",
EnableDebug: true,
LeaveOnInt: true,
LeaveOnTerm: true,
@ -63,6 +74,7 @@ func TestConfig_Merge(t *testing.T) {
SyslogFacility: "local0.debug",
DisableUpdateCheck: true,
DisableAnonymousSignature: true,
BindAddr: "127.0.0.2",
Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.2:8125",
StatsdAddr: "127.0.0.2:8125",
@ -83,11 +95,23 @@ func TestConfig_Merge(t *testing.T) {
BootstrapExpect: 2,
DataDir: "/tmp/data2",
ProtocolVersion: 2,
AdvertiseAddr: "127.0.0.2:4647",
BindAddr: "127.0.0.2",
NumSchedulers: 2,
EnabledSchedulers: []string{structs.JobTypeBatch},
},
Ports: &Ports{
HTTP: 20000,
RPC: 21000,
Serf: 22000,
},
Addresses: &Addresses{
HTTP: "127.0.0.2",
RPC: "127.0.0.2",
Serf: "127.0.0.2",
},
AdvertiseAddrs: &AdvertiseAddrs{
RPC: "127.0.0.2",
Serf: "127.0.0.2",
},
}
result := c1.Merge(c2)
@ -231,3 +255,44 @@ func TestConfig_LoadConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
}
func TestConfig_Listener(t *testing.T) {
config := DefaultConfig()
// Fails on invalid input
if _, err := config.Listener("tcp", "nope", 8080); err == nil {
t.Fatalf("expected addr error")
}
if _, err := config.Listener("nope", "127.0.0.1", 8080); err == nil {
t.Fatalf("expected protocol err")
}
if _, err := config.Listener("tcp", "127.0.0.1", -1); err == nil {
t.Fatalf("expected port error")
}
// Works with valid inputs
ln, err := config.Listener("tcp", "127.0.0.1", 24000)
if err != nil {
t.Fatalf("err: %s", err)
}
ln.Close()
if net := ln.Addr().Network(); net != "tcp" {
t.Fatalf("expected tcp, got: %q", net)
}
if addr := ln.Addr().String(); addr != "127.0.0.1:24000" {
t.Fatalf("expected 127.0.0.1:4646, got: %q", addr)
}
// Falls back to default bind address if non provided
config.BindAddr = "0.0.0.0"
ln, err = config.Listener("tcp4", "", 24000)
if err != nil {
t.Fatalf("err: %s", err)
}
ln.Close()
if addr := ln.Addr().String(); addr != "0.0.0.0:24000" {
t.Fatalf("expected 0.0.0.0:24000, got: %q", addr)
}
}

View file

@ -31,9 +31,9 @@ type HTTPServer struct {
// NewHTTPServer starts new HTTP server over the agent
func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServer, error) {
// Start the listener
ln, err := net.Listen("tcp", config.HttpAddr)
ln, err := config.Listener("tcp", config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
return nil, fmt.Errorf("failed to start HTTP listener on %s: %v", config.HttpAddr, err)
return nil, fmt.Errorf("failed to start HTTP listener: %v", err)
}
// Create the mux

View file

@ -29,13 +29,33 @@ var offset uint64
// TestServerConfig is the main server configuration struct.
type TestServerConfig struct {
HTTPAddr string `json:"http_addr,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Region string `json:"region,omitempty"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Stdout, Stderr io.Writer `json:"-"`
Bootstrap bool `json:"bootstrap,omitempty"`
DataDir string `json:"data_dir,omitempty"`
Region string `json:"region,omitempty"`
DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"`
Ports *PortsConfig `json:"ports,omitempty"`
Server *ServerConfig `json:"server,omitempty"`
Client *ClientConfig `json:"client,omitempty"`
Stdout, Stderr io.Writer `json:"-"`
}
// Ports is used to configure the network ports we use.
type PortsConfig struct {
HTTP int `json:"http,omitempty"`
RPC int `json:"rpc,omitempty"`
Serf int `json:"serf,omitempty"`
}
// ServerConfig is used to configure the nomad server.
type ServerConfig struct {
Enabled bool `json:"enabled"`
Bootstrap bool `json:"bootstrap"`
}
// ClientConfig is used to configure the client
type ClientConfig struct {
Enabled bool `json:"enabled"`
}
// ServerConfigCallback is a function interface which can be
@ -51,7 +71,18 @@ func defaultServerConfig() *TestServerConfig {
DisableCheckpoint: true,
Bootstrap: true,
LogLevel: "DEBUG",
HTTPAddr: fmt.Sprintf("127.0.0.1:%d", 20000+idx),
Ports: &PortsConfig{
HTTP: 20000 + idx,
RPC: 21000 + idx,
Serf: 22000 + idx,
},
Server: &ServerConfig{
Enabled: true,
Bootstrap: true,
},
Client: &ClientConfig{
Enabled: false,
},
}
}
@ -62,6 +93,7 @@ type TestServer struct {
t *testing.T
HTTPAddr string
SerfAddr string
HttpClient *http.Client
}
@ -111,7 +143,7 @@ func NewTestServer(t *testing.T, cb ServerConfigCallback) *TestServer {
}
// Start the server
cmd := exec.Command("nomad", "agent", "-dev", "-config", configFile.Name())
cmd := exec.Command("nomad", "agent", "-config", configFile.Name())
cmd.Stdout = stdout
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {
@ -126,7 +158,8 @@ func NewTestServer(t *testing.T, cb ServerConfigCallback) *TestServer {
PID: cmd.Process.Pid,
t: t,
HTTPAddr: nomadConfig.HTTPAddr,
HTTPAddr: fmt.Sprintf("127.0.0.1:%d", nomadConfig.Ports.HTTP),
SerfAddr: fmt.Sprintf("127.0.0.1:%d", nomadConfig.Ports.Serf),
HttpClient: client,
}