Merge pull request #621 from hashicorp/f-leave

Changing interaction between Leave and RemovePeers
This commit is contained in:
Armon Dadgar 2015-01-21 16:28:19 -08:00
commit 7e27d923be
3 changed files with 140 additions and 17 deletions

View File

@ -469,9 +469,11 @@ func (s *Server) handleReapMember(member serf.Member) error {
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
state := s.fsm.State()
// Check if the node does not exists
_, found, _ := state.GetNode(member.Name)
if !found {
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member.Name == s.config.NodeName {
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
return nil
}
@ -483,11 +485,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
}
}
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member.Name == s.config.NodeName {
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
// Check if the node does not exists
_, found, _ := state.GetNode(member.Name)
if !found {
return nil
}

View File

@ -53,6 +53,10 @@ const (
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
)
// Server is Consul server which manages the service discovery,
@ -521,6 +525,25 @@ func (s *Server) Leave() error {
s.logger.Printf("[INFO] consul: server starting leave")
s.left = true
// Check the number of known peers
numPeers, err := s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
return err
}
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 0 {
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err)
}
}
// Leave the WAN pool
if s.serfWAN != nil {
if err := s.serfWAN.Leave(); err != nil {
@ -534,9 +557,47 @@ func (s *Server) Leave() error {
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if !isLeader {
limit := time.Now().Add(raftRemoveGracePeriod)
for numPeers > 0 && time.Now().Before(limit) {
// Update the number of peers
numPeers, err = s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
break
}
// Avoid the sleep if we are done
if numPeers == 0 {
break
}
// Sleep a while and check again
time.Sleep(50 * time.Millisecond)
}
if numPeers != 0 {
s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout")
}
}
return nil
}
// numOtherPeers is used to check on the number of known peers
// excluding the local ndoe
func (s *Server) numOtherPeers() (int, error) {
peers, err := s.raftPeers.Peers()
if err != nil {
return 0, err
}
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
return len(otherPeers), nil
}
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address

View File

@ -216,6 +216,61 @@ func TestServer_JoinWAN(t *testing.T) {
})
}
func TestServer_LeaveLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
// Second server not in bootstrap mode
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
var p1 []net.Addr
var p2 []net.Addr
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForResult(func() (bool, error) {
p2, _ = s2.raftPeers.Peers()
return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1))
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
// Issue a leave to the leader
for _, s := range []*Server{s1, s2} {
if !s.IsLeader() {
continue
}
if err := s.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Should lose a peer
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
}
}
func TestServer_Leave(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -250,18 +305,25 @@ func TestServer_Leave(t *testing.T) {
t.Fatalf("should have 2 peers: %v", err)
})
// Issue a leave
if err := s2.Leave(); err != nil {
t.Fatalf("err: %v", err)
// Issue a leave to the non-leader
for _, s := range []*Server{s1, s2} {
if s.IsLeader() {
continue
}
if err := s.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Should lose a peer
testutil.WaitForResult(func() (bool, error) {
p1, _ = s1.raftPeers.Peers()
return len(p1) == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
for _, s := range []*Server{s1, s2} {
testutil.WaitForResult(func() (bool, error) {
p1, _ = s.raftPeers.Peers()
return len(p1) == 1, nil
}, func(err error) {
t.Fatalf("should have 1 peer: %v", p1)
})
}
}
func TestServer_RPC(t *testing.T) {