2017-12-12 00:38:52 +00:00
package autopilot
import (
"context"
"fmt"
2020-01-28 23:50:41 +00:00
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
2017-12-13 01:45:03 +00:00
"net"
2017-12-12 00:38:52 +00:00
"strconv"
"sync"
"time"
)
// Delegate is the interface for the Autopilot mechanism
type Delegate interface {
2017-12-13 01:45:03 +00:00
AutopilotConfig ( ) * Config
FetchStats ( context . Context , [ ] serf . Member ) map [ string ] * ServerStats
2017-12-13 18:57:37 +00:00
IsServer ( serf . Member ) ( * ServerInfo , error )
2017-12-13 01:45:03 +00:00
NotifyHealth ( OperatorHealthReply )
2017-12-12 00:38:52 +00:00
PromoteNonVoters ( * Config , OperatorHealthReply ) ( [ ] raft . Server , error )
Raft ( ) * raft . Raft
2019-06-28 17:40:07 +00:00
SerfLAN ( ) * serf . Serf
SerfWAN ( ) * serf . Serf
2017-12-12 00:38:52 +00:00
}
// Autopilot is a mechanism for automatically managing the Raft
// quorum using server health information along with updates from Serf gossip.
// For more information, see https://www.consul.io/docs/guides/autopilot.html
type Autopilot struct {
2020-01-28 23:50:41 +00:00
logger hclog . Logger
2017-12-13 01:45:03 +00:00
delegate Delegate
2017-12-12 00:38:52 +00:00
interval time . Duration
healthInterval time . Duration
clusterHealth OperatorHealthReply
clusterHealthLock sync . RWMutex
2018-02-20 23:51:59 +00:00
enabled bool
2017-12-12 00:38:52 +00:00
removeDeadCh chan struct { }
shutdownCh chan struct { }
2018-02-20 23:51:59 +00:00
shutdownLock sync . Mutex
2017-12-12 00:38:52 +00:00
waitGroup sync . WaitGroup
}
2017-12-13 01:45:03 +00:00
type ServerInfo struct {
Name string
ID string
Addr net . Addr
Build version . Version
Status serf . MemberStatus
}
2020-01-28 23:50:41 +00:00
func NewAutopilot ( logger hclog . Logger , delegate Delegate , interval , healthInterval time . Duration ) * Autopilot {
2017-12-12 00:38:52 +00:00
return & Autopilot {
2020-01-28 23:50:41 +00:00
logger : logger . Named ( logging . Autopilot ) ,
2017-12-13 01:45:03 +00:00
delegate : delegate ,
interval : interval ,
healthInterval : healthInterval ,
removeDeadCh : make ( chan struct { } ) ,
2017-12-12 00:38:52 +00:00
}
}
func ( a * Autopilot ) Start ( ) {
2018-02-20 23:51:59 +00:00
a . shutdownLock . Lock ( )
defer a . shutdownLock . Unlock ( )
// Nothing to do
if a . enabled {
return
}
2017-12-12 00:38:52 +00:00
a . shutdownCh = make ( chan struct { } )
a . waitGroup = sync . WaitGroup { }
2018-01-23 20:52:17 +00:00
a . clusterHealth = OperatorHealthReply { }
2017-12-12 00:38:52 +00:00
2018-01-23 19:17:41 +00:00
a . waitGroup . Add ( 2 )
2017-12-12 00:38:52 +00:00
go a . run ( )
2018-01-23 19:17:41 +00:00
go a . serverHealthLoop ( )
2018-02-20 23:51:59 +00:00
a . enabled = true
2017-12-12 00:38:52 +00:00
}
func ( a * Autopilot ) Stop ( ) {
2018-02-20 23:51:59 +00:00
a . shutdownLock . Lock ( )
defer a . shutdownLock . Unlock ( )
// Nothing to do
if ! a . enabled {
return
}
2017-12-12 00:38:52 +00:00
close ( a . shutdownCh )
a . waitGroup . Wait ( )
2018-02-20 23:51:59 +00:00
a . enabled = false
2017-12-12 00:38:52 +00:00
}
2017-12-13 01:45:03 +00:00
// run periodically looks for nonvoting servers to promote and dead servers to remove.
2017-12-12 00:38:52 +00:00
func ( a * Autopilot ) run ( ) {
defer a . waitGroup . Done ( )
// Monitor server health until shutdown
ticker := time . NewTicker ( a . interval )
defer ticker . Stop ( )
for {
select {
case <- a . shutdownCh :
return
case <- ticker . C :
2017-12-13 18:57:37 +00:00
if err := a . promoteServers ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Error ( "Error promoting servers" , "error" , err )
2017-12-12 00:38:52 +00:00
}
2017-12-13 01:45:03 +00:00
if err := a . pruneDeadServers ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Error ( "Error checking for dead servers to remove" , "error" , err )
2017-12-12 00:38:52 +00:00
}
case <- a . removeDeadCh :
2017-12-13 01:45:03 +00:00
if err := a . pruneDeadServers ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Error ( "Error checking for dead servers to remove" , "error" , err )
2017-12-12 00:38:52 +00:00
}
}
}
}
2017-12-13 18:57:37 +00:00
// promoteServers asks the delegate for any promotions and carries them out.
func ( a * Autopilot ) promoteServers ( ) error {
conf := a . delegate . AutopilotConfig ( )
if conf == nil {
return nil
}
// Skip the non-voter promotions unless all servers support the new APIs
minRaftProtocol , err := a . MinRaftProtocol ( )
if err != nil {
return fmt . Errorf ( "error getting server raft protocol versions: %s" , err )
}
if minRaftProtocol >= 3 {
promotions , err := a . delegate . PromoteNonVoters ( conf , a . GetClusterHealth ( ) )
if err != nil {
return fmt . Errorf ( "error checking for non-voters to promote: %s" , err )
}
if err := a . handlePromotions ( promotions ) ; err != nil {
return fmt . Errorf ( "error handling promotions: %s" , err )
}
}
return nil
}
2017-12-12 00:38:52 +00:00
// fmtServer prints info about a server in a standard way for logging.
func fmtServer ( server raft . Server ) string {
return fmt . Sprintf ( "Server (ID: %q Address: %q)" , server . ID , server . Address )
}
2017-12-13 01:45:03 +00:00
// NumPeers counts the number of voting peers in the given raft config.
func NumPeers ( raftConfig raft . Configuration ) int {
var numPeers int
for _ , server := range raftConfig . Servers {
2017-12-13 18:57:37 +00:00
if server . Suffrage == raft . Voter {
2017-12-13 01:45:03 +00:00
numPeers ++
}
2017-12-12 00:38:52 +00:00
}
2017-12-13 01:45:03 +00:00
return numPeers
}
2017-12-12 00:38:52 +00:00
2017-12-18 20:26:35 +00:00
// RemoveDeadServers triggers a pruning of dead servers in a non-blocking way.
func ( a * Autopilot ) RemoveDeadServers ( ) {
select {
case a . removeDeadCh <- struct { } { } :
default :
}
}
2019-12-16 22:35:13 +00:00
func canRemoveServers ( peers , minQuorum , deadServers int ) ( bool , string ) {
if peers - deadServers < int ( minQuorum ) {
return false , fmt . Sprintf ( "denied, because removing %d/%d servers would leave less then minimal allowed quorum of %d servers" , deadServers , peers , minQuorum )
}
// Only do removals if a minority of servers will be affected.
// For failure tolerance of F we need n = 2F+1 servers.
// This means we can safely remove up to (n-1)/2 servers.
if deadServers > ( peers - 1 ) / 2 {
return false , fmt . Sprintf ( "denied, because removing the majority of servers %d/%d is not safe" , deadServers , peers )
}
return true , fmt . Sprintf ( "allowed, because removing %d/%d servers leaves a majority of servers above the minimal allowed quorum %d" , deadServers , peers , minQuorum )
}
2017-12-13 01:45:03 +00:00
// pruneDeadServers removes up to numPeers/2 failed servers
func ( a * Autopilot ) pruneDeadServers ( ) error {
2017-12-13 19:10:42 +00:00
conf := a . delegate . AutopilotConfig ( )
if conf == nil || ! conf . CleanupDeadServers {
return nil
}
2017-12-12 00:38:52 +00:00
// Failed servers are known to Serf and marked failed, and stale servers
// are known to Raft but not Serf.
2019-06-28 17:40:07 +00:00
var failed [ ] serf . Member
2017-12-12 00:38:52 +00:00
staleRaftServers := make ( map [ string ] raft . Server )
raftNode := a . delegate . Raft ( )
future := raftNode . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
return err
}
2017-12-13 01:45:03 +00:00
raftConfig := future . Configuration ( )
for _ , server := range raftConfig . Servers {
2017-12-12 00:38:52 +00:00
staleRaftServers [ string ( server . Address ) ] = server
}
2019-06-28 17:40:07 +00:00
serfWAN := a . delegate . SerfWAN ( )
serfLAN := a . delegate . SerfLAN ( )
2017-12-12 00:38:52 +00:00
for _ , member := range serfLAN . Members ( ) {
2017-12-13 18:57:37 +00:00
server , err := a . delegate . IsServer ( member )
if err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Warn ( "Error parsing server info" , "name" , member . Name , "error" , err )
2017-12-13 18:57:37 +00:00
continue
}
if server != nil {
2017-12-13 01:45:03 +00:00
// todo(kyhavlov): change this to index by UUID
2018-08-14 21:24:51 +00:00
s , found := staleRaftServers [ server . Addr . String ( ) ]
if found {
2017-12-13 18:57:37 +00:00
delete ( staleRaftServers , server . Addr . String ( ) )
2017-12-12 00:38:52 +00:00
}
if member . Status == serf . StatusFailed {
2018-08-14 21:24:51 +00:00
// If the node is a nonvoter, we can remove it immediately.
if found && s . Suffrage == raft . Nonvoter {
2020-01-28 23:50:41 +00:00
a . logger . Info ( "Attempting removal of failed server node" , "name" , member . Name )
2018-08-14 21:24:51 +00:00
go serfLAN . RemoveFailedNode ( member . Name )
2019-06-28 17:40:07 +00:00
if serfWAN != nil {
go serfWAN . RemoveFailedNode ( member . Name )
}
2018-08-14 21:24:51 +00:00
} else {
2019-06-28 17:40:07 +00:00
failed = append ( failed , member )
2018-08-14 21:24:51 +00:00
}
2017-12-12 00:38:52 +00:00
}
}
}
2019-12-16 22:35:13 +00:00
deadServers := len ( failed ) + len ( staleRaftServers )
// nothing to do
if deadServers == 0 {
2017-12-12 00:38:52 +00:00
return nil
}
2019-12-16 22:35:13 +00:00
if ok , msg := canRemoveServers ( NumPeers ( raftConfig ) , int ( conf . MinQuorum ) , deadServers ) ; ! ok {
2020-01-28 23:50:41 +00:00
a . logger . Debug ( "Failed to remove dead servers" , "error" , msg )
2019-12-16 22:35:13 +00:00
return nil
}
2019-06-28 17:40:07 +00:00
2019-12-16 22:35:13 +00:00
for _ , node := range failed {
2020-01-28 23:50:41 +00:00
a . logger . Info ( "Attempting removal of failed server node" , "name" , node . Name )
2019-12-16 22:35:13 +00:00
go serfLAN . RemoveFailedNode ( node . Name )
if serfWAN != nil {
go serfWAN . RemoveFailedNode ( fmt . Sprintf ( "%s.%s" , node . Name , node . Tags [ "dc" ] ) )
2017-12-12 00:38:52 +00:00
}
2019-12-16 22:35:13 +00:00
}
minRaftProtocol , err := a . MinRaftProtocol ( )
if err != nil {
return err
}
for _ , raftServer := range staleRaftServers {
2020-01-28 23:50:41 +00:00
a . logger . Info ( "Attempting removal of stale server" , "server" , fmtServer ( raftServer ) )
2019-12-16 22:35:13 +00:00
var future raft . Future
if minRaftProtocol >= 2 {
future = raftNode . RemoveServer ( raftServer . ID , 0 , 0 )
} else {
future = raftNode . RemovePeer ( raftServer . Address )
2017-12-12 00:38:52 +00:00
}
2019-12-16 22:35:13 +00:00
if err := future . Error ( ) ; err != nil {
return err
2017-12-12 00:38:52 +00:00
}
}
return nil
}
// MinRaftProtocol returns the lowest supported Raft protocol among alive servers
func ( a * Autopilot ) MinRaftProtocol ( ) ( int , error ) {
2019-06-28 17:40:07 +00:00
return minRaftProtocol ( a . delegate . SerfLAN ( ) . Members ( ) , a . delegate . IsServer )
2017-12-18 20:26:35 +00:00
}
func minRaftProtocol ( members [ ] serf . Member , serverFunc func ( serf . Member ) ( * ServerInfo , error ) ) ( int , error ) {
2017-12-12 00:38:52 +00:00
minVersion := - 1
for _ , m := range members {
if m . Status != serf . StatusAlive {
continue
}
2017-12-18 20:26:35 +00:00
server , err := serverFunc ( m )
2017-12-13 18:57:37 +00:00
if err != nil {
return - 1 , err
}
if server == nil {
2017-12-12 00:38:52 +00:00
continue
}
vsn , ok := m . Tags [ "raft_vsn" ]
if ! ok {
vsn = "1"
}
raftVsn , err := strconv . Atoi ( vsn )
if err != nil {
return - 1 , err
}
if minVersion == - 1 || raftVsn < minVersion {
minVersion = raftVsn
}
}
if minVersion == - 1 {
return minVersion , fmt . Errorf ( "No servers found" )
}
return minVersion , nil
}
// handlePromotions is a helper shared with Consul Enterprise that attempts to
// apply desired server promotions to the Raft configuration.
func ( a * Autopilot ) handlePromotions ( promotions [ ] raft . Server ) error {
// This used to wait to only promote to maintain an odd quorum of
// servers, but this was at odds with the dead server cleanup when doing
// rolling updates (add one new server, wait, and then kill an old
// server). The dead server cleanup would still count the old server as
// a peer, which is conservative and the right thing to do, and this
// would wait to promote, so you could get into a stalemate. It is safer
// to promote early than remove early, so by promoting as soon as
// possible we have chosen that as the solution here.
for _ , server := range promotions {
2020-01-28 23:50:41 +00:00
a . logger . Info ( "Promoting server to voter" , "server" , fmtServer ( server ) )
2017-12-12 00:38:52 +00:00
addFuture := a . delegate . Raft ( ) . AddVoter ( server . ID , server . Address , 0 , 0 )
if err := addFuture . Error ( ) ; err != nil {
return fmt . Errorf ( "failed to add raft peer: %v" , err )
}
}
// If we promoted a server, trigger a check to remove dead servers.
if len ( promotions ) > 0 {
select {
case a . removeDeadCh <- struct { } { } :
default :
}
}
return nil
}
2018-01-23 19:17:41 +00:00
// serverHealthLoop monitors the health of the servers in the cluster
func ( a * Autopilot ) serverHealthLoop ( ) {
defer a . waitGroup . Done ( )
2017-12-12 00:38:52 +00:00
// Monitor server health until shutdown
ticker := time . NewTicker ( a . healthInterval )
defer ticker . Stop ( )
for {
select {
2018-01-23 19:17:41 +00:00
case <- a . shutdownCh :
2017-12-12 00:38:52 +00:00
return
case <- ticker . C :
if err := a . updateClusterHealth ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Error ( "Error updating cluster health" , "error" , err )
2017-12-12 00:38:52 +00:00
}
}
}
}
// updateClusterHealth fetches the Raft stats of the other servers and updates
// s.clusterHealth based on the configured Autopilot thresholds
func ( a * Autopilot ) updateClusterHealth ( ) error {
// Don't do anything if the min Raft version is too low
minRaftProtocol , err := a . MinRaftProtocol ( )
if err != nil {
return fmt . Errorf ( "error getting server raft protocol versions: %s" , err )
}
if minRaftProtocol < 3 {
return nil
}
2017-12-13 01:45:03 +00:00
autopilotConf := a . delegate . AutopilotConfig ( )
2017-12-12 00:38:52 +00:00
// Bail early if autopilot config hasn't been initialized yet
if autopilotConf == nil {
return nil
}
// Get the the serf members which are Consul servers
2017-12-13 01:45:03 +00:00
var serverMembers [ ] serf . Member
serverMap := make ( map [ string ] * ServerInfo )
2019-06-28 17:40:07 +00:00
for _ , member := range a . delegate . SerfLAN ( ) . Members ( ) {
2017-12-12 00:38:52 +00:00
if member . Status == serf . StatusLeft {
continue
}
2017-12-13 18:57:37 +00:00
server , err := a . delegate . IsServer ( member )
if err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Warn ( "Error parsing server info" , "name" , member . Name , "error" , err )
2017-12-13 18:57:37 +00:00
continue
}
if server != nil {
serverMap [ server . ID ] = server
2017-12-13 01:45:03 +00:00
serverMembers = append ( serverMembers , member )
2017-12-12 00:38:52 +00:00
}
}
raftNode := a . delegate . Raft ( )
future := raftNode . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
return fmt . Errorf ( "error getting Raft configuration %s" , err )
}
servers := future . Configuration ( ) . Servers
// Fetch the health for each of the servers in parallel so we get as
// consistent of a sample as possible. We capture the leader's index
// here as well so it roughly lines up with the same point in time.
targetLastIndex := raftNode . LastIndex ( )
2017-12-13 01:45:03 +00:00
var fetchList [ ] * ServerInfo
2017-12-12 00:38:52 +00:00
for _ , server := range servers {
if parts , ok := serverMap [ string ( server . ID ) ] ; ok {
fetchList = append ( fetchList , parts )
}
}
d := time . Now ( ) . Add ( a . healthInterval / 2 )
ctx , cancel := context . WithDeadline ( context . Background ( ) , d )
defer cancel ( )
2017-12-13 01:45:03 +00:00
fetchedStats := a . delegate . FetchStats ( ctx , serverMembers )
2017-12-12 00:38:52 +00:00
// Build a current list of server healths
leader := raftNode . Leader ( )
var clusterHealth OperatorHealthReply
voterCount := 0
healthyCount := 0
healthyVoterCount := 0
for _ , server := range servers {
health := ServerHealth {
ID : string ( server . ID ) ,
Address : string ( server . Address ) ,
Leader : server . Address == leader ,
LastContact : - 1 ,
Voter : server . Suffrage == raft . Voter ,
}
parts , ok := serverMap [ string ( server . ID ) ]
if ok {
health . Name = parts . Name
health . SerfStatus = parts . Status
health . Version = parts . Build . String ( )
if stats , ok := fetchedStats [ string ( server . ID ) ] ; ok {
if err := a . updateServerHealth ( & health , parts , stats , autopilotConf , targetLastIndex ) ; err != nil {
2020-01-28 23:50:41 +00:00
a . logger . Warn ( "Error updating server health" , "server" , fmtServer ( server ) , "error" , err )
2017-12-12 00:38:52 +00:00
}
}
} else {
health . SerfStatus = serf . StatusNone
}
if health . Voter {
voterCount ++
}
if health . Healthy {
healthyCount ++
if health . Voter {
healthyVoterCount ++
}
}
clusterHealth . Servers = append ( clusterHealth . Servers , health )
}
clusterHealth . Healthy = healthyCount == len ( servers )
// If we have extra healthy voters, update FailureTolerance
requiredQuorum := voterCount / 2 + 1
if healthyVoterCount > requiredQuorum {
clusterHealth . FailureTolerance = healthyVoterCount - requiredQuorum
}
2017-12-13 01:45:03 +00:00
a . delegate . NotifyHealth ( clusterHealth )
2017-12-12 00:38:52 +00:00
a . clusterHealthLock . Lock ( )
a . clusterHealth = clusterHealth
a . clusterHealthLock . Unlock ( )
return nil
}
// updateServerHealth computes the resulting health of the server based on its
// fetched stats and the state of the leader.
func ( a * Autopilot ) updateServerHealth ( health * ServerHealth ,
2017-12-13 01:45:03 +00:00
server * ServerInfo , stats * ServerStats ,
2017-12-12 00:38:52 +00:00
autopilotConf * Config , targetLastIndex uint64 ) error {
health . LastTerm = stats . LastTerm
health . LastIndex = stats . LastIndex
if stats . LastContact != "never" {
var err error
health . LastContact , err = time . ParseDuration ( stats . LastContact )
if err != nil {
return fmt . Errorf ( "error parsing last_contact duration: %s" , err )
}
}
raftNode := a . delegate . Raft ( )
lastTerm , err := strconv . ParseUint ( raftNode . Stats ( ) [ "last_log_term" ] , 10 , 64 )
if err != nil {
return fmt . Errorf ( "error parsing last_log_term: %s" , err )
}
health . Healthy = health . IsHealthy ( lastTerm , targetLastIndex , autopilotConf )
// If this is a new server or the health changed, reset StableSince
lastHealth := a . GetServerHealth ( server . ID )
if lastHealth == nil || lastHealth . Healthy != health . Healthy {
health . StableSince = time . Now ( )
} else {
health . StableSince = lastHealth . StableSince
}
return nil
}
func ( a * Autopilot ) GetClusterHealth ( ) OperatorHealthReply {
a . clusterHealthLock . RLock ( )
defer a . clusterHealthLock . RUnlock ( )
return a . clusterHealth
}
func ( a * Autopilot ) GetServerHealth ( id string ) * ServerHealth {
a . clusterHealthLock . RLock ( )
defer a . clusterHealthLock . RUnlock ( )
return a . clusterHealth . ServerHealth ( id )
}
2017-12-14 01:53:26 +00:00
func IsPotentialVoter ( suffrage raft . ServerSuffrage ) bool {
2017-12-12 00:38:52 +00:00
switch suffrage {
case raft . Voter , raft . Staging :
return true
default :
return false
}
}