Add rpc_listener option to segment config

This commit is contained in:
Kyle Havlovitz 2017-08-28 17:58:22 -07:00
parent e582d02079
commit 107d7f6c5a
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
9 changed files with 126 additions and 56 deletions

View File

@ -648,30 +648,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
}
base.Segment = a.config.Segment
for _, segment := range a.config.Segments {
config := consul.DefaultConfig().SerfLANConfig
config.MemberlistConfig.AdvertiseAddr = segment.Advertise
config.MemberlistConfig.AdvertisePort = segment.Port
config.MemberlistConfig.BindAddr = segment.Bind
config.MemberlistConfig.BindPort = segment.Port
if a.config.ReconnectTimeoutLan != 0 {
config.ReconnectTimeout = a.config.ReconnectTimeoutLan
if len(a.config.Segments) > 0 {
segments, err := a.segmentConfig()
if err != nil {
return nil, err
}
if a.config.EncryptVerifyIncoming != nil {
config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming
}
if a.config.EncryptVerifyOutgoing != nil {
config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing
}
base.Segments = append(base.Segments, consul.NetworkSegment{
Name: segment.Name,
Bind: segment.Bind,
Port: segment.Port,
Advertise: segment.Advertise,
SerfConfig: config,
})
base.Segments = segments
}
if a.config.Bootstrap {
base.Bootstrap = true
@ -789,6 +771,49 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
return base, nil
}
// Setup the serf and memberlist config for any defined network segments.
func (a *Agent) segmentConfig() ([]consul.NetworkSegment, error) {
var segments []consul.NetworkSegment
config := a.config
for _, segment := range config.Segments {
serfConf := consul.DefaultConfig().SerfLANConfig
serfConf.MemberlistConfig.AdvertiseAddr = segment.Advertise
serfConf.MemberlistConfig.AdvertisePort = segment.Port
serfConf.MemberlistConfig.BindAddr = segment.Bind
serfConf.MemberlistConfig.BindPort = segment.Port
if config.ReconnectTimeoutLan != 0 {
serfConf.ReconnectTimeout = config.ReconnectTimeoutLan
}
if config.EncryptVerifyIncoming != nil {
serfConf.MemberlistConfig.GossipVerifyIncoming = *config.EncryptVerifyIncoming
}
if config.EncryptVerifyOutgoing != nil {
serfConf.MemberlistConfig.GossipVerifyOutgoing = *config.EncryptVerifyOutgoing
}
var rpcAddr *net.TCPAddr
if segment.RPCListener {
rpcAddr = &net.TCPAddr{
IP: net.ParseIP(segment.Bind),
Port: a.config.Ports.Server,
}
}
segments = append(segments, consul.NetworkSegment{
Name: segment.Name,
Bind: segment.Bind,
Port: segment.Port,
Advertise: segment.Advertise,
RPCAddr: rpcAddr,
SerfConfig: serfConf,
})
}
return segments, nil
}
// makeRandomID will generate a random UUID for a node.
func (a *Agent) makeRandomID() (string, error) {
id, err := uuid.GenerateUUID()

View File

@ -357,11 +357,16 @@ type NetworkSegment struct {
Name string `mapstructure:"name"`
// Bind is the bind address for this segment.
Bind string `mapstructure:"bind"`
Bind string `mapstructure:"bind"`
BindAddrs []string `mapstructure:"-"`
// Port is the port for this segment.
Port int `mapstructure:"port"`
// RPCListener is whether to bind a separate RPC listener on the bind address
// for this segment.
RPCListener bool `mapstructure:"rpc_listener"`
// Advertise is the advertise address of this segment.
Advertise string `mapstructure:"advertise"`
}
@ -1408,6 +1413,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.AdvertiseAddrs.RPC = addr
}
// Validate segment config.
if err := ValidateSegments(&result); err != nil {
return nil, err
}
// Enforce the max Raft multiplier.
if result.Performance.RaftMultiplier > consul.MaxRaftMultiplier {
return nil, fmt.Errorf("Performance.RaftMultiplier must be <= %d", consul.MaxRaftMultiplier)
@ -1461,31 +1471,25 @@ func DecodeConfig(r io.Reader) (*Config, error) {
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
}
// Validate segment config
if err := ValidateSegments(&result); err != nil {
return nil, err
}
return &result, nil
}
func ValidateSegments(conf *Config) error {
if conf.Server && conf.Segment != "" {
return fmt.Errorf("Segment option can only be set on clients")
}
if !conf.Server && len(conf.Segments) > 0 {
return fmt.Errorf("Cannot define segments on clients")
}
if len(conf.Segments) > SegmentLimit {
return fmt.Errorf("Cannot exceed network segment limit of %d", SegmentLimit)
}
takenPorts := make(map[int]string, len(conf.Segments))
for _, segment := range conf.Segments {
if len(segment.Name) > SegmentNameLimit {
return fmt.Errorf("Segment name %q exceeds maximum length of %d", segment.Name, SegmentNameLimit)
}
previous, ok := takenPorts[segment.Port]
if ok {
return fmt.Errorf("Segment %q port %d overlaps with segment %q", segment.Name, segment.Port, previous)
}
takenPorts[segment.Port] = segment.Name
}
return nil
@ -2275,7 +2279,6 @@ func (c *Config) ResolveTmplAddrs() (err error) {
for i, segment := range c.Segments {
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))
}
return

View File

@ -597,8 +597,15 @@ func TestDecodeConfig(t *testing.T) {
c: &Config{Segment: "thing"},
},
{
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`,
c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}},
in: `{"server": true, "segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "rpc_listener": true, "advertise": "1.1.1.1"}]}`,
c: &Config{Server: true, Segments: []NetworkSegment{{
Name: "alpha",
Bind: "127.0.0.1",
BindAddrs: []string{"127.0.0.1"},
Port: 1234,
RPCListener: true,
Advertise: "1.1.1.1",
}}},
},
{
in: `{"serf_lan_bind":"1.2.3.4"}`,
@ -1311,18 +1318,6 @@ func TestDecodeConfig_VerifyUniqueListeners(t *testing.T) {
func TestDecodeConfig_ValidateSegments(t *testing.T) {
t.Parallel()
serverWithSegment := &Config{Segment: "asfd", Server: true}
if err := ValidateSegments(serverWithSegment); !strings.Contains(err.Error(), "can only be set on clients") {
t.Fatalf("bad: %v", err)
}
clientWithSegments := &Config{
Segments: []NetworkSegment{{Name: "asdf"}},
}
if err := ValidateSegments(clientWithSegments); !strings.Contains(err.Error(), "Cannot define segments on clients") {
t.Fatalf("bad: %v", err)
}
tooManySegments := &Config{Server: true}
for i := 0; i < SegmentLimit+1; i++ {
tooManySegments.Segments = append(tooManySegments.Segments, NetworkSegment{})
@ -1338,6 +1333,17 @@ func TestDecodeConfig_ValidateSegments(t *testing.T) {
if err := ValidateSegments(segmentNameTooLong); !strings.Contains(err.Error(), "exceeds maximum length") {
t.Fatalf("bad: %v", err)
}
duplicatePorts := &Config{
Segments: []NetworkSegment{
{Name: "asdf", Port: 1234},
{Name: "qwer", Port: 1234},
},
Server: true,
}
if err := ValidateSegments(duplicatePorts); !strings.Contains(err.Error(), "port 1234 overlaps with segment \"asdf\"") {
t.Fatalf("bad: %v", err)
}
}
func TestDefaultConfig(t *testing.T) {

View File

@ -55,6 +55,7 @@ type NetworkSegment struct {
Bind string
Port int
Advertise string
RPCAddr *net.TCPAddr
SerfConfig *serf.Config
}

View File

@ -46,10 +46,10 @@ const (
)
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
func (s *Server) listen(listener net.Listener) {
for {
// Accept a connection
conn, err := s.Listener.Accept()
conn, err := listener.Accept()
if err != nil {
if s.shutdown {
return

View File

@ -167,6 +167,9 @@ type Server struct {
segmentLAN map[string]*serf.Serf
segmentLock sync.RWMutex
// segmentListeners holds the RPC listener for any segment with a separate listener.
segmentListeners map[string]net.Listener
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
@ -306,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
segmentListeners: make(map[string]net.Listener),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
@ -418,7 +422,12 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
}
// Start listening for RPC requests.
go s.listen()
go s.listen(s.Listener)
// Start listeners for any segments with separate RPC listeners.
for _, listener := range s.segmentListeners {
go s.listen(listener)
}
// Start the metrics handlers.
go s.sessionStats()
@ -645,6 +654,19 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise)
}
for _, segment := range s.config.Segments {
if segment.RPCAddr == nil {
continue
}
segmentListener, err := net.ListenTCP("tcp", segment.RPCAddr)
if err != nil {
return err
}
s.segmentListeners[segment.Name] = segmentListener
}
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)

View File

@ -53,6 +53,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
addr := s.Listener.Addr().(*net.TCPAddr)
if listener, ok := s.segmentListeners[segment]; ok {
addr = listener.Addr().(*net.TCPAddr)
}
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"

View File

@ -465,7 +465,7 @@ func NewClient(config *Config) (*Client, error) {
if config.Token == "" {
config.Token = defConfig.Token
}
client := &Client{
config: *config,
}

View File

@ -394,6 +394,16 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
return nil
}
if cfg.Server && cfg.Segment != "" {
cmd.UI.Error("Segment option can only be set on clients")
return nil
}
if !cfg.Server && len(cfg.Segments) > 0 {
cmd.UI.Error("Cannot define segments on clients")
return nil
}
// patch deprecated retry-join-{gce,azure,ec2)-* parameters
// into -retry-join and issue warning.
// todo(fs): this should really be in DecodeConfig where it can be tested