open-consul/agent/consul/leader.go

783 lines
24 KiB
Go
Raw Normal View History

2014-01-09 23:49:09 +00:00
package consul
import (
"fmt"
"net"
"strconv"
"sync"
"time"
2014-02-20 23:16:26 +00:00
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
2014-01-09 23:49:09 +00:00
"github.com/hashicorp/serf/serf"
)
const (
newLeaderEvent = "consul:new-leader"
barrierWriteTimeout = 2 * time.Minute
2014-01-09 23:49:09 +00:00
)
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (s *Server) monitorLeadership() {
// We use the notify channel we configured Raft with, NOT Raft's
// leaderCh, which is only notified best-effort. Doing this ensures
// that we get all notifications in order, which is required for
// cleanup and to ensure we never run multiple leader loops.
2017-07-06 14:09:21 +00:00
raftNotifyCh := s.raftNotifyCh
var wg sync.WaitGroup
2014-01-09 23:49:09 +00:00
var stopCh chan struct{}
for {
select {
case isLeader := <-raftNotifyCh:
2014-01-09 23:49:09 +00:00
if isLeader {
stopCh = make(chan struct{})
wg.Add(1)
go func() {
s.leaderLoop(stopCh)
wg.Done()
}()
2014-01-09 23:49:09 +00:00
s.logger.Printf("[INFO] consul: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
stopCh = nil
wg.Wait()
2014-01-09 23:49:09 +00:00
s.logger.Printf("[INFO] consul: cluster leadership lost")
}
case <-s.shutdownCh:
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
2015-09-15 12:22:08 +00:00
// maintenance activities
2014-01-09 23:49:09 +00:00
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Fire a user event indicating a new leader
payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
}
// Reconcile channel is only used once initial reconcile
// has succeeded
var reconcileCh chan serf.Member
establishedLeader := false
reassert := func() error {
if !establishedLeader {
return fmt.Errorf("leadership has not been established")
}
if err := s.revokeLeadership(); err != nil {
return err
}
if err := s.establishLeadership(); err != nil {
return err
}
return nil
}
2014-01-09 23:49:09 +00:00
RECONCILE:
// Setup a reconciliation timer
reconcileCh = nil
interval := time.After(s.config.ReconcileInterval)
2014-01-09 23:49:09 +00:00
// Apply a raft barrier to ensure our FSM is caught up
2014-02-20 23:16:26 +00:00
start := time.Now()
barrier := s.raft.Barrier(barrierWriteTimeout)
2014-01-09 23:49:09 +00:00
if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
return
2014-01-09 23:49:09 +00:00
}
2014-02-20 23:16:26 +00:00
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
2014-01-09 23:49:09 +00:00
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := s.establishLeadership(); err != nil {
2017-05-04 14:15:25 +00:00
s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err)
goto WAIT
}
establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
}
}()
}
2014-01-09 23:49:09 +00:00
// Reconcile any missing data
if err := s.reconcile(); err != nil {
s.logger.Printf("[ERR] consul: failed to reconcile: %v", err)
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s.reconcileCh
2014-01-09 23:49:09 +00:00
WAIT:
// Periodically reconcile as long as we are the leader,
// or when Serf events arrive
for {
select {
case <-stopCh:
return
case <-s.shutdownCh:
return
case <-interval:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
case errCh := <-s.reassertLeaderCh:
errCh <- reassert()
}
2014-01-09 23:49:09 +00:00
}
}
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2015-09-11 19:24:54 +00:00
// previously inflight transactions have been committed and that our
// state is up-to-date.
func (s *Server) establishLeadership() error {
// This will create the anonymous token and master token (if that is
// configured).
if err := s.initializeACL(); err != nil {
return err
}
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
2015-01-05 22:58:59 +00:00
s.tombstoneGC.SetEnabled(true)
lastIndex := s.raft.LastIndex()
s.tombstoneGC.Hint(lastIndex)
// Setup the session timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node along, effectively this means all the timers are renewed at the
// time of failover. The TTL contract is that the session will not be expired
// before the TTL, so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Sessions
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s.initializeSessionTimers(); err != nil {
return err
}
s.getOrCreateAutopilotConfig()
s.startAutopilot()
s.setConsistentReadReady()
return nil
}
2015-01-05 22:58:59 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
// Disable the tombstone GC, since it is only useful as a leader
s.tombstoneGC.SetEnabled(false)
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s.clearAllSessionTimers(); err != nil {
return err
}
s.resetConsistentReadReady()
s.stopAutopilot()
2015-01-05 22:58:59 +00:00
return nil
}
// initializeACL is used to setup the ACLs if we are the leader
// and need to do this.
func (s *Server) initializeACL() error {
// Bail if not configured or we are not authoritative.
authDC := s.config.ACLDatacenter
if len(authDC) == 0 || authDC != s.config.Datacenter {
return nil
}
// Purge the cache, since it could've changed while we were not the
// leader.
s.aclAuthCache.Purge()
// Create anonymous token if missing.
state := s.fsm.State()
_, acl, err := state.ACLGet(nil, anonymousToken)
if err != nil {
return fmt.Errorf("failed to get anonymous token: %v", err)
}
if acl == nil {
req := structs.ACLRequest{
Datacenter: authDC,
2014-10-09 19:28:07 +00:00
Op: structs.ACLSet,
ACL: structs.ACL{
ID: anonymousToken,
Name: "Anonymous Token",
Type: structs.ACLTypeClient,
},
}
_, err := s.raftApply(structs.ACLRequestType, &req)
if err != nil {
return fmt.Errorf("failed to create anonymous token: %v", err)
}
}
// Check for configured master token.
if master := s.config.ACLMasterToken; len(master) > 0 {
_, acl, err = state.ACLGet(nil, master)
if err != nil {
return fmt.Errorf("failed to get master token: %v", err)
}
if acl == nil {
req := structs.ACLRequest{
Datacenter: authDC,
Op: structs.ACLSet,
ACL: structs.ACL{
ID: master,
Name: "Master Token",
Type: structs.ACLTypeManagement,
},
}
_, err := s.raftApply(structs.ACLRequestType, &req)
if err != nil {
return fmt.Errorf("failed to create master token: %v", err)
}
s.logger.Printf("[INFO] consul: Created ACL master token from configuration")
}
}
// Check to see if we need to initialize the ACL bootstrap info. This
// needs a Consul version check since it introduces a new Raft operation
// that'll produce an error on older servers, and it also makes a piece
// of state in the state store that will cause problems with older
// servers consuming snapshots, so we have to wait to create it.
var minVersion = version.Must(version.NewVersion("0.9.1"))
if ServersMeetMinimumVersion(s.LANMembers(), minVersion) {
bs, err := state.ACLGetBootstrap()
if err != nil {
return fmt.Errorf("failed looking for ACL bootstrap info: %v", err)
}
if bs == nil {
req := structs.ACLRequest{
Datacenter: authDC,
Op: structs.ACLBootstrapInit,
}
resp, err := s.raftApply(structs.ACLRequestType, &req)
if err != nil {
return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
}
switch v := resp.(type) {
case error:
return fmt.Errorf("failed to initialize ACL bootstrap: %v", v)
case bool:
if v {
s.logger.Printf("[INFO] consul: ACL bootstrap enabled")
} else {
s.logger.Printf("[INFO] consul: ACL bootstrap disabled, existing management tokens found")
}
default:
return fmt.Errorf("unexpected response trying to initialize ACL bootstrap: %T", v)
}
}
} else {
s.logger.Printf("[WARN] consul: Can't initialize ACL bootstrap until all servers are >= %s", minVersion.String())
}
return nil
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
func (s *Server) getOrCreateAutopilotConfig() (*structs.AutopilotConfig, bool) {
state := s.fsm.State()
_, config, err := state.AutopilotConfig()
if err != nil {
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
return nil, false
}
if config != nil {
return config, true
}
if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
return nil, false
}
config = s.config.AutopilotConfig
req := structs.AutopilotSetConfigRequest{Config: *config}
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
return nil, false
}
return config, true
}
2014-01-09 23:49:09 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
// Mainly we need to ensure all live nodes are registered, all failed
// nodes are marked as such, and all left nodes are de-registered.
func (s *Server) reconcile() (err error) {
2014-02-20 23:16:26 +00:00
defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now())
2014-01-09 23:49:09 +00:00
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
2014-01-09 23:49:09 +00:00
for _, member := range members {
if err := s.reconcileMember(member); err != nil {
return err
}
knownMembers[member.Name] = struct{}{}
}
// Reconcile any members that have been reaped while we were not the leader
return s.reconcileReaped(knownMembers)
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
2015-09-15 12:22:08 +00:00
// in a critical state that does not correspond to a known Serf member. We generate
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State()
_, checks, err := state.ChecksInState(nil, api.HealthAny)
if err != nil {
return err
}
2014-10-14 18:04:43 +00:00
for _, check := range checks {
// Ignore any non serf checks
if check.CheckID != structs.SerfCheckID {
continue
}
// Check if this node is "known" by serf
if _, ok := known[check.Node]; ok {
continue
}
// Create a fake member
member := serf.Member{
Name: check.Node,
Tags: map[string]string{
"dc": s.config.Datacenter,
"role": "node",
},
}
// Get the node services, look for ConsulServiceID
_, services, err := state.NodeServices(nil, check.Node)
if err != nil {
return err
}
serverPort := 0
for _, service := range services.Services {
if service.ID == structs.ConsulServiceID {
serverPort = service.Port
break
}
}
// Create the appropriate tags if this was a server node
if serverPort > 0 {
member.Tags["role"] = "consul"
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
}
// Attempt to reap this member
if err := s.handleReapMember(member); err != nil {
return err
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func (s *Server) reconcileMember(member serf.Member) error {
// Check if this is a member we should handle
if !s.shouldHandleMember(member) {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[WARN] consul: skipping reconcile of node %v", member)
2014-01-09 23:49:09 +00:00
return nil
}
2014-02-20 23:16:26 +00:00
defer metrics.MeasureSince([]string{"consul", "leader", "reconcileMember"}, time.Now())
2014-01-09 23:49:09 +00:00
var err error
switch member.Status {
case serf.StatusAlive:
err = s.handleAliveMember(member)
case serf.StatusFailed:
err = s.handleFailedMember(member)
case serf.StatusLeft:
err = s.handleLeftMember(member)
2014-03-20 19:51:49 +00:00
case StatusReap:
err = s.handleReapMember(member)
2014-01-09 23:49:09 +00:00
}
if err != nil {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v",
2014-01-09 23:49:09 +00:00
member, err)
// Permission denied should not bubble up
if acl.IsErrPermissionDenied(err) {
return nil
}
2014-01-09 23:49:09 +00:00
}
return nil
}
// shouldHandleMember checks if this is a Consul pool member
func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true
}
if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
2014-01-09 23:49:09 +00:00
return true
}
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func (s *Server) handleAliveMember(member serf.Member) error {
// Register consul service if a server
var service *structs.NodeService
if valid, parts := metadata.IsConsulServer(member); valid {
service = &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
2014-01-20 23:39:07 +00:00
Port: parts.Port,
}
// Attempt to join the consul server
if err := s.joinConsulServer(member, parts); err != nil {
return err
}
}
2014-01-09 23:49:09 +00:00
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
// Check if the associated service is available
if service != nil {
match := false
_, services, err := state.NodeServices(nil, member.Name)
if err != nil {
return err
}
if services != nil {
for id := range services.Services {
if id == service.ID {
match = true
}
}
}
if !match {
goto AFTER_CHECK
}
}
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the passing state
_, checks, err := state.NodeChecks(nil, member.Name)
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
AFTER_CHECK:
s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name)
2014-01-09 23:49:09 +00:00
// Register with the catalog.
2014-01-09 23:49:09 +00:00
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
2014-01-09 23:49:09 +00:00
Address: member.Addr.String(),
2014-01-10 01:57:13 +00:00
Service: service,
2014-01-09 23:49:09 +00:00
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthPassing,
Output: structs.SerfCheckAliveOutput,
2014-01-09 23:49:09 +00:00
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
2014-01-09 23:49:09 +00:00
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
2014-01-09 23:49:09 +00:00
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func (s *Server) handleFailedMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
2014-01-09 23:49:09 +00:00
// Check if the serfCheck is in the critical state
_, checks, err := state.NodeChecks(nil, member.Name)
if err != nil {
return err
}
2014-01-09 23:49:09 +00:00
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical {
2014-01-09 23:49:09 +00:00
return nil
}
}
}
s.logger.Printf("[INFO] consul: member '%s' failed, marking health critical", member.Name)
2014-01-09 23:49:09 +00:00
// Register with the catalog
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
2014-01-09 23:49:09 +00:00
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: structs.SerfCheckFailedOutput,
2014-01-09 23:49:09 +00:00
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
2014-01-09 23:49:09 +00:00
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
2014-01-09 23:49:09 +00:00
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func (s *Server) handleLeftMember(member serf.Member) error {
2014-03-20 19:51:49 +00:00
return s.handleDeregisterMember("left", member)
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func (s *Server) handleReapMember(member serf.Member) error {
return s.handleDeregisterMember("reaped", member)
}
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if member.Name == s.config.NodeName {
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
2014-01-09 23:49:09 +00:00
return nil
}
// Remove from Raft peers if this was a server
if valid, parts := metadata.IsConsulServer(member); valid {
2014-01-20 23:39:07 +00:00
if err := s.removeConsulServer(member, parts.Port); err != nil {
return err
}
}
// Check if the node does not exist
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
if err != nil {
return err
}
if node == nil {
return nil
}
2014-01-09 23:49:09 +00:00
// Deregister the node
s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason)
2014-01-09 23:49:09 +00:00
req := structs.DeregisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
2014-01-09 23:49:09 +00:00
}
_, err = s.raftApply(structs.DeregisterRequestType, &req)
return err
2014-01-09 23:49:09 +00:00
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
// Do not join ourself
if m.Name == s.config.NodeName {
return nil
}
// Check for possibility of multiple bootstrap nodes
2014-01-30 21:13:29 +00:00
if parts.Bootstrap {
members := s.serfLAN.Members()
for _, member := range members {
valid, p := metadata.IsConsulServer(member)
2014-01-30 21:13:29 +00:00
if valid && member.Name != m.Name && p.Bootstrap {
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
return nil
}
}
}
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries. If the address is the same but the ID changed, remove the
// old server before adding the new one.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
for _, server := range configFuture.Configuration().Servers {
// No-op if the raft version is too low
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
return nil
}
// If the address or ID matches an existing server, see if we need to remove the old one first
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
// Exit with no-op if this is being called on an existing server
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
}
future := s.raft.RemoveServer(server.ID, 0, 0)
if server.Address == raft.ServerAddress(addr) {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
}
s.logger.Printf("[INFO] consul: removed server with duplicate address: %s", server.Address)
} else {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
}
s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID)
}
}
}
// Attempt to add as a peer
switch {
case minRaftProtocol >= 3:
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
case minRaftProtocol == 2 && parts.RaftVersion >= 3:
2017-02-22 20:53:32 +00:00
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
default:
2017-02-22 20:53:32 +00:00
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
return err
}
}
// Trigger a check to remove dead servers
2017-03-07 21:58:06 +00:00
select {
case s.autopilotRemoveDeadCh <- struct{}{}:
default:
}
return nil
}
2014-01-20 23:39:07 +00:00
// removeConsulServer is used to try to remove a consul server that has left
func (s *Server) removeConsulServer(m serf.Member, port int) error {
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
return err
}
2017-02-22 20:53:32 +00:00
minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
if err != nil {
return err
}
_, parts := metadata.IsConsulServer(m)
2017-02-22 20:53:32 +00:00
// Pick which remove API to use based on how the server was added.
for _, server := range configFuture.Configuration().Servers {
2017-02-22 20:53:32 +00:00
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID)
2017-02-22 20:53:32 +00:00
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
server.ID, err)
2017-02-22 20:53:32 +00:00
return err
}
break
} else if server.Address == raft.ServerAddress(addr) {
// If not, use the old remove API
s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address)
2017-02-22 20:53:32 +00:00
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
addr, err)
return err
}
break
}
}
return nil
}
// reapTombstones is invoked by the current leader to manage garbage
// collection of tombstones. When a key is deleted, we trigger a tombstone
// GC clock. Once the expiration is reached, this routine is invoked
// to clear all tombstones before this index. This must be replicated
// through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking.
func (s *Server) reapTombstones(index uint64) {
2014-12-19 00:09:02 +00:00
defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now())
req := structs.TombstoneRequest{
Datacenter: s.config.Datacenter,
Op: structs.TombstoneReap,
ReapIndex: index,
}
_, err := s.raftApply(structs.TombstoneRequestType, &req)
if err != nil {
s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v",
index, err)
}
}