Remove failed nodes from serfWAN (#6028)
* Prune Servers from WAN and LAN * cleaned up and fixed LAN to WAN * moving things around * force-leave remove from serfWAN, create pruneSerfWAN * removed serfWAN remove, reduced complexity, fixed comments * add another place to remove from serfWAN * add nil check * Update agent/consul/server.go Co-Authored-By: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
7400ce2594
commit
8a930f7d3a
|
@ -77,6 +77,10 @@ func (d *AutopilotDelegate) Raft() *raft.Raft {
|
|||
return d.server.raft
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) Serf() *serf.Serf {
|
||||
func (d *AutopilotDelegate) SerfLAN() *serf.Serf {
|
||||
return d.server.serfLAN
|
||||
}
|
||||
|
||||
func (d *AutopilotDelegate) SerfWAN() *serf.Serf {
|
||||
return d.server.serfWAN
|
||||
}
|
||||
|
|
|
@ -22,7 +22,8 @@ type Delegate interface {
|
|||
NotifyHealth(OperatorHealthReply)
|
||||
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
|
||||
Raft() *raft.Raft
|
||||
Serf() *serf.Serf
|
||||
SerfLAN() *serf.Serf
|
||||
SerfWAN() *serf.Serf
|
||||
}
|
||||
|
||||
// Autopilot is a mechanism for automatically managing the Raft
|
||||
|
@ -182,7 +183,7 @@ func (a *Autopilot) pruneDeadServers() error {
|
|||
|
||||
// Failed servers are known to Serf and marked failed, and stale servers
|
||||
// are known to Raft but not Serf.
|
||||
var failed []string
|
||||
var failed []serf.Member
|
||||
staleRaftServers := make(map[string]raft.Server)
|
||||
raftNode := a.delegate.Raft()
|
||||
future := raftNode.GetConfiguration()
|
||||
|
@ -194,8 +195,8 @@ func (a *Autopilot) pruneDeadServers() error {
|
|||
for _, server := range raftConfig.Servers {
|
||||
staleRaftServers[string(server.Address)] = server
|
||||
}
|
||||
|
||||
serfLAN := a.delegate.Serf()
|
||||
serfWAN := a.delegate.SerfWAN()
|
||||
serfLAN := a.delegate.SerfLAN()
|
||||
for _, member := range serfLAN.Members() {
|
||||
server, err := a.delegate.IsServer(member)
|
||||
if err != nil {
|
||||
|
@ -214,8 +215,12 @@ func (a *Autopilot) pruneDeadServers() error {
|
|||
if found && s.Suffrage == raft.Nonvoter {
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", member.Name)
|
||||
go serfLAN.RemoveFailedNode(member.Name)
|
||||
if serfWAN != nil {
|
||||
go serfWAN.RemoveFailedNode(member.Name)
|
||||
}
|
||||
} else {
|
||||
failed = append(failed, member.Name)
|
||||
failed = append(failed, member)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -231,8 +236,12 @@ func (a *Autopilot) pruneDeadServers() error {
|
|||
peers := NumPeers(raftConfig)
|
||||
if removalCount < peers/2 {
|
||||
for _, node := range failed {
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
|
||||
go serfLAN.RemoveFailedNode(node)
|
||||
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node.Name)
|
||||
go serfLAN.RemoveFailedNode(node.Name)
|
||||
if serfWAN != nil {
|
||||
go serfWAN.RemoveFailedNode(fmt.Sprintf("%s.%s", node.Name, node.Tags["dc"]))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
minRaftProtocol, err := a.MinRaftProtocol()
|
||||
|
@ -260,7 +269,7 @@ func (a *Autopilot) pruneDeadServers() error {
|
|||
|
||||
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
|
||||
func (a *Autopilot) MinRaftProtocol() (int, error) {
|
||||
return minRaftProtocol(a.delegate.Serf().Members(), a.delegate.IsServer)
|
||||
return minRaftProtocol(a.delegate.SerfLAN().Members(), a.delegate.IsServer)
|
||||
}
|
||||
|
||||
func minRaftProtocol(members []serf.Member, serverFunc func(serf.Member) (*ServerInfo, error)) (int, error) {
|
||||
|
@ -369,7 +378,7 @@ func (a *Autopilot) updateClusterHealth() error {
|
|||
// Get the the serf members which are Consul servers
|
||||
var serverMembers []serf.Member
|
||||
serverMap := make(map[string]*ServerInfo)
|
||||
for _, member := range a.delegate.Serf().Members() {
|
||||
for _, member := range a.delegate.SerfLAN().Members() {
|
||||
if member.Status == serf.StatusLeft {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -1003,6 +1004,11 @@ func (s *Server) RemoveFailedNode(node string) error {
|
|||
if err := s.serfLAN.RemoveFailedNode(node); err != nil {
|
||||
return err
|
||||
}
|
||||
// The Serf WAN pool stores members as node.datacenter
|
||||
// so the dc is appended if not present
|
||||
if !strings.HasSuffix(node, "."+s.config.Datacenter) {
|
||||
node = node + "." + s.config.Datacenter
|
||||
}
|
||||
if s.serfWAN != nil {
|
||||
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue