More autopilot reorganizing

This commit is contained in:
Kyle Havlovitz 2017-12-13 10:57:37 -08:00
parent f347c8a531
commit 77d92bf15c
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
3 changed files with 57 additions and 41 deletions

View File

@ -26,29 +26,30 @@ func (d *AutopilotDelegate) FetchStats(ctx context.Context, servers []serf.Membe
return d.server.statsFetcher.Fetch(ctx, servers)
}
func (d *AutopilotDelegate) IsServer(m serf.Member) (bool, *autopilot.ServerInfo) {
func (d *AutopilotDelegate) IsServer(m serf.Member) (*autopilot.ServerInfo, error) {
if m.Tags["role"] != "consul" {
return false, nil
return nil, nil
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return false, nil
return nil, err
}
build_version, err := metadata.Build(&m)
if err != nil {
return false, nil
return nil, err
}
return true, &autopilot.ServerInfo{
server := &autopilot.ServerInfo{
Name: m.Name,
ID: m.Tags["id"],
Addr: &net.TCPAddr{IP: m.Addr, Port: port},
Build: *build_version,
Status: m.Status,
}
return server, nil
}
// Heartbeat a metric for monitoring if we're the leader

View File

@ -18,7 +18,7 @@ import (
type Delegate interface {
AutopilotConfig() *Config
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
IsServer(serf.Member) (bool, *ServerInfo)
IsServer(serf.Member) (*ServerInfo, error)
NotifyHealth(OperatorHealthReply)
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
Raft() *raft.Raft
@ -87,36 +87,14 @@ func (a *Autopilot) run() {
case <-a.shutdownCh:
return
case <-ticker.C:
autopilotConfig := a.delegate.AutopilotConfig()
if autopilotConfig == nil {
continue
}
// Skip the non-voter promotions unless all servers support the new APIs
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
a.logger.Printf("[ERR] autopilot: error getting server raft protocol versions: %s", err)
continue
}
if minRaftProtocol >= 3 {
promotions, err := a.delegate.PromoteNonVoters(autopilotConfig, a.GetClusterHealth())
if err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for non-voters to promote: %s", err)
}
if err := a.handlePromotions(promotions); err != nil {
a.logger.Printf("[ERR] autopilot: Error handling promotions: %s", err)
}
if err := a.promoteServers(); err != nil {
a.logger.Printf("[ERR] autopilot: %v", err)
}
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
case <-a.removeDeadCh:
autopilotConfig := a.delegate.AutopilotConfig()
if autopilotConfig == nil {
continue
}
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
@ -124,6 +102,31 @@ func (a *Autopilot) run() {
}
}
// promoteServers asks the delegate for any promotions and carries them out.
func (a *Autopilot) promoteServers() error {
conf := a.delegate.AutopilotConfig()
if conf == nil {
return nil
}
// Skip the non-voter promotions unless all servers support the new APIs
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol >= 3 {
promotions, err := a.delegate.PromoteNonVoters(conf, a.GetClusterHealth())
if err != nil {
return fmt.Errorf("error checking for non-voters to promote: %s", err)
}
if err := a.handlePromotions(promotions); err != nil {
return fmt.Errorf("error handling promotions: %s", err)
}
}
return nil
}
// fmtServer prints info about a server in a standard way for logging.
func fmtServer(server raft.Server) string {
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
@ -133,7 +136,7 @@ func fmtServer(server raft.Server) string {
func NumPeers(raftConfig raft.Configuration) int {
var numPeers int
for _, server := range raftConfig.Servers {
if isVoter(server.Suffrage) {
if server.Suffrage == raft.Voter {
numPeers++
}
}
@ -159,11 +162,15 @@ func (a *Autopilot) pruneDeadServers() error {
serfLAN := a.delegate.Serf()
for _, member := range serfLAN.Members() {
valid, parts := a.delegate.IsServer(member)
if valid {
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
// todo(kyhavlov): change this to index by UUID
if _, ok := staleRaftServers[parts.Addr.String()]; ok {
delete(staleRaftServers, parts.Addr.String())
if _, ok := staleRaftServers[server.Addr.String()]; ok {
delete(staleRaftServers, server.Addr.String())
}
if member.Status == serf.StatusFailed {
@ -218,7 +225,11 @@ func (a *Autopilot) MinRaftProtocol() (int, error) {
continue
}
if ok, _ := a.delegate.IsServer(m); !ok {
server, err := a.delegate.IsServer(m)
if err != nil {
return -1, err
}
if server == nil {
continue
}
@ -316,9 +327,13 @@ func (a *Autopilot) updateClusterHealth() error {
continue
}
valid, parts := a.delegate.IsServer(member)
if valid {
serverMap[parts.ID] = parts
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
serverMap[server.ID] = server
serverMembers = append(serverMembers, member)
}
}
@ -450,7 +465,7 @@ func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
return a.clusterHealth.ServerHealth(id)
}
func isVoter(suffrage raft.ServerSuffrage) bool {
func isPotentialVoter(suffrage raft.ServerSuffrage) bool {
switch suffrage {
case raft.Voter, raft.Staging:
return true

View File

@ -14,7 +14,7 @@ func PromoteStableServers(autopilotConfig *Config, health OperatorHealthReply, s
now := time.Now()
var promotions []raft.Server
for _, server := range servers {
if !isVoter(server.Suffrage) {
if !isPotentialVoter(server.Suffrage) {
health := health.ServerHealth(string(server.ID))
if health.IsStable(now, autopilotConfig) {
promotions = append(promotions, server)