consul: Adding tons of shit, leave test

This commit is contained in:
Armon Dadgar 2013-12-09 16:05:15 -08:00
parent 9c4583c0e3
commit 761197575c
4 changed files with 117 additions and 8 deletions

View File

@ -30,6 +30,6 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error {
r.server.logger.Printf("[ERR] Failed to parse peer: %v", err)
return err
}
future := r.server.raft.AddPeer(peer)
future := r.server.raft.RemovePeer(peer)
return future.Error()
}

View File

@ -1,7 +1,12 @@
package consul
import (
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"net"
"strconv"
"strings"
"time"
)
// lanEventHandler is used to handle events from the lan Serf cluster
@ -54,6 +59,18 @@ func (s *Server) wanEventHandler() {
// localJoin is used to handle join events on the lan serf cluster
func (s *Server) localJoin(me serf.MemberEvent) {
// Check for consul members
for _, m := range me.Members {
ok, dc, port := s.isConsulServer(m)
if ok {
if dc != s.config.Datacenter {
s.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster",
m.Name, dc)
return
}
go s.joinConsulServer(m, port)
}
}
}
// localLeave is used to handle leave events on the lan serf cluster
@ -83,3 +100,59 @@ func (s *Server) remoteFailed(me serf.MemberEvent) {
// remoteEvent is used to handle events on the wan serf cluster
func (s *Server) remoteEvent(ue serf.UserEvent) {
}
// Returns if a member is a consul server. Returns a bool,
// the data center, and the rpc port
func (s *Server) isConsulServer(m serf.Member) (bool, string, int) {
role := m.Role
if !strings.HasPrefix(role, "consul:") {
return false, "", 0
}
parts := strings.SplitN(role, ":", 3)
datacenter := parts[1]
port_str := parts[2]
port, err := strconv.Atoi(port_str)
if err != nil {
s.logger.Printf("[ERR] Failed to parse role: %s", role)
return false, "", 0
}
return true, datacenter, port
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, port int) {
if m.Name == s.config.NodeName {
return
}
var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port}
var future raft.Future
CHECK:
// Get the Raft peers
peers, err := s.raftPeers.Peers()
if err != nil {
s.logger.Printf("[ERR] Failed to get raft peers: %v", err)
goto WAIT
}
// Bail if this node is already a peer
for _, p := range peers {
if p.String() == addr.String() {
return
}
}
// Attempt to add as a peer
future = s.raft.AddPeer(addr)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] Failed to add raft peer: %v", err)
} else {
return
}
WAIT:
time.Sleep(500 * time.Millisecond)
goto CHECK
}

View File

@ -145,8 +145,9 @@ func (s *Server) ensurePath(path string, dir bool) error {
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
addr := s.rpcListener.Addr().(*net.TCPAddr)
conf.NodeName = s.config.NodeName
conf.Role = fmt.Sprintf("consul:%s:%s", s.config.Datacenter, s.rpcListener.Addr().String())
conf.Role = fmt.Sprintf("consul:%s:%d", s.config.Datacenter, addr.Port)
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
@ -231,12 +232,10 @@ func (s *Server) Shutdown() error {
if s.serfLAN != nil {
s.serfLAN.Shutdown()
s.serfLAN = nil
}
if s.serfWAN != nil {
s.serfWAN.Shutdown()
s.serfWAN = nil
}
if s.raft != nil {
@ -244,14 +243,10 @@ func (s *Server) Shutdown() error {
s.raftLayer.Close()
s.raft.Shutdown()
s.raftStore.Close()
s.raft = nil
s.raftLayer = nil
s.raftStore = nil
}
if s.rpcListener != nil {
s.rpcListener.Close()
s.rpcListener = nil
}
// Close all the RPC connections

View File

@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"
)
var nextPort = 15000
@ -99,3 +100,43 @@ func TestServer_JoinWAN(t *testing.T) {
t.Fatalf("err: %v", err)
}
}
func TestServer_Leave(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.Port)
if err := s2.JoinLAN(addr); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(time.Second)
p1, _ := s1.raftPeers.Peers()
if len(p1) != 2 {
t.Fatalf("should have 2 peers: %v", p1)
}
p2, _ := s2.raftPeers.Peers()
if len(p2) != 2 {
t.Fatalf("should have 2 peers: %v", p2)
}
// Issue a leave
if err := s2.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
// Should lose a peer
p1, _ = s1.raftPeers.Peers()
if len(p1) != 1 {
t.Fatalf("should have 1 peer: %v", p1)
}
}