consul: Add+test JoinLAN/JoinWAN

This commit is contained in:
Armon Dadgar 2013-12-06 17:18:09 -08:00
parent 72e93a7432
commit af9176bca0
4 changed files with 100 additions and 18 deletions

View File

@ -36,11 +36,11 @@ type Config struct {
// by the WAN and LAN
RPCAddr string
// SerfLocalConfig is the configuration for the local serf
SerfLocalConfig *serf.Config
// SerfLANConfig is the configuration for the intra-dc serf
SerfLANConfig *serf.Config
// SerfRemoteConfig is the configuration for the remtoe serf
SerfRemoteConfig *serf.Config
// SerfWANConfig is the configuration for the cross-dc serf
SerfWANConfig *serf.Config
// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
@ -55,22 +55,22 @@ func DefaultConfig() *Config {
}
conf := &Config{
Datacenter: "dc1",
NodeName: hostname,
RaftBindAddr: DefaultRaftAddr,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLocalConfig: serf.DefaultConfig(),
SerfRemoteConfig: serf.DefaultConfig(),
Datacenter: "dc1",
NodeName: hostname,
RaftBindAddr: DefaultRaftAddr,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(),
}
// Remote Serf should use the WAN timing, since we are using it
// WAN Serf should use the WAN timing, since we are using it
// to communicate between DC's
conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig()
conf.SerfWANConfig.MemberlistConfig = memberlist.DefaultWANConfig()
// Ensure we don't have port conflicts
conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort
conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort
conf.SerfLANConfig.MemberlistConfig.Port = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.Port = DefaultWANSerfPort
return conf
}

View File

@ -1,4 +1,4 @@
package connsul
package consul
// lanEventHandler is used to handle events from the lan Serf cluster
func (s *Server) lanEventHandler() {

View File

@ -99,7 +99,7 @@ func NewServer(config *Config) (*Server, error) {
// Initialize the lan Serf
var err error
s.serfLAN, err = s.setupSerf(config.SerfLocalConfig,
s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
s.eventChLAN, serfLANSnapshot)
if err != nil {
s.Shutdown()
@ -107,7 +107,7 @@ func NewServer(config *Config) (*Server, error) {
}
// Initialize the wan Serf
s.serfWAN, err = s.setupSerf(config.SerfRemoteConfig,
s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
s.eventChWAN, serfWANSnapshot)
if err != nil {
s.Shutdown()
@ -251,3 +251,19 @@ func (s *Server) Shutdown() error {
return nil
}
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (s *Server) JoinLAN(addr string) error {
_, err := s.serfLAN.Join([]string{addr}, false)
return err
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addr string) error {
_, err := s.serfWAN.Join([]string{addr}, false)
return err
}

View File

@ -1,11 +1,20 @@
package consul
import (
"fmt"
"io/ioutil"
"os"
"testing"
)
var nextPort = 15000
func getPort() int {
p := nextPort
nextPort++
return p
}
func tmpDir(t *testing.T) string {
dir, err := ioutil.TempDir("", "consul")
if err != nil {
@ -14,6 +23,28 @@ func tmpDir(t *testing.T) string {
return dir
}
func testServer(t *testing.T) (string, *Server) {
dir := tmpDir(t)
config := DefaultConfig()
config.DataDir = dir
// Adjust the ports
p := getPort()
config.NodeName = fmt.Sprintf("Node %d", p)
config.RaftBindAddr = fmt.Sprintf("127.0.0.1:%d", p)
config.RPCAddr = fmt.Sprintf("127.0.0.1:%d", getPort())
config.SerfLANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfLANConfig.MemberlistConfig.Port = getPort()
config.SerfWANConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfWANConfig.MemberlistConfig.Port = getPort()
server, err := NewServer(config)
if err != nil {
t.Fatalf("err: %v", err)
}
return dir, server
}
func TestServer_StartStop(t *testing.T) {
dir := tmpDir(t)
defer os.RemoveAll(dir)
@ -35,3 +66,38 @@ func TestServer_StartStop(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestServer_JoinLAN(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.Port)
if err := s2.JoinLAN(addr); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestServer_JoinWAN(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.Port)
if err := s2.JoinWAN(addr); err != nil {
t.Fatalf("err: %v", err)
}
t.Fatalf("fail")
}