2014-01-09 23:49:09 +00:00
package consul
import (
2014-02-20 23:16:26 +00:00
"github.com/armon/go-metrics"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/consul/consul/structs"
2014-01-10 20:55:55 +00:00
"github.com/hashicorp/raft"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/serf/serf"
2014-01-10 20:55:55 +00:00
"net"
2014-04-03 22:51:03 +00:00
"strconv"
2014-01-09 23:49:09 +00:00
"time"
)
const (
2014-06-09 23:07:22 +00:00
SerfCheckID = "serfHealth"
SerfCheckName = "Serf Health Status"
SerfCheckAliveOutput = "Agent alive and reachable"
SerfCheckFailedOutput = "Agent not live or unreachable"
ConsulServiceID = "consul"
ConsulServiceName = "consul"
newLeaderEvent = "consul:new-leader"
2014-01-09 23:49:09 +00:00
)
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
leaderCh := s . raft . LeaderCh ( )
var stopCh chan struct { }
for {
select {
case isLeader := <- leaderCh :
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] consul: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] consul: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2014-02-19 20:36:27 +00:00
// Fire a user event indicating a new leader
payload := [ ] byte ( s . config . NodeName )
if err := s . serfLAN . UserEvent ( newLeaderEvent , payload , false ) ; err != nil {
s . logger . Printf ( "[WARN] consul: failed to broadcast new leader event: %v" , err )
}
2014-01-10 20:55:55 +00:00
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf . Member
2014-01-09 23:49:09 +00:00
RECONCILE :
2014-01-10 20:55:55 +00:00
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
2014-01-09 23:49:09 +00:00
// Apply a raft barrier to ensure our FSM is caught up
2014-02-20 23:16:26 +00:00
start := time . Now ( )
2014-01-09 23:49:09 +00:00
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to wait for barrier: %v" , err )
goto WAIT
}
2014-02-20 23:16:26 +00:00
metrics . MeasureSince ( [ ] string { "consul" , "leader" , "barrier" } , start )
2014-01-09 23:49:09 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to reconcile: %v" , err )
goto WAIT
}
2014-01-10 20:55:55 +00:00
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
2014-01-09 23:49:09 +00:00
WAIT :
2014-01-10 20:55:55 +00:00
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
s . reconcileMember ( member )
}
2014-01-09 23:49:09 +00:00
}
}
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
// Mainly we need to ensure all live nodes are registered, all failed
// nodes are marked as such, and all left nodes are de-registered.
func ( s * Server ) reconcile ( ) ( err error ) {
2014-02-20 23:16:26 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "reconcile" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
members := s . serfLAN . Members ( )
2014-04-03 22:51:03 +00:00
knownMembers := make ( map [ string ] struct { } )
2014-01-09 23:49:09 +00:00
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
2014-04-03 22:51:03 +00:00
knownMembers [ member . Name ] = struct { } { }
}
// Reconcile any members that have been reaped while we were not the leader
return s . reconcileReaped ( knownMembers )
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
// in a crticial state that does not correspond to a known Serf member. We generate
// a "reap" event to cause the node to be cleaned up.
func ( s * Server ) reconcileReaped ( known map [ string ] struct { } ) error {
state := s . fsm . State ( )
_ , critical := state . ChecksInState ( structs . HealthCritical )
for _ , check := range critical {
// Ignore any non serf checks
if check . CheckID != SerfCheckID {
continue
}
// Check if this node is "known" by serf
if _ , ok := known [ check . Node ] ; ok {
continue
}
// Create a fake member
member := serf . Member {
Name : check . Node ,
Tags : map [ string ] string {
"dc" : s . config . Datacenter ,
"role" : "node" ,
} ,
}
// Get the node services, look for ConsulServiceID
_ , services := state . NodeServices ( check . Node )
serverPort := 0
for _ , service := range services . Services {
if service . ID == ConsulServiceID {
serverPort = service . Port
break
}
}
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member . Tags [ "role" ] = "consul"
member . Tags [ "port" ] = strconv . FormatUint ( uint64 ( serverPort ) , 10 )
}
// Attempt to reap this member
if err := s . handleReapMember ( member ) ; err != nil {
return err
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
if ! s . shouldHandleMember ( member ) {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[WARN] consul: skipping reconcile of node %v" , member )
2014-01-09 23:49:09 +00:00
return nil
}
2014-02-20 23:16:26 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "reconcileMember" } , time . Now ( ) )
2014-01-09 23:49:09 +00:00
var err error
switch member . Status {
case serf . StatusAlive :
err = s . handleAliveMember ( member )
case serf . StatusFailed :
err = s . handleFailedMember ( member )
case serf . StatusLeft :
err = s . handleLeftMember ( member )
2014-03-20 19:51:49 +00:00
case StatusReap :
err = s . handleReapMember ( member )
2014-01-09 23:49:09 +00:00
}
if err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to reconcile member: %v: %v" ,
2014-01-09 23:49:09 +00:00
member , err )
return err
}
return nil
}
// shouldHandleMember checks if this is a Consul pool member
func ( s * Server ) shouldHandleMember ( member serf . Member ) bool {
if valid , dc := isConsulNode ( member ) ; valid && dc == s . config . Datacenter {
return true
}
2014-01-20 23:39:07 +00:00
if valid , parts := isConsulServer ( member ) ; valid && parts . Datacenter == s . config . Datacenter {
2014-01-09 23:49:09 +00:00
return true
}
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func ( s * Server ) handleAliveMember ( member serf . Member ) error {
state := s . fsm . State ( )
2014-01-10 01:59:31 +00:00
// Register consul service if a server
var service * structs . NodeService
2014-01-20 23:39:07 +00:00
if valid , parts := isConsulServer ( member ) ; valid {
2014-01-10 01:59:31 +00:00
service = & structs . NodeService {
2014-01-16 03:27:37 +00:00
ID : ConsulServiceID ,
Service : ConsulServiceName ,
2014-01-20 23:39:07 +00:00
Port : parts . Port ,
2014-01-10 01:59:31 +00:00
}
2014-01-10 20:55:55 +00:00
// Attempt to join the consul server
2014-01-20 23:56:29 +00:00
if err := s . joinConsulServer ( member , parts ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
2014-01-10 01:59:31 +00:00
}
2014-01-09 23:49:09 +00:00
// Check if the node exists
2014-02-05 02:33:15 +00:00
_ , found , addr := state . GetNode ( member . Name )
2014-01-10 06:12:08 +00:00
if found && addr == member . Addr . String ( ) {
// Check if the associated service is available
if service != nil {
match := false
2014-02-05 02:33:15 +00:00
_ , services := state . NodeServices ( member . Name )
2014-03-05 23:03:12 +00:00
if services != nil {
for id , _ := range services . Services {
if id == service . ID {
match = true
}
2014-01-10 06:12:08 +00:00
}
}
if ! match {
goto AFTER_CHECK
}
}
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the passing state
2014-02-05 02:33:15 +00:00
_ , checks := state . NodeChecks ( member . Name )
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2014-01-16 03:24:16 +00:00
if check . CheckID == SerfCheckID && check . Status == structs . HealthPassing {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 06:12:08 +00:00
AFTER_CHECK :
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' joined, marking health alive" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
Address : member . Addr . String ( ) ,
2014-01-10 01:57:13 +00:00
Service : service ,
2014-01-09 23:49:09 +00:00
Check : & structs . HealthCheck {
Node : member . Name ,
2014-01-16 03:24:16 +00:00
CheckID : SerfCheckID ,
Name : SerfCheckName ,
2014-01-09 23:49:09 +00:00
Status : structs . HealthPassing ,
2014-06-09 23:07:22 +00:00
Output : SerfCheckAliveOutput ,
2014-01-09 23:49:09 +00:00
} ,
}
var out struct { }
return s . endpoints . Catalog . Register ( & req , & out )
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func ( s * Server ) handleFailedMember ( member serf . Member ) error {
state := s . fsm . State ( )
// Check if the node exists
2014-02-05 02:33:15 +00:00
_ , found , addr := state . GetNode ( member . Name )
2014-01-09 23:49:09 +00:00
if found && addr == member . Addr . String ( ) {
// Check if the serfCheck is in the critical state
2014-02-05 02:33:15 +00:00
_ , checks := state . NodeChecks ( member . Name )
2014-01-09 23:49:09 +00:00
for _ , check := range checks {
2014-01-16 03:24:16 +00:00
if check . CheckID == SerfCheckID && check . Status == structs . HealthCritical {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
2014-01-10 01:46:33 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' failed, marking health critical" , member . Name )
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs . RegisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
Address : member . Addr . String ( ) ,
Check : & structs . HealthCheck {
Node : member . Name ,
2014-01-16 03:24:16 +00:00
CheckID : SerfCheckID ,
Name : SerfCheckName ,
2014-01-09 23:49:09 +00:00
Status : structs . HealthCritical ,
2014-06-09 23:07:22 +00:00
Output : SerfCheckFailedOutput ,
2014-01-09 23:49:09 +00:00
} ,
}
var out struct { }
return s . endpoints . Catalog . Register ( & req , & out )
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func ( s * Server ) handleLeftMember ( member serf . Member ) error {
2014-03-20 19:51:49 +00:00
return s . handleDeregisterMember ( "left" , member )
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func ( s * Server ) handleReapMember ( member serf . Member ) error {
return s . handleDeregisterMember ( "reaped" , member )
}
// handleDeregisterMember is used to deregister a member of a given reason
func ( s * Server ) handleDeregisterMember ( reason string , member serf . Member ) error {
2014-01-09 23:49:09 +00:00
state := s . fsm . State ( )
// Check if the node does not exists
2014-02-05 02:33:15 +00:00
_ , found , _ := state . GetNode ( member . Name )
2014-01-09 23:49:09 +00:00
if ! found {
return nil
}
2014-03-20 19:51:49 +00:00
s . logger . Printf ( "[INFO] consul: member '%s' %s, deregistering" , member . Name , reason )
2014-01-09 23:49:09 +00:00
2014-01-10 20:55:55 +00:00
// Remove from Raft peers if this was a server
2014-01-20 23:39:07 +00:00
if valid , parts := isConsulServer ( member ) ; valid {
if err := s . removeConsulServer ( member , parts . Port ) ; err != nil {
2014-01-10 20:55:55 +00:00
return err
}
}
2014-01-09 23:49:09 +00:00
// Deregister the node
req := structs . DeregisterRequest {
Datacenter : s . config . Datacenter ,
Node : member . Name ,
}
var out struct { }
return s . endpoints . Catalog . Deregister ( & req , & out )
}
2014-01-10 20:55:55 +00:00
// joinConsulServer is used to try to join another consul server
2014-01-20 23:56:29 +00:00
func ( s * Server ) joinConsulServer ( m serf . Member , parts * serverParts ) error {
2014-01-10 20:55:55 +00:00
// Do not join ourself
if m . Name == s . config . NodeName {
return nil
}
2014-01-20 23:56:29 +00:00
// Check for possibility of multiple bootstrap nodes
2014-01-30 21:13:29 +00:00
if parts . Bootstrap {
2014-01-20 23:56:29 +00:00
members := s . serfLAN . Members ( )
for _ , member := range members {
valid , p := isConsulServer ( member )
2014-01-30 21:13:29 +00:00
if valid && member . Name != m . Name && p . Bootstrap {
2014-01-20 23:56:29 +00:00
s . logger . Printf ( "[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
}
}
2014-01-10 20:55:55 +00:00
// Attempt to add as a peer
2014-01-20 23:56:29 +00:00
var addr net . Addr = & net . TCPAddr { IP : m . Addr , Port : parts . Port }
2014-01-10 20:55:55 +00:00
future := s . raft . AddPeer ( addr )
2014-05-02 01:11:30 +00:00
if err := future . Error ( ) ; err != nil && err != raft . ErrKnownPeer {
2014-01-10 20:55:55 +00:00
s . logger . Printf ( "[ERR] consul: failed to add raft peer: %v" , err )
return err
}
return nil
}
2014-01-10 23:05:34 +00:00
2014-01-20 23:39:07 +00:00
// removeConsulServer is used to try to remove a consul server that has left
2014-01-10 23:05:34 +00:00
func ( s * Server ) removeConsulServer ( m serf . Member , port int ) error {
// Do not remove ourself
if m . Name == s . config . NodeName {
return nil
}
// Attempt to remove as peer
peer := & net . TCPAddr { IP : m . Addr , Port : port }
future := s . raft . RemovePeer ( peer )
2014-05-02 01:11:30 +00:00
if err := future . Error ( ) ; err != nil && err != raft . ErrUnknownPeer {
2014-01-10 23:05:34 +00:00
s . logger . Printf ( "[ERR] consul: failed to remove raft peer '%v': %v" ,
peer , err )
return err
}
return nil
}