2014-01-09 23:49:09 +00:00
package consul
import (
2014-08-11 21:54:18 +00:00
"fmt"
2014-06-16 21:36:12 +00:00
"net"
"strconv"
2017-04-13 21:17:32 +00:00
"sync"
2014-06-16 21:36:12 +00:00
"time"
2014-02-20 23:16:26 +00:00
"github.com/armon/go-metrics"
2017-08-23 14:52:48 +00:00
"github.com/hashicorp/consul/acl"
2018-04-24 18:50:31 +00:00
"github.com/hashicorp/consul/agent/connect"
2017-12-12 00:38:52 +00:00
"github.com/hashicorp/consul/agent/consul/autopilot"
2017-07-06 10:48:37 +00:00
"github.com/hashicorp/consul/agent/metadata"
2017-07-06 10:34:00 +00:00
"github.com/hashicorp/consul/agent/structs"
2017-04-19 23:00:11 +00:00
"github.com/hashicorp/consul/api"
2016-06-07 20:24:51 +00:00
"github.com/hashicorp/consul/types"
2018-04-24 18:50:31 +00:00
uuid "github.com/hashicorp/go-uuid"
2017-08-03 00:05:18 +00:00
"github.com/hashicorp/go-version"
2014-01-10 20:55:55 +00:00
"github.com/hashicorp/raft"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/serf/serf"
)
const (
2017-07-14 05:33:47 +00:00
newLeaderEvent = "consul:new-leader"
barrierWriteTimeout = 2 * time . Minute
2014-01-09 23:49:09 +00:00
)
2017-12-12 00:38:52 +00:00
var minAutopilotVersion = version . Must ( version . NewVersion ( "0.8.0" ) )
2014-01-09 23:49:09 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
2017-04-13 21:17:32 +00:00
// We use the notify channel we configured Raft with, NOT Raft's
// leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for
// cleanup and to ensure we never run multiple leader loops.
2017-07-06 14:09:21 +00:00
raftNotifyCh := s . raftNotifyCh
2017-04-13 21:17:32 +00:00
2017-10-06 14:54:49 +00:00
var weAreLeaderCh chan struct { }
var leaderLoop sync . WaitGroup
2014-01-09 23:49:09 +00:00
for {
select {
2017-07-05 22:09:18 +00:00
case isLeader := <- raftNotifyCh :
2017-10-06 14:54:49 +00:00
switch {
case isLeader :
if weAreLeaderCh != nil {
s . logger . Printf ( "[ERR] consul: attempted to start the leader loop while running" )
continue
}
weAreLeaderCh = make ( chan struct { } )
leaderLoop . Add ( 1 )
go func ( ch chan struct { } ) {
defer leaderLoop . Done ( )
s . leaderLoop ( ch )
} ( weAreLeaderCh )
2014-01-09 23:49:09 +00:00
s . logger . Printf ( "[INFO] consul: cluster leadership acquired" )
2017-10-06 14:54:49 +00:00
default :
if weAreLeaderCh == nil {
s . logger . Printf ( "[ERR] consul: attempted to stop the leader loop while not running" )
continue
}
s . logger . Printf ( "[DEBUG] consul: shutting down leader loop" )
close ( weAreLeaderCh )
leaderLoop . Wait ( )
weAreLeaderCh = nil
2014-01-09 23:49:09 +00:00
s . logger . Printf ( "[INFO] consul: cluster leadership lost" )
}
2017-10-06 14:54:49 +00:00
2014-01-09 23:49:09 +00:00
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
2015-09-15 12:22:08 +00:00
// maintenance activities
2014-01-09 23:49:09 +00:00
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2014-02-19 20:36:27 +00:00
// Fire a user event indicating a new leader
payload := [ ] byte ( s . config . NodeName )
2017-08-14 14:36:07 +00:00
for name , segment := range s . LANSegments ( ) {
if err := segment . UserEvent ( newLeaderEvent , payload , false ) ; err != nil {
s . logger . Printf ( "[WARN] consul: failed to broadcast new leader event on segment %q: %v" , name , err )
}
2014-02-19 20:36:27 +00:00
}
2014-01-10 20:55:55 +00:00
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf . Member
2014-12-13 05:42:24 +00:00
establishedLeader := false
2014-01-10 20:55:55 +00:00
2017-05-04 18:52:22 +00:00
reassert := func ( ) error {
if ! establishedLeader {
return fmt . Errorf ( "leadership has not been established" )
}
if err := s . revokeLeadership ( ) ; err != nil {
return err
}
if err := s . establishLeadership ( ) ; err != nil {
return err
}
return nil
}
2014-01-09 23:49:09 +00:00
RECONCILE :
2014-01-10 20:55:55 +00:00
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
2014-01-09 23:49:09 +00:00
// Apply a raft barrier to ensure our FSM is caught up
2014-02-20 23:16:26 +00:00
start := time . Now ( )
2017-07-05 22:09:18 +00:00
barrier := s . raft . Barrier ( barrierWriteTimeout )
2014-01-09 23:49:09 +00:00
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to wait for barrier: %v" , err )
2017-10-06 14:54:49 +00:00
goto WAIT
2014-01-09 23:49:09 +00:00
}
2017-10-04 23:43:27 +00:00
metrics . MeasureSince ( [ ] string { "leader" , "barrier" } , start )
2014-01-09 23:49:09 +00:00
2014-12-13 05:42:24 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
if err := s . establishLeadership ( ) ; err != nil {
2017-05-04 14:15:25 +00:00
s . logger . Printf ( "[ERR] consul: failed to establish leadership: %v" , err )
2018-02-21 18:33:22 +00:00
// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := s . revokeLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to revoke leadership: %v" , err )
}
2014-12-13 05:42:24 +00:00
goto WAIT
}
establishedLeader = true
2017-08-03 00:05:18 +00:00
defer func ( ) {
if err := s . revokeLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to revoke leadership: %v" , err )
}
} ( )
2014-12-13 05:42:24 +00:00
}
2014-01-09 23:49:09 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to reconcile: %v" , err )
goto WAIT
}
2014-01-10 20:55:55 +00:00
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
2014-01-09 23:49:09 +00:00
WAIT :
2017-10-06 14:54:49 +00:00
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <- stopCh :
return
default :
}
2014-01-10 20:55:55 +00:00
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
s . reconcileMember ( member )
2014-12-15 23:04:21 +00:00
case index := <- s . tombstoneGC . ExpireCh ( ) :
go s . reapTombstones ( index )
2017-05-04 18:52:22 +00:00
case errCh := <- s . reassertLeaderCh :
errCh <- reassert ( )
2014-01-10 20:55:55 +00:00
}
2014-01-09 23:49:09 +00:00
}
}
2014-12-13 05:42:24 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2015-09-11 19:24:54 +00:00
// previously inflight transactions have been committed and that our
2014-12-13 05:42:24 +00:00
// state is up-to-date.
func ( s * Server ) establishLeadership ( ) error {
2017-08-03 00:05:18 +00:00
// This will create the anonymous token and master token (if that is
// configured).
if err := s . initializeACL ( ) ; err != nil {
return err
}
2014-12-15 22:37:49 +00:00
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
2015-01-05 22:58:59 +00:00
s . tombstoneGC . SetEnabled ( true )
2014-12-15 22:37:49 +00:00
lastIndex := s . raft . LastIndex ( )
s . tombstoneGC . Hint ( lastIndex )
2014-12-13 05:42:24 +00:00
// Setup the session timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node along, effectively this means all the timers are renewed at the
// time of failover. The TTL contract is that the session will not be expired
// before the TTL, so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Sessions
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeSessionTimers ( ) ; err != nil {
return err
}
2017-02-24 04:32:13 +00:00
2017-04-13 01:38:36 +00:00
s . getOrCreateAutopilotConfig ( )
2017-12-12 00:38:52 +00:00
s . autopilot . Start ( )
2018-04-09 04:57:32 +00:00
// todo(kyhavlov): start a goroutine here for handling periodic CA rotation
2018-04-24 23:16:37 +00:00
if err := s . initializeCA ( ) ; err != nil {
return err
}
2018-04-09 04:57:32 +00:00
2017-06-16 03:41:30 +00:00
s . setConsistentReadReady ( )
2014-12-13 05:42:24 +00:00
return nil
}
2015-01-05 22:58:59 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
// Disable the tombstone GC, since it is only useful as a leader
s . tombstoneGC . SetEnabled ( false )
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s . clearAllSessionTimers ( ) ; err != nil {
return err
}
2017-03-01 22:04:40 +00:00
2018-04-24 23:16:37 +00:00
s . setCAProvider ( nil )
2017-06-16 03:41:30 +00:00
s . resetConsistentReadReady ( )
2017-12-12 00:38:52 +00:00
s . autopilot . Stop ( )
2015-01-05 22:58:59 +00:00
return nil
}
2014-08-11 21:54:18 +00:00
// initializeACL is used to setup the ACLs if we are the leader
// and need to do this.
func ( s * Server ) initializeACL ( ) error {
2017-08-03 00:05:18 +00:00
// Bail if not configured or we are not authoritative.
2014-08-11 21:54:18 +00:00
authDC := s . config . ACLDatacenter
if len ( authDC ) == 0 || authDC != s . config . Datacenter {
return nil
}
2017-08-03 00:05:18 +00:00
// Purge the cache, since it could've changed while we were not the
// leader.
2014-08-11 21:54:18 +00:00
s . aclAuthCache . Purge ( )
2017-08-03 00:05:18 +00:00
// Create anonymous token if missing.
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2017-01-24 08:00:06 +00:00
_ , acl , err := state . ACLGet ( nil , anonymousToken )
2014-08-11 21:54:18 +00:00
if err != nil {
return fmt . Errorf ( "failed to get anonymous token: %v" , err )
}
if acl == nil {
req := structs . ACLRequest {
Datacenter : authDC ,
2014-10-09 19:28:07 +00:00
Op : structs . ACLSet ,
2014-08-11 21:54:18 +00:00
ACL : structs . ACL {
ID : anonymousToken ,
Name : "Anonymous Token" ,
Type : structs . ACLTypeClient ,
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create anonymous token: %v" , err )
}
}
2017-08-03 00:05:18 +00:00
// Check for configured master token.
if master := s . config . ACLMasterToken ; len ( master ) > 0 {
_ , acl , err = state . ACLGet ( nil , master )
if err != nil {
return fmt . Errorf ( "failed to get master token: %v" , err )
}
if acl == nil {
req := structs . ACLRequest {
Datacenter : authDC ,
Op : structs . ACLSet ,
ACL : structs . ACL {
ID : master ,
Name : "Master Token" ,
Type : structs . ACLTypeManagement ,
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create master token: %v" , err )
}
s . logger . Printf ( "[INFO] consul: Created ACL master token from configuration" )
}
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
// Check to see if we need to initialize the ACL bootstrap info. This
// needs a Consul version check since it introduces a new Raft operation
// that'll produce an error on older servers, and it also makes a piece
// of state in the state store that will cause problems with older
// servers consuming snapshots, so we have to wait to create it.
var minVersion = version . Must ( version . NewVersion ( "0.9.1" ) )
if ServersMeetMinimumVersion ( s . LANMembers ( ) , minVersion ) {
bs , err := state . ACLGetBootstrap ( )
2014-08-11 21:54:18 +00:00
if err != nil {
2017-08-03 00:05:18 +00:00
return fmt . Errorf ( "failed looking for ACL bootstrap info: %v" , err )
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
if bs == nil {
req := structs . ACLRequest {
Datacenter : authDC ,
Op : structs . ACLBootstrapInit ,
}
resp , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to initialize ACL bootstrap: %v" , err )
}
switch v := resp . ( type ) {
case error :
return fmt . Errorf ( "failed to initialize ACL bootstrap: %v" , v )
case bool :
if v {
s . logger . Printf ( "[INFO] consul: ACL bootstrap enabled" )
} else {
s . logger . Printf ( "[INFO] consul: ACL bootstrap disabled, existing management tokens found" )
}
default :
return fmt . Errorf ( "unexpected response trying to initialize ACL bootstrap: %T" , v )
}
}
} else {
s . logger . Printf ( "[WARN] consul: Can't initialize ACL bootstrap until all servers are >= %s" , minVersion . String ( ) )
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
2014-08-11 21:54:18 +00:00
return nil
}
2017-04-13 01:38:36 +00:00
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
2017-12-13 01:45:03 +00:00
func ( s * Server ) getOrCreateAutopilotConfig ( ) * autopilot . Config {
2017-02-24 04:32:13 +00:00
state := s . fsm . State ( )
2017-02-24 21:08:49 +00:00
_ , config , err := state . AutopilotConfig ( )
2017-02-24 04:32:13 +00:00
if err != nil {
2017-04-13 01:38:36 +00:00
s . logger . Printf ( "[ERR] autopilot: failed to get config: %v" , err )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 04:32:13 +00:00
}
if config != nil {
2017-12-13 01:45:03 +00:00
return config
2017-02-24 04:32:13 +00:00
}
2017-04-13 00:09:57 +00:00
if ! ServersMeetMinimumVersion ( s . LANMembers ( ) , minAutopilotVersion ) {
2017-04-13 17:43:07 +00:00
s . logger . Printf ( "[WARN] autopilot: can't initialize until all servers are >= %s" , minAutopilotVersion . String ( ) )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 21:08:49 +00:00
}
2017-04-13 00:09:57 +00:00
config = s . config . AutopilotConfig
req := structs . AutopilotSetConfigRequest { Config : * config }
2017-02-24 21:08:49 +00:00
if _ , err = s . raftApply ( structs . AutopilotRequestType , req ) ; err != nil {
2017-04-13 01:38:36 +00:00
s . logger . Printf ( "[ERR] autopilot: failed to initialize config: %v" , err )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 04:32:13 +00:00
}
2017-12-13 01:45:03 +00:00
return config
2017-02-24 04:32:13 +00:00
}
2018-04-24 18:50:31 +00:00
// initializeCAConfig is used to initialize the CA config if necessary
// when setting up the CA during establishLeadership
func ( s * Server ) initializeCAConfig ( ) ( * structs . CAConfiguration , error ) {
2018-04-09 04:57:32 +00:00
state := s . fsm . State ( )
_ , config , err := state . CAConfig ( )
if err != nil {
return nil , err
}
if config != nil {
return config , nil
}
2018-04-24 18:50:31 +00:00
id , err := uuid . GenerateUUID ( )
2018-04-20 08:30:34 +00:00
if err != nil {
return nil , err
}
2018-04-09 04:57:32 +00:00
config = s . config . CAConfig
2018-04-24 18:50:31 +00:00
config . ClusterID = id
2018-04-09 04:57:32 +00:00
req := structs . CARequest {
Op : structs . CAOpSetConfig ,
Config : config ,
}
if _ , err = s . raftApply ( structs . ConnectCARequestType , req ) ; err != nil {
return nil , err
}
return config , nil
}
2018-04-24 18:50:31 +00:00
// initializeCA sets up the CA provider when gaining leadership, bootstrapping
// the root in the state store if necessary.
func ( s * Server ) initializeCA ( ) error {
2018-04-25 18:34:08 +00:00
// Bail if connect isn't enabled.
if ! s . config . ConnectEnabled {
return nil
}
2018-04-24 18:50:31 +00:00
conf , err := s . initializeCAConfig ( )
2018-04-09 04:57:32 +00:00
if err != nil {
return err
}
// Initialize the right provider based on the config
2018-04-21 01:46:02 +00:00
provider , err := s . createCAProvider ( conf )
if err != nil {
return err
2018-04-09 04:57:32 +00:00
}
2018-04-21 01:46:02 +00:00
s . setCAProvider ( provider )
2018-04-09 04:57:32 +00:00
2018-04-20 08:30:34 +00:00
// Get the active root cert from the CA
2018-04-24 23:16:37 +00:00
rootPEM , err := provider . ActiveRoot ( )
2018-04-09 04:57:32 +00:00
if err != nil {
2018-04-20 08:30:34 +00:00
return fmt . Errorf ( "error getting root cert: %v" , err )
2018-04-09 04:57:32 +00:00
}
2018-04-24 23:16:37 +00:00
id , err := connect . ParseCertFingerprint ( rootPEM )
if err != nil {
return fmt . Errorf ( "error parsing root fingerprint: %v" , err )
}
rootCA := & structs . CARoot {
ID : id ,
Name : fmt . Sprintf ( "%s CA Root Cert" , conf . Provider ) ,
RootCert : rootPEM ,
Active : true ,
}
2018-04-21 01:46:02 +00:00
// Check if the CA root is already initialized and exit if it is.
// Every change to the CA after this initial bootstrapping should
// be done through the rotation process.
2018-04-09 04:57:32 +00:00
state := s . fsm . State ( )
2018-04-24 23:16:37 +00:00
_ , activeRoot , err := state . CARootActive ( nil )
2018-04-09 04:57:32 +00:00
if err != nil {
return err
}
2018-04-24 23:16:37 +00:00
if activeRoot != nil {
if activeRoot . ID != rootCA . ID {
s . logger . Printf ( "[WARN] connect: CA root %q is not the active root (%q)" , rootCA . ID , activeRoot . ID )
2018-04-24 18:50:31 +00:00
}
2018-04-09 04:57:32 +00:00
return nil
}
// Get the highest index
idx , _ , err := state . CARoots ( nil )
if err != nil {
return err
}
2018-04-20 08:30:34 +00:00
// Store the root cert in raft
2018-04-09 04:57:32 +00:00
resp , err := s . raftApply ( structs . ConnectCARequestType , & structs . CARequest {
Op : structs . CAOpSetRoots ,
Index : idx ,
2018-04-24 23:16:37 +00:00
Roots : [ ] * structs . CARoot { rootCA } ,
2018-04-09 04:57:32 +00:00
} )
if err != nil {
s . logger . Printf ( "[ERR] connect: Apply failed %v" , err )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
s . logger . Printf ( "[INFO] connect: initialized CA with provider %q" , conf . Provider )
return nil
}
2018-04-21 01:46:02 +00:00
// createProvider returns a connect CA provider from the given config.
func ( s * Server ) createCAProvider ( conf * structs . CAConfiguration ) ( connect . CAProvider , error ) {
switch conf . Provider {
case structs . ConsulCAProvider :
return NewConsulCAProvider ( conf . Config , s )
default :
return nil , fmt . Errorf ( "unknown CA provider %q" , conf . Provider )
}
}
func ( s * Server ) getCAProvider ( ) connect . CAProvider {
s . caProviderLock . RLock ( )
defer s . caProviderLock . RUnlock ( )
return s . caProvider
}
func ( s * Server ) setCAProvider ( newProvider connect . CAProvider ) {
s . caProviderLock . Lock ( )
defer s . caProviderLock . Unlock ( )
s . caProvider = newProvider
}
2014-04-03 22:51:03 +00:00
// reconcileReaped is used to reconcile nodes that have failed and been reaped
2017-09-27 03:49:41 +00:00
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.
2014-04-03 22:51:03 +00:00
func ( s * Server ) reconcileReaped ( known map [ string ] struct { } ) error {
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2017-04-19 23:00:11 +00:00
_ , checks , err := state . ChecksInState ( nil , api . HealthAny )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-10-14 18:04:43 +00:00
for _ , check := range checks {
2014-04-03 22:51:03 +00:00
// Ignore any non serf checks
2017-07-14 05:33:47 +00:00
if check . CheckID != structs . SerfCheckID {
2014-04-03 22:51:03 +00:00
continue
}
// Check if this node is "known" by serf
if _ , ok := known [ check . Node ] ; ok {
continue
}
// Get the node services, look for ConsulServiceID
2017-01-20 07:36:50 +00:00
_ , services , err := state . NodeServices ( nil , check . Node )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-04-03 22:51:03 +00:00
serverPort := 0
2017-09-27 03:49:41 +00:00
serverAddr := ""
serverID := ""
CHECKS :
2014-04-03 22:51:03 +00:00
for _ , service := range services . Services {
2017-07-14 05:33:47 +00:00
if service . ID == structs . ConsulServiceID {
2017-09-27 03:49:41 +00:00
_ , node , err := state . GetNode ( check . Node )
if err != nil {
s . logger . Printf ( "[ERR] consul: Unable to look up node with name %q: %v" , check . Node , err )
continue CHECKS
}
serverAddr = node . Address
2014-04-03 22:51:03 +00:00
serverPort = service . Port
2017-09-27 03:49:41 +00:00
lookupAddr := net . JoinHostPort ( serverAddr , strconv . Itoa ( serverPort ) )
svr := s . serverLookup . Server ( raft . ServerAddress ( lookupAddr ) )
if svr != nil {
serverID = svr . ID
}
2014-04-03 22:51:03 +00:00
break
}
}
2017-09-27 03:49:41 +00:00
// Create a fake member
member := serf . Member {
Name : check . Node ,
Tags : map [ string ] string {
"dc" : s . config . Datacenter ,
"role" : "node" ,
} ,
}
2014-04-03 22:51:03 +00:00
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member . Tags [ "role" ] = "consul"
member . Tags [ "port" ] = strconv . FormatUint ( uint64 ( serverPort ) , 10 )
2017-09-27 03:49:41 +00:00
member . Tags [ "id" ] = serverID
member . Addr = net . ParseIP ( serverAddr )
2014-04-03 22:51:03 +00:00
}
// Attempt to reap this member
if err := s . handleReapMember ( member ) ; err != nil {
return err
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
if ! s . shouldHandleMember ( member ) {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[WARN] consul: skipping reconcile of node %v" , member )
2014-01-09 23:49:09 +00:00
return nil
}
2017-10-04 23:43:27 +00:00
defer metrics . MeasureSince ( [ ] string { "leader" , "reconcileMember" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
var err error
switch member . Status {
case serf . StatusAlive :
err = s . handleAliveMember ( member )
case serf . StatusFailed :
err = s . handleFailedMember ( member )
case serf . StatusLeft :
err = s . handleLeftMember ( member )
2014-03-20 19:51:49 +00:00
case StatusReap :
err = s . handleReapMember ( member )
2014-01-09 23:49:09 +00:00
}
if err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to reconcile member: %v: %v" ,
2014-01-09 23:49:09 +00:00
member , err )
2014-12-01 04:05:15 +00:00
// Permission denied should not bubble up
2017-08-23 14:52:48 +00:00
if acl . IsErrPermissionDenied ( err ) {
2014-12-01 04:05:15 +00:00
return nil
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// shouldHandleMember checks if this is a Consul pool member
func ( s * Server ) shouldHandleMember ( member serf . Member ) bool {
if valid , dc := isConsulNode ( member ) ; valid && dc == s . config . Datacenter {
return true
}
2017-08-14 14:36:07 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid &&
parts . Segment == "" &&
parts . Datacenter == s . config . Datacenter {
2014-01-09 23:49:09 +00:00
return true
}
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func ( s * Server ) handleAliveMember ( member serf . Member ) error {
2014-01-10 01:59:31 +00:00
// Register consul service if a server
var service * structs . NodeService
2017-07-06 10:48:37 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid {
2014-01-10 01:59:31 +00:00
service = & structs . NodeService {
2017-07-14 05:33:47 +00:00
ID : structs . ConsulServiceID ,
Service : structs . ConsulServiceName ,
2014-01-20 23:39:07 +00:00
Port : parts . Port ,
2014-01-10 01:59:31 +00:00
}
2014-01-10 20:55:55 +00:00
// Attempt to join the consul server
2014-01-20 23:56:29 +00:00
if err := s . joinConsulServer ( member , parts ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
2014-01-10 01:59:31 +00:00
}
2014-01-09 23:49:09 +00:00
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-10 06:12:08 +00:00
// Check if the associated service is available
if service != nil {
match := false
2017-01-20 07:36:50 +00:00
_ , services , err := state . NodeServices ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-03-05 23:03:12 +00:00
if services != nil {
2017-04-20 18:42:22 +00:00
for id := range services . Services {
2014-03-05 23:03:12 +00:00
if id == service . ID {
match = true
}
2014-01-10 06:12:08 +00:00
}
}
if ! match {
goto AFTER_CHECK
}
}
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the passing state
2017-01-24 07:37:21 +00:00
_ , checks , err := state . NodeChecks ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2017-07-14 05:33:47 +00:00
if check . CheckID == structs . SerfCheckID && check . Status == api . HealthPassing {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 06:12:08 +00:00
AFTER_CHECK :
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' joined, marking health alive" , member . Name )
2014-01-09 23:49:09 +00:00
2017-03-23 22:01:46 +00:00
// Register with the catalog.
2014-01-09 23:49:09 +00:00
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2017-03-23 22:01:46 +00:00
ID : types . NodeID ( member . Tags [ "id" ] ) ,
2014-01-09 23:49:09 +00:00
Address : member . Addr . String ( ) ,
2014-01-10 01:57:13 +00:00
Service : service ,
2014-01-09 23:49:09 +00:00
Check : & structs . HealthCheck {
Node : member . Name ,
2017-07-14 05:33:47 +00:00
CheckID : structs . SerfCheckID ,
Name : structs . SerfCheckName ,
2017-04-19 23:00:11 +00:00
Status : api . HealthPassing ,
2017-07-14 05:33:47 +00:00
Output : structs . SerfCheckAliveOutput ,
2014-01-09 23:49:09 +00:00
} ,
2017-03-23 22:01:46 +00:00
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate : true ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . RegisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func ( s * Server ) handleFailedMember ( member serf . Member ) error {
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the critical state
2017-01-24 07:37:21 +00:00
_ , checks , err := state . NodeChecks ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2017-07-14 05:33:47 +00:00
if check . CheckID == structs . SerfCheckID && check . Status == api . HealthCritical {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' failed, marking health critical" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2017-03-23 22:01:46 +00:00
ID : types . NodeID ( member . Tags [ "id" ] ) ,
2014-01-09 23:49:09 +00:00
Address : member . Addr . String ( ) ,
Check : & structs . HealthCheck {
Node : member . Name ,
2017-07-14 05:33:47 +00:00
CheckID : structs . SerfCheckID ,
Name : structs . SerfCheckName ,
2017-04-19 23:00:11 +00:00
Status : api . HealthCritical ,
2017-07-14 05:33:47 +00:00
Output : structs . SerfCheckFailedOutput ,
2014-01-09 23:49:09 +00:00
} ,
2017-03-23 22:01:46 +00:00
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate : true ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . RegisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func ( s * Server ) handleLeftMember ( member serf . Member ) error {
2014-03-20 19:51:49 +00:00
return s . handleDeregisterMember ( "left" , member )
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func ( s * Server ) handleReapMember ( member serf . Member ) error {
return s . handleDeregisterMember ( "reaped" , member )
}
// handleDeregisterMember is used to deregister a member of a given reason
func ( s * Server ) handleDeregisterMember ( reason string , member serf . Member ) error {
2015-01-21 00:13:54 +00:00
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member . Name == s . config . NodeName {
s . logger . Printf ( "[WARN] consul: deregistering self (%s) should be done by follower" , s . config . NodeName )
2014-01-09 23:49:09 +00:00
return nil
}
2014-01-10 20:55:55 +00:00
// Remove from Raft peers if this was a server
2017-07-06 10:48:37 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid {
2014-01-20 23:39:07 +00:00
if err := s . removeConsulServer ( member , parts . Port ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
}
2015-10-12 07:42:09 +00:00
// Check if the node does not exist
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node == nil {
2014-10-14 05:14:43 +00:00
return nil
}
2014-01-09 23:49:09 +00:00
// Deregister the node
2014-10-14 05:14:43 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' %s, deregistering" , member . Name , reason )
2014-01-09 23:49:09 +00:00
req := structs . DeregisterRequest {
2017-03-23 20:34:30 +00:00
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . DeregisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
2014-01-10 20:55:55 +00:00
// joinConsulServer is used to try to join another consul server
2017-07-06 10:48:37 +00:00
func ( s * Server ) joinConsulServer ( m serf . Member , parts * metadata . Server ) error {
2014-01-20 23:56:29 +00:00
// Check for possibility of multiple bootstrap nodes
2014-01-30 21:13:29 +00:00
if parts . Bootstrap {
2014-01-20 23:56:29 +00:00
members := s . serfLAN . Members ( )
for _ , member := range members {
2017-07-06 10:48:37 +00:00
valid , p := metadata . IsConsulServer ( member )
2014-01-30 21:13:29 +00:00
if valid && member . Name != m . Name && p . Bootstrap {
2014-01-20 23:56:29 +00:00
s . logger . Printf ( "[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
}
}
2017-09-06 20:05:51 +00:00
// Processing ourselves could result in trying to remove ourselves to
// fix up our address, which would make us step down. This is only
// safe to attempt if there are multiple servers available.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
2017-09-06 20:05:51 +00:00
if m . Name == s . config . NodeName {
if l := len ( configFuture . Configuration ( ) . Servers ) ; l < 3 {
s . logger . Printf ( "[DEBUG] consul: Skipping self join check for %q since the cluster is too small" , m . Name )
return nil
}
}
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
2017-09-06 20:07:42 +00:00
addr := ( & net . TCPAddr { IP : m . Addr , Port : parts . Port } ) . String ( )
2017-12-12 00:38:52 +00:00
minRaftProtocol , err := s . autopilot . MinRaftProtocol ( )
2017-09-06 20:07:42 +00:00
if err != nil {
return err
}
2016-07-28 19:11:28 +00:00
for _ , server := range configFuture . Configuration ( ) . Servers {
2017-03-15 23:09:55 +00:00
// No-op if the raft version is too low
if server . Address == raft . ServerAddress ( addr ) && ( minRaftProtocol < 2 || parts . RaftVersion < 3 ) {
2016-07-28 19:11:28 +00:00
return nil
}
2017-03-15 23:09:55 +00:00
// If the address or ID matches an existing server, see if we need to remove the old one first
if server . Address == raft . ServerAddress ( addr ) || server . ID == raft . ServerID ( parts . ID ) {
// Exit with no-op if this is being called on an existing server
if server . Address == raft . ServerAddress ( addr ) && server . ID == raft . ServerID ( parts . ID ) {
return nil
2017-04-21 01:59:42 +00:00
}
future := s . raft . RemoveServer ( server . ID , 0 , 0 )
if server . Address == raft . ServerAddress ( addr ) {
if err := future . Error ( ) ; err != nil {
return fmt . Errorf ( "error removing server with duplicate address %q: %s" , server . Address , err )
}
s . logger . Printf ( "[INFO] consul: removed server with duplicate address: %s" , server . Address )
2017-03-15 23:09:55 +00:00
} else {
2017-04-21 01:59:42 +00:00
if err := future . Error ( ) ; err != nil {
return fmt . Errorf ( "error removing server with duplicate ID %q: %s" , server . ID , err )
2017-03-15 23:09:55 +00:00
}
2017-04-21 01:59:42 +00:00
s . logger . Printf ( "[INFO] consul: removed server with duplicate ID: %s" , server . ID )
2017-03-15 23:09:55 +00:00
}
}
2014-01-10 20:55:55 +00:00
}
2017-02-17 18:49:16 +00:00
2017-03-15 23:09:55 +00:00
// Attempt to add as a peer
2017-03-01 22:04:40 +00:00
switch {
case minRaftProtocol >= 3 :
addFuture := s . raft . AddNonvoter ( raft . ServerID ( parts . ID ) , raft . ServerAddress ( addr ) , 0 , 0 )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
case minRaftProtocol == 2 && parts . RaftVersion >= 3 :
2017-02-22 20:53:32 +00:00
addFuture := s . raft . AddVoter ( raft . ServerID ( parts . ID ) , raft . ServerAddress ( addr ) , 0 , 0 )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
2017-03-01 22:04:40 +00:00
default :
2017-02-22 20:53:32 +00:00
addFuture := s . raft . AddPeer ( raft . ServerAddress ( addr ) )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
}
2017-03-01 22:04:40 +00:00
// Trigger a check to remove dead servers
2017-12-18 20:26:35 +00:00
s . autopilot . RemoveDeadServers ( )
2017-02-17 18:49:16 +00:00
2014-01-10 20:55:55 +00:00
return nil
}
2014-01-10 23:05:34 +00:00
2014-01-20 23:39:07 +00:00
// removeConsulServer is used to try to remove a consul server that has left
2014-01-10 23:05:34 +00:00
func ( s * Server ) removeConsulServer ( m serf . Member , port int ) error {
2016-07-28 19:11:28 +00:00
addr := ( & net . TCPAddr { IP : m . Addr , Port : port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-remove it
2016-07-29 16:27:15 +00:00
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
2017-02-22 20:53:32 +00:00
2017-12-12 00:38:52 +00:00
minRaftProtocol , err := s . autopilot . MinRaftProtocol ( )
2017-02-22 20:53:32 +00:00
if err != nil {
return err
}
2017-07-06 10:48:37 +00:00
_ , parts := metadata . IsConsulServer ( m )
2017-02-22 20:53:32 +00:00
// Pick which remove API to use based on how the server was added.
2016-07-28 19:11:28 +00:00
for _ , server := range configFuture . Configuration ( ) . Servers {
2017-02-22 20:53:32 +00:00
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server . ID == raft . ServerID ( parts . ID ) {
2017-02-23 21:08:40 +00:00
s . logger . Printf ( "[INFO] consul: removing server by ID: %q" , server . ID )
2017-02-22 20:53:32 +00:00
future := s . raft . RemoveServer ( raft . ServerID ( parts . ID ) , 0 , 0 )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
2017-02-23 21:08:40 +00:00
server . ID , err )
2017-02-22 20:53:32 +00:00
return err
}
break
} else if server . Address == raft . ServerAddress ( addr ) {
// If not, use the old remove API
2017-02-23 21:08:40 +00:00
s . logger . Printf ( "[INFO] consul: removing server by address: %q" , server . Address )
2017-02-22 20:53:32 +00:00
future := s . raft . RemovePeer ( raft . ServerAddress ( addr ) )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
addr , err )
return err
}
break
2016-07-28 19:11:28 +00:00
}
}
2014-01-10 23:05:34 +00:00
return nil
}
2014-12-15 23:04:21 +00:00
// reapTombstones is invoked by the current leader to manage garbage
// collection of tombstones. When a key is deleted, we trigger a tombstone
// GC clock. Once the expiration is reached, this routine is invoked
// to clear all tombstones before this index. This must be replicated
// through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking.
func ( s * Server ) reapTombstones ( index uint64 ) {
2017-10-04 23:43:27 +00:00
defer metrics . MeasureSince ( [ ] string { "leader" , "reapTombstones" } , time . Now ( ) )
2014-12-15 23:28:56 +00:00
req := structs . TombstoneRequest {
2017-03-23 20:34:30 +00:00
Datacenter : s . config . Datacenter ,
Op : structs . TombstoneReap ,
ReapIndex : index ,
2014-12-15 23:04:21 +00:00
}
2014-12-15 23:28:56 +00:00
_ , err := s . raftApply ( structs . TombstoneRequestType , & req )
2014-12-15 23:04:21 +00:00
if err != nil {
s . logger . Printf ( "[ERR] consul: failed to reap tombstones up to %d: %v" ,
index , err )
}
}