993 lines
29 KiB
Go
993 lines
29 KiB
Go
package consul
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
ca "github.com/hashicorp/consul/agent/connect/ca"
|
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
|
"github.com/hashicorp/consul/agent/metadata"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/types"
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/go-version"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
const (
|
|
newLeaderEvent = "consul:new-leader"
|
|
barrierWriteTimeout = 2 * time.Minute
|
|
)
|
|
|
|
var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
|
|
|
|
// monitorLeadership is used to monitor if we acquire or lose our role
|
|
// as the leader in the Raft cluster. There is some work the leader is
|
|
// expected to do, so we must react to changes
|
|
func (s *Server) monitorLeadership() {
|
|
// We use the notify channel we configured Raft with, NOT Raft's
|
|
// leaderCh, which is only notified best-effort. Doing this ensures
|
|
// that we get all notifications in order, which is required for
|
|
// cleanup and to ensure we never run multiple leader loops.
|
|
raftNotifyCh := s.raftNotifyCh
|
|
|
|
var weAreLeaderCh chan struct{}
|
|
var leaderLoop sync.WaitGroup
|
|
for {
|
|
select {
|
|
case isLeader := <-raftNotifyCh:
|
|
switch {
|
|
case isLeader:
|
|
if weAreLeaderCh != nil {
|
|
s.logger.Printf("[ERR] consul: attempted to start the leader loop while running")
|
|
continue
|
|
}
|
|
|
|
weAreLeaderCh = make(chan struct{})
|
|
leaderLoop.Add(1)
|
|
go func(ch chan struct{}) {
|
|
defer leaderLoop.Done()
|
|
s.leaderLoop(ch)
|
|
}(weAreLeaderCh)
|
|
s.logger.Printf("[INFO] consul: cluster leadership acquired")
|
|
|
|
default:
|
|
if weAreLeaderCh == nil {
|
|
s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running")
|
|
continue
|
|
}
|
|
|
|
s.logger.Printf("[DEBUG] consul: shutting down leader loop")
|
|
close(weAreLeaderCh)
|
|
leaderLoop.Wait()
|
|
weAreLeaderCh = nil
|
|
s.logger.Printf("[INFO] consul: cluster leadership lost")
|
|
}
|
|
|
|
case <-s.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// leaderLoop runs as long as we are the leader to run various
|
|
// maintenance activities
|
|
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
|
// Fire a user event indicating a new leader
|
|
payload := []byte(s.config.NodeName)
|
|
for name, segment := range s.LANSegments() {
|
|
if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
|
|
s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err)
|
|
}
|
|
}
|
|
|
|
// Reconcile channel is only used once initial reconcile
|
|
// has succeeded
|
|
var reconcileCh chan serf.Member
|
|
establishedLeader := false
|
|
|
|
reassert := func() error {
|
|
if !establishedLeader {
|
|
return fmt.Errorf("leadership has not been established")
|
|
}
|
|
if err := s.revokeLeadership(); err != nil {
|
|
return err
|
|
}
|
|
if err := s.establishLeadership(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
RECONCILE:
|
|
// Setup a reconciliation timer
|
|
reconcileCh = nil
|
|
interval := time.After(s.config.ReconcileInterval)
|
|
|
|
// Apply a raft barrier to ensure our FSM is caught up
|
|
start := time.Now()
|
|
barrier := s.raft.Barrier(barrierWriteTimeout)
|
|
if err := barrier.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
|
|
goto WAIT
|
|
}
|
|
metrics.MeasureSince([]string{"leader", "barrier"}, start)
|
|
|
|
// Check if we need to handle initial leadership actions
|
|
if !establishedLeader {
|
|
if err := s.establishLeadership(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to establish leadership: %v", err)
|
|
// Immediately revoke leadership since we didn't successfully
|
|
// establish leadership.
|
|
if err := s.revokeLeadership(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
|
|
}
|
|
goto WAIT
|
|
}
|
|
establishedLeader = true
|
|
defer func() {
|
|
if err := s.revokeLeadership(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Reconcile any missing data
|
|
if err := s.reconcile(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to reconcile: %v", err)
|
|
goto WAIT
|
|
}
|
|
|
|
// Initial reconcile worked, now we can process the channel
|
|
// updates
|
|
reconcileCh = s.reconcileCh
|
|
|
|
WAIT:
|
|
// Poll the stop channel to give it priority so we don't waste time
|
|
// trying to perform the other operations if we have been asked to shut
|
|
// down.
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Periodically reconcile as long as we are the leader,
|
|
// or when Serf events arrive
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-s.shutdownCh:
|
|
return
|
|
case <-interval:
|
|
goto RECONCILE
|
|
case member := <-reconcileCh:
|
|
s.reconcileMember(member)
|
|
case index := <-s.tombstoneGC.ExpireCh():
|
|
go s.reapTombstones(index)
|
|
case errCh := <-s.reassertLeaderCh:
|
|
errCh <- reassert()
|
|
}
|
|
}
|
|
}
|
|
|
|
// establishLeadership is invoked once we become leader and are able
|
|
// to invoke an initial barrier. The barrier is used to ensure any
|
|
// previously inflight transactions have been committed and that our
|
|
// state is up-to-date.
|
|
func (s *Server) establishLeadership() error {
|
|
// This will create the anonymous token and master token (if that is
|
|
// configured).
|
|
if err := s.initializeACL(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Hint the tombstone expiration timer. When we freshly establish leadership
|
|
// we become the authoritative timer, and so we need to start the clock
|
|
// on any pending GC events.
|
|
s.tombstoneGC.SetEnabled(true)
|
|
lastIndex := s.raft.LastIndex()
|
|
s.tombstoneGC.Hint(lastIndex)
|
|
|
|
// Setup the session timers. This is done both when starting up or when
|
|
// a leader fail over happens. Since the timers are maintained by the leader
|
|
// node along, effectively this means all the timers are renewed at the
|
|
// time of failover. The TTL contract is that the session will not be expired
|
|
// before the TTL, so expiring it later is allowable.
|
|
//
|
|
// This MUST be done after the initial barrier to ensure the latest Sessions
|
|
// are available to be initialized. Otherwise initialization may use stale
|
|
// data.
|
|
if err := s.initializeSessionTimers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.getOrCreateAutopilotConfig()
|
|
s.autopilot.Start()
|
|
|
|
// todo(kyhavlov): start a goroutine here for handling periodic CA rotation
|
|
if err := s.initializeCA(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.setConsistentReadReady()
|
|
return nil
|
|
}
|
|
|
|
// revokeLeadership is invoked once we step down as leader.
|
|
// This is used to cleanup any state that may be specific to a leader.
|
|
func (s *Server) revokeLeadership() error {
|
|
// Disable the tombstone GC, since it is only useful as a leader
|
|
s.tombstoneGC.SetEnabled(false)
|
|
|
|
// Clear the session timers on either shutdown or step down, since we
|
|
// are no longer responsible for session expirations.
|
|
if err := s.clearAllSessionTimers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.setCAProvider(nil)
|
|
|
|
s.resetConsistentReadReady()
|
|
s.autopilot.Stop()
|
|
return nil
|
|
}
|
|
|
|
// initializeACL is used to setup the ACLs if we are the leader
|
|
// and need to do this.
|
|
func (s *Server) initializeACL() error {
|
|
// Bail if not configured or we are not authoritative.
|
|
authDC := s.config.ACLDatacenter
|
|
if len(authDC) == 0 || authDC != s.config.Datacenter {
|
|
return nil
|
|
}
|
|
|
|
// Purge the cache, since it could've changed while we were not the
|
|
// leader.
|
|
s.aclAuthCache.Purge()
|
|
|
|
// Create anonymous token if missing.
|
|
state := s.fsm.State()
|
|
_, acl, err := state.ACLGet(nil, anonymousToken)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get anonymous token: %v", err)
|
|
}
|
|
if acl == nil {
|
|
req := structs.ACLRequest{
|
|
Datacenter: authDC,
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
ID: anonymousToken,
|
|
Name: "Anonymous Token",
|
|
Type: structs.ACLTypeClient,
|
|
},
|
|
}
|
|
_, err := s.raftApply(structs.ACLRequestType, &req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create anonymous token: %v", err)
|
|
}
|
|
}
|
|
|
|
// Check for configured master token.
|
|
if master := s.config.ACLMasterToken; len(master) > 0 {
|
|
_, acl, err = state.ACLGet(nil, master)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get master token: %v", err)
|
|
}
|
|
if acl == nil {
|
|
req := structs.ACLRequest{
|
|
Datacenter: authDC,
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
ID: master,
|
|
Name: "Master Token",
|
|
Type: structs.ACLTypeManagement,
|
|
},
|
|
}
|
|
_, err := s.raftApply(structs.ACLRequestType, &req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create master token: %v", err)
|
|
}
|
|
s.logger.Printf("[INFO] consul: Created ACL master token from configuration")
|
|
}
|
|
}
|
|
|
|
// Check to see if we need to initialize the ACL bootstrap info. This
|
|
// needs a Consul version check since it introduces a new Raft operation
|
|
// that'll produce an error on older servers, and it also makes a piece
|
|
// of state in the state store that will cause problems with older
|
|
// servers consuming snapshots, so we have to wait to create it.
|
|
var minVersion = version.Must(version.NewVersion("0.9.1"))
|
|
if ServersMeetMinimumVersion(s.LANMembers(), minVersion) {
|
|
bs, err := state.ACLGetBootstrap()
|
|
if err != nil {
|
|
return fmt.Errorf("failed looking for ACL bootstrap info: %v", err)
|
|
}
|
|
if bs == nil {
|
|
req := structs.ACLRequest{
|
|
Datacenter: authDC,
|
|
Op: structs.ACLBootstrapInit,
|
|
}
|
|
resp, err := s.raftApply(structs.ACLRequestType, &req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
|
|
}
|
|
switch v := resp.(type) {
|
|
case error:
|
|
return fmt.Errorf("failed to initialize ACL bootstrap: %v", v)
|
|
|
|
case bool:
|
|
if v {
|
|
s.logger.Printf("[INFO] consul: ACL bootstrap enabled")
|
|
} else {
|
|
s.logger.Printf("[INFO] consul: ACL bootstrap disabled, existing management tokens found")
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("unexpected response trying to initialize ACL bootstrap: %T", v)
|
|
}
|
|
}
|
|
} else {
|
|
s.logger.Printf("[WARN] consul: Can't initialize ACL bootstrap until all servers are >= %s", minVersion.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
|
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
|
|
state := s.fsm.State()
|
|
_, config, err := state.AutopilotConfig()
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] autopilot: failed to get config: %v", err)
|
|
return nil
|
|
}
|
|
if config != nil {
|
|
return config
|
|
}
|
|
|
|
if !ServersMeetMinimumVersion(s.LANMembers(), minAutopilotVersion) {
|
|
s.logger.Printf("[WARN] autopilot: can't initialize until all servers are >= %s", minAutopilotVersion.String())
|
|
return nil
|
|
}
|
|
|
|
config = s.config.AutopilotConfig
|
|
req := structs.AutopilotSetConfigRequest{Config: *config}
|
|
if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
|
|
s.logger.Printf("[ERR] autopilot: failed to initialize config: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
// initializeCAConfig is used to initialize the CA config if necessary
|
|
// when setting up the CA during establishLeadership
|
|
func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) {
|
|
state := s.fsm.State()
|
|
_, config, err := state.CAConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if config != nil {
|
|
return config, nil
|
|
}
|
|
|
|
config = s.config.CAConfig
|
|
if config.ClusterID == "" {
|
|
id, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config.ClusterID = id
|
|
}
|
|
|
|
req := structs.CARequest{
|
|
Op: structs.CAOpSetConfig,
|
|
Config: config,
|
|
}
|
|
if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// initializeCA sets up the CA provider when gaining leadership, bootstrapping
|
|
// the root in the state store if necessary.
|
|
func (s *Server) initializeCA() error {
|
|
// Bail if connect isn't enabled.
|
|
if !s.config.ConnectEnabled {
|
|
return nil
|
|
}
|
|
|
|
conf, err := s.initializeCAConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Initialize the right provider based on the config
|
|
provider, err := s.createCAProvider(conf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.setCAProvider(provider)
|
|
|
|
// Get the active root cert from the CA
|
|
rootPEM, err := provider.ActiveRoot()
|
|
if err != nil {
|
|
return fmt.Errorf("error getting root cert: %v", err)
|
|
}
|
|
|
|
rootCA, err := parseCARoot(rootPEM, conf.Provider)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if the CA root is already initialized and exit if it is.
|
|
// Every change to the CA after this initial bootstrapping should
|
|
// be done through the rotation process.
|
|
state := s.fsm.State()
|
|
_, activeRoot, err := state.CARootActive(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if activeRoot != nil {
|
|
if activeRoot.ID != rootCA.ID {
|
|
s.logger.Printf("[WARN] connect: CA root %q is not the active root (%q)", rootCA.ID, activeRoot.ID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Get the highest index
|
|
idx, _, err := state.CARoots(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store the root cert in raft
|
|
resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
|
Op: structs.CAOpSetRoots,
|
|
Index: idx,
|
|
Roots: []*structs.CARoot{rootCA},
|
|
})
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] connect: Apply failed %v", err)
|
|
return err
|
|
}
|
|
if respErr, ok := resp.(error); ok {
|
|
return respErr
|
|
}
|
|
|
|
s.logger.Printf("[INFO] connect: initialized CA with provider %q", conf.Provider)
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseCARoot returns a filled-in structs.CARoot from a raw PEM value.
|
|
func parseCARoot(pemValue, provider string) (*structs.CARoot, error) {
|
|
id, err := connect.CalculateCertFingerprint(pemValue)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing root fingerprint: %v", err)
|
|
}
|
|
rootCert, err := connect.ParseCert(pemValue)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing root cert: %v", err)
|
|
}
|
|
return &structs.CARoot{
|
|
ID: id,
|
|
Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)),
|
|
SerialNumber: rootCert.SerialNumber.Uint64(),
|
|
SigningKeyID: connect.HexString(rootCert.AuthorityKeyId),
|
|
NotBefore: rootCert.NotBefore,
|
|
NotAfter: rootCert.NotAfter,
|
|
RootCert: pemValue,
|
|
Active: true,
|
|
}, nil
|
|
}
|
|
|
|
// createProvider returns a connect CA provider from the given config.
|
|
func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
|
|
switch conf.Provider {
|
|
case structs.ConsulCAProvider:
|
|
return ca.NewConsulProvider(conf.Config, &consulCADelegate{s})
|
|
default:
|
|
return nil, fmt.Errorf("unknown CA provider %q", conf.Provider)
|
|
}
|
|
}
|
|
|
|
func (s *Server) getCAProvider() ca.Provider {
|
|
retries := 0
|
|
var result ca.Provider
|
|
for result == nil {
|
|
s.caProviderLock.RLock()
|
|
result = s.caProvider
|
|
s.caProviderLock.RUnlock()
|
|
|
|
// In cases where an agent is started with managed proxies, we may ask
|
|
// for the provider before establishLeadership completes. If we're the
|
|
// leader, then wait and get the provider again
|
|
if result == nil && s.IsLeader() && retries < 10 {
|
|
retries++
|
|
time.Sleep(50 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (s *Server) setCAProvider(newProvider ca.Provider) {
|
|
s.caProviderLock.Lock()
|
|
defer s.caProviderLock.Unlock()
|
|
s.caProvider = newProvider
|
|
}
|
|
|
|
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
|
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
|
// We generate a "reap" event to cause the node to be cleaned up.
|
|
func (s *Server) reconcileReaped(known map[string]struct{}) error {
|
|
state := s.fsm.State()
|
|
_, checks, err := state.ChecksInState(nil, api.HealthAny)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, check := range checks {
|
|
// Ignore any non serf checks
|
|
if check.CheckID != structs.SerfCheckID {
|
|
continue
|
|
}
|
|
|
|
// Check if this node is "known" by serf
|
|
if _, ok := known[check.Node]; ok {
|
|
continue
|
|
}
|
|
|
|
// Get the node services, look for ConsulServiceID
|
|
_, services, err := state.NodeServices(nil, check.Node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
serverPort := 0
|
|
serverAddr := ""
|
|
serverID := ""
|
|
|
|
CHECKS:
|
|
for _, service := range services.Services {
|
|
if service.ID == structs.ConsulServiceID {
|
|
_, node, err := state.GetNode(check.Node)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul: Unable to look up node with name %q: %v", check.Node, err)
|
|
continue CHECKS
|
|
}
|
|
|
|
serverAddr = node.Address
|
|
serverPort = service.Port
|
|
lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort))
|
|
svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr))
|
|
if svr != nil {
|
|
serverID = svr.ID
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// Create a fake member
|
|
member := serf.Member{
|
|
Name: check.Node,
|
|
Tags: map[string]string{
|
|
"dc": s.config.Datacenter,
|
|
"role": "node",
|
|
},
|
|
}
|
|
|
|
// Create the appropriate tags if this was a server node
|
|
if serverPort > 0 {
|
|
member.Tags["role"] = "consul"
|
|
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
|
|
member.Tags["id"] = serverID
|
|
member.Addr = net.ParseIP(serverAddr)
|
|
}
|
|
|
|
// Attempt to reap this member
|
|
if err := s.handleReapMember(member); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// reconcileMember is used to do an async reconcile of a single
|
|
// serf member
|
|
func (s *Server) reconcileMember(member serf.Member) error {
|
|
// Check if this is a member we should handle
|
|
if !s.shouldHandleMember(member) {
|
|
s.logger.Printf("[WARN] consul: skipping reconcile of node %v", member)
|
|
return nil
|
|
}
|
|
defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now())
|
|
var err error
|
|
switch member.Status {
|
|
case serf.StatusAlive:
|
|
err = s.handleAliveMember(member)
|
|
case serf.StatusFailed:
|
|
err = s.handleFailedMember(member)
|
|
case serf.StatusLeft:
|
|
err = s.handleLeftMember(member)
|
|
case StatusReap:
|
|
err = s.handleReapMember(member)
|
|
}
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to reconcile member: %v: %v",
|
|
member, err)
|
|
|
|
// Permission denied should not bubble up
|
|
if acl.IsErrPermissionDenied(err) {
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// shouldHandleMember checks if this is a Consul pool member
|
|
func (s *Server) shouldHandleMember(member serf.Member) bool {
|
|
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
|
|
return true
|
|
}
|
|
if valid, parts := metadata.IsConsulServer(member); valid &&
|
|
parts.Segment == "" &&
|
|
parts.Datacenter == s.config.Datacenter {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// handleAliveMember is used to ensure the node
|
|
// is registered, with a passing health check.
|
|
func (s *Server) handleAliveMember(member serf.Member) error {
|
|
// Register consul service if a server
|
|
var service *structs.NodeService
|
|
if valid, parts := metadata.IsConsulServer(member); valid {
|
|
service = &structs.NodeService{
|
|
ID: structs.ConsulServiceID,
|
|
Service: structs.ConsulServiceName,
|
|
Port: parts.Port,
|
|
}
|
|
|
|
// Attempt to join the consul server
|
|
if err := s.joinConsulServer(member, parts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check if the node exists
|
|
state := s.fsm.State()
|
|
_, node, err := state.GetNode(member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if node != nil && node.Address == member.Addr.String() {
|
|
// Check if the associated service is available
|
|
if service != nil {
|
|
match := false
|
|
_, services, err := state.NodeServices(nil, member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if services != nil {
|
|
for id := range services.Services {
|
|
if id == service.ID {
|
|
match = true
|
|
}
|
|
}
|
|
}
|
|
if !match {
|
|
goto AFTER_CHECK
|
|
}
|
|
}
|
|
|
|
// Check if the serfCheck is in the passing state
|
|
_, checks, err := state.NodeChecks(nil, member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, check := range checks {
|
|
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
AFTER_CHECK:
|
|
s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name)
|
|
|
|
// Register with the catalog.
|
|
req := structs.RegisterRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
Node: member.Name,
|
|
ID: types.NodeID(member.Tags["id"]),
|
|
Address: member.Addr.String(),
|
|
Service: service,
|
|
Check: &structs.HealthCheck{
|
|
Node: member.Name,
|
|
CheckID: structs.SerfCheckID,
|
|
Name: structs.SerfCheckName,
|
|
Status: api.HealthPassing,
|
|
Output: structs.SerfCheckAliveOutput,
|
|
},
|
|
|
|
// If there's existing information about the node, do not
|
|
// clobber it.
|
|
SkipNodeUpdate: true,
|
|
}
|
|
_, err = s.raftApply(structs.RegisterRequestType, &req)
|
|
return err
|
|
}
|
|
|
|
// handleFailedMember is used to mark the node's status
|
|
// as being critical, along with all checks as unknown.
|
|
func (s *Server) handleFailedMember(member serf.Member) error {
|
|
// Check if the node exists
|
|
state := s.fsm.State()
|
|
_, node, err := state.GetNode(member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if node != nil && node.Address == member.Addr.String() {
|
|
// Check if the serfCheck is in the critical state
|
|
_, checks, err := state.NodeChecks(nil, member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, check := range checks {
|
|
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
s.logger.Printf("[INFO] consul: member '%s' failed, marking health critical", member.Name)
|
|
|
|
// Register with the catalog
|
|
req := structs.RegisterRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
Node: member.Name,
|
|
ID: types.NodeID(member.Tags["id"]),
|
|
Address: member.Addr.String(),
|
|
Check: &structs.HealthCheck{
|
|
Node: member.Name,
|
|
CheckID: structs.SerfCheckID,
|
|
Name: structs.SerfCheckName,
|
|
Status: api.HealthCritical,
|
|
Output: structs.SerfCheckFailedOutput,
|
|
},
|
|
|
|
// If there's existing information about the node, do not
|
|
// clobber it.
|
|
SkipNodeUpdate: true,
|
|
}
|
|
_, err = s.raftApply(structs.RegisterRequestType, &req)
|
|
return err
|
|
}
|
|
|
|
// handleLeftMember is used to handle members that gracefully
|
|
// left. They are deregistered if necessary.
|
|
func (s *Server) handleLeftMember(member serf.Member) error {
|
|
return s.handleDeregisterMember("left", member)
|
|
}
|
|
|
|
// handleReapMember is used to handle members that have been
|
|
// reaped after a prolonged failure. They are deregistered.
|
|
func (s *Server) handleReapMember(member serf.Member) error {
|
|
return s.handleDeregisterMember("reaped", member)
|
|
}
|
|
|
|
// handleDeregisterMember is used to deregister a member of a given reason
|
|
func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
|
|
// Do not deregister ourself. This can only happen if the current leader
|
|
// is leaving. Instead, we should allow a follower to take-over and
|
|
// deregister us later.
|
|
if member.Name == s.config.NodeName {
|
|
s.logger.Printf("[WARN] consul: deregistering self (%s) should be done by follower", s.config.NodeName)
|
|
return nil
|
|
}
|
|
|
|
// Remove from Raft peers if this was a server
|
|
if valid, parts := metadata.IsConsulServer(member); valid {
|
|
if err := s.removeConsulServer(member, parts.Port); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check if the node does not exist
|
|
state := s.fsm.State()
|
|
_, node, err := state.GetNode(member.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
|
|
// Deregister the node
|
|
s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason)
|
|
req := structs.DeregisterRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
Node: member.Name,
|
|
}
|
|
_, err = s.raftApply(structs.DeregisterRequestType, &req)
|
|
return err
|
|
}
|
|
|
|
// joinConsulServer is used to try to join another consul server
|
|
func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
|
|
// Check for possibility of multiple bootstrap nodes
|
|
if parts.Bootstrap {
|
|
members := s.serfLAN.Members()
|
|
for _, member := range members {
|
|
valid, p := metadata.IsConsulServer(member)
|
|
if valid && member.Name != m.Name && p.Bootstrap {
|
|
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Processing ourselves could result in trying to remove ourselves to
|
|
// fix up our address, which would make us step down. This is only
|
|
// safe to attempt if there are multiple servers available.
|
|
configFuture := s.raft.GetConfiguration()
|
|
if err := configFuture.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
|
|
return err
|
|
}
|
|
if m.Name == s.config.NodeName {
|
|
if l := len(configFuture.Configuration().Servers); l < 3 {
|
|
s.logger.Printf("[DEBUG] consul: Skipping self join check for %q since the cluster is too small", m.Name)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// See if it's already in the configuration. It's harmless to re-add it
|
|
// but we want to avoid doing that if possible to prevent useless Raft
|
|
// log entries. If the address is the same but the ID changed, remove the
|
|
// old server before adding the new one.
|
|
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
|
|
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, server := range configFuture.Configuration().Servers {
|
|
// No-op if the raft version is too low
|
|
if server.Address == raft.ServerAddress(addr) && (minRaftProtocol < 2 || parts.RaftVersion < 3) {
|
|
return nil
|
|
}
|
|
|
|
// If the address or ID matches an existing server, see if we need to remove the old one first
|
|
if server.Address == raft.ServerAddress(addr) || server.ID == raft.ServerID(parts.ID) {
|
|
// Exit with no-op if this is being called on an existing server
|
|
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
|
|
return nil
|
|
}
|
|
future := s.raft.RemoveServer(server.ID, 0, 0)
|
|
if server.Address == raft.ServerAddress(addr) {
|
|
if err := future.Error(); err != nil {
|
|
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
|
|
}
|
|
s.logger.Printf("[INFO] consul: removed server with duplicate address: %s", server.Address)
|
|
} else {
|
|
if err := future.Error(); err != nil {
|
|
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
|
|
}
|
|
s.logger.Printf("[INFO] consul: removed server with duplicate ID: %s", server.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Attempt to add as a peer
|
|
switch {
|
|
case minRaftProtocol >= 3:
|
|
addFuture := s.raft.AddNonvoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
|
return err
|
|
}
|
|
case minRaftProtocol == 2 && parts.RaftVersion >= 3:
|
|
addFuture := s.raft.AddVoter(raft.ServerID(parts.ID), raft.ServerAddress(addr), 0, 0)
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
|
return err
|
|
}
|
|
default:
|
|
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
|
|
if err := addFuture.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to add raft peer: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Trigger a check to remove dead servers
|
|
s.autopilot.RemoveDeadServers()
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeConsulServer is used to try to remove a consul server that has left
|
|
func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
|
addr := (&net.TCPAddr{IP: m.Addr, Port: port}).String()
|
|
|
|
// See if it's already in the configuration. It's harmless to re-remove it
|
|
// but we want to avoid doing that if possible to prevent useless Raft
|
|
// log entries.
|
|
configFuture := s.raft.GetConfiguration()
|
|
if err := configFuture.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to get raft configuration: %v", err)
|
|
return err
|
|
}
|
|
|
|
minRaftProtocol, err := s.autopilot.MinRaftProtocol()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, parts := metadata.IsConsulServer(m)
|
|
|
|
// Pick which remove API to use based on how the server was added.
|
|
for _, server := range configFuture.Configuration().Servers {
|
|
// If we understand the new add/remove APIs and the server was added by ID, use the new remove API
|
|
if minRaftProtocol >= 2 && server.ID == raft.ServerID(parts.ID) {
|
|
s.logger.Printf("[INFO] consul: removing server by ID: %q", server.ID)
|
|
future := s.raft.RemoveServer(raft.ServerID(parts.ID), 0, 0)
|
|
if err := future.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
|
server.ID, err)
|
|
return err
|
|
}
|
|
break
|
|
} else if server.Address == raft.ServerAddress(addr) {
|
|
// If not, use the old remove API
|
|
s.logger.Printf("[INFO] consul: removing server by address: %q", server.Address)
|
|
future := s.raft.RemovePeer(raft.ServerAddress(addr))
|
|
if err := future.Error(); err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to remove raft peer '%v': %v",
|
|
addr, err)
|
|
return err
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reapTombstones is invoked by the current leader to manage garbage
|
|
// collection of tombstones. When a key is deleted, we trigger a tombstone
|
|
// GC clock. Once the expiration is reached, this routine is invoked
|
|
// to clear all tombstones before this index. This must be replicated
|
|
// through Raft to ensure consistency. We do this outside the leader loop
|
|
// to avoid blocking.
|
|
func (s *Server) reapTombstones(index uint64) {
|
|
defer metrics.MeasureSince([]string{"leader", "reapTombstones"}, time.Now())
|
|
req := structs.TombstoneRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
Op: structs.TombstoneReap,
|
|
ReapIndex: index,
|
|
}
|
|
_, err := s.raftApply(structs.TombstoneRequestType, &req)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v",
|
|
index, err)
|
|
}
|
|
}
|