diff --git a/consul/agent/server.go b/consul/agent/server.go index 749465224..c19a25e86 100644 --- a/consul/agent/server.go +++ b/consul/agent/server.go @@ -29,6 +29,7 @@ type Server struct { ID string Datacenter string Port int + WanJoinPort int Bootstrap bool Expect int Version int @@ -80,6 +81,15 @@ func IsConsulServer(m serf.Member) (bool, *Server) { return false, nil } + wan_join_port := 0 + wan_join_port_str, ok := m.Tags["wan_join_port"] + if ok { + wan_join_port, err = strconv.Atoi(wan_join_port_str) + if err != nil { + return false, nil + } + } + vsn_str := m.Tags["vsn"] vsn, err := strconv.Atoi(vsn_str) if err != nil { @@ -102,6 +112,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { ID: m.Tags["id"], Datacenter: datacenter, Port: port, + WanJoinPort: wan_join_port, Bootstrap: bootstrap, Expect: expect, Addr: addr, diff --git a/consul/agent/server_test.go b/consul/agent/server_test.go index f94c29cb0..f04ea6b38 100644 --- a/consul/agent/server_test.go +++ b/consul/agent/server_test.go @@ -55,13 +55,14 @@ func TestIsConsulServer(t *testing.T) { Name: "foo", Addr: net.IP([]byte{127, 0, 0, 1}), Tags: map[string]string{ - "role": "consul", - "id": "asdf", - "dc": "east-aws", - "port": "10000", - "vsn": "1", - "expect": "3", - "raft_vsn": "3", + "role": "consul", + "id": "asdf", + "dc": "east-aws", + "port": "10000", + "wan_join_port": "1234", + "vsn": "1", + "expect": "3", + "raft_vsn": "3", }, } ok, parts := agent.IsConsulServer(m) @@ -80,6 +81,12 @@ func TestIsConsulServer(t *testing.T) { if parts.Expect != 3 { t.Fatalf("bad: %v", parts.Expect) } + if parts.Port != 10000 { + t.Fatalf("bad: %v", parts.Port) + } + if parts.WanJoinPort != 1234 { + t.Fatalf("bad: %v", parts.WanJoinPort) + } if parts.RaftVersion != 3 { t.Fatalf("bad: %v", parts.RaftVersion) } @@ -126,8 +133,8 @@ func TestIsConsulServer_Optional(t *testing.T) { "dc": "east-aws", "port": "10000", "vsn": "1", - // raft_vsn and expect are optional and should default - // to zero. + // wan_join_port, raft_vsn, and expect are optional and + // should default to zero. }, } ok, parts := agent.IsConsulServer(m) @@ -146,6 +153,12 @@ func TestIsConsulServer_Optional(t *testing.T) { if parts.Expect != 0 { t.Fatalf("bad: %v", parts.Expect) } + if parts.Port != 10000 { + t.Fatalf("bad: %v", parts.Port) + } + if parts.WanJoinPort != 0 { + t.Fatalf("bad: %v", parts.WanJoinPort) + } if parts.RaftVersion != 0 { t.Fatalf("bad: %v", parts.RaftVersion) } diff --git a/consul/config.go b/consul/config.go index 12142cd2c..8e543654f 100644 --- a/consul/config.go +++ b/consul/config.go @@ -96,6 +96,11 @@ type Config struct { // SerfWANConfig is the configuration for the cross-dc serf SerfWANConfig *serf.Config + // SerfFloodInterval controls how often we attempt to flood local Serf + // Consul servers into the global areas (WAN and user-defined areas in + // Consul Enterprise). + SerfFloodInterval time.Duration + // ReconcileInterval controls how often we reconcile the strongly // consistent store with the Serf info. This is used to handle nodes // that are force removed, as well as intermittent unavailability during @@ -331,6 +336,7 @@ func DefaultConfig() *Config { RaftConfig: raft.DefaultConfig(), SerfLANConfig: serf.DefaultConfig(), SerfWANConfig: serf.DefaultConfig(), + SerfFloodInterval: 60 * time.Second, ReconcileInterval: 60 * time.Second, ProtocolVersion: ProtocolVersion2Compatible, ACLTTL: 30 * time.Second, diff --git a/consul/serf.go b/consul/serf.go index eee7f7e0a..4f00d564a 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -142,6 +142,12 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { if s.config.BootstrapExpect != 0 { s.maybeBootstrap() } + + // Kick the WAN flooder. + select { + case s.floodCh <- struct{}{}: + default: + } } } diff --git a/consul/server.go b/consul/server.go index 8178a0a1d..691def634 100644 --- a/consul/server.go +++ b/consul/server.go @@ -156,6 +156,10 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf + // floodCh is kicked whenever we should try to flood LAN servers into + // the WAN. + floodCh chan struct{} + // sessionTimers track the expiration time of each Session that has // a TTL. On expiration, a SessionDestroy event will occur, and // destroy the session via standard session destroy processing @@ -254,6 +258,7 @@ func NewServer(config *Config) (*Server, error) { router: servers.NewRouter(logger, shutdownCh, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + floodCh: make(chan struct{}), tombstoneGC: gc, shutdownCh: make(chan struct{}), } @@ -313,6 +318,38 @@ func NewServer(config *Config) (*Server, error) { } go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) + // Fire up the LAN <-> WAN Serf join flooder. + go func() { + ticker := time.NewTicker(config.SerfFloodInterval) + defer ticker.Stop() + + portFn := func(s *agent.Server) (int, bool) { + if s.WanJoinPort > 0 { + return s.WanJoinPort, true + } else { + return 0, false + } + } + + for { + WAIT: + select { + case <-s.shutdownCh: + return + + case <-ticker.C: + goto FLOOD + + case <-s.floodCh: + goto FLOOD + } + goto WAIT + + FLOOD: + servers.FloodJoins(s.logger, portFn, config.Datacenter, s.serfLAN, s.serfWAN) + } + }() + // Start monitoring leadership. This must happen after Serf is set up // since it can fire events when leadership is obtained. go s.monitorLeadership() @@ -342,6 +379,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter) } else { conf.NodeName = s.config.NodeName + conf.Tags["wan_join_port"] = fmt.Sprintf("%d", s.config.SerfWANConfig.MemberlistConfig.BindPort) } conf.Tags["role"] = "consul" conf.Tags["dc"] = s.config.Datacenter diff --git a/consul/server_test.go b/consul/server_test.go index 1eb97252d..5395de0cd 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -225,6 +225,65 @@ func TestServer_JoinWAN(t *testing.T) { }) } +func TestServer_JoinWAN_Flood(t *testing.T) { + // Set up two servers in a WAN. + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDC(t, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + return len(s1.WANMembers()) == 2, nil + }, func(err error) { + t.Fatalf("bad len") + }) + + testutil.WaitForResult(func() (bool, error) { + return len(s2.WANMembers()) == 2, nil + }, func(err error) { + t.Fatalf("bad len") + }) + + dir3, s3 := testServer(t) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Do just a LAN join for the new server and make sure it + // shows up in the WAN. + addr = fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + return len(s1.WANMembers()) == 3, nil + }, func(err error) { + t.Fatalf("bad len") + }) + + testutil.WaitForResult(func() (bool, error) { + return len(s2.WANMembers()) == 3, nil + }, func(err error) { + t.Fatalf("bad len") + }) + + testutil.WaitForResult(func() (bool, error) { + return len(s3.WANMembers()) == 3, nil + }, func(err error) { + t.Fatalf("bad len") + }) +} + func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/servers/serf_flooder.go b/consul/servers/serf_flooder.go new file mode 100644 index 000000000..5134ec677 --- /dev/null +++ b/consul/servers/serf_flooder.go @@ -0,0 +1,80 @@ +package servers + +import ( + "fmt" + "log" + "net" + "strings" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/serf/serf" +) + +// FloodPortFn gets the port to use for a given server when flood-joining. This +// will return false if it doesn't have one. +type FloodPortFn func(*agent.Server) (int, bool) + +// FloodJoins attempts to make sure all Consul servers in the local Serf +// instance are joined in the global Serf instance. It assumes names in the +// local area are of the form and those in the global area are of the +// form . as is done for WAN and general network areas in Consul +// Enterprise. +func FloodJoins(logger *log.Logger, portFn FloodPortFn, + localDatacenter string, localSerf *serf.Serf, globalSerf *serf.Serf) { + + // Names in the global Serf have the datacenter suffixed. + suffix := fmt.Sprintf(".%s", localDatacenter) + + // Index the global side so we can do one pass through the local side + // with cheap lookups. + index := make(map[string]*agent.Server) + for _, m := range globalSerf.Members() { + ok, server := agent.IsConsulServer(m) + if !ok { + continue + } + + if server.Datacenter != localDatacenter { + continue + } + + localName := strings.TrimSuffix(server.Name, suffix) + index[localName] = server + } + + // Now run through the local side and look for joins. + for _, m := range localSerf.Members() { + if m.Status != serf.StatusAlive { + continue + } + + ok, server := agent.IsConsulServer(m) + if !ok { + continue + } + + if _, ok := index[server.Name]; ok { + continue + } + + // We can't use the port number from the local Serf, so we just + // get the host part. + addr, _, err := net.SplitHostPort(server.Addr.String()) + if err != nil { + logger.Printf("[DEBUG] consul: Failed to flood-join %q (bad address %q): %v", + server.Name, server.Addr.String(), err) + } + + // Let the callback see if it can get the port number, otherwise + // leave it blank to behave as if we just supplied an address. + if port, ok := portFn(server); ok { + addr = net.JoinHostPort(addr, fmt.Sprintf("%d", port)) + } + + // Do the join! + if _, err := globalSerf.Join([]string{addr}, true); err != nil { + logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v", + server.Name, addr, err) + } + } +}