2013-12-06 23:43:07 +00:00
package consul
import (
2014-04-04 23:30:06 +00:00
"crypto/tls"
2014-06-11 02:12:26 +00:00
"errors"
2013-12-06 23:43:07 +00:00
"fmt"
2016-10-26 02:20:24 +00:00
"io"
2016-07-30 07:54:08 +00:00
"io/ioutil"
2013-12-06 23:43:07 +00:00
"log"
2013-12-07 00:35:13 +00:00
"net"
"net/rpc"
2013-12-06 23:43:07 +00:00
"os"
"path/filepath"
2014-06-11 02:12:26 +00:00
"reflect"
2014-02-24 00:37:33 +00:00
"strconv"
2013-12-06 23:43:07 +00:00
"sync"
2017-06-16 03:41:30 +00:00
"sync/atomic"
2017-06-16 15:49:54 +00:00
"time"
2017-06-16 03:41:30 +00:00
2014-08-08 22:32:43 +00:00
"github.com/hashicorp/consul/acl"
pkg refactor
command/agent/* -> agent/*
command/consul/* -> agent/consul/*
command/agent/command{,_test}.go -> command/agent{,_test}.go
command/base/command.go -> command/base.go
command/base/* -> command/*
commands.go -> command/commands.go
The script which did the refactor is:
(
cd $GOPATH/src/github.com/hashicorp/consul
git mv command/agent/command.go command/agent.go
git mv command/agent/command_test.go command/agent_test.go
git mv command/agent/flag_slice_value{,_test}.go command/
git mv command/agent .
git mv command/base/command.go command/base.go
git mv command/base/config_util{,_test}.go command/
git mv commands.go command/
git mv consul agent
rmdir command/base/
gsed -i -e 's|package agent|package command|' command/agent{,_test}.go
gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go
gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go
gsed -i -e 's|package main|package command|' command/commands.go
gsed -i -e 's|base.Command|BaseCommand|' command/commands.go
gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go
gsed -i -e 's|base\.||' command/commands.go
gsed -i -e 's|command\.||' command/commands.go
gsed -i -e 's|command|c|' main.go
gsed -i -e 's|range Commands|range command.Commands|' main.go
gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go
gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go
gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go
gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go
gsed -i -e 's|base.Command|BaseCommand|' command/*.go
gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go
gsed -i -e 's|base\.||' command/*_test.go
gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go
gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go
gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go
gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go
gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go
gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go
gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go
gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go
gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go
gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go
gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go
gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go
gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go
gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go
gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go
gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go
gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile
gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go
# fix imports
f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f
goimports -w $f
f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f
goimports -w $f
goimports -w command/*.go main.go
)
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/state"
2017-07-06 10:48:37 +00:00
"github.com/hashicorp/consul/agent/metadata"
2017-06-15 13:16:16 +00:00
"github.com/hashicorp/consul/agent/pool"
2017-07-06 10:40:54 +00:00
"github.com/hashicorp/consul/agent/router"
2017-07-06 10:34:00 +00:00
"github.com/hashicorp/consul/agent/structs"
2017-08-03 22:39:31 +00:00
"github.com/hashicorp/consul/agent/token"
2017-01-18 06:20:11 +00:00
"github.com/hashicorp/consul/lib"
2017-09-14 19:31:01 +00:00
"github.com/hashicorp/consul/sentinel"
2015-05-08 22:57:37 +00:00
"github.com/hashicorp/consul/tlsutil"
2017-03-14 01:54:34 +00:00
"github.com/hashicorp/consul/types"
2014-06-16 21:36:12 +00:00
"github.com/hashicorp/raft"
2017-07-06 10:48:37 +00:00
raftboltdb "github.com/hashicorp/raft-boltdb"
2015-04-15 23:12:45 +00:00
"github.com/hashicorp/serf/coordinate"
2014-06-16 21:36:12 +00:00
"github.com/hashicorp/serf/serf"
2013-12-06 23:43:07 +00:00
)
2014-03-09 22:18:36 +00:00
// These are the protocol versions that Consul can _understand_. These are
// Consul-level protocol versions, that are used to configure the Serf
// protocol versions.
const (
2016-08-10 01:10:04 +00:00
ProtocolVersionMin uint8 = 2
2015-10-16 02:28:31 +00:00
// Version 3 added support for network coordinates but we kept the
// default protocol version at 2 to ease the transition to this new
// feature. A Consul agent speaking version 2 of the protocol will
// attempt to send its coordinates to a server who understands version
// 3 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 3
2014-03-09 22:18:36 +00:00
)
2013-12-06 23:43:07 +00:00
const (
2015-01-30 05:55:11 +00:00
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
snapshotsRetained = 2
2014-05-27 21:33:09 +00:00
// serverRPCCache controls how long we keep an idle connection
// open to a server
serverRPCCache = 2 * time . Minute
2015-09-15 12:22:08 +00:00
// serverMaxStreams controls how many idle streams we keep
2014-05-27 21:33:09 +00:00
// open to a server
serverMaxStreams = 64
2014-08-08 22:32:43 +00:00
2015-01-14 23:49:58 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2015-09-11 19:24:54 +00:00
// This is used to reduce disk I/O for the recently committed entries.
2015-01-14 23:49:58 +00:00
raftLogCacheSize = 512
2015-01-21 00:19:54 +00:00
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time . Second
2013-12-06 23:43:07 +00:00
)
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
2017-09-14 19:31:01 +00:00
// sentinel is the Sentinel code engine (can be nil).
sentinel sentinel . Evaluator
2016-08-04 00:01:32 +00:00
// aclAuthCache is the authoritative ACL cache.
2014-08-08 22:32:43 +00:00
aclAuthCache * acl . Cache
2015-06-18 23:19:05 +00:00
// aclCache is the non-authoritative ACL cache.
aclCache * aclCache
2014-08-09 00:38:39 +00:00
2017-03-08 19:31:32 +00:00
// autopilotPolicy controls the behavior of Autopilot for certain tasks.
autopilotPolicy AutopilotPolicy
// autopilotRemoveDeadCh is used to trigger a check for dead server removals.
2017-03-01 22:04:40 +00:00
autopilotRemoveDeadCh chan struct { }
2017-03-08 19:31:32 +00:00
// autopilotShutdownCh is used to stop the Autopilot loop.
autopilotShutdownCh chan struct { }
2017-03-10 00:43:07 +00:00
// autopilotWaitGroup is used to block until Autopilot shuts down.
autopilotWaitGroup sync . WaitGroup
2017-03-15 23:09:55 +00:00
// clusterHealth stores the current view of the cluster's health.
clusterHealth structs . OperatorHealthReply
clusterHealthLock sync . RWMutex
2014-08-08 22:32:43 +00:00
// Consul configuration
2013-12-06 23:43:07 +00:00
config * Config
2017-08-03 22:39:31 +00:00
// tokens holds ACL tokens initially from the configuration, but can
// be updated at runtime, so should always be used instead of going to
// the configuration directly.
tokens * token . Store
2013-12-09 20:09:57 +00:00
// Connection pool to other consul servers
2017-06-15 13:16:16 +00:00
connPool * pool . ConnPool
2013-12-09 20:09:57 +00:00
2014-01-09 23:30:36 +00:00
// Endpoints holds our RPC endpoints
endpoints endpoints
2013-12-07 00:05:26 +00:00
// eventChLAN is used to receive events from the
// serf cluster in the datacenter
eventChLAN chan serf . Event
2013-12-06 23:43:07 +00:00
2013-12-07 00:05:26 +00:00
// eventChWAN is used to receive events from the
// serf cluster that spans datacenters
eventChWAN chan serf . Event
2013-12-06 23:43:07 +00:00
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm * consulFSM
// Logger uses the provided LogOutput
logger * log . Logger
2016-07-29 16:21:30 +00:00
// The raft instance is used among Consul nodes within the DC to protect
2016-08-01 21:32:53 +00:00
// operations that require strong consistency.
2016-07-29 16:32:49 +00:00
// the state directly.
2013-12-09 22:20:56 +00:00
raft * raft . Raft
raftLayer * RaftLayer
2015-01-30 05:55:11 +00:00
raftStore * raftboltdb . BoltStore
2013-12-09 22:20:56 +00:00
raftTransport * raft . NetworkTransport
2015-11-29 04:40:05 +00:00
raftInmem * raft . InmemStore
2013-12-06 23:43:07 +00:00
2017-07-06 14:09:21 +00:00
// raftNotifyCh is set up by setupRaft() and ensures that we get reliable leader
2017-04-13 21:17:32 +00:00
// transition notifications from the Raft layer.
2017-07-06 14:09:21 +00:00
raftNotifyCh <- chan bool
2017-04-13 21:17:32 +00:00
2014-01-10 20:55:55 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
reconcileCh chan serf . Member
2017-10-10 22:19:50 +00:00
// readyForConsistentReads is used to track when the leader server is
// ready to serve consistent reads, after it has applied its initial
// barrier. This is updated atomically.
2017-06-16 15:49:54 +00:00
readyForConsistentReads int32
2017-06-16 03:41:30 +00:00
2017-10-10 22:19:50 +00:00
// leaveCh is used to signal that the server is leaving the cluster
// and trying to shed its RPC traffic onto other Consul servers. This
// is only ever closed.
leaveCh chan struct { }
2017-03-14 01:54:34 +00:00
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
2017-07-06 10:40:54 +00:00
router * router . Router
2013-12-12 00:24:34 +00:00
2017-06-25 19:36:03 +00:00
// Listener is used to listen for incoming connections
Listener net . Listener
rpcServer * rpc . Server
2013-12-07 00:35:13 +00:00
2014-04-04 23:34:23 +00:00
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS * tls . Config
2013-12-07 00:05:26 +00:00
// serfLAN is the Serf cluster maintained inside the DC
2013-12-06 23:43:07 +00:00
// which contains all the DC nodes
2013-12-07 00:05:26 +00:00
serfLAN * serf . Serf
2013-12-06 23:43:07 +00:00
2017-08-14 14:36:07 +00:00
// segmentLAN maps segment names to their Serf cluster
2017-09-01 00:39:46 +00:00
segmentLAN map [ string ] * serf . Serf
2017-08-29 00:58:22 +00:00
2013-12-07 00:05:26 +00:00
// serfWAN is the Serf cluster maintained between DC's
2013-12-06 23:43:07 +00:00
// which SHOULD only consist of Consul servers
2013-12-07 00:05:26 +00:00
serfWAN * serf . Serf
2013-12-06 23:43:07 +00:00
2017-08-30 15:02:10 +00:00
// serverLookup tracks server consuls in the local datacenter.
// Used to do leader forwarding and provide fast lookup by server id and address
2017-08-30 03:35:22 +00:00
serverLookup * ServerLookup
2017-08-30 00:37:48 +00:00
2017-03-15 23:11:19 +00:00
// floodLock controls access to floodCh.
floodLock sync . RWMutex
floodCh [ ] chan struct { }
2017-03-15 19:26:54 +00:00
2014-11-25 16:06:14 +00:00
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
2015-09-11 19:24:54 +00:00
// destroy the session via standard session destroy processing
2017-06-27 13:25:25 +00:00
sessionTimers * SessionTimers
2014-11-25 16:06:14 +00:00
2017-03-18 01:42:28 +00:00
// statsFetcher is used by autopilot to check the status of the other
2017-07-06 10:40:54 +00:00
// Consul router.
2017-03-18 01:42:28 +00:00
statsFetcher * StatsFetcher
2017-05-04 03:31:14 +00:00
// reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore.
2017-05-04 18:52:22 +00:00
reassertLeaderCh chan chan error
2017-05-04 03:31:14 +00:00
2014-12-11 07:49:44 +00:00
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
2015-10-12 07:42:09 +00:00
tombstoneGC * state . TombstoneGC
2014-12-11 07:49:44 +00:00
2016-08-05 04:32:36 +00:00
// aclReplicationStatus (and its associated lock) provide information
// about the health of the ACL replication goroutine.
aclReplicationStatus structs . ACLReplicationStatus
aclReplicationStatusLock sync . RWMutex
2016-08-03 05:04:11 +00:00
// shutdown and the associated members here are used in orchestrating
// a clean shutdown. The shutdownCh is never written to, only closed to
2016-08-04 00:01:32 +00:00
// indicate a shutdown has been initiated.
2013-12-06 23:43:07 +00:00
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
}
2014-01-09 23:30:36 +00:00
// Holds the RPC endpoints
type endpoints struct {
2016-08-30 02:09:57 +00:00
ACL * ACL
2015-11-10 04:37:41 +00:00
Catalog * Catalog
2016-08-30 02:09:57 +00:00
Coordinate * Coordinate
2015-11-10 04:37:41 +00:00
Health * Health
Internal * Internal
2016-08-30 02:09:57 +00:00
KVS * KVS
Operator * Operator
2015-11-10 04:37:41 +00:00
PreparedQuery * PreparedQuery
2016-08-30 02:09:57 +00:00
Session * Session
Status * Status
2016-05-11 04:41:47 +00:00
Txn * Txn
2014-01-09 23:30:36 +00:00
}
2017-05-23 17:04:06 +00:00
func NewServer ( config * Config ) ( * Server , error ) {
2017-08-03 22:39:31 +00:00
return NewServerLogger ( config , nil , new ( token . Store ) )
2017-05-23 17:04:06 +00:00
}
2013-12-06 23:43:07 +00:00
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
2017-08-03 22:39:31 +00:00
func NewServerLogger ( config * Config , logger * log . Logger , tokens * token . Store ) ( * Server , error ) {
2016-08-03 05:04:11 +00:00
// Check the protocol version.
2017-05-03 19:02:01 +00:00
if err := config . CheckProtocolVersion ( ) ; err != nil {
2014-03-09 22:18:36 +00:00
return nil , err
}
2016-08-03 05:04:11 +00:00
// Check for a data directory.
2015-11-29 04:40:05 +00:00
if config . DataDir == "" && ! config . DevMode {
2013-12-06 23:43:07 +00:00
return nil , fmt . Errorf ( "Config must provide a DataDir" )
}
2016-08-03 05:04:11 +00:00
// Sanity check the ACLs.
2014-08-05 22:20:35 +00:00
if err := config . CheckACL ( ) ; err != nil {
return nil , err
}
2016-08-03 05:04:11 +00:00
// Ensure we have a log output and create a logger.
2013-12-06 23:43:07 +00:00
if config . LogOutput == nil {
config . LogOutput = os . Stderr
}
2017-05-23 17:04:06 +00:00
if logger == nil {
logger = log . New ( config . LogOutput , "" , log . LstdFlags )
}
2013-12-06 23:43:07 +00:00
2017-05-10 21:25:48 +00:00
// Check if TLS is enabled
if config . CAFile != "" || config . CAPath != "" {
config . UseTLS = true
}
2016-08-03 05:04:11 +00:00
// Create the TLS wrapper for outgoing connections.
2014-11-18 22:56:48 +00:00
tlsConf := config . tlsConfig ( )
2015-05-11 22:15:36 +00:00
tlsWrap , err := tlsConf . OutgoingTLSWrapper ( )
2014-06-22 19:49:51 +00:00
if err != nil {
return nil , err
2014-04-04 23:30:06 +00:00
}
2016-08-03 05:04:11 +00:00
// Get the incoming TLS config.
2014-11-18 16:03:36 +00:00
incomingTLS , err := tlsConf . IncomingTLSConfig ( )
2014-04-04 23:34:23 +00:00
if err != nil {
return nil , err
}
2016-08-03 05:04:11 +00:00
// Create the tombstone GC.
2015-10-12 07:42:09 +00:00
gc , err := state . NewTombstoneGC ( config . TombstoneTTL , config . TombstoneTTLGranularity )
2014-12-11 07:49:44 +00:00
if err != nil {
return nil , err
}
2017-03-14 01:54:34 +00:00
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make ( chan struct { } )
2017-06-15 13:16:16 +00:00
connPool := & pool . ConnPool {
SrcAddr : config . RPCSrcAddr ,
LogOutput : config . LogOutput ,
MaxTime : serverRPCCache ,
MaxStreams : serverMaxStreams ,
TLSWrapper : tlsWrap ,
ForceTLS : config . VerifyOutgoing ,
}
2016-08-03 05:04:11 +00:00
// Create server.
2013-12-06 23:43:07 +00:00
s := & Server {
2017-03-01 22:04:40 +00:00
autopilotRemoveDeadCh : make ( chan struct { } ) ,
autopilotShutdownCh : make ( chan struct { } ) ,
config : config ,
2017-08-03 22:39:31 +00:00
tokens : tokens ,
2017-06-15 13:16:16 +00:00
connPool : connPool ,
2017-03-01 22:04:40 +00:00
eventChLAN : make ( chan serf . Event , 256 ) ,
eventChWAN : make ( chan serf . Event , 256 ) ,
logger : logger ,
2017-10-10 22:19:50 +00:00
leaveCh : make ( chan struct { } ) ,
2017-03-01 22:04:40 +00:00
reconcileCh : make ( chan serf . Member , 32 ) ,
2017-07-06 10:40:54 +00:00
router : router . NewRouter ( logger , config . Datacenter ) ,
2017-03-01 22:04:40 +00:00
rpcServer : rpc . NewServer ( ) ,
rpcTLS : incomingTLS ,
2017-05-04 18:52:22 +00:00
reassertLeaderCh : make ( chan chan error ) ,
2017-08-14 14:36:07 +00:00
segmentLAN : make ( map [ string ] * serf . Serf , len ( config . Segments ) ) ,
2017-06-27 13:25:25 +00:00
sessionTimers : NewSessionTimers ( ) ,
2017-03-01 22:04:40 +00:00
tombstoneGC : gc ,
2017-08-30 03:35:22 +00:00
serverLookup : NewServerLookup ( ) ,
2017-05-23 07:05:48 +00:00
shutdownCh : shutdownCh ,
2013-12-06 23:43:07 +00:00
}
2017-03-21 23:36:44 +00:00
// Set up the autopilot policy
s . autopilotPolicy = & BasicAutopilot { server : s }
2013-12-06 23:43:07 +00:00
2017-03-18 01:42:28 +00:00
// Initialize the stats fetcher that autopilot will use.
2017-03-20 03:48:42 +00:00
s . statsFetcher = NewStatsFetcher ( logger , s . connPool , s . config . Datacenter )
2017-03-18 01:42:28 +00:00
2016-08-03 05:04:11 +00:00
// Initialize the authoritative ACL cache.
2017-09-14 19:31:01 +00:00
s . sentinel = sentinel . New ( logger )
s . aclAuthCache , err = acl . NewCache ( aclCacheSize , s . aclLocalFault , s . sentinel )
2014-08-08 22:32:43 +00:00
if err != nil {
s . Shutdown ( )
2016-08-04 00:01:32 +00:00
return nil , fmt . Errorf ( "Failed to create authoritative ACL cache: %v" , err )
2014-08-08 22:32:43 +00:00
}
2016-08-04 00:01:32 +00:00
// Set up the non-authoritative ACL cache. A nil local function is given
// if ACL replication isn't enabled.
var local acl . FaultFunc
if s . IsACLReplicationEnabled ( ) {
local = s . aclLocalFault
}
2017-09-14 19:31:01 +00:00
if s . aclCache , err = newACLCache ( config , logger , s . RPC , local , s . sentinel ) ; err != nil {
2014-08-08 22:32:43 +00:00
s . Shutdown ( )
2016-08-04 00:01:32 +00:00
return nil , fmt . Errorf ( "Failed to create non-authoritative ACL cache: %v" , err )
2014-08-09 00:38:39 +00:00
}
2016-08-03 05:04:11 +00:00
// Initialize the RPC layer.
2015-05-08 22:57:37 +00:00
if err := s . setupRPC ( tlsWrap ) ; err != nil {
2014-06-11 17:17:58 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start RPC layer: %v" , err )
}
2017-09-01 00:39:46 +00:00
// Initialize any extra RPC listeners for segments.
segmentListeners , err := s . setupSegmentRPC ( )
if err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start segment RPC layer: %v" , err )
}
2016-08-03 05:04:11 +00:00
// Initialize the Raft server.
2013-12-09 23:29:01 +00:00
if err := s . setupRaft ( ) ; err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start Raft: %v" , err )
}
2017-06-27 08:55:55 +00:00
// Serf and dynamic bind ports
//
// The LAN serf cluster announces the port of the WAN serf cluster
// which creates a race when the WAN cluster is supposed to bind to
2017-07-05 03:15:50 +00:00
// a dynamic port (port 0). The current memberlist implementation will
// update the bind port in the configuration after the memberlist is
// created, so we can pull it out from there reliably, even though it's
// a little gross to be reading the updated config.
2017-06-27 08:55:55 +00:00
// Initialize the WAN Serf.
serfBindPortWAN := config . SerfWANConfig . MemberlistConfig . BindPort
2017-09-01 00:39:46 +00:00
s . serfWAN , err = s . setupSerf ( config . SerfWANConfig , s . eventChWAN , serfWANSnapshot , true , serfBindPortWAN , "" , s . Listener )
2013-12-06 23:43:07 +00:00
if err != nil {
s . Shutdown ( )
2017-06-27 08:55:55 +00:00
return nil , fmt . Errorf ( "Failed to start WAN Serf: %v" , err )
2013-12-06 23:43:07 +00:00
}
2017-07-05 03:15:50 +00:00
// See big comment above why we are doing this.
2017-06-27 08:55:55 +00:00
if serfBindPortWAN == 0 {
2017-07-05 03:15:50 +00:00
serfBindPortWAN = config . SerfWANConfig . MemberlistConfig . BindPort
2017-06-27 08:55:55 +00:00
if serfBindPortWAN == 0 {
return nil , fmt . Errorf ( "Failed to get dynamic bind port for WAN Serf" )
}
s . logger . Printf ( "[INFO] agent: Serf WAN TCP bound to port %d" , serfBindPortWAN )
}
2017-08-14 14:36:07 +00:00
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
2017-09-01 00:56:43 +00:00
if err := s . setupSegments ( config , serfBindPortWAN , segmentListeners ) ; err != nil {
2017-08-14 14:36:07 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to setup network segments: %v" , err )
}
// Initialize the LAN Serf for the default network segment.
2017-09-01 00:39:46 +00:00
s . serfLAN , err = s . setupSerf ( config . SerfLANConfig , s . eventChLAN , serfLANSnapshot , false , serfBindPortWAN , "" , s . Listener )
2013-12-06 23:43:07 +00:00
if err != nil {
s . Shutdown ( )
2017-06-27 08:55:55 +00:00
return nil , fmt . Errorf ( "Failed to start LAN Serf: %v" , err )
2017-03-14 01:54:34 +00:00
}
2017-06-27 08:55:55 +00:00
go s . lanEventHandler ( )
2017-03-14 01:54:34 +00:00
2017-08-14 14:36:07 +00:00
// Start the flooders after the LAN event handler is wired up.
s . floodSegments ( config )
2017-03-14 01:54:34 +00:00
// Add a "static route" to the WAN Serf and hook it up to Serf events.
2017-07-15 00:31:52 +00:00
if err := s . router . AddArea ( types . AreaWAN , s . serfWAN , s . connPool , s . config . VerifyOutgoing ) ; err != nil {
2017-03-14 01:54:34 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to add WAN serf route: %v" , err )
2013-12-06 23:43:07 +00:00
}
2017-07-06 10:40:54 +00:00
go router . HandleSerfEvents ( s . logger , s . router , types . AreaWAN , s . serfWAN . ShutdownCh ( ) , s . eventChWAN )
2013-12-06 23:43:07 +00:00
2017-03-15 23:11:19 +00:00
// Fire up the LAN <-> WAN join flooder.
2017-07-06 10:48:37 +00:00
portFn := func ( s * metadata . Server ) ( int , bool ) {
2017-03-15 23:11:19 +00:00
if s . WanJoinPort > 0 {
return s . WanJoinPort , true
2017-03-15 19:26:54 +00:00
}
2017-04-21 01:59:42 +00:00
return 0 , false
2017-03-15 23:11:19 +00:00
}
2017-08-30 01:27:47 +00:00
go s . Flood ( nil , portFn , s . serfWAN )
2013-12-06 23:43:07 +00:00
2017-01-18 23:06:15 +00:00
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
go s . monitorLeadership ( )
2016-08-03 05:04:11 +00:00
// Start ACL replication.
if s . IsACLReplicationEnabled ( ) {
go s . runACLReplication ( )
}
// Start listening for RPC requests.
2017-08-29 00:58:22 +00:00
go s . listen ( s . Listener )
// Start listeners for any segments with separate RPC listeners.
2017-09-01 00:39:46 +00:00
for _ , listener := range segmentListeners {
2017-08-29 00:58:22 +00:00
go s . listen ( listener )
}
2014-12-19 00:57:49 +00:00
2016-08-03 05:04:11 +00:00
// Start the metrics handlers.
2014-12-19 00:57:49 +00:00
go s . sessionStats ( )
2016-08-03 05:04:11 +00:00
2017-03-10 00:43:07 +00:00
// Start the server health checking.
go s . serverHealthLoop ( )
2013-12-06 23:43:07 +00:00
return s , nil
}
// setupRaft is used to setup and initialize Raft
func ( s * Server ) setupRaft ( ) error {
2016-07-28 19:11:28 +00:00
// If we have an unclean exit then attempt to close the Raft store.
defer func ( ) {
if s . raft == nil && s . raftStore != nil {
if err := s . raftStore . Close ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to close Raft store: %v" , err )
}
}
} ( )
2013-12-25 00:48:07 +00:00
2016-07-28 19:11:28 +00:00
// Create the FSM.
2013-12-11 01:00:48 +00:00
var err error
2015-10-13 03:56:31 +00:00
s . fsm , err = NewFSM ( s . tombstoneGC , s . config . LogOutput )
2013-12-11 01:00:48 +00:00
if err != nil {
return err
}
2017-08-30 19:36:47 +00:00
var serverAddressProvider raft . ServerAddressProvider = nil
if s . config . RaftConfig . ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher
serverAddressProvider = s . serverLookup
}
2016-07-28 19:11:28 +00:00
// Create a transport layer.
2017-08-30 00:37:48 +00:00
transConfig := & raft . NetworkTransportConfig {
Stream : s . raftLayer ,
MaxPool : 3 ,
Timeout : 10 * time . Second ,
2017-08-30 19:36:47 +00:00
ServerAddressProvider : serverAddressProvider ,
2017-08-30 00:37:48 +00:00
}
2017-08-23 19:54:10 +00:00
trans := raft . NewNetworkTransportWithConfig ( transConfig )
2015-11-29 04:40:05 +00:00
s . raftTransport = trans
2014-10-15 21:57:59 +00:00
2016-07-30 03:45:59 +00:00
// Make sure we set the LogOutput.
s . config . RaftConfig . LogOutput = s . config . LogOutput
2017-05-23 17:04:06 +00:00
s . config . RaftConfig . Logger = s . logger
2016-07-30 03:45:59 +00:00
2017-02-22 20:53:32 +00:00
// Versions of the Raft protocol below 3 require the LocalID to match the network
2016-07-30 03:45:59 +00:00
// address of the transport.
s . config . RaftConfig . LocalID = raft . ServerID ( trans . LocalAddr ( ) )
2017-02-22 20:53:32 +00:00
if s . config . RaftConfig . ProtocolVersion >= 3 {
s . config . RaftConfig . LocalID = raft . ServerID ( s . config . NodeID )
}
2016-07-30 03:45:59 +00:00
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
2015-11-29 04:40:05 +00:00
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
if s . config . DevMode {
store := raft . NewInmemStore ( )
s . raftInmem = store
stable = store
log = store
2016-10-31 18:39:47 +00:00
snap = raft . NewInmemSnapshotStore ( )
2015-11-29 04:40:05 +00:00
} else {
2016-07-28 19:11:28 +00:00
// Create the base raft path.
2015-11-29 04:40:05 +00:00
path := filepath . Join ( s . config . DataDir , raftState )
2017-01-18 06:20:11 +00:00
if err := lib . EnsurePath ( path , true ) ; err != nil {
2015-11-29 04:40:05 +00:00
return err
}
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// Create the backend raft store for logs and stable storage.
2015-11-29 04:40:05 +00:00
store , err := raftboltdb . NewBoltStore ( filepath . Join ( path , "raft.db" ) )
if err != nil {
return err
}
s . raftStore = store
stable = store
2015-01-14 23:49:58 +00:00
2016-07-28 19:11:28 +00:00
// Wrap the store in a LogCache to improve performance.
2015-11-29 04:40:05 +00:00
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
return err
}
log = cacheStore
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// Create the snapshot store.
2015-11-29 04:40:05 +00:00
snapshots , err := raft . NewFileSnapshotStore ( path , snapshotsRetained , s . config . LogOutput )
if err != nil {
return err
}
snap = snapshots
2013-12-06 23:43:07 +00:00
2016-07-30 07:54:08 +00:00
// For an existing cluster being upgraded to the new version of
// Raft, we almost never want to run recovery based on the old
// peers.json file. We create a peers.info file with a helpful
// note about where peers.json went, and use that as a sentinel
// to avoid ingesting the old one that first time (if we have to
// create the peers.info file because it's not there, we also
// blow away any existing peers.json file).
2016-07-30 03:45:59 +00:00
peersFile := filepath . Join ( path , "peers.json" )
2016-07-30 07:54:08 +00:00
peersInfoFile := filepath . Join ( path , "peers.info" )
if _ , err := os . Stat ( peersInfoFile ) ; os . IsNotExist ( err ) {
2016-08-09 18:56:39 +00:00
if err := ioutil . WriteFile ( peersInfoFile , [ ] byte ( peersInfoContent ) , 0755 ) ; err != nil {
2016-07-30 07:54:08 +00:00
return fmt . Errorf ( "failed to write peers.info file: %v" , err )
}
// Blow away the peers.json file if present, since the
// peers.info sentinel wasn't there.
if _ , err := os . Stat ( peersFile ) ; err == nil {
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
s . logger . Printf ( "[INFO] consul: deleted peers.json file (see peers.info for details)" )
}
} else if _ , err := os . Stat ( peersFile ) ; err == nil {
2016-07-28 19:11:28 +00:00
s . logger . Printf ( "[INFO] consul: found peers.json file, recovering Raft configuration..." )
2017-05-04 21:15:59 +00:00
var configuration raft . Configuration
if s . config . RaftConfig . ProtocolVersion < 3 {
configuration , err = raft . ReadPeersJSON ( peersFile )
} else {
configuration , err = raft . ReadConfigJSON ( peersFile )
}
2016-07-30 03:45:59 +00:00
if err != nil {
return fmt . Errorf ( "recovery failed to parse peers.json: %v" , err )
}
2017-05-04 21:15:59 +00:00
2016-07-28 19:11:28 +00:00
tmpFsm , err := NewFSM ( s . tombstoneGC , s . config . LogOutput )
if err != nil {
return fmt . Errorf ( "recovery failed to make temp FSM: %v" , err )
}
if err := raft . RecoverCluster ( s . config . RaftConfig , tmpFsm ,
2016-07-31 05:33:07 +00:00
log , stable , snap , trans , configuration ) ; err != nil {
2016-07-28 19:11:28 +00:00
return fmt . Errorf ( "recovery failed: %v" , err )
}
2017-05-04 21:15:59 +00:00
2016-07-30 03:45:59 +00:00
if err := os . Remove ( peersFile ) ; err != nil {
2016-07-30 07:54:08 +00:00
return fmt . Errorf ( "recovery failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
2016-07-28 19:11:28 +00:00
}
s . logger . Printf ( "[INFO] consul: deleted peers.json file after successful recovery" )
}
2015-11-29 04:40:05 +00:00
}
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s . config . Bootstrap || s . config . DevMode {
hasState , err := raft . HasExistingState ( log , stable , snap )
2014-02-21 00:27:14 +00:00
if err != nil {
return err
}
2016-07-28 19:11:28 +00:00
if ! hasState {
configuration := raft . Configuration {
Servers : [ ] raft . Server {
raft . Server {
2017-02-22 20:53:32 +00:00
ID : s . config . RaftConfig . LocalID ,
2016-07-28 19:11:28 +00:00
Address : trans . LocalAddr ( ) ,
} ,
} ,
}
if err := raft . BootstrapCluster ( s . config . RaftConfig ,
log , stable , snap , trans , configuration ) ; err != nil {
return err
}
2014-02-21 00:27:14 +00:00
}
2013-12-23 19:50:58 +00:00
}
2017-04-13 21:17:32 +00:00
// Set up a channel for reliable leader notifications.
2017-07-06 14:09:21 +00:00
raftNotifyCh := make ( chan bool , 1 )
s . config . RaftConfig . NotifyCh = raftNotifyCh
s . raftNotifyCh = raftNotifyCh
2017-04-13 21:17:32 +00:00
2016-07-28 19:11:28 +00:00
// Setup the Raft store.
s . raft , err = raft . NewRaft ( s . config . RaftConfig , s . fsm , log , stable , snap , trans )
2013-12-06 23:43:07 +00:00
if err != nil {
return err
}
return nil
}
2013-12-07 00:35:13 +00:00
// setupRPC is used to setup the RPC listener
2015-05-11 22:15:36 +00:00
func ( s * Server ) setupRPC ( tlsWrap tlsutil . DCWrapper ) error {
2014-01-09 23:30:36 +00:00
// Create endpoints
2016-08-30 02:09:57 +00:00
s . endpoints . ACL = & ACL { s }
2014-01-09 23:30:36 +00:00
s . endpoints . Catalog = & Catalog { s }
2016-08-30 02:09:57 +00:00
s . endpoints . Coordinate = NewCoordinate ( s )
2014-01-09 23:30:36 +00:00
s . endpoints . Health = & Health { s }
2014-04-28 21:44:36 +00:00
s . endpoints . Internal = & Internal { s }
2016-08-30 02:09:57 +00:00
s . endpoints . KVS = & KVS { s }
s . endpoints . Operator = & Operator { s }
2015-11-10 04:37:41 +00:00
s . endpoints . PreparedQuery = & PreparedQuery { s }
2016-08-30 02:09:57 +00:00
s . endpoints . Session = & Session { s }
s . endpoints . Status = & Status { s }
2016-05-11 04:41:47 +00:00
s . endpoints . Txn = & Txn { s }
2014-01-09 23:30:36 +00:00
2013-12-09 22:49:07 +00:00
// Register the handlers
2016-08-30 02:09:57 +00:00
s . rpcServer . Register ( s . endpoints . ACL )
2014-01-09 23:30:36 +00:00
s . rpcServer . Register ( s . endpoints . Catalog )
2016-08-30 02:09:57 +00:00
s . rpcServer . Register ( s . endpoints . Coordinate )
2014-01-09 23:30:36 +00:00
s . rpcServer . Register ( s . endpoints . Health )
2014-04-28 21:44:36 +00:00
s . rpcServer . Register ( s . endpoints . Internal )
2016-08-30 02:09:57 +00:00
s . rpcServer . Register ( s . endpoints . KVS )
s . rpcServer . Register ( s . endpoints . Operator )
2015-11-10 04:37:41 +00:00
s . rpcServer . Register ( s . endpoints . PreparedQuery )
2016-08-30 02:09:57 +00:00
s . rpcServer . Register ( s . endpoints . Session )
s . rpcServer . Register ( s . endpoints . Status )
2016-05-11 04:41:47 +00:00
s . rpcServer . Register ( s . endpoints . Txn )
2013-12-09 22:49:07 +00:00
2017-05-03 20:59:06 +00:00
ln , err := net . ListenTCP ( "tcp" , s . config . RPCAddr )
2013-12-07 00:35:13 +00:00
if err != nil {
return err
}
2017-06-25 19:36:03 +00:00
s . Listener = ln
if s . config . NotifyListen != nil {
s . config . NotifyListen ( )
}
// todo(fs): we should probably guard this
if s . config . RPCAdvertise == nil {
s . config . RPCAdvertise = ln . Addr ( ) . ( * net . TCPAddr )
}
2013-12-31 22:00:25 +00:00
// Verify that we have a usable advertise address
2017-05-03 20:59:06 +00:00
if s . config . RPCAdvertise . IP . IsUnspecified ( ) {
ln . Close ( )
return fmt . Errorf ( "RPC advertise address is not advertisable: %v" , s . config . RPCAdvertise )
2013-12-31 22:00:25 +00:00
}
2015-05-11 22:15:36 +00:00
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil . SpecificDC ( s . config . Datacenter , tlsWrap )
2017-05-10 21:25:48 +00:00
// Define a callback for determining whether to wrap a connection with TLS
tlsFunc := func ( address raft . ServerAddress ) bool {
if s . config . VerifyOutgoing {
return true
}
2017-08-30 17:31:36 +00:00
server := s . serverLookup . Server ( address )
2017-05-10 21:25:48 +00:00
2017-08-30 17:31:36 +00:00
if server == nil {
2017-05-10 21:25:48 +00:00
return false
}
return server . UseTLS
}
s . raftLayer = NewRaftLayer ( s . config . RPCSrcAddr , s . config . RPCAdvertise , wrapper , tlsFunc )
2013-12-07 00:35:13 +00:00
return nil
}
2013-12-06 23:43:07 +00:00
// Shutdown is used to shutdown the server
func ( s * Server ) Shutdown ( ) error {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[INFO] consul: shutting down server" )
2013-12-06 23:43:07 +00:00
s . shutdownLock . Lock ( )
defer s . shutdownLock . Unlock ( )
if s . shutdown {
return nil
}
s . shutdown = true
close ( s . shutdownCh )
2013-12-07 00:05:26 +00:00
if s . serfLAN != nil {
s . serfLAN . Shutdown ( )
2013-12-06 23:43:07 +00:00
}
2013-12-07 00:05:26 +00:00
if s . serfWAN != nil {
s . serfWAN . Shutdown ( )
2017-03-14 01:54:34 +00:00
if err := s . router . RemoveArea ( types . AreaWAN ) ; err != nil {
s . logger . Printf ( "[WARN] consul: error removing WAN area: %v" , err )
}
2013-12-06 23:43:07 +00:00
}
2017-06-02 23:33:48 +00:00
s . router . Shutdown ( )
2013-12-06 23:43:07 +00:00
if s . raft != nil {
2013-12-09 22:20:56 +00:00
s . raftTransport . Close ( )
2013-12-09 21:13:40 +00:00
s . raftLayer . Close ( )
2013-12-19 00:40:32 +00:00
future := s . raft . Shutdown ( )
if err := future . Error ( ) ; err != nil {
2016-07-29 18:29:59 +00:00
s . logger . Printf ( "[WARN] consul: error shutting down raft: %s" , err )
2013-12-19 00:40:32 +00:00
}
2015-11-29 04:40:05 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2013-12-06 23:43:07 +00:00
}
2013-12-07 00:35:13 +00:00
2017-06-25 19:36:03 +00:00
if s . Listener != nil {
s . Listener . Close ( )
2013-12-07 00:35:13 +00:00
}
2013-12-09 20:09:57 +00:00
// Close the connection pool
s . connPool . Shutdown ( )
2013-12-06 23:43:07 +00:00
return nil
}
2013-12-07 01:18:09 +00:00
2013-12-09 20:10:27 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func ( s * Server ) Leave ( ) error {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[INFO] consul: server starting leave" )
2013-12-09 20:10:27 +00:00
2015-01-21 00:19:29 +00:00
// Check the number of known peers
2016-07-28 19:11:28 +00:00
numPeers , err := s . numPeers ( )
2015-01-21 00:19:29 +00:00
if err != nil {
s . logger . Printf ( "[ERR] consul: failed to check raft peers: %v" , err )
return err
}
2016-07-28 19:11:28 +00:00
addr := s . raftTransport . LocalAddr ( )
2015-01-21 00:19:29 +00:00
// If we are the current leader, and we have any other peers (cluster has multiple
2017-03-27 19:31:38 +00:00
// servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size.
// If we are not the leader, then we should issue our leave intention and wait to be
// removed for some sane period of time.
2015-01-21 00:19:29 +00:00
isLeader := s . IsLeader ( )
2016-07-28 19:11:28 +00:00
if isLeader && numPeers > 1 {
2017-03-27 19:31:38 +00:00
minRaftProtocol , err := ServerMinRaftProtocol ( s . serfLAN . Members ( ) )
if err != nil {
return err
}
if minRaftProtocol >= 2 && s . config . RaftConfig . ProtocolVersion >= 3 {
future := s . raft . RemoveServer ( raft . ServerID ( s . config . NodeID ) , 0 , 0 )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove ourself as raft peer: %v" , err )
}
} else {
future := s . raft . RemovePeer ( addr )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to remove ourself as raft peer: %v" , err )
}
2015-01-21 00:19:29 +00:00
}
}
2013-12-09 20:10:27 +00:00
// Leave the WAN pool
if s . serfWAN != nil {
if err := s . serfWAN . Leave ( ) ; err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to leave WAN Serf cluster: %v" , err )
2013-12-09 20:10:27 +00:00
}
}
// Leave the LAN pool
if s . serfLAN != nil {
if err := s . serfLAN . Leave ( ) ; err != nil {
2014-01-10 19:06:11 +00:00
s . logger . Printf ( "[ERR] consul: failed to leave LAN Serf cluster: %v" , err )
2013-12-09 20:10:27 +00:00
}
}
2015-01-21 00:19:54 +00:00
2017-10-10 22:19:50 +00:00
// Start refusing RPCs now that we've left the LAN pool. It's important
// to do this *after* we've left the LAN pool so that clients will know
// to shift onto another server if they perform a retry. We also wake up
// all queries in the RPC retry state.
s . logger . Printf ( "[INFO] consul: Waiting %s to drain RPC traffic" , s . config . LeaveDrainTime )
close ( s . leaveCh )
time . Sleep ( s . config . LeaveDrainTime )
2016-07-28 19:11:28 +00:00
// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
2015-01-21 00:19:54 +00:00
if ! isLeader {
2016-07-28 19:11:28 +00:00
left := false
2015-01-21 00:19:54 +00:00
limit := time . Now ( ) . Add ( raftRemoveGracePeriod )
2016-07-28 19:11:28 +00:00
for ! left && time . Now ( ) . Before ( limit ) {
// Sleep a while before we check.
time . Sleep ( 50 * time . Millisecond )
2015-01-21 00:19:54 +00:00
2016-07-28 19:11:28 +00:00
// Get the latest configuration.
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: failed to get raft configuration: %v" , err )
2015-01-21 00:19:54 +00:00
break
}
2016-07-28 19:11:28 +00:00
// See if we are no longer included.
left = true
for _ , server := range future . Configuration ( ) . Servers {
if server . Address == addr {
left = false
break
}
}
2015-01-21 00:19:54 +00:00
}
2016-07-28 19:11:28 +00:00
2016-08-01 21:32:53 +00:00
// TODO (slackpad) With the old Raft library we used to force the
// peers set to empty when a graceful leave occurred. This would
// keep voting spam down if the server was restarted, but it was
// dangerous because the peers was inconsistent with the logs and
// snapshots, so it wasn't really safe in all cases for the server
// to become leader. This is now safe, but the log spam is noisy.
// The next new version of the library will have a "you are not a
// peer stop it" behavior that should address this. We will have
// to evaluate during the RC period if this interim situation is
// not too confusing for operators.
2016-07-28 19:11:28 +00:00
// TODO (slackpad) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if ! left {
s . logger . Printf ( "[WARN] consul: failed to leave raft configuration gracefully, timeout" )
2015-01-21 00:19:54 +00:00
}
}
2013-12-09 20:10:27 +00:00
return nil
}
2017-09-25 22:27:04 +00:00
// numPeers is used to check on the number of known peers, including potentially
// the local node. We count only voters, since others can't actually become
// leader, so aren't considered peers.
2016-07-28 19:11:28 +00:00
func ( s * Server ) numPeers ( ) ( int , error ) {
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
2015-01-21 00:19:29 +00:00
return 0 , err
}
2017-09-25 22:27:04 +00:00
var numPeers int
for _ , server := range future . Configuration ( ) . Servers {
if server . Suffrage == raft . Voter {
numPeers ++
}
}
return numPeers , nil
2015-01-21 00:19:29 +00:00
}
2013-12-07 01:18:09 +00:00
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
2013-12-30 20:20:17 +00:00
func ( s * Server ) JoinLAN ( addrs [ ] string ) ( int , error ) {
2014-02-21 00:27:03 +00:00
return s . serfLAN . Join ( addrs , true )
2013-12-07 01:18:09 +00:00
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
2013-12-30 20:20:17 +00:00
func ( s * Server ) JoinWAN ( addrs [ ] string ) ( int , error ) {
2014-02-21 00:27:03 +00:00
return s . serfWAN . Join ( addrs , true )
2013-12-07 01:18:09 +00:00
}
2013-12-10 21:17:54 +00:00
2014-05-25 23:59:48 +00:00
// LocalMember is used to return the local node
2017-04-20 18:51:27 +00:00
func ( s * Server ) LocalMember ( ) serf . Member {
return s . serfLAN . LocalMember ( )
2014-05-25 23:59:48 +00:00
}
2013-12-10 21:17:54 +00:00
// LANMembers is used to return the members of the LAN cluster
func ( s * Server ) LANMembers ( ) [ ] serf . Member {
return s . serfLAN . Members ( )
}
// WANMembers is used to return the members of the LAN cluster
func ( s * Server ) WANMembers ( ) [ ] serf . Member {
return s . serfWAN . Members ( )
}
2013-12-10 22:42:29 +00:00
// RemoveFailedNode is used to remove a failed node from the cluster
func ( s * Server ) RemoveFailedNode ( node string ) error {
if err := s . serfLAN . RemoveFailedNode ( node ) ; err != nil {
return err
}
if err := s . serfWAN . RemoveFailedNode ( node ) ; err != nil {
return err
}
return nil
}
2013-12-11 22:04:44 +00:00
// IsLeader checks if this server is the cluster leader
func ( s * Server ) IsLeader ( ) bool {
return s . raft . State ( ) == raft . Leader
}
2013-12-19 23:18:25 +00:00
2014-09-09 01:09:51 +00:00
// KeyManagerLAN returns the LAN Serf keyring manager
func ( s * Server ) KeyManagerLAN ( ) * serf . KeyManager {
return s . serfLAN . KeyManager ( )
}
// KeyManagerWAN returns the WAN Serf keyring manager
func ( s * Server ) KeyManagerWAN ( ) * serf . KeyManager {
return s . serfWAN . KeyManager ( )
}
2014-10-04 02:20:58 +00:00
// Encrypted determines if gossip is encrypted
func ( s * Server ) Encrypted ( ) bool {
return s . serfLAN . EncryptionEnabled ( ) && s . serfWAN . EncryptionEnabled ( )
}
2017-08-14 14:36:07 +00:00
// LANSegments returns a map of LAN segments by name
func ( s * Server ) LANSegments ( ) map [ string ] * serf . Serf {
2017-08-30 23:44:04 +00:00
segments := make ( map [ string ] * serf . Serf , len ( s . segmentLAN ) + 1 )
segments [ "" ] = s . serfLAN
2017-08-14 14:36:07 +00:00
for name , segment := range s . segmentLAN {
segments [ name ] = segment
}
return segments
}
2014-06-11 02:12:26 +00:00
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface { }
reply interface { }
err error
}
func ( i * inmemCodec ) ReadRequestHeader ( req * rpc . Request ) error {
req . ServiceMethod = i . method
return nil
}
func ( i * inmemCodec ) ReadRequestBody ( args interface { } ) error {
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . args ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( args ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) WriteResponse ( resp * rpc . Response , reply interface { } ) error {
if resp . Error != "" {
i . err = errors . New ( resp . Error )
return nil
}
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( reply ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . reply ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) Close ( ) error {
return nil
}
2013-12-19 23:18:25 +00:00
// RPC is used to make a local RPC call
func ( s * Server ) RPC ( method string , args interface { } , reply interface { } ) error {
2014-06-11 02:12:26 +00:00
codec := & inmemCodec {
method : method ,
args : args ,
reply : reply ,
}
if err := s . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
return codec . err
2013-12-19 23:18:25 +00:00
}
2014-02-24 00:37:33 +00:00
2016-10-26 02:20:24 +00:00
// SnapshotRPC dispatches the given snapshot request, reading from the streaming
// input and writing to the streaming output depending on the operation.
func ( s * Server ) SnapshotRPC ( args * structs . SnapshotRequest , in io . Reader , out io . Writer ,
2017-06-15 09:50:28 +00:00
replyFn structs . SnapshotReplyFn ) error {
2016-10-26 02:20:24 +00:00
// Perform the operation.
var reply structs . SnapshotResponse
snap , err := s . dispatchSnapshotRequest ( args , in , & reply )
if err != nil {
return err
}
defer func ( ) {
if err := snap . Close ( ) ; err != nil {
s . logger . Printf ( "[ERR] consul: Failed to close snapshot: %v" , err )
}
} ( )
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn ( & reply ) ; err != nil {
return nil
}
}
// Stream the snapshot.
if out != nil {
if _ , err := io . Copy ( out , snap ) ; err != nil {
return fmt . Errorf ( "failed to stream snapshot: %v" , err )
}
}
return nil
}
2017-06-16 07:54:09 +00:00
// RegisterEndpoint is used to substitute an endpoint for testing.
func ( s * Server ) RegisterEndpoint ( name string , handler interface { } ) error {
2015-11-12 01:27:25 +00:00
s . logger . Printf ( "[WARN] consul: endpoint injected; this should only be used for testing" )
2017-06-16 07:54:09 +00:00
return s . rpcServer . RegisterName ( name , handler )
2015-11-12 01:27:25 +00:00
}
2014-02-24 00:37:33 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( s * Server ) Stats ( ) map [ string ] map [ string ] string {
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
2017-03-14 01:54:34 +00:00
numKnownDCs := len ( s . router . GetDatacenters ( ) )
2014-02-24 00:37:33 +00:00
stats := map [ string ] map [ string ] string {
"consul" : map [ string ] string {
2014-02-24 02:08:58 +00:00
"server" : "true" ,
"leader" : fmt . Sprintf ( "%v" , s . IsLeader ( ) ) ,
2016-07-28 19:11:28 +00:00
"leader_addr" : string ( s . raft . Leader ( ) ) ,
2014-02-24 02:08:58 +00:00
"bootstrap" : fmt . Sprintf ( "%v" , s . config . Bootstrap ) ,
2016-06-20 20:50:59 +00:00
"known_datacenters" : toString ( uint64 ( numKnownDCs ) ) ,
2014-02-24 00:37:33 +00:00
} ,
2014-02-24 02:08:58 +00:00
"raft" : s . raft . Stats ( ) ,
2014-03-09 22:46:03 +00:00
"serf_lan" : s . serfLAN . Stats ( ) ,
"serf_wan" : s . serfWAN . Stats ( ) ,
2014-04-29 17:55:42 +00:00
"runtime" : runtimeStats ( ) ,
2014-02-24 00:37:33 +00:00
}
return stats
}
2015-04-15 23:12:45 +00:00
2015-06-06 03:31:33 +00:00
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
2017-08-14 14:36:07 +00:00
func ( s * Server ) GetLANCoordinate ( ) ( lib . CoordinateSet , error ) {
lan , err := s . serfLAN . GetCoordinate ( )
if err != nil {
return nil , err
}
cs := lib . CoordinateSet { "" : lan }
for name , segment := range s . segmentLAN {
c , err := segment . GetCoordinate ( )
if err != nil {
return nil , err
}
cs [ name ] = c
}
return cs , nil
2015-04-15 23:12:45 +00:00
}
2015-05-08 08:31:34 +00:00
2015-06-06 03:31:33 +00:00
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
2015-06-29 22:53:29 +00:00
func ( s * Server ) GetWANCoordinate ( ) ( * coordinate . Coordinate , error ) {
2015-05-08 08:31:34 +00:00
return s . serfWAN . GetCoordinate ( )
}
2016-08-09 18:56:39 +00:00
2017-06-16 03:41:30 +00:00
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func ( s * Server ) setConsistentReadReady ( ) {
2017-06-16 15:49:54 +00:00
atomic . StoreInt32 ( & s . readyForConsistentReads , 1 )
2017-06-16 03:41:30 +00:00
}
// Atomically reset readiness state flag on leadership revoke
func ( s * Server ) resetConsistentReadReady ( ) {
2017-06-16 15:49:54 +00:00
atomic . StoreInt32 ( & s . readyForConsistentReads , 0 )
2017-06-16 03:41:30 +00:00
}
// Returns true if this server is ready to serve consistent reads
func ( s * Server ) isReadyForConsistentReads ( ) bool {
2017-06-21 00:43:07 +00:00
return atomic . LoadInt32 ( & s . readyForConsistentReads ) == 1
2017-06-16 03:41:30 +00:00
}
2016-08-09 18:56:39 +00:00
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
const peersInfoContent = `
As of Consul 0.7 .0 , the peers . json file is only used for recovery
2017-05-04 21:15:59 +00:00
after an outage . The format of this file depends on what the server has
configured for its Raft protocol version . Please see the agent configuration
page at https : //www.consul.io/docs/agent/options.html#_raft_protocol for more
details about this parameter .
For Raft protocol version 2 and earlier , this should be formatted as a JSON
array containing the address and port of each Consul server in the cluster , like
this :
[
"10.1.0.1:8300" ,
"10.1.0.2:8300" ,
"10.1.0.3:8300"
]
For Raft protocol version 3 and later , this should be formatted as a JSON
array containing the node ID , address : port , and suffrage information of each
Consul server in the cluster , like this :
[
{
"id" : "adf4238a-882b-9ddc-4a9d-5b6758e4159e" ,
"address" : "10.1.0.1:8300" ,
"non_voter" : false
} ,
{
"id" : "8b6dda82-3103-11e7-93ae-92361f002671" ,
"address" : "10.1.0.2:8300" ,
"non_voter" : false
} ,
{
"id" : "97e17742-3103-11e7-93ae-92361f002671" ,
"address" : "10.1.0.3:8300" ,
"non_voter" : false
}
]
The "id" field is the node ID of the server . This can be found in the logs when
the server starts up , or in the "node-id" file inside the server ' s data
directory .
The "address" field is the address and port of the server .
The "non_voter" field controls whether the server is a non - voter , which is used
in some advanced Autopilot configurations , please see
https : //www.consul.io/docs/guides/autopilot.html for more information. If
"non_voter" is omitted it will default to false , which is typical for most
clusters .
2016-08-09 18:56:39 +00:00
Under normal operation , the peers . json file will not be present .
When Consul starts for the first time , it will create this peers . info file and
delete any existing peers . json file so that recovery doesn ' t occur on the first
startup .
Once this peers . info file is present , any peers . json file will be ingested at
startup , and will set the Raft peer configuration manually to recover from an
outage . It ' s crucial that all servers in the cluster are shut down before
creating the peers . json file , and that all servers receive the same
configuration . Once the peers . json file is successfully ingested and applied , it
will be deleted .
Please see https : //www.consul.io/docs/guides/outage.html for more information.
`