Allows disabling WAN federation by setting serf WAN port to -1

This commit is contained in:
Preetha Appan 2018-03-26 14:21:06 -05:00
parent 9a91b4eaef
commit 512f9a50fc
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
8 changed files with 91 additions and 49 deletions

View File

@ -703,16 +703,21 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.SerfLANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfLANProbeTimeout
base.SerfLANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfLANSuspicionMult
base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
if a.config.SerfBindAddrWAN != nil {
base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String()
base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port
base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming
base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing
base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval
base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval
base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout
base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult
} else {
// Disable serf WAN federation
base.SerfWANConfig = nil
}
base.RPCAddr = a.config.RPCBindAddr
base.RPCAdvertise = a.config.RPCAdvertiseAddr
@ -1019,6 +1024,7 @@ func (a *Agent) setupNodeID(config *config.RuntimeConfig) error {
func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
// If the keyring file is disabled then just poke the provided key
// into the in-memory keyring.
fedarationEnabled := config.SerfWANConfig != nil
if a.config.DisableKeyringFile {
if a.config.EncryptKey == "" {
return nil
@ -1028,7 +1034,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
if err := loadKeyring(config.SerfLANConfig, keys); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if err := loadKeyring(config.SerfWANConfig, keys); err != nil {
return err
}
@ -1048,7 +1054,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error {
return err
}
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if _, err := os.Stat(fileWAN); err != nil {
if err := initKeyring(fileWAN, a.config.EncryptKey); err != nil {
return err
@ -1063,7 +1069,7 @@ LOAD:
if err := loadKeyringFile(config.SerfLANConfig); err != nil {
return err
}
if a.config.ServerMode {
if a.config.ServerMode && fedarationEnabled {
if _, err := os.Stat(fileWAN); err == nil {
config.SerfWANConfig.KeyringFile = fileWAN
}

View File

@ -369,9 +369,6 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
if ipaddr.IsAny(b.stringVal(c.AdvertiseAddrWAN)) {
return RuntimeConfig{}, fmt.Errorf("Advertise WAN address cannot be 0.0.0.0, :: or [::]")
}
if serfPortWAN < 0 {
return RuntimeConfig{}, fmt.Errorf("ports.serf_wan must be a valid port from 1 to 65535")
}
bindAddr := bindAddrs[0].(*net.IPAddr)
advertiseAddr := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), bindAddr)

View File

@ -1065,7 +1065,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
{
desc: "serf wan port > 0",
desc: "allow disabling serf wan port",
args: []string{`-data-dir=` + dataDir},
json: []string{`{
"ports": {
@ -1079,7 +1079,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
advertise_addr_wan = "1.2.3.4"
`},
err: "ports.serf_wan must be a valid port from 1 to 65535",
err: nil,
},
{
desc: "serf bind address lan template",

View File

@ -170,6 +170,10 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
return err
}
if len(dcs) == 0 { // no WAN federation, so return the local data center name
dcs = []string{c.srv.config.Datacenter}
}
*reply = dcs
return nil
}

View File

@ -129,7 +129,7 @@ func (m *Internal) KeyringOperation(
}
// Only perform WAN keyring querying and RPC forwarding once
if !args.Forwarded {
if !args.Forwarded && m.srv.serfWAN != nil {
args.Forwarded = true
m.executeKeyringOp(args, reply, true)
return m.srv.globalRPC("Internal.KeyringOperation", args, reply)

View File

@ -75,6 +75,10 @@ const (
raftRemoveGracePeriod = 5 * time.Second
)
var (
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
)
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
@ -344,25 +348,28 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// created, so we can pull it out from there reliably, even though it's
// a little gross to be reading the updated config.
// Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
// Initialize the WAN Serf if enabled
serfBindPortWAN := -1
if config.SerfWANConfig != nil {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
}
// See big comment above why we are doing this.
if serfBindPortWAN == 0 {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
}
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
// TODO preetha: why is this passing WAN port to create segments?
if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
@ -380,20 +387,22 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.floodSegments(config)
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
if s.serfWAN != nil {
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
return 0, false
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *metadata.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
}
return 0, false
}
go s.Flood(nil, portFn, s.serfWAN)
}
go s.Flood(nil, portFn, s.serfWAN)
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
@ -831,6 +840,9 @@ func (s *Server) JoinLAN(addrs []string) (int, error) {
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addrs []string) (int, error) {
if s.serfWAN == nil {
return 0, ErrWANFederationDisabled
}
return s.serfWAN.Join(addrs, true)
}
@ -846,6 +858,9 @@ func (s *Server) LANMembers() []serf.Member {
// WANMembers is used to return the members of the LAN cluster
func (s *Server) WANMembers() []serf.Member {
if s.serfWAN == nil {
return nil
}
return s.serfWAN.Members()
}
@ -854,8 +869,10 @@ func (s *Server) RemoveFailedNode(node string) error {
if err := s.serfLAN.RemoveFailedNode(node); err != nil {
return err
}
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
if s.serfWAN != nil {
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
}
}
return nil
}
@ -872,12 +889,19 @@ func (s *Server) KeyManagerLAN() *serf.KeyManager {
// KeyManagerWAN returns the WAN Serf keyring manager
func (s *Server) KeyManagerWAN() *serf.KeyManager {
if s.serfWAN == nil {
return nil
}
return s.serfWAN.KeyManager()
}
// Encrypted determines if gossip is encrypted
func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
LANEncrypted := s.serfLAN.EncryptionEnabled()
if s.serfWAN == nil {
return LANEncrypted
}
return LANEncrypted && s.serfWAN.EncryptionEnabled()
}
// LANSegments returns a map of LAN segments by name
@ -995,9 +1019,11 @@ func (s *Server) Stats() map[string]map[string]string {
},
"raft": s.raft.Stats(),
"serf_lan": s.serfLAN.Stats(),
"serf_wan": s.serfWAN.Stats(),
"runtime": runtimeStats(),
}
if s.serfWAN != nil {
stats["serf_wan"] = s.serfWAN.Stats()
}
return stats
}
@ -1021,6 +1047,10 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
if s.serfWAN == nil {
// Return zero values if WAN federation is disabled
return &coordinate.Coordinate{}, nil
}
return s.serfWAN.GetCoordinate()
}

View File

@ -37,7 +37,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
if wanPort > 0 {
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter

View File

@ -116,6 +116,9 @@ func (r *Router) Shutdown() {
// AddArea registers a new network area with the router.
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger, useTLS bool) error {
if cluster == nil {
return nil
}
r.Lock()
defer r.Unlock()