2014-01-09 23:49:09 +00:00
package consul
import (
2018-10-19 16:04:07 +00:00
"context"
2014-08-11 21:54:18 +00:00
"fmt"
2014-06-16 21:36:12 +00:00
"net"
"strconv"
2018-05-04 23:01:54 +00:00
"strings"
2017-04-13 21:17:32 +00:00
"sync"
2018-10-19 16:04:07 +00:00
"sync/atomic"
2014-06-16 21:36:12 +00:00
"time"
2014-02-20 23:16:26 +00:00
"github.com/armon/go-metrics"
2017-08-23 14:52:48 +00:00
"github.com/hashicorp/consul/acl"
2018-04-24 18:50:31 +00:00
"github.com/hashicorp/consul/agent/connect"
2018-05-09 22:12:31 +00:00
ca "github.com/hashicorp/consul/agent/connect/ca"
2017-12-12 00:38:52 +00:00
"github.com/hashicorp/consul/agent/consul/autopilot"
2017-07-06 10:48:37 +00:00
"github.com/hashicorp/consul/agent/metadata"
2017-07-06 10:34:00 +00:00
"github.com/hashicorp/consul/agent/structs"
2017-04-19 23:00:11 +00:00
"github.com/hashicorp/consul/api"
2018-10-19 16:04:07 +00:00
"github.com/hashicorp/consul/lib"
2016-06-07 20:24:51 +00:00
"github.com/hashicorp/consul/types"
2018-10-19 16:04:07 +00:00
memdb "github.com/hashicorp/go-memdb"
2018-04-24 18:50:31 +00:00
uuid "github.com/hashicorp/go-uuid"
2017-08-03 00:05:18 +00:00
"github.com/hashicorp/go-version"
2014-01-10 20:55:55 +00:00
"github.com/hashicorp/raft"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/serf/serf"
2018-10-19 16:04:07 +00:00
"golang.org/x/time/rate"
2014-01-09 23:49:09 +00:00
)
const (
2017-07-14 05:33:47 +00:00
newLeaderEvent = "consul:new-leader"
barrierWriteTimeout = 2 * time . Minute
2014-01-09 23:49:09 +00:00
)
2018-06-21 22:42:28 +00:00
var (
// caRootPruneInterval is how often we check for stale CARoots to remove.
caRootPruneInterval = time . Hour
// minAutopilotVersion is the minimum Consul version in which Autopilot features
// are supported.
minAutopilotVersion = version . Must ( version . NewVersion ( "0.8.0" ) )
)
2017-12-12 00:38:52 +00:00
2014-01-09 23:49:09 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
2017-04-13 21:17:32 +00:00
// We use the notify channel we configured Raft with, NOT Raft's
// leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for
// cleanup and to ensure we never run multiple leader loops.
2017-07-06 14:09:21 +00:00
raftNotifyCh := s . raftNotifyCh
2017-04-13 21:17:32 +00:00
2018-10-19 16:04:07 +00:00
aclModeCheckWait := aclModeCheckMinInterval
var aclUpgradeCh <- chan time . Time
if s . ACLsEnabled ( ) {
aclUpgradeCh = time . After ( aclModeCheckWait )
}
2017-10-06 14:54:49 +00:00
var weAreLeaderCh chan struct { }
var leaderLoop sync . WaitGroup
2014-01-09 23:49:09 +00:00
for {
select {
2017-07-05 22:09:18 +00:00
case isLeader := <- raftNotifyCh :
2017-10-06 14:54:49 +00:00
switch {
case isLeader :
if weAreLeaderCh != nil {
s . logger . Printf ( "[ERR] consul: attempted to start the leader loop while running" )
continue
}
weAreLeaderCh = make ( chan struct { } )
leaderLoop . Add ( 1 )
go func ( ch chan struct { } ) {
defer leaderLoop . Done ( )
s . leaderLoop ( ch )
} ( weAreLeaderCh )
2014-01-09 23:49:09 +00:00
s . logger . Printf ( "[INFO] consul: cluster leadership acquired" )
2017-10-06 14:54:49 +00:00
default :
if weAreLeaderCh == nil {
s . logger . Printf ( "[ERR] consul: attempted to stop the leader loop while not running" )
continue
}
s . logger . Printf ( "[DEBUG] consul: shutting down leader loop" )
close ( weAreLeaderCh )
leaderLoop . Wait ( )
weAreLeaderCh = nil
2014-01-09 23:49:09 +00:00
s . logger . Printf ( "[INFO] consul: cluster leadership lost" )
}
2018-10-19 16:04:07 +00:00
case <- aclUpgradeCh :
if atomic . LoadInt32 ( & s . useNewACLs ) == 0 {
aclModeCheckWait = aclModeCheckWait * 2
if aclModeCheckWait > aclModeCheckMaxInterval {
aclModeCheckWait = aclModeCheckMaxInterval
}
aclUpgradeCh = time . After ( aclModeCheckWait )
if canUpgrade := s . canUpgradeToNewACLs ( weAreLeaderCh != nil ) ; canUpgrade {
if weAreLeaderCh != nil {
if err := s . initializeACLs ( true ) ; err != nil {
s . logger . Printf ( "[ERR] consul: error transitioning to using new ACLs: %v" , err )
continue
}
}
s . logger . Printf ( "[DEBUG] acl: transitioning out of legacy ACL mode" )
atomic . StoreInt32 ( & s . useNewACLs , 1 )
s . updateACLAdvertisement ( )
// setting this to nil ensures that we will never hit this case again
aclUpgradeCh = nil
}
} else {
// establishLeadership probably transitioned us
aclUpgradeCh = nil
}
2014-01-09 23:49:09 +00:00
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
2015-09-15 12:22:08 +00:00
// maintenance activities
2014-01-09 23:49:09 +00:00
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2014-02-19 20:36:27 +00:00
// Fire a user event indicating a new leader
payload := [ ] byte ( s . config . NodeName )
2017-08-14 14:36:07 +00:00
for name , segment := range s . LANSegments ( ) {
if err := segment . UserEvent ( newLeaderEvent , payload , false ) ; err != nil {
s . logger . Printf ( "[WARN] consul: failed to broadcast new leader event on segment %q: %v" , name , err )
}
2014-02-19 20:36:27 +00:00
}
2014-01-10 20:55:55 +00:00
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf . Member
2014-12-13 05:42:24 +00:00
establishedLeader := false
2014-01-10 20:55:55 +00:00
2017-05-04 18:52:22 +00:00
reassert := func ( ) error {
if ! establishedLeader {
return fmt . Errorf ( "leadership has not been established" )
}
if err := s . revokeLeadership ( ) ; err != nil {
return err
}
if err := s . establishLeadership ( ) ; err != nil {
return err
}
return nil
}
2014-01-09 23:49:09 +00:00
RECONCILE :
2014-01-10 20:55:55 +00:00
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
2014-01-09 23:49:09 +00:00
// Apply a raft barrier to ensure our FSM is caught up
2014-02-20 23:16:26 +00:00
start := time . Now ( )
2017-07-05 22:09:18 +00:00
barrier := s . raft . Barrier ( barrierWriteTimeout )
2014-01-09 23:49:09 +00:00
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to wait for barrier: %v" , err )
2017-10-06 14:54:49 +00:00
goto WAIT
2014-01-09 23:49:09 +00:00
}
2017-10-04 23:43:27 +00:00
metrics . MeasureSince ( [ ] string { "leader" , "barrier" } , start )
2014-01-09 23:49:09 +00:00
2014-12-13 05:42:24 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
if err := s . establishLeadership ( ) ; err != nil {
2017-05-04 14:15:25 +00:00
s . logger . Printf ( "[ERR] consul: failed to establish leadership: %v" , err )
2018-02-21 18:33:22 +00:00
// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := s . revokeLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to revoke leadership: %v" , err )
}
2014-12-13 05:42:24 +00:00
goto WAIT
}
establishedLeader = true
2017-08-03 00:05:18 +00:00
defer func ( ) {
if err := s . revokeLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to revoke leadership: %v" , err )
}
} ( )
2014-12-13 05:42:24 +00:00
}
2014-01-09 23:49:09 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to reconcile: %v" , err )
goto WAIT
}
2014-01-10 20:55:55 +00:00
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
2014-01-09 23:49:09 +00:00
WAIT :
2017-10-06 14:54:49 +00:00
// Poll the stop channel to give it priority so we don't waste time
// trying to perform the other operations if we have been asked to shut
// down.
select {
case <- stopCh :
return
default :
}
2014-01-10 20:55:55 +00:00
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
s . reconcileMember ( member )
2014-12-15 23:04:21 +00:00
case index := <- s . tombstoneGC . ExpireCh ( ) :
go s . reapTombstones ( index )
2017-05-04 18:52:22 +00:00
case errCh := <- s . reassertLeaderCh :
errCh <- reassert ( )
2014-01-10 20:55:55 +00:00
}
2014-01-09 23:49:09 +00:00
}
}
2014-12-13 05:42:24 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2015-09-11 19:24:54 +00:00
// previously inflight transactions have been committed and that our
2014-12-13 05:42:24 +00:00
// state is up-to-date.
func ( s * Server ) establishLeadership ( ) error {
2018-10-19 16:04:07 +00:00
// check for the upgrade here - this helps us transition to new ACLs much
// quicker if this is a new cluster or this is a test agent
if canUpgrade := s . canUpgradeToNewACLs ( true ) ; canUpgrade {
if err := s . initializeACLs ( true ) ; err != nil {
return err
}
atomic . StoreInt32 ( & s . useNewACLs , 1 )
s . updateACLAdvertisement ( )
} else if err := s . initializeACLs ( false ) ; err != nil {
2017-08-03 00:05:18 +00:00
return err
}
2014-12-15 22:37:49 +00:00
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
2015-01-05 22:58:59 +00:00
s . tombstoneGC . SetEnabled ( true )
2014-12-15 22:37:49 +00:00
lastIndex := s . raft . LastIndex ( )
s . tombstoneGC . Hint ( lastIndex )
2014-12-13 05:42:24 +00:00
// Setup the session timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node along, effectively this means all the timers are renewed at the
// time of failover. The TTL contract is that the session will not be expired
// before the TTL, so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Sessions
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeSessionTimers ( ) ; err != nil {
return err
}
2017-02-24 04:32:13 +00:00
2017-04-13 01:38:36 +00:00
s . getOrCreateAutopilotConfig ( )
2017-12-12 00:38:52 +00:00
s . autopilot . Start ( )
2018-04-09 04:57:32 +00:00
// todo(kyhavlov): start a goroutine here for handling periodic CA rotation
2018-04-24 23:16:37 +00:00
if err := s . initializeCA ( ) ; err != nil {
return err
}
2018-04-09 04:57:32 +00:00
2018-10-15 16:17:48 +00:00
s . startEnterpriseLeader ( )
2018-06-21 22:42:28 +00:00
s . startCARootPruning ( )
2017-06-16 03:41:30 +00:00
s . setConsistentReadReady ( )
2014-12-13 05:42:24 +00:00
return nil
}
2015-01-05 22:58:59 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
// Disable the tombstone GC, since it is only useful as a leader
s . tombstoneGC . SetEnabled ( false )
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s . clearAllSessionTimers ( ) ; err != nil {
return err
}
2017-03-01 22:04:40 +00:00
2018-10-15 16:17:48 +00:00
s . stopEnterpriseLeader ( )
2018-06-21 22:42:28 +00:00
s . stopCARootPruning ( )
2018-06-20 11:37:36 +00:00
s . setCAProvider ( nil , nil )
2018-04-24 23:16:37 +00:00
2018-10-19 16:04:07 +00:00
s . stopACLUpgrade ( )
2017-06-16 03:41:30 +00:00
s . resetConsistentReadReady ( )
2017-12-12 00:38:52 +00:00
s . autopilot . Stop ( )
2015-01-05 22:58:59 +00:00
return nil
}
2018-10-19 16:04:07 +00:00
// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed
func ( s * Server ) initializeLegacyACL ( ) error {
if ! s . ACLsEnabled ( ) {
2014-08-11 21:54:18 +00:00
return nil
}
2018-10-19 16:04:07 +00:00
authDC := s . config . ACLDatacenter
2014-08-11 21:54:18 +00:00
2017-08-03 00:05:18 +00:00
// Create anonymous token if missing.
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2018-10-19 16:04:07 +00:00
_ , token , err := state . ACLTokenGetBySecret ( nil , anonymousToken )
2014-08-11 21:54:18 +00:00
if err != nil {
return fmt . Errorf ( "failed to get anonymous token: %v" , err )
}
2018-10-19 16:04:07 +00:00
if token == nil {
2014-08-11 21:54:18 +00:00
req := structs . ACLRequest {
Datacenter : authDC ,
2014-10-09 19:28:07 +00:00
Op : structs . ACLSet ,
2014-08-11 21:54:18 +00:00
ACL : structs . ACL {
ID : anonymousToken ,
Name : "Anonymous Token" ,
2018-10-19 16:04:07 +00:00
Type : structs . ACLTokenTypeClient ,
2014-08-11 21:54:18 +00:00
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create anonymous token: %v" , err )
}
2018-10-19 16:04:07 +00:00
s . logger . Printf ( "[INFO] acl: Created the anonymous token" )
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
// Check for configured master token.
if master := s . config . ACLMasterToken ; len ( master ) > 0 {
2018-10-19 16:04:07 +00:00
_ , token , err = state . ACLTokenGetBySecret ( nil , master )
2017-08-03 00:05:18 +00:00
if err != nil {
return fmt . Errorf ( "failed to get master token: %v" , err )
}
2018-10-19 16:04:07 +00:00
if token == nil {
2017-08-03 00:05:18 +00:00
req := structs . ACLRequest {
Datacenter : authDC ,
Op : structs . ACLSet ,
ACL : structs . ACL {
ID : master ,
Name : "Master Token" ,
2018-10-19 16:04:07 +00:00
Type : structs . ACLTokenTypeManagement ,
2017-08-03 00:05:18 +00:00
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create master token: %v" , err )
}
s . logger . Printf ( "[INFO] consul: Created ACL master token from configuration" )
}
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
// Check to see if we need to initialize the ACL bootstrap info. This
// needs a Consul version check since it introduces a new Raft operation
// that'll produce an error on older servers, and it also makes a piece
// of state in the state store that will cause problems with older
// servers consuming snapshots, so we have to wait to create it.
var minVersion = version . Must ( version . NewVersion ( "0.9.1" ) )
if ServersMeetMinimumVersion ( s . LANMembers ( ) , minVersion ) {
2018-10-19 16:04:07 +00:00
canBootstrap , _ , err := state . CanBootstrapACLToken ( )
2014-08-11 21:54:18 +00:00
if err != nil {
2017-08-03 00:05:18 +00:00
return fmt . Errorf ( "failed looking for ACL bootstrap info: %v" , err )
2014-08-11 21:54:18 +00:00
}
2018-10-19 16:04:07 +00:00
if canBootstrap {
2017-08-03 00:05:18 +00:00
req := structs . ACLRequest {
Datacenter : authDC ,
Op : structs . ACLBootstrapInit ,
}
resp , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to initialize ACL bootstrap: %v" , err )
}
switch v := resp . ( type ) {
case error :
return fmt . Errorf ( "failed to initialize ACL bootstrap: %v" , v )
case bool :
if v {
s . logger . Printf ( "[INFO] consul: ACL bootstrap enabled" )
} else {
s . logger . Printf ( "[INFO] consul: ACL bootstrap disabled, existing management tokens found" )
}
default :
return fmt . Errorf ( "unexpected response trying to initialize ACL bootstrap: %T" , v )
}
}
} else {
s . logger . Printf ( "[WARN] consul: Can't initialize ACL bootstrap until all servers are >= %s" , minVersion . String ( ) )
2014-08-11 21:54:18 +00:00
}
2017-08-03 00:05:18 +00:00
2014-08-11 21:54:18 +00:00
return nil
}
2018-10-19 16:04:07 +00:00
// initializeACLs is used to setup the ACLs if we are the leader
// and need to do this.
func ( s * Server ) initializeACLs ( upgrade bool ) error {
if ! s . ACLsEnabled ( ) {
return nil
}
// Purge the cache, since it could've changed while we were not the
// leader.
s . acls . cache . Purge ( )
if s . InACLDatacenter ( ) {
if s . UseLegacyACLs ( ) && ! upgrade {
s . logger . Printf ( "[INFO] acl: initializing legacy acls" )
return s . initializeLegacyACL ( )
}
s . logger . Printf ( "[INFO] acl: initializing acls" )
// Create the builtin global-management policy
_ , policy , err := s . fsm . State ( ) . ACLPolicyGetByID ( nil , structs . ACLPolicyGlobalManagementID )
if err != nil {
return fmt . Errorf ( "failed to get the builtin global-management policy" )
}
if policy == nil {
policy := structs . ACLPolicy {
ID : structs . ACLPolicyGlobalManagementID ,
Name : "global-management" ,
Description : "Builtin Policy that grants unlimited access" ,
Rules : structs . ACLPolicyGlobalManagement ,
Syntax : acl . SyntaxCurrent ,
}
policy . SetHash ( true )
2018-10-31 20:00:46 +00:00
req := structs . ACLPolicyBatchSetRequest {
2018-10-19 16:04:07 +00:00
Policies : structs . ACLPolicies { & policy } ,
}
2018-10-31 20:00:46 +00:00
_ , err := s . raftApply ( structs . ACLPolicySetRequestType , & req )
2018-10-19 16:04:07 +00:00
if err != nil {
return fmt . Errorf ( "failed to create global-management policy: %v" , err )
}
s . logger . Printf ( "[INFO] consul: Created ACL 'global-management' policy" )
}
// Check for configured master token.
if master := s . config . ACLMasterToken ; len ( master ) > 0 {
state := s . fsm . State ( )
if _ , err := uuid . ParseUUID ( master ) ; err != nil {
s . logger . Printf ( "[WARN] consul: Configuring a non-UUID master token is deprecated" )
}
_ , token , err := state . ACLTokenGetBySecret ( nil , master )
if err != nil {
return fmt . Errorf ( "failed to get master token: %v" , err )
}
if token == nil {
accessor , err := lib . GenerateUUID ( s . checkTokenUUID )
if err != nil {
return fmt . Errorf ( "failed to generate the accessor ID for the master token: %v" , err )
}
token := structs . ACLToken {
AccessorID : accessor ,
SecretID : master ,
Description : "Master Token" ,
Policies : [ ] structs . ACLTokenPolicyLink {
{
ID : structs . ACLPolicyGlobalManagementID ,
} ,
} ,
CreateTime : time . Now ( ) ,
Local : false ,
// DEPRECATED (ACL-Legacy-Compat) - only needed for compatibility
Type : structs . ACLTokenTypeManagement ,
}
token . SetHash ( true )
done := false
if canBootstrap , _ , err := state . CanBootstrapACLToken ( ) ; err == nil && canBootstrap {
req := structs . ACLTokenBootstrapRequest {
Token : token ,
ResetIndex : 0 ,
}
if _ , err := s . raftApply ( structs . ACLBootstrapRequestType , & req ) ; err == nil {
s . logger . Printf ( "[INFO] consul: Bootstrapped ACL master token from configuration" )
done = true
} else {
if err . Error ( ) != structs . ACLBootstrapNotAllowedErr . Error ( ) &&
err . Error ( ) != structs . ACLBootstrapInvalidResetIndexErr . Error ( ) {
return fmt . Errorf ( "failed to bootstrap master token: %v" , err )
}
}
}
if ! done {
// either we didn't attempt to or setting the token with a bootstrap request failed.
2018-10-31 20:00:46 +00:00
req := structs . ACLTokenBatchSetRequest {
2018-10-19 16:04:07 +00:00
Tokens : structs . ACLTokens { & token } ,
2018-10-31 20:00:46 +00:00
CAS : false ,
2018-10-19 16:04:07 +00:00
}
2018-10-31 20:00:46 +00:00
if _ , err := s . raftApply ( structs . ACLTokenSetRequestType , & req ) ; err != nil {
2018-10-19 16:04:07 +00:00
return fmt . Errorf ( "failed to create master token: %v" , err )
}
s . logger . Printf ( "[INFO] consul: Created ACL master token from configuration" )
}
}
}
state := s . fsm . State ( )
_ , token , err := state . ACLTokenGetBySecret ( nil , structs . ACLTokenAnonymousID )
if err != nil {
return fmt . Errorf ( "failed to get anonymous token: %v" , err )
}
if token == nil {
// DEPRECATED (ACL-Legacy-Compat) - Don't need to query for previous "anonymous" token
// check for legacy token that needs an upgrade
_ , legacyToken , err := state . ACLTokenGetBySecret ( nil , anonymousToken )
if err != nil {
return fmt . Errorf ( "failed to get anonymous token: %v" , err )
}
// the token upgrade routine will take care of upgrading the token if a legacy version exists
if legacyToken == nil {
token = & structs . ACLToken {
AccessorID : structs . ACLTokenAnonymousID ,
SecretID : anonymousToken ,
Description : "Anonymous Token" ,
2018-10-31 22:11:51 +00:00
CreateTime : time . Now ( ) ,
2018-10-19 16:04:07 +00:00
}
token . SetHash ( true )
2018-10-31 20:00:46 +00:00
req := structs . ACLTokenBatchSetRequest {
Tokens : structs . ACLTokens { token } ,
CAS : false ,
2018-10-19 16:04:07 +00:00
}
2018-10-31 20:00:46 +00:00
_ , err := s . raftApply ( structs . ACLTokenSetRequestType , & req )
2018-10-19 16:04:07 +00:00
if err != nil {
return fmt . Errorf ( "failed to create anonymous token: %v" , err )
}
s . logger . Printf ( "[INFO] consul: Created ACL anonymous token from configuration" )
}
}
s . startACLUpgrade ( )
} else {
if s . UseLegacyACLs ( ) && ! upgrade {
if s . IsACLReplicationEnabled ( ) {
s . startLegacyACLReplication ( )
}
}
if upgrade {
s . stopACLReplication ( )
}
// ACL replication is now mandatory
s . startACLReplication ( )
}
// launch the upgrade go routine to generate accessors for everything
return nil
}
func ( s * Server ) startACLUpgrade ( ) {
s . aclUpgradeLock . Lock ( )
defer s . aclUpgradeLock . Unlock ( )
if s . aclUpgradeEnabled {
return
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
s . aclUpgradeCancel = cancel
go func ( ) {
limiter := rate . NewLimiter ( aclUpgradeRateLimit , int ( aclUpgradeRateLimit ) )
for {
if err := limiter . Wait ( ctx ) ; err != nil {
return
}
// actually run the upgrade here
state := s . fsm . State ( )
tokens , waitCh , err := state . ACLTokenListUpgradeable ( aclUpgradeBatchSize )
if err != nil {
s . logger . Printf ( "[WARN] acl: encountered an error while searching for tokens without accessor ids: %v" , err )
}
if len ( tokens ) == 0 {
ws := memdb . NewWatchSet ( )
ws . Add ( state . AbandonCh ( ) )
ws . Add ( waitCh )
ws . Add ( ctx . Done ( ) )
// wait for more tokens to need upgrading or the aclUpgradeCh to be closed
ws . Watch ( nil )
continue
}
var newTokens structs . ACLTokens
for _ , token := range tokens {
2018-11-02 17:00:39 +00:00
// This should be entirely unnecessary but is just a small safeguard against changing accessor IDs
2018-10-19 16:04:07 +00:00
if token . AccessorID != "" {
continue
}
newToken := * token
if token . SecretID == anonymousToken {
newToken . AccessorID = structs . ACLTokenAnonymousID
} else {
accessor , err := lib . GenerateUUID ( s . checkTokenUUID )
if err != nil {
s . logger . Printf ( "[WARN] acl: failed to generate accessor during token auto-upgrade: %v" , err )
continue
}
newToken . AccessorID = accessor
}
// Assign the global-management policy to legacy management tokens
if len ( newToken . Policies ) == 0 && newToken . Type == structs . ACLTokenTypeManagement {
newToken . Policies = append ( newToken . Policies , structs . ACLTokenPolicyLink { ID : structs . ACLPolicyGlobalManagementID } )
}
2018-10-31 20:00:46 +00:00
// need to copy these as we are going to do a CAS operation.
newToken . CreateIndex = token . CreateIndex
newToken . ModifyIndex = token . ModifyIndex
2018-11-07 15:59:44 +00:00
newToken . SetHash ( true )
2018-10-19 16:04:07 +00:00
newTokens = append ( newTokens , & newToken )
}
2018-10-31 20:00:46 +00:00
req := & structs . ACLTokenBatchSetRequest { Tokens : newTokens , CAS : true }
2018-10-19 16:04:07 +00:00
2018-10-31 20:00:46 +00:00
resp , err := s . raftApply ( structs . ACLTokenSetRequestType , req )
2018-10-19 16:04:07 +00:00
if err != nil {
s . logger . Printf ( "[ERR] acl: failed to apply acl token upgrade batch: %v" , err )
}
if err , ok := resp . ( error ) ; ok {
s . logger . Printf ( "[ERR] acl: failed to apply acl token upgrade batch: %v" , err )
}
}
} ( )
s . aclUpgradeEnabled = true
}
func ( s * Server ) stopACLUpgrade ( ) {
s . aclUpgradeLock . Lock ( )
defer s . aclUpgradeLock . Unlock ( )
if ! s . aclUpgradeEnabled {
return
}
s . aclUpgradeCancel ( )
s . aclUpgradeCancel = nil
s . aclUpgradeEnabled = false
}
func ( s * Server ) startLegacyACLReplication ( ) {
s . aclReplicationLock . Lock ( )
defer s . aclReplicationLock . Unlock ( )
if s . aclReplicationEnabled {
return
}
s . initReplicationStatus ( )
ctx , cancel := context . WithCancel ( context . Background ( ) )
s . aclReplicationCancel = cancel
go func ( ) {
var lastRemoteIndex uint64
limiter := rate . NewLimiter ( rate . Limit ( s . config . ACLReplicationRate ) , s . config . ACLReplicationBurst )
for {
if err := limiter . Wait ( ctx ) ; err != nil {
return
}
2019-02-27 19:28:31 +00:00
if s . tokens . ReplicationToken ( ) == "" {
2018-10-19 16:04:07 +00:00
continue
}
index , exit , err := s . replicateLegacyACLs ( lastRemoteIndex , ctx )
if exit {
return
}
if err != nil {
lastRemoteIndex = 0
s . updateACLReplicationStatusError ( )
s . logger . Printf ( "[WARN] consul: Legacy ACL replication error (will retry if still leader): %v" , err )
} else {
lastRemoteIndex = index
s . updateACLReplicationStatusIndex ( index )
s . logger . Printf ( "[DEBUG] consul: Legacy ACL replication completed through remote index %d" , index )
}
}
} ( )
s . updateACLReplicationStatusRunning ( structs . ACLReplicateLegacy )
s . aclReplicationEnabled = true
}
func ( s * Server ) startACLReplication ( ) {
s . aclReplicationLock . Lock ( )
defer s . aclReplicationLock . Unlock ( )
if s . aclReplicationEnabled {
return
}
s . initReplicationStatus ( )
ctx , cancel := context . WithCancel ( context . Background ( ) )
s . aclReplicationCancel = cancel
replicationType := structs . ACLReplicatePolicies
go func ( ) {
var failedAttempts uint
limiter := rate . NewLimiter ( rate . Limit ( s . config . ACLReplicationRate ) , s . config . ACLReplicationBurst )
var lastRemoteIndex uint64
for {
if err := limiter . Wait ( ctx ) ; err != nil {
return
}
2019-02-27 19:28:31 +00:00
if s . tokens . ReplicationToken ( ) == "" {
2018-10-19 16:04:07 +00:00
continue
}
index , exit , err := s . replicateACLPolicies ( lastRemoteIndex , ctx )
if exit {
return
}
if err != nil {
lastRemoteIndex = 0
s . updateACLReplicationStatusError ( )
s . logger . Printf ( "[WARN] consul: ACL policy replication error (will retry if still leader): %v" , err )
if ( 1 << failedAttempts ) < aclReplicationMaxRetryBackoff {
failedAttempts ++
}
select {
case <- ctx . Done ( ) :
return
case <- time . After ( ( 1 << failedAttempts ) * time . Second ) :
// do nothing
}
} else {
lastRemoteIndex = index
s . updateACLReplicationStatusIndex ( index )
s . logger . Printf ( "[DEBUG] consul: ACL policy replication completed through remote index %d" , index )
failedAttempts = 0
}
}
} ( )
s . logger . Printf ( "[INFO] acl: started ACL Policy replication" )
if s . config . ACLTokenReplication {
replicationType = structs . ACLReplicateTokens
go func ( ) {
var failedAttempts uint
limiter := rate . NewLimiter ( rate . Limit ( s . config . ACLReplicationRate ) , s . config . ACLReplicationBurst )
var lastRemoteIndex uint64
for {
if err := limiter . Wait ( ctx ) ; err != nil {
return
}
2019-02-27 19:28:31 +00:00
if s . tokens . ReplicationToken ( ) == "" {
2018-10-19 16:04:07 +00:00
continue
}
index , exit , err := s . replicateACLTokens ( lastRemoteIndex , ctx )
if exit {
return
}
if err != nil {
lastRemoteIndex = 0
s . updateACLReplicationStatusError ( )
s . logger . Printf ( "[WARN] consul: ACL token replication error (will retry if still leader): %v" , err )
if ( 1 << failedAttempts ) < aclReplicationMaxRetryBackoff {
failedAttempts ++
}
select {
case <- ctx . Done ( ) :
return
case <- time . After ( ( 1 << failedAttempts ) * time . Second ) :
// do nothing
}
} else {
lastRemoteIndex = index
s . updateACLReplicationStatusTokenIndex ( index )
s . logger . Printf ( "[DEBUG] consul: ACL token replication completed through remote index %d" , index )
failedAttempts = 0
}
}
} ( )
s . logger . Printf ( "[INFO] acl: started ACL Token replication" )
}
s . updateACLReplicationStatusRunning ( replicationType )
s . aclReplicationEnabled = true
}
func ( s * Server ) stopACLReplication ( ) {
s . aclReplicationLock . Lock ( )
defer s . aclReplicationLock . Unlock ( )
if ! s . aclReplicationEnabled {
return
}
s . aclReplicationCancel ( )
s . aclReplicationCancel = nil
s . updateACLReplicationStatusStopped ( )
s . aclReplicationEnabled = false
}
2017-04-13 01:38:36 +00:00
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
2017-12-13 01:45:03 +00:00
func ( s * Server ) getOrCreateAutopilotConfig ( ) * autopilot . Config {
2017-02-24 04:32:13 +00:00
state := s . fsm . State ( )
2017-02-24 21:08:49 +00:00
_ , config , err := state . AutopilotConfig ( )
2017-02-24 04:32:13 +00:00
if err != nil {
2017-04-13 01:38:36 +00:00
s . logger . Printf ( "[ERR] autopilot: failed to get config: %v" , err )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 04:32:13 +00:00
}
if config != nil {
2017-12-13 01:45:03 +00:00
return config
2017-02-24 04:32:13 +00:00
}
2017-04-13 00:09:57 +00:00
if ! ServersMeetMinimumVersion ( s . LANMembers ( ) , minAutopilotVersion ) {
2017-04-13 17:43:07 +00:00
s . logger . Printf ( "[WARN] autopilot: can't initialize until all servers are >= %s" , minAutopilotVersion . String ( ) )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 21:08:49 +00:00
}
2017-04-13 00:09:57 +00:00
config = s . config . AutopilotConfig
req := structs . AutopilotSetConfigRequest { Config : * config }
2017-02-24 21:08:49 +00:00
if _ , err = s . raftApply ( structs . AutopilotRequestType , req ) ; err != nil {
2017-04-13 01:38:36 +00:00
s . logger . Printf ( "[ERR] autopilot: failed to initialize config: %v" , err )
2017-12-13 01:45:03 +00:00
return nil
2017-02-24 04:32:13 +00:00
}
2017-12-13 01:45:03 +00:00
return config
2017-02-24 04:32:13 +00:00
}
2018-04-24 18:50:31 +00:00
// initializeCAConfig is used to initialize the CA config if necessary
// when setting up the CA during establishLeadership
func ( s * Server ) initializeCAConfig ( ) ( * structs . CAConfiguration , error ) {
2018-04-09 04:57:32 +00:00
state := s . fsm . State ( )
_ , config , err := state . CAConfig ( )
if err != nil {
return nil , err
}
if config != nil {
return config , nil
}
2018-05-10 16:04:33 +00:00
config = s . config . CAConfig
if config . ClusterID == "" {
id , err := uuid . GenerateUUID ( )
if err != nil {
return nil , err
}
config . ClusterID = id
2018-04-20 08:30:34 +00:00
}
2018-04-09 04:57:32 +00:00
req := structs . CARequest {
Op : structs . CAOpSetConfig ,
Config : config ,
}
if _ , err = s . raftApply ( structs . ConnectCARequestType , req ) ; err != nil {
return nil , err
}
return config , nil
}
2018-10-15 16:17:48 +00:00
// initializeRootCA runs the initialization logic for a root CA.
func ( s * Server ) initializeRootCA ( provider ca . Provider , conf * structs . CAConfiguration ) error {
2018-09-07 02:18:54 +00:00
if err := provider . Configure ( conf . ClusterID , true , conf . Config ) ; err != nil {
return fmt . Errorf ( "error configuring provider: %v" , err )
}
if err := provider . GenerateRoot ( ) ; err != nil {
return fmt . Errorf ( "error generating CA root certificate: %v" , err )
}
2018-04-09 04:57:32 +00:00
2018-04-20 08:30:34 +00:00
// Get the active root cert from the CA
2018-04-24 23:16:37 +00:00
rootPEM , err := provider . ActiveRoot ( )
2018-04-09 04:57:32 +00:00
if err != nil {
2018-04-20 08:30:34 +00:00
return fmt . Errorf ( "error getting root cert: %v" , err )
2018-04-09 04:57:32 +00:00
}
2018-10-15 16:17:48 +00:00
rootCA , err := parseCARoot ( rootPEM , conf . Provider , conf . ClusterID )
2018-04-24 23:16:37 +00:00
if err != nil {
2018-05-04 23:01:54 +00:00
return err
2018-04-24 23:16:37 +00:00
}
2018-07-11 17:00:42 +00:00
// Check if the CA root is already initialized and exit if it is,
// adding on any existing intermediate certs since they aren't directly
// tied to the provider.
2018-04-21 01:46:02 +00:00
// Every change to the CA after this initial bootstrapping should
// be done through the rotation process.
2018-04-09 04:57:32 +00:00
state := s . fsm . State ( )
2018-04-24 23:16:37 +00:00
_ , activeRoot , err := state . CARootActive ( nil )
2018-04-09 04:57:32 +00:00
if err != nil {
return err
}
2018-04-24 23:16:37 +00:00
if activeRoot != nil {
2018-07-11 16:44:30 +00:00
// This state shouldn't be possible to get into because we update the root and
// CA config in the same FSM operation.
2018-04-24 23:16:37 +00:00
if activeRoot . ID != rootCA . ID {
2018-07-11 16:44:30 +00:00
return fmt . Errorf ( "stored CA root %q is not the active root (%s)" , rootCA . ID , activeRoot . ID )
2018-04-24 18:50:31 +00:00
}
2018-07-11 16:44:30 +00:00
rootCA . IntermediateCerts = activeRoot . IntermediateCerts
s . setCAProvider ( provider , rootCA )
2018-04-09 04:57:32 +00:00
return nil
}
// Get the highest index
idx , _ , err := state . CARoots ( nil )
if err != nil {
return err
}
2018-04-20 08:30:34 +00:00
// Store the root cert in raft
2018-04-09 04:57:32 +00:00
resp , err := s . raftApply ( structs . ConnectCARequestType , & structs . CARequest {
Op : structs . CAOpSetRoots ,
Index : idx ,
2018-04-24 23:16:37 +00:00
Roots : [ ] * structs . CARoot { rootCA } ,
2018-04-09 04:57:32 +00:00
} )
if err != nil {
s . logger . Printf ( "[ERR] connect: Apply failed %v" , err )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
2018-07-11 16:44:30 +00:00
s . setCAProvider ( provider , rootCA )
2018-10-15 16:17:48 +00:00
s . logger . Printf ( "[INFO] connect: initialized primary datacenter CA with provider %q" , conf . Provider )
2018-04-09 04:57:32 +00:00
return nil
}
2018-05-04 23:01:54 +00:00
// parseCARoot returns a filled-in structs.CARoot from a raw PEM value.
2018-10-15 16:17:48 +00:00
func parseCARoot ( pemValue , provider , clusterID string ) ( * structs . CARoot , error ) {
2018-05-04 23:01:54 +00:00
id , err := connect . CalculateCertFingerprint ( pemValue )
if err != nil {
return nil , fmt . Errorf ( "error parsing root fingerprint: %v" , err )
}
rootCert , err := connect . ParseCert ( pemValue )
if err != nil {
return nil , fmt . Errorf ( "error parsing root cert: %v" , err )
}
return & structs . CARoot {
2018-10-15 16:17:48 +00:00
ID : id ,
Name : fmt . Sprintf ( "%s CA Root Cert" , strings . Title ( provider ) ) ,
SerialNumber : rootCert . SerialNumber . Uint64 ( ) ,
SigningKeyID : connect . HexString ( rootCert . AuthorityKeyId ) ,
ExternalTrustDomain : clusterID ,
NotBefore : rootCert . NotBefore ,
NotAfter : rootCert . NotAfter ,
RootCert : pemValue ,
Active : true ,
2018-05-04 23:01:54 +00:00
} , nil
}
2018-04-21 01:46:02 +00:00
// createProvider returns a connect CA provider from the given config.
2018-05-09 22:12:31 +00:00
func ( s * Server ) createCAProvider ( conf * structs . CAConfiguration ) ( ca . Provider , error ) {
2018-04-21 01:46:02 +00:00
switch conf . Provider {
case structs . ConsulCAProvider :
2018-09-11 23:43:04 +00:00
return & ca . ConsulProvider { Delegate : & consulCADelegate { s } } , nil
2018-06-13 08:40:03 +00:00
case structs . VaultCAProvider :
2018-09-11 23:43:04 +00:00
return & ca . VaultProvider { } , nil
2018-04-21 01:46:02 +00:00
default :
return nil , fmt . Errorf ( "unknown CA provider %q" , conf . Provider )
}
}
2018-06-20 11:37:36 +00:00
func ( s * Server ) getCAProvider ( ) ( ca . Provider , * structs . CARoot ) {
2018-05-09 04:27:23 +00:00
retries := 0
2018-05-09 22:12:31 +00:00
var result ca . Provider
2018-06-20 11:37:36 +00:00
var resultRoot * structs . CARoot
2018-05-09 04:32:47 +00:00
for result == nil {
s . caProviderLock . RLock ( )
result = s . caProvider
2018-06-20 11:37:36 +00:00
resultRoot = s . caProviderRoot
2018-05-09 04:32:47 +00:00
s . caProviderLock . RUnlock ( )
// In cases where an agent is started with managed proxies, we may ask
// for the provider before establishLeadership completes. If we're the
// leader, then wait and get the provider again
if result == nil && s . IsLeader ( ) && retries < 10 {
retries ++
time . Sleep ( 50 * time . Millisecond )
continue
}
2018-05-09 04:27:23 +00:00
2018-05-09 04:32:47 +00:00
break
2018-05-09 04:27:23 +00:00
}
2018-06-20 11:37:36 +00:00
return result , resultRoot
2018-04-21 01:46:02 +00:00
}
2018-06-20 11:37:36 +00:00
func ( s * Server ) setCAProvider ( newProvider ca . Provider , root * structs . CARoot ) {
2018-04-21 01:46:02 +00:00
s . caProviderLock . Lock ( )
defer s . caProviderLock . Unlock ( )
s . caProvider = newProvider
2018-06-20 11:37:36 +00:00
s . caProviderRoot = root
2018-04-21 01:46:02 +00:00
}
2018-06-21 22:42:28 +00:00
// startCARootPruning starts a goroutine that looks for stale CARoots
// and removes them from the state store.
func ( s * Server ) startCARootPruning ( ) {
s . caPruningLock . Lock ( )
defer s . caPruningLock . Unlock ( )
if s . caPruningEnabled {
return
}
s . caPruningCh = make ( chan struct { } )
go func ( ) {
ticker := time . NewTicker ( caRootPruneInterval )
defer ticker . Stop ( )
for {
select {
case <- s . caPruningCh :
return
case <- ticker . C :
if err := s . pruneCARoots ( ) ; err != nil {
s . logger . Printf ( "[ERR] connect: error pruning CA roots: %v" , err )
}
}
}
} ( )
s . caPruningEnabled = true
}
// pruneCARoots looks for any CARoots that have been rotated out and expired.
func ( s * Server ) pruneCARoots ( ) error {
2018-07-06 23:05:25 +00:00
if ! s . config . ConnectEnabled {
return nil
}
2018-07-20 23:04:04 +00:00
state := s . fsm . State ( )
idx , roots , err := state . CARoots ( nil )
if err != nil {
return err
}
_ , caConf , err := state . CAConfig ( )
if err != nil {
return err
}
common , err := caConf . GetCommonConfig ( )
2018-06-21 22:42:28 +00:00
if err != nil {
return err
}
var newRoots structs . CARoots
for _ , r := range roots {
2018-07-20 23:04:04 +00:00
if ! r . Active && ! r . RotatedOutAt . IsZero ( ) && time . Now ( ) . Sub ( r . RotatedOutAt ) > common . LeafCertTTL * 2 {
2018-06-21 22:42:28 +00:00
s . logger . Printf ( "[INFO] connect: pruning old unused root CA (ID: %s)" , r . ID )
continue
}
newRoot := * r
newRoots = append ( newRoots , & newRoot )
}
// Return early if there's nothing to remove.
if len ( newRoots ) == len ( roots ) {
return nil
}
// Commit the new root state.
var args structs . CARequest
args . Op = structs . CAOpSetRoots
args . Index = idx
args . Roots = newRoots
resp , err := s . raftApply ( structs . ConnectCARequestType , args )
if err != nil {
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
return nil
}
// stopCARootPruning stops the CARoot pruning process.
func ( s * Server ) stopCARootPruning ( ) {
s . caPruningLock . Lock ( )
defer s . caPruningLock . Unlock ( )
if ! s . caPruningEnabled {
return
}
close ( s . caPruningCh )
s . caPruningEnabled = false
}
2014-04-03 22:51:03 +00:00
// reconcileReaped is used to reconcile nodes that have failed and been reaped
2017-09-27 03:49:41 +00:00
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.
2014-04-03 22:51:03 +00:00
func ( s * Server ) reconcileReaped ( known map [ string ] struct { } ) error {
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2017-04-19 23:00:11 +00:00
_ , checks , err := state . ChecksInState ( nil , api . HealthAny )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-10-14 18:04:43 +00:00
for _ , check := range checks {
2014-04-03 22:51:03 +00:00
// Ignore any non serf checks
2017-07-14 05:33:47 +00:00
if check . CheckID != structs . SerfCheckID {
2014-04-03 22:51:03 +00:00
continue
}
// Check if this node is "known" by serf
if _ , ok := known [ check . Node ] ; ok {
continue
}
// Get the node services, look for ConsulServiceID
2017-01-20 07:36:50 +00:00
_ , services , err := state . NodeServices ( nil , check . Node )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-04-03 22:51:03 +00:00
serverPort := 0
2017-09-27 03:49:41 +00:00
serverAddr := ""
serverID := ""
CHECKS :
2014-04-03 22:51:03 +00:00
for _ , service := range services . Services {
2017-07-14 05:33:47 +00:00
if service . ID == structs . ConsulServiceID {
2017-09-27 03:49:41 +00:00
_ , node , err := state . GetNode ( check . Node )
if err != nil {
s . logger . Printf ( "[ERR] consul: Unable to look up node with name %q: %v" , check . Node , err )
continue CHECKS
}
serverAddr = node . Address
2014-04-03 22:51:03 +00:00
serverPort = service . Port
2017-09-27 03:49:41 +00:00
lookupAddr := net . JoinHostPort ( serverAddr , strconv . Itoa ( serverPort ) )
svr := s . serverLookup . Server ( raft . ServerAddress ( lookupAddr ) )
if svr != nil {
serverID = svr . ID
}
2014-04-03 22:51:03 +00:00
break
}
}
2017-09-27 03:49:41 +00:00
// Create a fake member
member := serf . Member {
Name : check . Node ,
Tags : map [ string ] string {
"dc" : s . config . Datacenter ,
"role" : "node" ,
} ,
}
2014-04-03 22:51:03 +00:00
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member . Tags [ "role" ] = "consul"
member . Tags [ "port" ] = strconv . FormatUint ( uint64 ( serverPort ) , 10 )
2017-09-27 03:49:41 +00:00
member . Tags [ "id" ] = serverID
member . Addr = net . ParseIP ( serverAddr )
2014-04-03 22:51:03 +00:00
}
// Attempt to reap this member
if err := s . handleReapMember ( member ) ; err != nil {
return err
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
if ! s . shouldHandleMember ( member ) {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[WARN] consul: skipping reconcile of node %v" , member )
2014-01-09 23:49:09 +00:00
return nil
}
2017-10-04 23:43:27 +00:00
defer metrics . MeasureSince ( [ ] string { "leader" , "reconcileMember" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
var err error
switch member . Status {
case serf . StatusAlive :
err = s . handleAliveMember ( member )
case serf . StatusFailed :
err = s . handleFailedMember ( member )
case serf . StatusLeft :
err = s . handleLeftMember ( member )
2014-03-20 19:51:49 +00:00
case StatusReap :
err = s . handleReapMember ( member )
2014-01-09 23:49:09 +00:00
}
if err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to reconcile member: %v: %v" ,
2014-01-09 23:49:09 +00:00
member , err )
2014-12-01 04:05:15 +00:00
// Permission denied should not bubble up
2017-08-23 14:52:48 +00:00
if acl . IsErrPermissionDenied ( err ) {
2014-12-01 04:05:15 +00:00
return nil
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// shouldHandleMember checks if this is a Consul pool member
func ( s * Server ) shouldHandleMember ( member serf . Member ) bool {
if valid , dc := isConsulNode ( member ) ; valid && dc == s . config . Datacenter {
return true
}
2017-08-14 14:36:07 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid &&
parts . Segment == "" &&
parts . Datacenter == s . config . Datacenter {
2014-01-09 23:49:09 +00:00
return true
}
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func ( s * Server ) handleAliveMember ( member serf . Member ) error {
2014-01-10 01:59:31 +00:00
// Register consul service if a server
var service * structs . NodeService
2017-07-06 10:48:37 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid {
2014-01-10 01:59:31 +00:00
service = & structs . NodeService {
2017-07-14 05:33:47 +00:00
ID : structs . ConsulServiceID ,
Service : structs . ConsulServiceName ,
2014-01-20 23:39:07 +00:00
Port : parts . Port ,
2014-01-10 01:59:31 +00:00
}
2014-01-10 20:55:55 +00:00
// Attempt to join the consul server
2014-01-20 23:56:29 +00:00
if err := s . joinConsulServer ( member , parts ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
2014-01-10 01:59:31 +00:00
}
2014-01-09 23:49:09 +00:00
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-10 06:12:08 +00:00
// Check if the associated service is available
if service != nil {
match := false
2017-01-20 07:36:50 +00:00
_ , services , err := state . NodeServices ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-03-05 23:03:12 +00:00
if services != nil {
2017-04-20 18:42:22 +00:00
for id := range services . Services {
2014-03-05 23:03:12 +00:00
if id == service . ID {
match = true
}
2014-01-10 06:12:08 +00:00
}
}
if ! match {
goto AFTER_CHECK
}
}
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the passing state
2017-01-24 07:37:21 +00:00
_ , checks , err := state . NodeChecks ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2017-07-14 05:33:47 +00:00
if check . CheckID == structs . SerfCheckID && check . Status == api . HealthPassing {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 06:12:08 +00:00
AFTER_CHECK :
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' joined, marking health alive" , member . Name )
2014-01-09 23:49:09 +00:00
2017-03-23 22:01:46 +00:00
// Register with the catalog.
2014-01-09 23:49:09 +00:00
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2017-03-23 22:01:46 +00:00
ID : types . NodeID ( member . Tags [ "id" ] ) ,
2014-01-09 23:49:09 +00:00
Address : member . Addr . String ( ) ,
2014-01-10 01:57:13 +00:00
Service : service ,
2014-01-09 23:49:09 +00:00
Check : & structs . HealthCheck {
Node : member . Name ,
2017-07-14 05:33:47 +00:00
CheckID : structs . SerfCheckID ,
Name : structs . SerfCheckName ,
2017-04-19 23:00:11 +00:00
Status : api . HealthPassing ,
2017-07-14 05:33:47 +00:00
Output : structs . SerfCheckAliveOutput ,
2014-01-09 23:49:09 +00:00
} ,
2017-03-23 22:01:46 +00:00
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate : true ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . RegisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func ( s * Server ) handleFailedMember ( member serf . Member ) error {
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the critical state
2017-01-24 07:37:21 +00:00
_ , checks , err := state . NodeChecks ( nil , member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2017-07-14 05:33:47 +00:00
if check . CheckID == structs . SerfCheckID && check . Status == api . HealthCritical {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' failed, marking health critical" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2017-03-23 22:01:46 +00:00
ID : types . NodeID ( member . Tags [ "id" ] ) ,
2014-01-09 23:49:09 +00:00
Address : member . Addr . String ( ) ,
Check : & structs . HealthCheck {
Node : member . Name ,
2017-07-14 05:33:47 +00:00
CheckID : structs . SerfCheckID ,
Name : structs . SerfCheckName ,
2017-04-19 23:00:11 +00:00
Status : api . HealthCritical ,
2017-07-14 05:33:47 +00:00
Output : structs . SerfCheckFailedOutput ,
2014-01-09 23:49:09 +00:00
} ,
2017-03-23 22:01:46 +00:00
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate : true ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . RegisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func ( s * Server ) handleLeftMember ( member serf . Member ) error {
2014-03-20 19:51:49 +00:00
return s . handleDeregisterMember ( "left" , member )
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func ( s * Server ) handleReapMember ( member serf . Member ) error {
return s . handleDeregisterMember ( "reaped" , member )
}
// handleDeregisterMember is used to deregister a member of a given reason
func ( s * Server ) handleDeregisterMember ( reason string , member serf . Member ) error {
2015-01-21 00:13:54 +00:00
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member . Name == s . config . NodeName {
s . logger . Printf ( "[WARN] consul: deregistering self (%s) should be done by follower" , s . config . NodeName )
2014-01-09 23:49:09 +00:00
return nil
}
2014-01-10 20:55:55 +00:00
// Remove from Raft peers if this was a server
2017-07-06 10:48:37 +00:00
if valid , parts := metadata . IsConsulServer ( member ) ; valid {
2014-01-20 23:39:07 +00:00
if err := s . removeConsulServer ( member , parts . Port ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
}
2015-10-12 07:42:09 +00:00
// Check if the node does not exist
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node == nil {
2014-10-14 05:14:43 +00:00
return nil
}
2014-01-09 23:49:09 +00:00
// Deregister the node
2014-10-14 05:14:43 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' %s, deregistering" , member . Name , reason )
2014-01-09 23:49:09 +00:00
req := structs . DeregisterRequest {
2017-03-23 20:34:30 +00:00
Datacenter : s . config . Datacenter ,
Node : member . Name ,
2014-01-09 23:49:09 +00:00
}
2017-03-23 20:34:30 +00:00
_ , err = s . raftApply ( structs . DeregisterRequestType , & req )
return err
2014-01-09 23:49:09 +00:00
}
2014-01-10 20:55:55 +00:00
// joinConsulServer is used to try to join another consul server
2017-07-06 10:48:37 +00:00
func ( s * Server ) joinConsulServer ( m serf . Member , parts * metadata . Server ) error {
2014-01-20 23:56:29 +00:00
// Check for possibility of multiple bootstrap nodes
2014-01-30 21:13:29 +00:00
if parts . Bootstrap {
2014-01-20 23:56:29 +00:00
members := s . serfLAN . Members ( )
for _ , member := range members {
2017-07-06 10:48:37 +00:00
valid , p := metadata . IsConsulServer ( member )
2014-01-30 21:13:29 +00:00
if valid && member . Name != m . Name && p . Bootstrap {
2014-01-20 23:56:29 +00:00
s . logger . Printf ( "[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
}
}
2017-09-06 20:05:51 +00:00
// Processing ourselves could result in trying to remove ourselves to
// fix up our address, which would make us step down. This is only
// safe to attempt if there are multiple servers available.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
2017-09-06 20:05:51 +00:00
if m . Name == s . config . NodeName {
if l := len ( configFuture . Configuration ( ) . Servers ) ; l < 3 {
s . logger . Printf ( "[DEBUG] consul: Skipping self join check for %q since the cluster is too small" , m . Name )
return nil
}
}
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
2017-09-06 20:07:42 +00:00
addr := ( & net . TCPAddr { IP : m . Addr , Port : parts . Port } ) . String ( )
2017-12-12 00:38:52 +00:00
minRaftProtocol , err := s . autopilot . MinRaftProtocol ( )
2017-09-06 20:07:42 +00:00
if err != nil {
return err
}
2016-07-28 19:11:28 +00:00
for _ , server := range configFuture . Configuration ( ) . Servers {
2017-03-15 23:09:55 +00:00
// No-op if the raft version is too low
if server . Address == raft . ServerAddress ( addr ) && ( minRaftProtocol < 2 || parts . RaftVersion < 3 ) {
2016-07-28 19:11:28 +00:00
return nil
}
2017-03-15 23:09:55 +00:00
// If the address or ID matches an existing server, see if we need to remove the old one first
if server . Address == raft . ServerAddress ( addr ) || server . ID == raft . ServerID ( parts . ID ) {
// Exit with no-op if this is being called on an existing server
if server . Address == raft . ServerAddress ( addr ) && server . ID == raft . ServerID ( parts . ID ) {
return nil
2017-04-21 01:59:42 +00:00
}
future := s . raft . RemoveServer ( server . ID , 0 , 0 )
if server . Address == raft . ServerAddress ( addr ) {
if err := future . Error ( ) ; err != nil {
return fmt . Errorf ( "error removing server with duplicate address %q: %s" , server . Address , err )
}
s . logger . Printf ( "[INFO] consul: removed server with duplicate address: %s" , server . Address )
2017-03-15 23:09:55 +00:00
} else {
2017-04-21 01:59:42 +00:00
if err := future . Error ( ) ; err != nil {
return fmt . Errorf ( "error removing server with duplicate ID %q: %s" , server . ID , err )
2017-03-15 23:09:55 +00:00
}
2017-04-21 01:59:42 +00:00
s . logger . Printf ( "[INFO] consul: removed server with duplicate ID: %s" , server . ID )
2017-03-15 23:09:55 +00:00
}
}
2014-01-10 20:55:55 +00:00
}
2017-02-17 18:49:16 +00:00
2017-03-15 23:09:55 +00:00
// Attempt to add as a peer
2017-03-01 22:04:40 +00:00
switch {
case minRaftProtocol >= 3 :
addFuture := s . raft . AddNonvoter ( raft . ServerID ( parts . ID ) , raft . ServerAddress ( addr ) , 0 , 0 )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
case minRaftProtocol == 2 && parts . RaftVersion >= 3 :
2017-02-22 20:53:32 +00:00
addFuture := s . raft . AddVoter ( raft . ServerID ( parts . ID ) , raft . ServerAddress ( addr ) , 0 , 0 )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
2017-03-01 22:04:40 +00:00
default :
2017-02-22 20:53:32 +00:00
addFuture := s . raft . AddPeer ( raft . ServerAddress ( addr ) )
if err := addFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
}
2017-03-01 22:04:40 +00:00
// Trigger a check to remove dead servers
2017-12-18 20:26:35 +00:00
s . autopilot . RemoveDeadServers ( )
2017-02-17 18:49:16 +00:00
2014-01-10 20:55:55 +00:00
return nil
}
2014-01-10 23:05:34 +00:00
2014-01-20 23:39:07 +00:00
// removeConsulServer is used to try to remove a consul server that has left
2014-01-10 23:05:34 +00:00
func ( s * Server ) removeConsulServer ( m serf . Member , port int ) error {
2016-07-28 19:11:28 +00:00
addr := ( & net . TCPAddr { IP : m . Addr , Port : port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-remove it
2016-07-29 16:27:15 +00:00
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
2017-02-22 20:53:32 +00:00
2017-12-12 00:38:52 +00:00
minRaftProtocol , err := s . autopilot . MinRaftProtocol ( )
2017-02-22 20:53:32 +00:00
if err != nil {
return err
}
2017-07-06 10:48:37 +00:00
_ , parts := metadata . IsConsulServer ( m )
2017-02-22 20:53:32 +00:00
// Pick which remove API to use based on how the server was added.
2016-07-28 19:11:28 +00:00
for _ , server := range configFuture . Configuration ( ) . Servers {
2017-02-22 20:53:32 +00:00
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server . ID == raft . ServerID ( parts . ID ) {
2017-02-23 21:08:40 +00:00
s . logger . Printf ( "[INFO] consul: removing server by ID: %q" , server . ID )
2017-02-22 20:53:32 +00:00
future := s . raft . RemoveServer ( raft . ServerID ( parts . ID ) , 0 , 0 )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
2017-02-23 21:08:40 +00:00
server . ID , err )
2017-02-22 20:53:32 +00:00
return err
}
break
} else if server . Address == raft . ServerAddress ( addr ) {
// If not, use the old remove API
2017-02-23 21:08:40 +00:00
s . logger . Printf ( "[INFO] consul: removing server by address: %q" , server . Address )
2017-02-22 20:53:32 +00:00
future := s . raft . RemovePeer ( raft . ServerAddress ( addr ) )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
addr , err )
return err
}
break
2016-07-28 19:11:28 +00:00
}
}
2014-01-10 23:05:34 +00:00
return nil
}
2014-12-15 23:04:21 +00:00
// reapTombstones is invoked by the current leader to manage garbage
// collection of tombstones. When a key is deleted, we trigger a tombstone
// GC clock. Once the expiration is reached, this routine is invoked
// to clear all tombstones before this index. This must be replicated
// through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking.
func ( s * Server ) reapTombstones ( index uint64 ) {
2017-10-04 23:43:27 +00:00
defer metrics . MeasureSince ( [ ] string { "leader" , "reapTombstones" } , time . Now ( ) )
2014-12-15 23:28:56 +00:00
req := structs . TombstoneRequest {
2017-03-23 20:34:30 +00:00
Datacenter : s . config . Datacenter ,
Op : structs . TombstoneReap ,
ReapIndex : index ,
2014-12-15 23:04:21 +00:00
}
2014-12-15 23:28:56 +00:00
_ , err := s . raftApply ( structs . TombstoneRequestType , & req )
2014-12-15 23:04:21 +00:00
if err != nil {
s . logger . Printf ( "[ERR] consul: failed to reap tombstones up to %d: %v" ,
index , err )
}
}