diff --git a/consul/leader.go b/consul/leader.go index c08711204..a9802d8b2 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -469,9 +469,11 @@ func (s *Server) handleReapMember(member serf.Member) error { // handleDeregisterMember is used to deregister a member of a given reason func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { state := s.fsm.State() - // Check if the node does not exists - _, found, _ := state.GetNode(member.Name) - if !found { + // Do not deregister ourself. This can only happen if the current leader + // is leaving. Instead, we should allow a follower to take-over and + // deregister us later. + if member.Name == s.config.NodeName { + s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName) return nil } @@ -483,11 +485,9 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } } - // Do not deregister ourself. This can only happen if the current leader - // is leaving. Instead, we should allow a follower to take-over and - // deregister us later. - if member.Name == s.config.NodeName { - s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName) + // Check if the node does not exists + _, found, _ := state.GetNode(member.Name) + if !found { return nil } diff --git a/consul/server.go b/consul/server.go index 14b9bd5ad..a95791f0f 100644 --- a/consul/server.go +++ b/consul/server.go @@ -53,6 +53,10 @@ const ( // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently commited entries. raftLogCacheSize = 512 + + // raftRemoveGracePeriod is how long we wait to allow a RemovePeer + // to replicate to gracefully leave the cluster. + raftRemoveGracePeriod = 5 * time.Second ) // Server is Consul server which manages the service discovery, @@ -521,6 +525,25 @@ func (s *Server) Leave() error { s.logger.Printf("[INFO] consul: server starting leave") s.left = true + // Check the number of known peers + numPeers, err := s.numOtherPeers() + if err != nil { + s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err) + return err + } + + // If we are the current leader, and we have any other peers (cluster has multiple + // servers), we should do a RemovePeer to safely reduce the quorum size. If we are + // not the leader, then we should issue our leave intention and wait to be removed + // for some sane period of time. + isLeader := s.IsLeader() + if isLeader && numPeers > 0 { + future := s.raft.RemovePeer(s.raftTransport.LocalAddr()) + if err := future.Error(); err != nil && err != raft.ErrUnknownPeer { + s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err) + } + } + // Leave the WAN pool if s.serfWAN != nil { if err := s.serfWAN.Leave(); err != nil { @@ -534,9 +557,47 @@ func (s *Server) Leave() error { s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err) } } + + // If we were not leader, wait to be safely removed from the cluster. + // We must wait to allow the raft replication to take place, otherwise + // an immediate shutdown could cause a loss of quorum. + if !isLeader { + limit := time.Now().Add(raftRemoveGracePeriod) + for numPeers > 0 && time.Now().Before(limit) { + // Update the number of peers + numPeers, err = s.numOtherPeers() + if err != nil { + s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err) + break + } + + // Avoid the sleep if we are done + if numPeers == 0 { + break + } + + // Sleep a while and check again + time.Sleep(50 * time.Millisecond) + } + if numPeers != 0 { + s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout") + } + } + return nil } +// numOtherPeers is used to check on the number of known peers +// excluding the local ndoe +func (s *Server) numOtherPeers() (int, error) { + peers, err := s.raftPeers.Peers() + if err != nil { + return 0, err + } + otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr()) + return len(otherPeers), nil +} + // JoinLAN is used to have Consul join the inner-DC pool // The target address should be another node inside the DC // listening on the Serf LAN address diff --git a/consul/server_test.go b/consul/server_test.go index 59c220b19..0932a1f32 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -216,6 +216,61 @@ func TestServer_JoinWAN(t *testing.T) { }) } +func TestServer_LeaveLeader(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + // Second server not in bootstrap mode + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + var p1 []net.Addr + var p2 []net.Addr + + testutil.WaitForResult(func() (bool, error) { + p1, _ = s1.raftPeers.Peers() + return len(p1) == 2, errors.New(fmt.Sprintf("%v", p1)) + }, func(err error) { + t.Fatalf("should have 2 peers: %v", err) + }) + + testutil.WaitForResult(func() (bool, error) { + p2, _ = s2.raftPeers.Peers() + return len(p2) == 2, errors.New(fmt.Sprintf("%v", p1)) + }, func(err error) { + t.Fatalf("should have 2 peers: %v", err) + }) + + // Issue a leave to the leader + for _, s := range []*Server{s1, s2} { + if !s.IsLeader() { + continue + } + if err := s.Leave(); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Should lose a peer + for _, s := range []*Server{s1, s2} { + testutil.WaitForResult(func() (bool, error) { + p1, _ = s.raftPeers.Peers() + return len(p1) == 1, nil + }, func(err error) { + t.Fatalf("should have 1 peer: %v", p1) + }) + } +} + func TestServer_Leave(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -250,18 +305,25 @@ func TestServer_Leave(t *testing.T) { t.Fatalf("should have 2 peers: %v", err) }) - // Issue a leave - if err := s2.Leave(); err != nil { - t.Fatalf("err: %v", err) + // Issue a leave to the non-leader + for _, s := range []*Server{s1, s2} { + if s.IsLeader() { + continue + } + if err := s.Leave(); err != nil { + t.Fatalf("err: %v", err) + } } // Should lose a peer - testutil.WaitForResult(func() (bool, error) { - p1, _ = s1.raftPeers.Peers() - return len(p1) == 1, nil - }, func(err error) { - t.Fatalf("should have 1 peer: %v", p1) - }) + for _, s := range []*Server{s1, s2} { + testutil.WaitForResult(func() (bool, error) { + p1, _ = s.raftPeers.Peers() + return len(p1) == 1, nil + }, func(err error) { + t.Fatalf("should have 1 peer: %v", p1) + }) + } } func TestServer_RPC(t *testing.T) {