diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 8757e3a45..2c721d627 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -26,29 +26,30 @@ func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Membe return d.server.statsFetcher.Fetch(ctx, servers) } -func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) { +func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) { if m.Tags["role"] != "consul" { - return false, nil + return nil, nil } port_str := m.Tags["port"] port, err := strconv.Atoi(port_str) if err != nil { - return false, nil + return nil, err } build_version, err := metadata.Build(&m) if err != nil { - return false, nil + return nil, err } - return true, &autopilot.ServerInfo{ + server := &autopilot.ServerInfo{ Name: m.Name, ID: m.Tags["id"], Addr: &net.TCPAddr{IP: m.Addr, Port: port}, Build: *build_version, Status: m.Status, } + return server, nil } // Heartbeat a metric for monitoring if we're the leader diff --git a/agent/consul/autopilot/autopilot.go b/agent/consul/autopilot/autopilot.go index f5eef1ee6..f8b9d9c51 100644 --- a/agent/consul/autopilot/autopilot.go +++ b/agent/consul/autopilot/autopilot.go @@ -18,7 +18,7 @@ import ( type Delegate interface { AutopilotConfig() *Config FetchStats(context.Context, []serf.Member) map[string]*ServerStats - IsServer(serf.Member) (bool, *ServerInfo) + IsServer(serf.Member) (*ServerInfo, error) NotifyHealth(OperatorHealthReply) PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error) Raft() *raft.Raft @@ -87,36 +87,14 @@ func (a *Autopilot) run() { case <-a.shutdownCh: return case <-ticker.C: - autopilotConfig := a.delegate.AutopilotConfig() - if autopilotConfig == nil { - continue - } - - // Skip the non-voter promotions unless all servers support the new APIs - minRaftProtocol, err := a.MinRaftProtocol() - if err != nil { - a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err) - continue - } - if minRaftProtocol >= 3 { - promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth()) - if err != nil { - a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err) - } - if err := a.handlePromotions(promotions); err != nil { - a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err) - } + if err := a.promoteServers(); err != nil { + a.logger.Printf("[ERR] autopilot: %v", err) } if err := a.pruneDeadServers(); err != nil { a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) } case <-a.removeDeadCh: - autopilotConfig := a.delegate.AutopilotConfig() - if autopilotConfig == nil { - continue - } - if err := a.pruneDeadServers(); err != nil { a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err) } @@ -124,6 +102,31 @@ func (a *Autopilot) run() { } } +// promoteServers asks the delegate for any promotions and carries them out. +func (a *Autopilot) promoteServers() error { + conf := a.delegate.AutopilotConfig() + if conf == nil { + return nil + } + + // Skip the non-voter promotions unless all servers support the new APIs + minRaftProtocol, err := a.MinRaftProtocol() + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) + } + if minRaftProtocol >= 3 { + promotions, err := a.delegate.PromoteNonVoters(conf, a.GetClusterHealth()) + if err != nil { + return fmt.Errorf("error checking for non-voters to promote: %s", err) + } + if err := a.handlePromotions(promotions); err != nil { + return fmt.Errorf("error handling promotions: %s", err) + } + } + + return nil +} + // fmtServer prints info about a server in a standard way for logging. func fmtServer(server raft.Server) string { return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address) @@ -133,7 +136,7 @@ func fmtServer(server raft.Server) string { func NumPeers(raftConfig raft.Configuration) int { var numPeers int for _, server := range raftConfig.Servers { - if isVoter(server.Suffrage) { + if server.Suffrage == raft.Voter { numPeers++ } } @@ -159,11 +162,15 @@ func (a *Autopilot) pruneDeadServers() error { serfLAN := a.delegate.Serf() for _, member := range serfLAN.Members() { - valid, parts := a.delegate.IsServer(member) - if valid { + server, err := a.delegate.IsServer(member) + if err != nil { + a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err) + continue + } + if server != nil { // todo(kyhavlov): change this to index by UUID - if _, ok := staleRaftServers[parts.Addr.String()]; ok { - delete(staleRaftServers, parts.Addr.String()) + if _, ok := staleRaftServers[server.Addr.String()]; ok { + delete(staleRaftServers, server.Addr.String()) } if member.Status == serf.StatusFailed { @@ -218,7 +225,11 @@ func (a *Autopilot) MinRaftProtocol() (int, error) { continue } - if ok, _ := a.delegate.IsServer(m); !ok { + server, err := a.delegate.IsServer(m) + if err != nil { + return -1, err + } + if server == nil { continue } @@ -316,9 +327,13 @@ func (a *Autopilot) updateClusterHealth() error { continue } - valid, parts := a.delegate.IsServer(member) - if valid { - serverMap[parts.ID] = parts + server, err := a.delegate.IsServer(member) + if err != nil { + a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err) + continue + } + if server != nil { + serverMap[server.ID] = server serverMembers = append(serverMembers, member) } } @@ -450,7 +465,7 @@ func (a *Autopilot) GetServerHealth(id string) *ServerHealth { return a.clusterHealth.ServerHealth(id) } -func isVoter(suffrage raft.ServerSuffrage) bool { +func isPotentialVoter(suffrage raft.ServerSuffrage) bool { switch suffrage { case raft.Voter, raft.Staging: return true diff --git a/agent/consul/autopilot/promotion.go b/agent/consul/autopilot/promotion.go index ff008445f..bd10e70ed 100644 --- a/agent/consul/autopilot/promotion.go +++ b/agent/consul/autopilot/promotion.go @@ -14,7 +14,7 @@ func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, s now := time.Now() var promotions []raft.Server for _, server := range servers { - if !isVoter(server.Suffrage) { + if !isPotentialVoter(server.Suffrage) { health := health.ServerHealth(string(server.ID)) if health.IsStable(now, autopilotConfig) { promotions = append(promotions, server)