Adds LAN -> WAN join flooding.

This commit is contained in:
James Phillips 2017-03-15 12:26:54 -07:00
parent 1e5a442420
commit 91a861337b
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
7 changed files with 222 additions and 9 deletions

View File

@ -29,6 +29,7 @@ type Server struct {
ID string
Datacenter string
Port int
WanJoinPort int
Bootstrap bool
Expect int
Version int
@ -80,6 +81,15 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil
}
wan_join_port := 0
wan_join_port_str, ok := m.Tags["wan_join_port"]
if ok {
wan_join_port, err = strconv.Atoi(wan_join_port_str)
if err != nil {
return false, nil
}
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
@ -102,6 +112,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
ID: m.Tags["id"],
Datacenter: datacenter,
Port: port,
WanJoinPort: wan_join_port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,

View File

@ -55,13 +55,14 @@ func TestIsConsulServer(t *testing.T) {
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
"role": "consul",
"id": "asdf",
"dc": "east-aws",
"port": "10000",
"wan_join_port": "1234",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
},
}
ok, parts := agent.IsConsulServer(m)
@ -80,6 +81,12 @@ func TestIsConsulServer(t *testing.T) {
if parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Port != 10000 {
t.Fatalf("bad: %v", parts.Port)
}
if parts.WanJoinPort != 1234 {
t.Fatalf("bad: %v", parts.WanJoinPort)
}
if parts.RaftVersion != 3 {
t.Fatalf("bad: %v", parts.RaftVersion)
}
@ -126,8 +133,8 @@ func TestIsConsulServer_Optional(t *testing.T) {
"dc": "east-aws",
"port": "10000",
"vsn": "1",
// raft_vsn and expect are optional and should default
// to zero.
// wan_join_port, raft_vsn, and expect are optional and
// should default to zero.
},
}
ok, parts := agent.IsConsulServer(m)
@ -146,6 +153,12 @@ func TestIsConsulServer_Optional(t *testing.T) {
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Port != 10000 {
t.Fatalf("bad: %v", parts.Port)
}
if parts.WanJoinPort != 0 {
t.Fatalf("bad: %v", parts.WanJoinPort)
}
if parts.RaftVersion != 0 {
t.Fatalf("bad: %v", parts.RaftVersion)
}

View File

@ -96,6 +96,11 @@ type Config struct {
// SerfWANConfig is the configuration for the cross-dc serf
SerfWANConfig *serf.Config
// SerfFloodInterval controls how often we attempt to flood local Serf
// Consul servers into the global areas (WAN and user-defined areas in
// Consul Enterprise).
SerfFloodInterval time.Duration
// ReconcileInterval controls how often we reconcile the strongly
// consistent store with the Serf info. This is used to handle nodes
// that are force removed, as well as intermittent unavailability during
@ -331,6 +336,7 @@ func DefaultConfig() *Config {
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(),
SerfFloodInterval: 60 * time.Second,
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersion2Compatible,
ACLTTL: 30 * time.Second,

View File

@ -142,6 +142,12 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
if s.config.BootstrapExpect != 0 {
s.maybeBootstrap()
}
// Kick the WAN flooder.
select {
case s.floodCh <- struct{}{}:
default:
}
}
}

View File

@ -156,6 +156,10 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
// floodCh is kicked whenever we should try to flood LAN servers into
// the WAN.
floodCh chan struct{}
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
// destroy the session via standard session destroy processing
@ -254,6 +258,7 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
floodCh: make(chan struct{}),
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}
@ -313,6 +318,38 @@ func NewServer(config *Config) (*Server, error) {
}
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Fire up the LAN <-> WAN Serf join flooder.
go func() {
ticker := time.NewTicker(config.SerfFloodInterval)
defer ticker.Stop()
portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}
}
for {
WAIT:
select {
case <-s.shutdownCh:
return
case <-ticker.C:
goto FLOOD
case <-s.floodCh:
goto FLOOD
}
goto WAIT
FLOOD:
servers.FloodJoins(s.logger, portFn, config.Datacenter, s.serfLAN, s.serfWAN)
}
}()
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
go s.monitorLeadership()
@ -342,6 +379,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", s.config.SerfWANConfig.MemberlistConfig.BindPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter

View File

@ -225,6 +225,65 @@ func TestServer_JoinWAN(t *testing.T) {
})
}
func TestServer_JoinWAN_Flood(t *testing.T) {
// Set up two servers in a WAN.
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
return len(s1.WANMembers()) == 2, nil
}, func(err error) {
t.Fatalf("bad len")
})
testutil.WaitForResult(func() (bool, error) {
return len(s2.WANMembers()) == 2, nil
}, func(err error) {
t.Fatalf("bad len")
})
dir3, s3 := testServer(t)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Do just a LAN join for the new server and make sure it
// shows up in the WAN.
addr = fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
return len(s1.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
testutil.WaitForResult(func() (bool, error) {
return len(s2.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
testutil.WaitForResult(func() (bool, error) {
return len(s3.WANMembers()) == 3, nil
}, func(err error) {
t.Fatalf("bad len")
})
}
func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

View File

@ -0,0 +1,80 @@
package servers
import (
"fmt"
"log"
"net"
"strings"
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/serf/serf"
)
// FloodPortFn gets the port to use for a given server when flood-joining. This
// will return false if it doesn't have one.
type FloodPortFn func(*agent.Server) (int, bool)
// FloodJoins attempts to make sure all Consul servers in the local Serf
// instance are joined in the global Serf instance. It assumes names in the
// local area are of the form <node> and those in the global area are of the
// form <node>.<dc> as is done for WAN and general network areas in Consul
// Enterprise.
func FloodJoins(logger *log.Logger, portFn FloodPortFn,
localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf) {
// Names in the global Serf have the datacenter suffixed.
suffix := fmt.Sprintf(".%s", localDatacenter)
// Index the global side so we can do one pass through the local side
// with cheap lookups.
index := make(map[string]*agent.Server)
for _, m := range globalSerf.Members() {
ok, server := agent.IsConsulServer(m)
if !ok {
continue
}
if server.Datacenter != localDatacenter {
continue
}
localName := strings.TrimSuffix(server.Name, suffix)
index[localName] = server
}
// Now run through the local side and look for joins.
for _, m := range localSerf.Members() {
if m.Status != serf.StatusAlive {
continue
}
ok, server := agent.IsConsulServer(m)
if !ok {
continue
}
if _, ok := index[server.Name]; ok {
continue
}
// We can't use the port number from the local Serf, so we just
// get the host part.
addr, _, err := net.SplitHostPort(server.Addr.String())
if err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q (bad address %q): %v",
server.Name, server.Addr.String(), err)
}
// Let the callback see if it can get the port number, otherwise
// leave it blank to behave as if we just supplied an address.
if port, ok := portFn(server); ok {
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port))
}
// Do the join!
if _, err := globalSerf.Join([]string{addr}, true); err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v",
server.Name, addr, err)
}
}
}