Clean up raft servers without a corresponding serf entry

This commit is contained in:
Kyle Havlovitz 2017-03-29 12:52:00 -07:00
parent 75b8ff44e3
commit 18c95b504a
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
2 changed files with 93 additions and 6 deletions

View File

@ -78,17 +78,37 @@ func (s *Server) pruneDeadServers() error {
// Find any failed servers
var failed []string
staleRaftServers := make(map[string]raft.Server)
if autopilotConf.CleanupDeadServers {
future := s.raft.GetConfiguration()
if future.Error() != nil {
return err
}
for _, server := range future.Configuration().Servers {
staleRaftServers[string(server.Address)] = server
}
for _, member := range s.serfLAN.Members() {
valid, _ := agent.IsConsulServer(member)
if valid && member.Status == serf.StatusFailed {
failed = append(failed, member.Name)
valid, parts := agent.IsConsulServer(member)
if valid {
// Remove this server from the stale list; it has a serf entry
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
delete(staleRaftServers, parts.Addr.String())
}
if member.Status == serf.StatusFailed {
failed = append(failed, member.Name)
}
}
}
}
removalCount := len(failed) + len(staleRaftServers)
// Nothing to remove, return early
if len(failed) == 0 {
if removalCount == 0 {
return nil
}
@ -98,13 +118,17 @@ func (s *Server) pruneDeadServers() error {
}
// Only do removals if a minority of servers will be affected
if len(failed) < peers/2 {
if removalCount < peers/2 {
for _, server := range failed {
s.logger.Printf("[INFO] consul: Attempting removal of failed server: %v", server)
go s.serfLAN.RemoveFailedNode(server)
}
for _, raftServer := range staleRaftServers {
s.logger.Printf("[INFO] consul: Attempting removal of stale raft server : %v", raftServer.ID)
s.raft.RemoveServer(raftServer.ID, 0, 0)
}
} else {
s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", len(failed), peers)
s.logger.Printf("[DEBUG] consul: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
}
return nil

View File

@ -153,6 +153,69 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
}
}
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
servers := []*Server{s1, s2, s3}
// Join the servers to s1
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
for _, s := range servers[1:] {
if _, err := s.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
}
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal(err)
}
}
// Add s4 to peers directly
s4addr := fmt.Sprintf("127.0.0.1:%d",
s4.config.SerfLANConfig.MemberlistConfig.BindPort)
s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr),0, 0)
// Verify we have 4 peers
peers, err := s1.numPeers()
if err != nil {
t.Fatal(err)
}
if peers != 4 {
t.Fatalf("bad: %v", peers)
}
// Wait for s4 to be removed
for _, s := range []*Server{s1, s2, s3} {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal(err)
}
}
}
func TestAutopilot_PromoteNonVoter(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"