open-consul/agent/consul/autopilot/autopilot.go

523 lines
14 KiB
Go

package autopilot
import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
"time"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
// Delegate is the interface for the Autopilot mechanism
type Delegate interface {
AutopilotConfig() *Config
FetchStats(context.Context, []serf.Member) map[string]*ServerStats
IsServer(serf.Member) (*ServerInfo, error)
NotifyHealth(OperatorHealthReply)
PromoteNonVoters(*Config, OperatorHealthReply) ([]raft.Server, error)
Raft() *raft.Raft
Serf() *serf.Serf
}
// Autopilot is a mechanism for automatically managing the Raft
// quorum using server health information along with updates from Serf gossip.
// For more information, see https://www.consul.io/docs/guides/autopilot.html
type Autopilot struct {
logger *log.Logger
delegate Delegate
interval time.Duration
healthInterval time.Duration
clusterHealth OperatorHealthReply
clusterHealthLock sync.RWMutex
enabled bool
removeDeadCh chan struct{}
shutdownCh chan struct{}
shutdownLock sync.Mutex
waitGroup sync.WaitGroup
}
type ServerInfo struct {
Name string
ID string
Addr net.Addr
Build version.Version
Status serf.MemberStatus
}
func NewAutopilot(logger *log.Logger, delegate Delegate, interval, healthInterval time.Duration) *Autopilot {
return &Autopilot{
logger: logger,
delegate: delegate,
interval: interval,
healthInterval: healthInterval,
removeDeadCh: make(chan struct{}),
}
}
func (a *Autopilot) Start() {
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
// Nothing to do
if a.enabled {
return
}
a.shutdownCh = make(chan struct{})
a.waitGroup = sync.WaitGroup{}
a.clusterHealth = OperatorHealthReply{}
a.waitGroup.Add(2)
go a.run()
go a.serverHealthLoop()
a.enabled = true
}
func (a *Autopilot) Stop() {
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
// Nothing to do
if !a.enabled {
return
}
close(a.shutdownCh)
a.waitGroup.Wait()
a.enabled = false
}
// run periodically looks for nonvoting servers to promote and dead servers to remove.
func (a *Autopilot) run() {
defer a.waitGroup.Done()
// Monitor server health until shutdown
ticker := time.NewTicker(a.interval)
defer ticker.Stop()
for {
select {
case <-a.shutdownCh:
return
case <-ticker.C:
if err := a.promoteServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error promoting servers: %v", err)
}
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
case <-a.removeDeadCh:
if err := a.pruneDeadServers(); err != nil {
a.logger.Printf("[ERR] autopilot: Error checking for dead servers to remove: %s", err)
}
}
}
}
// promoteServers asks the delegate for any promotions and carries them out.
func (a *Autopilot) promoteServers() error {
conf := a.delegate.AutopilotConfig()
if conf == nil {
return nil
}
// Skip the non-voter promotions unless all servers support the new APIs
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol >= 3 {
promotions, err := a.delegate.PromoteNonVoters(conf, a.GetClusterHealth())
if err != nil {
return fmt.Errorf("error checking for non-voters to promote: %s", err)
}
if err := a.handlePromotions(promotions); err != nil {
return fmt.Errorf("error handling promotions: %s", err)
}
}
return nil
}
// fmtServer prints info about a server in a standard way for logging.
func fmtServer(server raft.Server) string {
return fmt.Sprintf("Server (ID: %q Address: %q)", server.ID, server.Address)
}
// NumPeers counts the number of voting peers in the given raft config.
func NumPeers(raftConfig raft.Configuration) int {
var numPeers int
for _, server := range raftConfig.Servers {
if server.Suffrage == raft.Voter {
numPeers++
}
}
return numPeers
}
// RemoveDeadServers triggers a pruning of dead servers in a non-blocking way.
func (a *Autopilot) RemoveDeadServers() {
select {
case a.removeDeadCh <- struct{}{}:
default:
}
}
// pruneDeadServers removes up to numPeers/2 failed servers
func (a *Autopilot) pruneDeadServers() error {
conf := a.delegate.AutopilotConfig()
if conf == nil || !conf.CleanupDeadServers {
return nil
}
// Failed servers are known to Serf and marked failed, and stale servers
// are known to Raft but not Serf.
var failed []string
staleRaftServers := make(map[string]raft.Server)
raftNode := a.delegate.Raft()
future := raftNode.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
raftConfig := future.Configuration()
for _, server := range raftConfig.Servers {
staleRaftServers[string(server.Address)] = server
}
serfLAN := a.delegate.Serf()
for _, member := range serfLAN.Members() {
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
// todo(kyhavlov): change this to index by UUID
s, found := staleRaftServers[server.Addr.String()]
if found {
delete(staleRaftServers, server.Addr.String())
}
if member.Status == serf.StatusFailed {
// If the node is a nonvoter, we can remove it immediately.
if found && s.Suffrage == raft.Nonvoter {
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", member.Name)
go serfLAN.RemoveFailedNode(member.Name)
} else {
failed = append(failed, member.Name)
}
}
}
}
// We can bail early if there's nothing to do.
removalCount := len(failed) + len(staleRaftServers)
if removalCount == 0 {
return nil
}
// Only do removals if a minority of servers will be affected.
peers := NumPeers(raftConfig)
if removalCount < peers/2 {
for _, node := range failed {
a.logger.Printf("[INFO] autopilot: Attempting removal of failed server node %q", node)
go serfLAN.RemoveFailedNode(node)
}
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return err
}
for _, raftServer := range staleRaftServers {
a.logger.Printf("[INFO] autopilot: Attempting removal of stale %s", fmtServer(raftServer))
var future raft.Future
if minRaftProtocol >= 2 {
future = raftNode.RemoveServer(raftServer.ID, 0, 0)
} else {
future = raftNode.RemovePeer(raftServer.Address)
}
if err := future.Error(); err != nil {
return err
}
}
} else {
a.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
}
return nil
}
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
func (a *Autopilot) MinRaftProtocol() (int, error) {
return minRaftProtocol(a.delegate.Serf().Members(), a.delegate.IsServer)
}
func minRaftProtocol(members []serf.Member, serverFunc func(serf.Member) (*ServerInfo, error)) (int, error) {
minVersion := -1
for _, m := range members {
if m.Status != serf.StatusAlive {
continue
}
server, err := serverFunc(m)
if err != nil {
return -1, err
}
if server == nil {
continue
}
vsn, ok := m.Tags["raft_vsn"]
if !ok {
vsn = "1"
}
raftVsn, err := strconv.Atoi(vsn)
if err != nil {
return -1, err
}
if minVersion == -1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == -1 {
return minVersion, fmt.Errorf("No servers found")
}
return minVersion, nil
}
// handlePromotions is a helper shared with Consul Enterprise that attempts to
// apply desired server promotions to the Raft configuration.
func (a *Autopilot) handlePromotions(promotions []raft.Server) error {
// This used to wait to only promote to maintain an odd quorum of
// servers, but this was at odds with the dead server cleanup when doing
// rolling updates (add one new server, wait, and then kill an old
// server). The dead server cleanup would still count the old server as
// a peer, which is conservative and the right thing to do, and this
// would wait to promote, so you could get into a stalemate. It is safer
// to promote early than remove early, so by promoting as soon as
// possible we have chosen that as the solution here.
for _, server := range promotions {
a.logger.Printf("[INFO] autopilot: Promoting %s to voter", fmtServer(server))
addFuture := a.delegate.Raft().AddVoter(server.ID, server.Address, 0, 0)
if err := addFuture.Error(); err != nil {
return fmt.Errorf("failed to add raft peer: %v", err)
}
}
// If we promoted a server, trigger a check to remove dead servers.
if len(promotions) > 0 {
select {
case a.removeDeadCh <- struct{}{}:
default:
}
}
return nil
}
// serverHealthLoop monitors the health of the servers in the cluster
func (a *Autopilot) serverHealthLoop() {
defer a.waitGroup.Done()
// Monitor server health until shutdown
ticker := time.NewTicker(a.healthInterval)
defer ticker.Stop()
for {
select {
case <-a.shutdownCh:
return
case <-ticker.C:
if err := a.updateClusterHealth(); err != nil {
a.logger.Printf("[ERR] autopilot: Error updating cluster health: %s", err)
}
}
}
}
// updateClusterHealth fetches the Raft stats of the other servers and updates
// s.clusterHealth based on the configured Autopilot thresholds
func (a *Autopilot) updateClusterHealth() error {
// Don't do anything if the min Raft version is too low
minRaftProtocol, err := a.MinRaftProtocol()
if err != nil {
return fmt.Errorf("error getting server raft protocol versions: %s", err)
}
if minRaftProtocol < 3 {
return nil
}
autopilotConf := a.delegate.AutopilotConfig()
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
var serverMembers []serf.Member
serverMap := make(map[string]*ServerInfo)
for _, member := range a.delegate.Serf().Members() {
if member.Status == serf.StatusLeft {
continue
}
server, err := a.delegate.IsServer(member)
if err != nil {
a.logger.Printf("[INFO] autopilot: Error parsing server info for %q: %s", member.Name, err)
continue
}
if server != nil {
serverMap[server.ID] = server
serverMembers = append(serverMembers, member)
}
}
raftNode := a.delegate.Raft()
future := raftNode.GetConfiguration()
if err := future.Error(); err != nil {
return fmt.Errorf("error getting Raft configuration %s", err)
}
servers := future.Configuration().Servers
// Fetch the health for each of the servers in parallel so we get as
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := raftNode.LastIndex()
var fetchList []*ServerInfo
for _, server := range servers {
if parts, ok := serverMap[string(server.ID)]; ok {
fetchList = append(fetchList, parts)
}
}
d := time.Now().Add(a.healthInterval / 2)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
fetchedStats := a.delegate.FetchStats(ctx, serverMembers)
// Build a current list of server healths
leader := raftNode.Leader()
var clusterHealth OperatorHealthReply
voterCount := 0
healthyCount := 0
healthyVoterCount := 0
for _, server := range servers {
health := ServerHealth{
ID: string(server.ID),
Address: string(server.Address),
Leader: server.Address == leader,
LastContact: -1,
Voter: server.Suffrage == raft.Voter,
}
parts, ok := serverMap[string(server.ID)]
if ok {
health.Name = parts.Name
health.SerfStatus = parts.Status
health.Version = parts.Build.String()
if stats, ok := fetchedStats[string(server.ID)]; ok {
if err := a.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
a.logger.Printf("[WARN] autopilot: Error updating server %s health: %s", fmtServer(server), err)
}
}
} else {
health.SerfStatus = serf.StatusNone
}
if health.Voter {
voterCount++
}
if health.Healthy {
healthyCount++
if health.Voter {
healthyVoterCount++
}
}
clusterHealth.Servers = append(clusterHealth.Servers, health)
}
clusterHealth.Healthy = healthyCount == len(servers)
// If we have extra healthy voters, update FailureTolerance
requiredQuorum := voterCount/2 + 1
if healthyVoterCount > requiredQuorum {
clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
}
a.delegate.NotifyHealth(clusterHealth)
a.clusterHealthLock.Lock()
a.clusterHealth = clusterHealth
a.clusterHealthLock.Unlock()
return nil
}
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func (a *Autopilot) updateServerHealth(health *ServerHealth,
server *ServerInfo, stats *ServerStats,
autopilotConf *Config, targetLastIndex uint64) error {
health.LastTerm = stats.LastTerm
health.LastIndex = stats.LastIndex
if stats.LastContact != "never" {
var err error
health.LastContact, err = time.ParseDuration(stats.LastContact)
if err != nil {
return fmt.Errorf("error parsing last_contact duration: %s", err)
}
}
raftNode := a.delegate.Raft()
lastTerm, err := strconv.ParseUint(raftNode.Stats()["last_log_term"], 10, 64)
if err != nil {
return fmt.Errorf("error parsing last_log_term: %s", err)
}
health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
// If this is a new server or the health changed, reset StableSince
lastHealth := a.GetServerHealth(server.ID)
if lastHealth == nil || lastHealth.Healthy != health.Healthy {
health.StableSince = time.Now()
} else {
health.StableSince = lastHealth.StableSince
}
return nil
}
func (a *Autopilot) GetClusterHealth() OperatorHealthReply {
a.clusterHealthLock.RLock()
defer a.clusterHealthLock.RUnlock()
return a.clusterHealth
}
func (a *Autopilot) GetServerHealth(id string) *ServerHealth {
a.clusterHealthLock.RLock()
defer a.clusterHealthLock.RUnlock()
return a.clusterHealth.ServerHealth(id)
}
func IsPotentialVoter(suffrage raft.ServerSuffrage) bool {
switch suffrage {
case raft.Voter, raft.Staging:
return true
default:
return false
}
}