2014-01-09 23:49:09 +00:00
package consul
import (
2014-08-11 21:54:18 +00:00
"fmt"
2014-06-16 21:36:12 +00:00
"net"
"strconv"
2014-12-01 04:05:15 +00:00
"strings"
2014-06-16 21:36:12 +00:00
"time"
2014-02-20 23:16:26 +00:00
"github.com/armon/go-metrics"
2016-03-30 00:39:19 +00:00
"github.com/hashicorp/consul/consul/agent"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/consul/consul/structs"
2016-06-07 20:24:51 +00:00
"github.com/hashicorp/consul/types"
2014-01-10 20:55:55 +00:00
"github.com/hashicorp/raft"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/serf/serf"
)
const (
2016-06-07 20:24:51 +00:00
SerfCheckID types . CheckID = "serfHealth"
SerfCheckName = "Serf Health Status"
SerfCheckAliveOutput = "Agent alive and reachable"
SerfCheckFailedOutput = "Agent not live or unreachable"
ConsulServiceID = "consul"
ConsulServiceName = "consul"
newLeaderEvent = "consul:new-leader"
2014-01-09 23:49:09 +00:00
)
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
leaderCh := s . raft . LeaderCh ( )
var stopCh chan struct { }
for {
select {
case isLeader := <- leaderCh :
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] consul: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] consul: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
2015-09-15 12:22:08 +00:00
// maintenance activities
2014-01-09 23:49:09 +00:00
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2015-01-05 22:58:59 +00:00
// Ensure we revoke leadership on stepdown
defer s . revokeLeadership ( )
2014-02-19 20:36:27 +00:00
// Fire a user event indicating a new leader
payload := [ ] byte ( s . config . NodeName )
if err := s . serfLAN . UserEvent ( newLeaderEvent , payload , false ) ; err != nil {
s . logger . Printf ( "[WARN] consul: failed to broadcast new leader event: %v" , err )
}
2014-01-10 20:55:55 +00:00
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf . Member
2014-12-13 05:42:24 +00:00
establishedLeader := false
2014-01-10 20:55:55 +00:00
2014-01-09 23:49:09 +00:00
RECONCILE :
2014-01-10 20:55:55 +00:00
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
2014-01-09 23:49:09 +00:00
// Apply a raft barrier to ensure our FSM is caught up
2014-02-20 23:16:26 +00:00
start := time . Now ( )
2014-01-09 23:49:09 +00:00
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to wait for barrier: %v" , err )
goto WAIT
}
2014-02-20 23:16:26 +00:00
metrics . MeasureSince ( [ ] string { "consul" , "leader" , "barrier" } , start )
2014-01-09 23:49:09 +00:00
2014-12-13 05:42:24 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
if err := s . establishLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to establish leadership: %v" ,
err )
goto WAIT
}
establishedLeader = true
}
2014-01-09 23:49:09 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to reconcile: %v" , err )
goto WAIT
}
2014-01-10 20:55:55 +00:00
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
2014-01-09 23:49:09 +00:00
WAIT :
2014-01-10 20:55:55 +00:00
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
s . reconcileMember ( member )
2014-12-15 23:04:21 +00:00
case index := <- s . tombstoneGC . ExpireCh ( ) :
go s . reapTombstones ( index )
2014-01-10 20:55:55 +00:00
}
2014-01-09 23:49:09 +00:00
}
}
2014-12-13 05:42:24 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2015-09-11 19:24:54 +00:00
// previously inflight transactions have been committed and that our
2014-12-13 05:42:24 +00:00
// state is up-to-date.
func ( s * Server ) establishLeadership ( ) error {
2014-12-15 22:37:49 +00:00
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
2015-01-05 22:58:59 +00:00
s . tombstoneGC . SetEnabled ( true )
2014-12-15 22:37:49 +00:00
lastIndex := s . raft . LastIndex ( )
s . tombstoneGC . Hint ( lastIndex )
s . logger . Printf ( "[DEBUG] consul: reset tombstone GC to index %d" , lastIndex )
2014-12-13 05:42:24 +00:00
// Setup ACLs if we are the leader and need to
if err := s . initializeACL ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: ACL initialization failed: %v" , err )
return err
}
// Setup the session timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node along, effectively this means all the timers are renewed at the
// time of failover. The TTL contract is that the session will not be expired
// before the TTL, so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Sessions
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeSessionTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: Session Timers initialization failed: %v" ,
err )
return err
}
return nil
}
2015-01-05 22:58:59 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
// Disable the tombstone GC, since it is only useful as a leader
s . tombstoneGC . SetEnabled ( false )
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s . clearAllSessionTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: Clearing session timers failed: %v" , err )
return err
}
return nil
}
2014-08-11 21:54:18 +00:00
// initializeACL is used to setup the ACLs if we are the leader
// and need to do this.
func ( s * Server ) initializeACL ( ) error {
// Bail if not configured or we are not authoritative
authDC := s . config . ACLDatacenter
if len ( authDC ) == 0 || authDC != s . config . Datacenter {
return nil
}
// Purge the cache, since it could've changed while we
// were not the leader
s . aclAuthCache . Purge ( )
// Look for the anonymous token
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , acl , err := state . ACLGet ( anonymousToken )
2014-08-11 21:54:18 +00:00
if err != nil {
return fmt . Errorf ( "failed to get anonymous token: %v" , err )
}
// Create anonymous token if missing
if acl == nil {
req := structs . ACLRequest {
Datacenter : authDC ,
2014-10-09 19:28:07 +00:00
Op : structs . ACLSet ,
2014-08-11 21:54:18 +00:00
ACL : structs . ACL {
ID : anonymousToken ,
Name : "Anonymous Token" ,
Type : structs . ACLTypeClient ,
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create anonymous token: %v" , err )
}
}
// Check for configured master token
master := s . config . ACLMasterToken
if len ( master ) == 0 {
return nil
}
// Look for the master token
2015-10-14 02:18:43 +00:00
_ , acl , err = state . ACLGet ( master )
2014-08-11 21:54:18 +00:00
if err != nil {
return fmt . Errorf ( "failed to get master token: %v" , err )
}
if acl == nil {
req := structs . ACLRequest {
Datacenter : authDC ,
2014-10-09 19:28:07 +00:00
Op : structs . ACLSet ,
2014-08-11 21:54:18 +00:00
ACL : structs . ACL {
ID : master ,
Name : "Master Token" ,
Type : structs . ACLTypeManagement ,
} ,
}
_ , err := s . raftApply ( structs . ACLRequestType , & req )
if err != nil {
return fmt . Errorf ( "failed to create master token: %v" , err )
}
}
return nil
}
2014-01-09 23:49:09 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
// Mainly we need to ensure all live nodes are registered, all failed
// nodes are marked as such, and all left nodes are de-registered.
func ( s * Server ) reconcile ( ) ( err error ) {
2014-02-20 23:16:26 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "reconcile" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
members := s . serfLAN . Members ( )
2014-04-03 22:51:03 +00:00
knownMembers := make ( map [ string ] struct { } )
2014-01-09 23:49:09 +00:00
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
2014-04-03 22:51:03 +00:00
knownMembers [ member . Name ] = struct { } { }
}
// Reconcile any members that have been reaped while we were not the leader
return s . reconcileReaped ( knownMembers )
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
2015-09-15 12:22:08 +00:00
// in a critical state that does not correspond to a known Serf member. We generate
2014-04-03 22:51:03 +00:00
// a "reap" event to cause the node to be cleaned up.
func ( s * Server ) reconcileReaped ( known map [ string ] struct { } ) error {
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-12 07:42:09 +00:00
_ , checks , err := state . ChecksInState ( structs . HealthAny )
if err != nil {
return err
}
2014-10-14 18:04:43 +00:00
for _ , check := range checks {
2014-04-03 22:51:03 +00:00
// Ignore any non serf checks
if check . CheckID != SerfCheckID {
continue
}
// Check if this node is "known" by serf
if _ , ok := known [ check . Node ] ; ok {
continue
}
// Create a fake member
member := serf . Member {
Name : check . Node ,
Tags : map [ string ] string {
"dc" : s . config . Datacenter ,
"role" : "node" ,
} ,
}
// Get the node services, look for ConsulServiceID
2015-10-12 07:42:09 +00:00
_ , services , err := state . NodeServices ( check . Node )
if err != nil {
return err
}
2014-04-03 22:51:03 +00:00
serverPort := 0
for _ , service := range services . Services {
if service . ID == ConsulServiceID {
serverPort = service . Port
break
}
}
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member . Tags [ "role" ] = "consul"
member . Tags [ "port" ] = strconv . FormatUint ( uint64 ( serverPort ) , 10 )
}
// Attempt to reap this member
if err := s . handleReapMember ( member ) ; err != nil {
return err
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
if ! s . shouldHandleMember ( member ) {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[WARN] consul: skipping reconcile of node %v" , member )
2014-01-09 23:49:09 +00:00
return nil
}
2014-02-20 23:16:26 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "reconcileMember" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
var err error
switch member . Status {
case serf . StatusAlive :
err = s . handleAliveMember ( member )
case serf . StatusFailed :
err = s . handleFailedMember ( member )
case serf . StatusLeft :
err = s . handleLeftMember ( member )
2014-03-20 19:51:49 +00:00
case StatusReap :
err = s . handleReapMember ( member )
2014-01-09 23:49:09 +00:00
}
if err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to reconcile member: %v: %v" ,
2014-01-09 23:49:09 +00:00
member , err )
2014-12-01 04:05:15 +00:00
// Permission denied should not bubble up
if strings . Contains ( err . Error ( ) , permissionDenied ) {
return nil
}
2014-01-09 23:49:09 +00:00
return err
}
return nil
}
// shouldHandleMember checks if this is a Consul pool member
func ( s * Server ) shouldHandleMember ( member serf . Member ) bool {
if valid , dc := isConsulNode ( member ) ; valid && dc == s . config . Datacenter {
return true
}
2016-03-30 00:39:19 +00:00
if valid , parts := agent . IsConsulServer ( member ) ; valid && parts . Datacenter == s . config . Datacenter {
2014-01-09 23:49:09 +00:00
return true
}
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func ( s * Server ) handleAliveMember ( member serf . Member ) error {
2014-01-10 01:59:31 +00:00
// Register consul service if a server
var service * structs . NodeService
2016-03-30 00:39:19 +00:00
if valid , parts := agent . IsConsulServer ( member ) ; valid {
2014-01-10 01:59:31 +00:00
service = & structs . NodeService {
2014-01-16 03:27:37 +00:00
ID : ConsulServiceID ,
Service : ConsulServiceName ,
2014-01-20 23:39:07 +00:00
Port : parts . Port ,
2014-01-10 01:59:31 +00:00
}
2014-01-10 20:55:55 +00:00
// Attempt to join the consul server
2014-01-20 23:56:29 +00:00
if err := s . joinConsulServer ( member , parts ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
2014-01-10 01:59:31 +00:00
}
2014-01-09 23:49:09 +00:00
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-10 06:12:08 +00:00
// Check if the associated service is available
if service != nil {
match := false
2015-10-12 07:42:09 +00:00
_ , services , err := state . NodeServices ( member . Name )
if err != nil {
return err
}
2014-03-05 23:03:12 +00:00
if services != nil {
for id , _ := range services . Services {
if id == service . ID {
match = true
}
2014-01-10 06:12:08 +00:00
}
}
if ! match {
goto AFTER_CHECK
}
}
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the passing state
2015-10-12 07:42:09 +00:00
_ , checks , err := state . NodeChecks ( member . Name )
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2014-01-16 03:24:16 +00:00
if check . CheckID == SerfCheckID && check . Status == structs . HealthPassing {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 06:12:08 +00:00
AFTER_CHECK :
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' joined, marking health alive" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
Address : member . Addr . String ( ) ,
2014-01-10 01:57:13 +00:00
Service : service ,
2014-01-09 23:49:09 +00:00
Check : & structs . HealthCheck {
Node : member . Name ,
2014-01-16 03:24:16 +00:00
CheckID : SerfCheckID ,
Name : SerfCheckName ,
2014-01-09 23:49:09 +00:00
Status : structs . HealthPassing ,
2014-06-09 23:07:22 +00:00
Output : SerfCheckAliveOutput ,
2014-01-09 23:49:09 +00:00
} ,
2016-12-11 19:24:44 +00:00
WriteRequest : structs . WriteRequest { Token : s . config . GetTokenForAgent ( ) } ,
2014-01-09 23:49:09 +00:00
}
var out struct { }
return s . endpoints . Catalog . Register ( & req , & out )
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func ( s * Server ) handleFailedMember ( member serf . Member ) error {
// Check if the node exists
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node != nil && node . Address == member . Addr . String ( ) {
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the critical state
2015-10-12 07:42:09 +00:00
_ , checks , err := state . NodeChecks ( member . Name )
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2014-01-16 03:24:16 +00:00
if check . CheckID == SerfCheckID && check . Status == structs . HealthCritical {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' failed, marking health critical" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
Address : member . Addr . String ( ) ,
Check : & structs . HealthCheck {
Node : member . Name ,
2014-01-16 03:24:16 +00:00
CheckID : SerfCheckID ,
Name : SerfCheckName ,
2014-01-09 23:49:09 +00:00
Status : structs . HealthCritical ,
2014-06-09 23:07:22 +00:00
Output : SerfCheckFailedOutput ,
2014-01-09 23:49:09 +00:00
} ,
2016-12-11 19:24:44 +00:00
WriteRequest : structs . WriteRequest { Token : s . config . GetTokenForAgent ( ) } ,
2014-01-09 23:49:09 +00:00
}
var out struct { }
return s . endpoints . Catalog . Register ( & req , & out )
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func ( s * Server ) handleLeftMember ( member serf . Member ) error {
2014-03-20 19:51:49 +00:00
return s . handleDeregisterMember ( "left" , member )
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func ( s * Server ) handleReapMember ( member serf . Member ) error {
return s . handleDeregisterMember ( "reaped" , member )
}
// handleDeregisterMember is used to deregister a member of a given reason
func ( s * Server ) handleDeregisterMember ( reason string , member serf . Member ) error {
2015-01-21 00:13:54 +00:00
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member . Name == s . config . NodeName {
s . logger . Printf ( "[WARN] consul: deregistering self (%s) should be done by follower" , s . config . NodeName )
2014-01-09 23:49:09 +00:00
return nil
}
2014-01-10 20:55:55 +00:00
// Remove from Raft peers if this was a server
2016-03-30 00:39:19 +00:00
if valid , parts := agent . IsConsulServer ( member ) ; valid {
2014-01-20 23:39:07 +00:00
if err := s . removeConsulServer ( member , parts . Port ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
}
2015-10-12 07:42:09 +00:00
// Check if the node does not exist
2015-10-13 05:21:39 +00:00
state := s . fsm . State ( )
2015-10-14 02:18:43 +00:00
_ , node , err := state . GetNode ( member . Name )
2015-10-12 07:42:09 +00:00
if err != nil {
2015-10-20 20:37:11 +00:00
return err
2015-10-12 07:42:09 +00:00
}
if node == nil {
2014-10-14 05:14:43 +00:00
return nil
}
2014-01-09 23:49:09 +00:00
// Deregister the node
2014-10-14 05:14:43 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' %s, deregistering" , member . Name , reason )
2014-01-09 23:49:09 +00:00
req := structs . DeregisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
}
var out struct { }
return s . endpoints . Catalog . Deregister ( & req , & out )
}
2014-01-10 20:55:55 +00:00
// joinConsulServer is used to try to join another consul server
2016-03-30 00:39:19 +00:00
func ( s * Server ) joinConsulServer ( m serf . Member , parts * agent . Server ) error {
2014-01-10 20:55:55 +00:00
// Do not join ourself
if m . Name == s . config . NodeName {
return nil
}
2014-01-20 23:56:29 +00:00
// Check for possibility of multiple bootstrap nodes
2014-01-30 21:13:29 +00:00
if parts . Bootstrap {
2014-01-20 23:56:29 +00:00
members := s . serfLAN . Members ( )
for _ , member := range members {
2016-03-30 00:39:19 +00:00
valid , p := agent . IsConsulServer ( member )
2014-01-30 21:13:29 +00:00
if valid && member . Name != m . Name && p . Bootstrap {
2014-01-20 23:56:29 +00:00
s . logger . Printf ( "[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
}
}
2016-07-28 19:11:28 +00:00
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := ( & net . TCPAddr { IP : m . Addr , Port : parts . Port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-add it
2016-07-29 16:27:15 +00:00
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
for _ , server := range configFuture . Configuration ( ) . Servers {
if server . Address == raft . ServerAddress ( addr ) {
return nil
}
}
2014-01-10 20:55:55 +00:00
// Attempt to add as a peer
2016-07-28 19:11:28 +00:00
addFuture := s . raft . AddPeer ( raft . ServerAddress ( addr ) )
if err := addFuture . Error ( ) ; err != nil {
2014-01-10 20:55:55 +00:00
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
return nil
}
2014-01-10 23:05:34 +00:00
2014-01-20 23:39:07 +00:00
// removeConsulServer is used to try to remove a consul server that has left
2014-01-10 23:05:34 +00:00
func ( s * Server ) removeConsulServer ( m serf . Member , port int ) error {
2016-07-28 19:11:28 +00:00
// TODO (slackpad) - This will need to be changed once we support node IDs.
addr := ( & net . TCPAddr { IP : m . Addr , Port : port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-remove it
2016-07-29 16:27:15 +00:00
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
2016-07-28 19:11:28 +00:00
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
return err
}
for _ , server := range configFuture . Configuration ( ) . Servers {
if server . Address == raft . ServerAddress ( addr ) {
goto REMOVE
}
}
return nil
REMOVE :
// Attempt to remove as a peer.
future := s . raft . RemovePeer ( raft . ServerAddress ( addr ) )
if err := future . Error ( ) ; err != nil {
2014-01-10 23:05:34 +00:00
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
2016-07-28 19:11:28 +00:00
addr , err )
2014-01-10 23:05:34 +00:00
return err
}
return nil
}
2014-12-15 23:04:21 +00:00
// reapTombstones is invoked by the current leader to manage garbage
// collection of tombstones. When a key is deleted, we trigger a tombstone
// GC clock. Once the expiration is reached, this routine is invoked
// to clear all tombstones before this index. This must be replicated
// through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking.
func ( s * Server ) reapTombstones ( index uint64 ) {
2014-12-19 00:09:02 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "reapTombstones" } , time . Now ( ) )
2014-12-15 23:28:56 +00:00
req := structs . TombstoneRequest {
2014-12-15 23:04:21 +00:00
Datacenter : s . config . Datacenter ,
2014-12-15 23:28:56 +00:00
Op : structs . TombstoneReap ,
2014-12-15 23:04:21 +00:00
ReapIndex : index ,
2016-12-11 19:24:44 +00:00
WriteRequest : structs . WriteRequest { Token : s . config . GetTokenForAgent ( ) } ,
2014-12-15 23:04:21 +00:00
}
2014-12-15 23:28:56 +00:00
_ , err := s . raftApply ( structs . TombstoneRequestType , & req )
2014-12-15 23:04:21 +00:00
if err != nil {
s . logger . Printf ( "[ERR] consul: failed to reap tombstones up to %d: %v" ,
index , err )
}
}