2013-12-06 23:43:07 +00:00
package consul
import (
2020-07-28 19:31:48 +00:00
"context"
2014-06-11 02:12:26 +00:00
"errors"
2013-12-06 23:43:07 +00:00
"fmt"
2016-10-26 02:20:24 +00:00
"io"
2016-07-30 07:54:08 +00:00
"io/ioutil"
2013-12-07 00:35:13 +00:00
"net"
"net/rpc"
2013-12-06 23:43:07 +00:00
"os"
"path/filepath"
2014-06-11 02:12:26 +00:00
"reflect"
2014-02-24 00:37:33 +00:00
"strconv"
2019-06-28 17:40:07 +00:00
"strings"
2013-12-06 23:43:07 +00:00
"sync"
2017-06-16 03:41:30 +00:00
"sync/atomic"
2017-06-16 15:49:54 +00:00
"time"
2017-06-16 03:41:30 +00:00
2020-11-13 02:12:12 +00:00
"github.com/armon/go-metrics"
2020-09-28 22:52:31 +00:00
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/raft"
2020-09-25 17:46:38 +00:00
autopilot "github.com/hashicorp/raft-autopilot"
2020-09-28 22:52:31 +00:00
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
"google.golang.org/grpc"
2019-10-15 20:58:50 +00:00
"github.com/hashicorp/consul/acl"
2019-10-24 18:38:09 +00:00
"github.com/hashicorp/consul/agent/consul/authmethod"
2020-06-05 19:56:19 +00:00
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
2017-11-29 02:01:17 +00:00
"github.com/hashicorp/consul/agent/consul/fsm"
pkg refactor
command/agent/* -> agent/*
command/consul/* -> agent/consul/*
command/agent/command{,_test}.go -> command/agent{,_test}.go
command/base/command.go -> command/base.go
command/base/* -> command/*
commands.go -> command/commands.go
The script which did the refactor is:
(
cd $GOPATH/src/github.com/hashicorp/consul
git mv command/agent/command.go command/agent.go
git mv command/agent/command_test.go command/agent_test.go
git mv command/agent/flag_slice_value{,_test}.go command/
git mv command/agent .
git mv command/base/command.go command/base.go
git mv command/base/config_util{,_test}.go command/
git mv commands.go command/
git mv consul agent
rmdir command/base/
gsed -i -e 's|package agent|package command|' command/agent{,_test}.go
gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go
gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go
gsed -i -e 's|package main|package command|' command/commands.go
gsed -i -e 's|base.Command|BaseCommand|' command/commands.go
gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go
gsed -i -e 's|base\.||' command/commands.go
gsed -i -e 's|command\.||' command/commands.go
gsed -i -e 's|command|c|' main.go
gsed -i -e 's|range Commands|range command.Commands|' main.go
gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go
gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go
gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go
gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go
gsed -i -e 's|base.Command|BaseCommand|' command/*.go
gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go
gsed -i -e 's|base\.||' command/*_test.go
gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go
gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go
gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go
gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go
gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go
gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go
gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go
gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go
gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go
gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go
gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go
gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go
gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go
gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go
gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go
gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go
gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile
gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go
# fix imports
f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f
goimports -w $f
f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f
goimports -w $f
goimports -w command/*.go main.go
)
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/state"
2020-09-02 15:24:17 +00:00
"github.com/hashicorp/consul/agent/consul/usagemetrics"
2021-04-30 21:55:22 +00:00
"github.com/hashicorp/consul/agent/consul/wanfed"
2020-09-08 21:31:47 +00:00
agentgrpc "github.com/hashicorp/consul/agent/grpc"
2017-07-06 10:48:37 +00:00
"github.com/hashicorp/consul/agent/metadata"
2017-06-15 13:16:16 +00:00
"github.com/hashicorp/consul/agent/pool"
2017-07-06 10:40:54 +00:00
"github.com/hashicorp/consul/agent/router"
2020-10-02 15:58:18 +00:00
"github.com/hashicorp/consul/agent/rpc/subscribe"
2017-07-06 10:34:00 +00:00
"github.com/hashicorp/consul/agent/structs"
2017-08-03 22:39:31 +00:00
"github.com/hashicorp/consul/agent/token"
2017-01-18 06:20:11 +00:00
"github.com/hashicorp/consul/lib"
2021-05-20 14:07:23 +00:00
"github.com/hashicorp/consul/lib/routine"
2020-01-28 23:50:41 +00:00
"github.com/hashicorp/consul/logging"
2020-09-08 21:31:47 +00:00
"github.com/hashicorp/consul/proto/pbsubscribe"
2015-05-08 22:57:37 +00:00
"github.com/hashicorp/consul/tlsutil"
2017-03-14 01:54:34 +00:00
"github.com/hashicorp/consul/types"
2013-12-06 23:43:07 +00:00
)
2020-11-13 02:12:12 +00:00
// NOTE The "consul.client.rpc" and "consul.client.rpc.exceeded" counters are defined in consul/client.go
2014-03-09 22:18:36 +00:00
// These are the protocol versions that Consul can _understand_. These are
// Consul-level protocol versions, that are used to configure the Serf
// protocol versions.
const (
2020-05-29 21:16:03 +00:00
DefaultRPCProtocol = 2
2016-08-10 01:10:04 +00:00
ProtocolVersionMin uint8 = 2
2015-10-16 02:28:31 +00:00
// Version 3 added support for network coordinates but we kept the
// default protocol version at 2 to ease the transition to this new
// feature. A Consul agent speaking version 2 of the protocol will
// attempt to send its coordinates to a server who understands version
// 3 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 3
2014-03-09 22:18:36 +00:00
)
2013-12-06 23:43:07 +00:00
const (
2015-01-30 05:55:11 +00:00
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
snapshotsRetained = 2
2014-05-27 21:33:09 +00:00
2015-01-14 23:49:58 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2015-09-11 19:24:54 +00:00
// This is used to reduce disk I/O for the recently committed entries.
2015-01-14 23:49:58 +00:00
raftLogCacheSize = 512
2015-01-21 00:19:54 +00:00
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time . Second
2018-08-08 17:26:58 +00:00
// serfEventChSize is the size of the buffered channel to get Serf
// events. If this is exhausted we will block Serf and Memberlist.
serfEventChSize = 2048
// reconcileChSize is the size of the buffered channel reconcile updates
// from Serf with the Catalog. If this is exhausted we will drop updates,
// and wait for a periodic reconcile.
reconcileChSize = 256
2013-12-06 23:43:07 +00:00
)
2019-10-04 17:08:45 +00:00
const (
2020-03-09 20:59:02 +00:00
aclPolicyReplicationRoutineName = "ACL policy replication"
aclRoleReplicationRoutineName = "ACL role replication"
aclTokenReplicationRoutineName = "ACL token replication"
aclTokenReapingRoutineName = "acl token reaping"
aclUpgradeRoutineName = "legacy ACL token upgrade"
caRootPruningRoutineName = "CA root pruning"
2021-03-24 21:40:10 +00:00
caRootMetricRoutineName = "CA root expiration metric"
2021-07-07 13:41:01 +00:00
caSigningMetricRoutineName = "CA signing expiration metric"
2020-03-09 20:59:02 +00:00
configReplicationRoutineName = "config entry replication"
federationStateReplicationRoutineName = "federation state replication"
federationStateAntiEntropyRoutineName = "federation state anti-entropy"
federationStatePruningRoutineName = "federation state pruning"
2020-10-06 18:24:05 +00:00
intentionMigrationRoutineName = "intention config entry migration"
2020-03-09 20:59:02 +00:00
secondaryCARootWatchRoutineName = "secondary CA roots watch"
2020-09-30 19:31:21 +00:00
intermediateCertRenewWatchRoutineName = "intermediate cert renew watch"
2021-01-15 18:20:27 +00:00
backgroundCAInitializationRoutineName = "CA initialization"
2019-10-04 17:08:45 +00:00
)
2018-03-26 19:21:06 +00:00
var (
ErrWANFederationDisabled = fmt . Errorf ( "WAN Federation is disabled" )
)
2013-12-06 23:43:07 +00:00
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
2020-02-10 18:01:15 +00:00
// queriesBlocking is a counter that we incr and decr atomically in
// rpc calls to provide telemetry on how many blocking queries are running.
// We interact with queriesBlocking atomically, do not move without ensuring it is
// correctly 64-byte aligned in the struct layout
queriesBlocking uint64
2020-01-13 20:51:40 +00:00
// aclConfig is the configuration for the ACL system
aclConfig * acl . Config
2017-09-14 19:31:01 +00:00
2018-10-19 16:04:07 +00:00
// acls is used to resolve tokens to effective policies
acls * ACLResolver
2014-08-08 22:32:43 +00:00
2019-10-24 18:38:09 +00:00
aclAuthMethodValidators authmethod . Cache
2019-04-26 17:49:28 +00:00
2017-12-12 00:38:52 +00:00
// autopilot is the Autopilot instance for this server.
autopilot * autopilot . Autopilot
2017-03-08 19:31:32 +00:00
2020-11-12 01:05:04 +00:00
// caManager is used to synchronize CA operations across the leader and RPC functions.
caManager * CAManager
2018-04-09 04:58:31 +00:00
2020-06-29 19:52:47 +00:00
// rate limiter to use when signing leaf certificates
caLeafLimiter connectSignRateLimiter
2014-08-08 22:32:43 +00:00
// Consul configuration
2013-12-06 23:43:07 +00:00
config * Config
2019-04-26 17:38:39 +00:00
// configReplicator is used to manage the leaders replication routines for
// centralized config
configReplicator * Replicator
2020-03-09 20:59:02 +00:00
// federationStateReplicator is used to manage the leaders replication routines for
// federation states
federationStateReplicator * Replicator
2020-06-04 21:05:27 +00:00
// dcSupportsFederationStates is used to determine whether we can
// replicate federation states or not. All servers in the local
// DC must be on a version of Consul supporting federation states
// before this will get enabled.
dcSupportsFederationStates int32
2017-08-03 22:39:31 +00:00
// tokens holds ACL tokens initially from the configuration, but can
// be updated at runtime, so should always be used instead of going to
// the configuration directly.
tokens * token . Store
2013-12-09 20:09:57 +00:00
// Connection pool to other consul servers
2017-06-15 13:16:16 +00:00
connPool * pool . ConnPool
2013-12-09 20:09:57 +00:00
2021-08-24 21:28:44 +00:00
// Connection pool to other consul servers using gRPC
grpcConnPool GRPCClientConner
2013-12-07 00:05:26 +00:00
// eventChLAN is used to receive events from the
// serf cluster in the datacenter
eventChLAN chan serf . Event
2013-12-06 23:43:07 +00:00
2013-12-07 00:05:26 +00:00
// eventChWAN is used to receive events from the
// serf cluster that spans datacenters
eventChWAN chan serf . Event
2013-12-06 23:43:07 +00:00
// fsm is the state machine used with Raft to provide
// strong consistency.
2017-11-29 02:01:17 +00:00
fsm * fsm . FSM
2013-12-06 23:43:07 +00:00
// Logger uses the provided LogOutput
2020-01-28 23:50:41 +00:00
logger hclog . InterceptLogger
loggers * loggerStore
2013-12-06 23:43:07 +00:00
2016-07-29 16:21:30 +00:00
// The raft instance is used among Consul nodes within the DC to protect
2016-08-01 21:32:53 +00:00
// operations that require strong consistency.
2016-07-29 16:32:49 +00:00
// the state directly.
2013-12-09 22:20:56 +00:00
raft * raft . Raft
raftLayer * RaftLayer
2015-01-30 05:55:11 +00:00
raftStore * raftboltdb . BoltStore
2013-12-09 22:20:56 +00:00
raftTransport * raft . NetworkTransport
2015-11-29 04:40:05 +00:00
raftInmem * raft . InmemStore
2013-12-06 23:43:07 +00:00
2017-07-06 14:09:21 +00:00
// raftNotifyCh is set up by setupRaft() and ensures that we get reliable leader
2017-04-13 21:17:32 +00:00
// transition notifications from the Raft layer.
2017-07-06 14:09:21 +00:00
raftNotifyCh <- chan bool
2017-04-13 21:17:32 +00:00
2014-01-10 20:55:55 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
reconcileCh chan serf . Member
2017-10-10 22:19:50 +00:00
// readyForConsistentReads is used to track when the leader server is
// ready to serve consistent reads, after it has applied its initial
// barrier. This is updated atomically.
2017-06-16 15:49:54 +00:00
readyForConsistentReads int32
2017-06-16 03:41:30 +00:00
2017-10-10 22:19:50 +00:00
// leaveCh is used to signal that the server is leaving the cluster
// and trying to shed its RPC traffic onto other Consul servers. This
// is only ever closed.
leaveCh chan struct { }
2017-03-14 01:54:34 +00:00
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
2017-07-06 10:40:54 +00:00
router * router . Router
2013-12-12 00:24:34 +00:00
2019-06-13 09:26:27 +00:00
// rpcLimiter is used to rate limit the total number of RPCs initiated
// from an agent.
rpcLimiter atomic . Value
2020-01-31 16:19:37 +00:00
// rpcConnLimiter limits the number of RPC connections from a single source IP
rpcConnLimiter connlimit . Limiter
2017-06-25 19:36:03 +00:00
// Listener is used to listen for incoming connections
2020-07-22 23:57:29 +00:00
Listener net . Listener
grpcHandler connHandler
rpcServer * rpc . Server
2013-12-07 00:35:13 +00:00
2019-06-27 20:22:07 +00:00
// insecureRPCServer is a RPC server that is configure with
// IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign
// to request client certificates. At this point a client doesn't have
// a client cert and thus cannot present it. This is the only RPC
// Endpoint that is available at the time of writing.
insecureRPCServer * rpc . Server
// tlsConfigurator holds the agent configuration relevant to TLS and
// configures everything related to it.
tlsConfigurator * tlsutil . Configurator
2014-04-04 23:34:23 +00:00
2013-12-07 00:05:26 +00:00
// serfLAN is the Serf cluster maintained inside the DC
2013-12-06 23:43:07 +00:00
// which contains all the DC nodes
2013-12-07 00:05:26 +00:00
serfLAN * serf . Serf
2013-12-06 23:43:07 +00:00
2017-08-14 14:36:07 +00:00
// segmentLAN maps segment names to their Serf cluster
2017-09-01 00:39:46 +00:00
segmentLAN map [ string ] * serf . Serf
2017-08-29 00:58:22 +00:00
2013-12-07 00:05:26 +00:00
// serfWAN is the Serf cluster maintained between DC's
2013-12-06 23:43:07 +00:00
// which SHOULD only consist of Consul servers
2020-03-09 20:59:02 +00:00
serfWAN * serf . Serf
2021-04-30 21:55:22 +00:00
memberlistTransportWAN wanfed . IngestionAwareTransport
2020-03-09 20:59:02 +00:00
gatewayLocator * GatewayLocator
2013-12-06 23:43:07 +00:00
2017-08-30 15:02:10 +00:00
// serverLookup tracks server consuls in the local datacenter.
// Used to do leader forwarding and provide fast lookup by server id and address
2017-08-30 03:35:22 +00:00
serverLookup * ServerLookup
2017-08-30 00:37:48 +00:00
2021-07-22 18:58:08 +00:00
// grpcLeaderForwarder is notified on leader change in order to keep the grpc
// resolver up to date.
grpcLeaderForwarder LeaderForwarder
2017-03-15 23:11:19 +00:00
// floodLock controls access to floodCh.
floodLock sync . RWMutex
floodCh [ ] chan struct { }
2017-03-15 19:26:54 +00:00
2014-11-25 16:06:14 +00:00
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
2015-09-11 19:24:54 +00:00
// destroy the session via standard session destroy processing
2017-06-27 13:25:25 +00:00
sessionTimers * SessionTimers
2014-11-25 16:06:14 +00:00
2017-03-18 01:42:28 +00:00
// statsFetcher is used by autopilot to check the status of the other
2017-07-06 10:40:54 +00:00
// Consul router.
2017-03-18 01:42:28 +00:00
statsFetcher * StatsFetcher
2017-05-04 03:31:14 +00:00
// reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore.
2017-05-04 18:52:22 +00:00
reassertLeaderCh chan chan error
2017-05-04 03:31:14 +00:00
2014-12-11 07:49:44 +00:00
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
2015-10-12 07:42:09 +00:00
tombstoneGC * state . TombstoneGC
2014-12-11 07:49:44 +00:00
2016-08-05 04:32:36 +00:00
// aclReplicationStatus (and its associated lock) provide information
// about the health of the ACL replication goroutine.
aclReplicationStatus structs . ACLReplicationStatus
aclReplicationStatusLock sync . RWMutex
2016-08-03 05:04:11 +00:00
// shutdown and the associated members here are used in orchestrating
// a clean shutdown. The shutdownCh is never written to, only closed to
2016-08-04 00:01:32 +00:00
// indicate a shutdown has been initiated.
2013-12-06 23:43:07 +00:00
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
2018-05-24 14:36:42 +00:00
2020-10-06 18:24:05 +00:00
// dcSupportsIntentionsAsConfigEntries is used to determine whether we can
// migrate old intentions into service-intentions config entries. All
// servers in the local DC must be on a version of Consul supporting
// service-intentions before this will get enabled.
dcSupportsIntentionsAsConfigEntries int32
2019-10-04 17:08:45 +00:00
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
2021-05-20 14:07:23 +00:00
leaderRoutineManager * routine . Manager
2019-10-04 17:08:45 +00:00
2018-05-24 14:36:42 +00:00
// embedded struct to hold all the enterprise specific data
EnterpriseServer
2013-12-06 23:43:07 +00:00
}
2020-07-22 23:57:29 +00:00
type connHandler interface {
Run ( ) error
Handle ( conn net . Conn )
Shutdown ( ) error
}
2020-08-05 17:20:12 +00:00
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
2020-09-14 22:31:07 +00:00
func NewServer ( config * Config , flat Deps ) ( * Server , error ) {
logger := flat . Logger
2017-05-03 19:02:01 +00:00
if err := config . CheckProtocolVersion ( ) ; err != nil {
2014-03-09 22:18:36 +00:00
return nil , err
}
2015-11-29 04:40:05 +00:00
if config . DataDir == "" && ! config . DevMode {
2013-12-06 23:43:07 +00:00
return nil , fmt . Errorf ( "Config must provide a DataDir" )
}
2014-08-05 22:20:35 +00:00
if err := config . CheckACL ( ) ; err != nil {
return nil , err
}
2013-12-06 23:43:07 +00:00
2016-08-03 05:04:11 +00:00
// Create the tombstone GC.
2015-10-12 07:42:09 +00:00
gc , err := state . NewTombstoneGC ( config . TombstoneTTL , config . TombstoneTTLGranularity )
2014-12-11 07:49:44 +00:00
if err != nil {
return nil , err
}
2017-03-14 01:54:34 +00:00
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make ( chan struct { } )
2020-09-14 22:31:07 +00:00
serverLogger := flat . Logger . NamedIntercept ( logging . ConsulServer )
2020-01-28 23:50:41 +00:00
loggers := newLoggerStore ( serverLogger )
2020-08-27 15:23:52 +00:00
2016-08-03 05:04:11 +00:00
// Create server.
2013-12-06 23:43:07 +00:00
s := & Server {
2019-10-24 18:38:09 +00:00
config : config ,
2020-09-14 22:31:07 +00:00
tokens : flat . Tokens ,
connPool : flat . ConnPool ,
2021-08-24 21:28:44 +00:00
grpcConnPool : flat . GRPCConnPool ,
2019-10-24 18:38:09 +00:00
eventChLAN : make ( chan serf . Event , serfEventChSize ) ,
eventChWAN : make ( chan serf . Event , serfEventChSize ) ,
2020-01-28 23:50:41 +00:00
logger : serverLogger ,
loggers : loggers ,
2019-10-24 18:38:09 +00:00
leaveCh : make ( chan struct { } ) ,
reconcileCh : make ( chan serf . Member , reconcileChSize ) ,
2020-09-14 22:31:07 +00:00
router : flat . Router ,
2019-10-24 18:38:09 +00:00
rpcServer : rpc . NewServer ( ) ,
insecureRPCServer : rpc . NewServer ( ) ,
2020-09-14 22:31:07 +00:00
tlsConfigurator : flat . TLSConfigurator ,
2019-10-24 18:38:09 +00:00
reassertLeaderCh : make ( chan chan error ) ,
segmentLAN : make ( map [ string ] * serf . Serf , len ( config . Segments ) ) ,
sessionTimers : NewSessionTimers ( ) ,
tombstoneGC : gc ,
serverLookup : NewServerLookup ( ) ,
shutdownCh : shutdownCh ,
2021-05-20 14:07:23 +00:00
leaderRoutineManager : routine . NewManager ( logger . Named ( logging . Leader ) ) ,
2019-10-24 18:38:09 +00:00
aclAuthMethodValidators : authmethod . NewCache ( ) ,
2020-10-23 19:21:37 +00:00
fsm : newFSMFromConfig ( flat . Logger , gc , config ) ,
2013-12-06 23:43:07 +00:00
}
2017-03-21 23:36:44 +00:00
2020-03-09 20:59:02 +00:00
if s . config . ConnectMeshGatewayWANFederationEnabled {
s . gatewayLocator = NewGatewayLocator (
s . logger ,
s ,
s . config . Datacenter ,
s . config . PrimaryDatacenter ,
)
s . connPool . GatewayResolver = s . gatewayLocator . PickGateway
2021-08-24 21:28:44 +00:00
s . grpcConnPool . SetGatewayResolver ( s . gatewayLocator . PickGateway )
2020-03-09 20:59:02 +00:00
}
2018-05-24 14:36:42 +00:00
// Initialize enterprise specific server functionality
2021-05-11 14:50:03 +00:00
if err := s . initEnterprise ( flat ) ; err != nil {
2018-05-24 14:36:42 +00:00
s . Shutdown ( )
return nil , err
}
2020-09-16 17:29:59 +00:00
s . rpcLimiter . Store ( rate . NewLimiter ( config . RPCRateLimit , config . RPCMaxBurst ) )
2019-06-13 09:26:27 +00:00
2019-04-26 17:38:39 +00:00
configReplicatorConfig := ReplicatorConfig {
2020-01-28 23:50:41 +00:00
Name : logging . ConfigEntry ,
2021-04-22 15:20:53 +00:00
Delegate : & FunctionReplicator { ReplicateFn : s . replicateConfig , Name : "config-entries" } ,
2019-10-28 17:49:57 +00:00
Rate : s . config . ConfigReplicationRate ,
Burst : s . config . ConfigReplicationBurst ,
2020-01-29 17:16:08 +00:00
Logger : s . logger ,
2019-04-26 17:38:39 +00:00
}
s . configReplicator , err = NewReplicator ( & configReplicatorConfig )
if err != nil {
s . Shutdown ( )
return nil , err
}
2020-03-09 20:59:02 +00:00
federationStateReplicatorConfig := ReplicatorConfig {
Name : logging . FederationState ,
Delegate : & IndexReplicator {
2020-05-27 16:31:10 +00:00
Delegate : & FederationStateReplicator {
srv : s ,
gatewayLocator : s . gatewayLocator ,
} ,
2020-09-24 20:49:38 +00:00
Logger : s . loggers . Named ( logging . Replication ) . Named ( logging . FederationState ) ,
2020-03-09 20:59:02 +00:00
} ,
2020-06-04 21:05:27 +00:00
Rate : s . config . FederationStateReplicationRate ,
Burst : s . config . FederationStateReplicationBurst ,
2020-09-24 20:49:38 +00:00
Logger : s . logger ,
2020-06-04 21:05:27 +00:00
SuppressErrorLog : isErrFederationStatesNotSupported ,
2020-03-09 20:59:02 +00:00
}
s . federationStateReplicator , err = NewReplicator ( & federationStateReplicatorConfig )
if err != nil {
s . Shutdown ( )
return nil , err
}
2017-03-18 01:42:28 +00:00
// Initialize the stats fetcher that autopilot will use.
2017-03-20 03:48:42 +00:00
s . statsFetcher = NewStatsFetcher ( logger , s . connPool , s . config . Datacenter )
2017-03-18 01:42:28 +00:00
2020-01-13 20:51:40 +00:00
s . aclConfig = newACLConfig ( logger )
2018-10-19 16:04:07 +00:00
aclConfig := ACLResolverConfig {
2021-08-06 22:39:39 +00:00
Config : config . ACLResolverSettings ,
2020-01-13 20:51:40 +00:00
Delegate : s ,
CacheConfig : serverACLCacheConfig ,
Logger : logger ,
ACLConfig : s . aclConfig ,
2021-04-14 16:39:35 +00:00
Tokens : flat . Tokens ,
2014-08-08 22:32:43 +00:00
}
2018-10-19 16:04:07 +00:00
// Initialize the ACL resolver.
if s . acls , err = NewACLResolver ( & aclConfig ) ; err != nil {
2014-08-08 22:32:43 +00:00
s . Shutdown ( )
2018-10-19 16:04:07 +00:00
return nil , fmt . Errorf ( "Failed to create ACL resolver: %v" , err )
2014-08-09 00:38:39 +00:00
}
2016-08-03 05:04:11 +00:00
// Initialize the RPC layer.
2019-06-27 20:22:07 +00:00
if err := s . setupRPC ( ) ; err != nil {
2014-06-11 17:17:58 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start RPC layer: %v" , err )
}
2017-09-01 00:39:46 +00:00
// Initialize any extra RPC listeners for segments.
segmentListeners , err := s . setupSegmentRPC ( )
if err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start segment RPC layer: %v" , err )
}
2016-08-03 05:04:11 +00:00
// Initialize the Raft server.
2013-12-09 23:29:01 +00:00
if err := s . setupRaft ( ) ; err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start Raft: %v" , err )
}
2021-06-21 21:51:37 +00:00
s . caManager = NewCAManager ( & caDelegateWithState { s } , s . leaderRoutineManager , s . logger . ResetNamed ( "connect.ca" ) , s . config )
2020-07-28 19:31:48 +00:00
if s . config . ConnectEnabled && ( s . config . AutoEncryptAllowTLS || s . config . AutoConfigAuthzEnabled ) {
go s . connectCARootsMonitor ( & lib . StopChannelContext { StopCh : s . shutdownCh } )
2019-06-27 20:22:07 +00:00
}
2020-03-09 20:59:02 +00:00
if s . gatewayLocator != nil {
2020-06-24 16:36:14 +00:00
go s . gatewayLocator . Run ( & lib . StopChannelContext { StopCh : s . shutdownCh } )
2020-03-09 20:59:02 +00:00
}
2017-06-27 08:55:55 +00:00
// Serf and dynamic bind ports
//
// The LAN serf cluster announces the port of the WAN serf cluster
// which creates a race when the WAN cluster is supposed to bind to
2017-07-05 03:15:50 +00:00
// a dynamic port (port 0). The current memberlist implementation will
// update the bind port in the configuration after the memberlist is
// created, so we can pull it out from there reliably, even though it's
// a little gross to be reading the updated config.
2017-06-27 08:55:55 +00:00
2018-03-26 19:21:06 +00:00
// Initialize the WAN Serf if enabled
serfBindPortWAN := - 1
if config . SerfWANConfig != nil {
2017-07-05 03:15:50 +00:00
serfBindPortWAN = config . SerfWANConfig . MemberlistConfig . BindPort
2018-03-26 19:21:06 +00:00
s . serfWAN , err = s . setupSerf ( config . SerfWANConfig , s . eventChWAN , serfWANSnapshot , true , serfBindPortWAN , "" , s . Listener )
if err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start WAN Serf: %v" , err )
}
2020-03-09 20:59:02 +00:00
// This is always a *memberlist.NetTransport or something which wraps
// it which satisfies this interface.
2021-04-30 21:55:22 +00:00
s . memberlistTransportWAN = config . SerfWANConfig . MemberlistConfig . Transport . ( wanfed . IngestionAwareTransport )
2020-03-09 20:59:02 +00:00
2018-03-26 19:21:06 +00:00
// See big comment above why we are doing this.
2017-06-27 08:55:55 +00:00
if serfBindPortWAN == 0 {
2018-03-26 19:21:06 +00:00
serfBindPortWAN = config . SerfWANConfig . MemberlistConfig . BindPort
if serfBindPortWAN == 0 {
return nil , fmt . Errorf ( "Failed to get dynamic bind port for WAN Serf" )
}
2020-01-28 23:50:41 +00:00
s . logger . Info ( "Serf WAN TCP bound" , "port" , serfBindPortWAN )
2017-06-27 08:55:55 +00:00
}
}
2017-08-14 14:36:07 +00:00
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
2017-09-01 00:56:43 +00:00
if err := s . setupSegments ( config , serfBindPortWAN , segmentListeners ) ; err != nil {
2017-08-14 14:36:07 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to setup network segments: %v" , err )
}
// Initialize the LAN Serf for the default network segment.
2017-09-01 00:39:46 +00:00
s . serfLAN , err = s . setupSerf ( config . SerfLANConfig , s . eventChLAN , serfLANSnapshot , false , serfBindPortWAN , "" , s . Listener )
2013-12-06 23:43:07 +00:00
if err != nil {
s . Shutdown ( )
2017-06-27 08:55:55 +00:00
return nil , fmt . Errorf ( "Failed to start LAN Serf: %v" , err )
2017-03-14 01:54:34 +00:00
}
2020-08-27 15:23:52 +00:00
if err := s . router . AddArea ( types . AreaLAN , s . serfLAN , s . connPool ) ; err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to add LAN serf route: %w" , err )
}
2017-06-27 08:55:55 +00:00
go s . lanEventHandler ( )
2017-03-14 01:54:34 +00:00
2017-08-14 14:36:07 +00:00
// Start the flooders after the LAN event handler is wired up.
s . floodSegments ( config )
2017-03-14 01:54:34 +00:00
// Add a "static route" to the WAN Serf and hook it up to Serf events.
2018-03-26 19:21:06 +00:00
if s . serfWAN != nil {
2020-04-30 20:12:17 +00:00
if err := s . router . AddArea ( types . AreaWAN , s . serfWAN , s . connPool ) ; err != nil {
2018-03-26 19:21:06 +00:00
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to add WAN serf route: %v" , err )
}
go router . HandleSerfEvents ( s . logger , s . router , types . AreaWAN , s . serfWAN . ShutdownCh ( ) , s . eventChWAN )
2013-12-06 23:43:07 +00:00
2018-03-26 19:21:06 +00:00
// Fire up the LAN <-> WAN join flooder.
2020-04-27 08:21:05 +00:00
addrFn := func ( s * metadata . Server ) ( string , error ) {
if s . WanJoinPort == 0 {
return "" , fmt . Errorf ( "no wan join port for server: %s" , s . Addr . String ( ) )
2018-03-26 19:21:06 +00:00
}
2020-04-27 08:21:05 +00:00
addr , _ , err := net . SplitHostPort ( s . Addr . String ( ) )
if err != nil {
return "" , err
}
return fmt . Sprintf ( "%s:%d" , addr , s . WanJoinPort ) , nil
2017-03-15 19:26:54 +00:00
}
2020-04-27 08:21:05 +00:00
go s . Flood ( addrFn , s . serfWAN )
2017-03-15 23:11:19 +00:00
}
2018-07-09 16:46:10 +00:00
2018-06-29 21:38:29 +00:00
// Start enterprise specific functionality
if err := s . startEnterprise ( ) ; err != nil {
s . Shutdown ( )
return nil , err
}
2019-02-01 18:19:13 +00:00
2020-09-02 15:24:17 +00:00
reporter , err := usagemetrics . NewUsageMetricsReporter (
new ( usagemetrics . Config ) .
WithStateProvider ( s . fsm ) .
WithLogger ( s . logger ) .
WithDatacenter ( s . config . Datacenter ) .
2021-06-03 15:25:53 +00:00
WithReportingInterval ( s . config . MetricsReportingInterval ) .
WithGetMembersFunc ( func ( ) [ ] serf . Member {
members , err := s . LANMembersAllSegments ( )
if err != nil {
return [ ] serf . Member { }
}
return members
} ) ,
2020-09-02 15:24:17 +00:00
)
if err != nil {
s . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start usage metrics reporter: %v" , err )
}
go reporter . Run ( & lib . StopChannelContext { StopCh : s . shutdownCh } )
2020-09-08 21:31:47 +00:00
s . grpcHandler = newGRPCHandlerFromConfig ( flat , config , s )
2021-07-22 18:58:08 +00:00
s . grpcLeaderForwarder = flat . LeaderForwarder
go s . trackLeaderChanges ( )
2020-07-22 23:57:29 +00:00
2019-02-11 16:12:24 +00:00
// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s . initAutopilot ( config )
2013-12-06 23:43:07 +00:00
2017-01-18 23:06:15 +00:00
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
go s . monitorLeadership ( )
2016-08-03 05:04:11 +00:00
// Start listening for RPC requests.
2020-07-22 23:57:29 +00:00
go func ( ) {
if err := s . grpcHandler . Run ( ) ; err != nil {
s . logger . Error ( "gRPC server failed" , "error" , err )
}
} ( )
2017-08-29 00:58:22 +00:00
go s . listen ( s . Listener )
// Start listeners for any segments with separate RPC listeners.
2017-09-01 00:39:46 +00:00
for _ , listener := range segmentListeners {
2017-08-29 00:58:22 +00:00
go s . listen ( listener )
}
2014-12-19 00:57:49 +00:00
2016-08-03 05:04:11 +00:00
// Start the metrics handlers.
2020-02-11 09:50:18 +00:00
go s . updateMetrics ( )
2016-08-03 05:04:11 +00:00
2013-12-06 23:43:07 +00:00
return s , nil
}
2020-10-23 19:21:37 +00:00
func newFSMFromConfig ( logger hclog . Logger , gc * state . TombstoneGC , config * Config ) * fsm . FSM {
deps := fsm . Deps { Logger : logger }
if config . RPCConfig . EnableStreaming {
deps . NewStateStore = func ( ) * state . Store {
return state . NewStateStoreWithEventPublisher ( gc )
}
return fsm . NewFromDeps ( deps )
}
deps . NewStateStore = func ( ) * state . Store {
return state . NewStateStore ( gc )
}
return fsm . NewFromDeps ( deps )
}
2020-09-08 21:31:47 +00:00
func newGRPCHandlerFromConfig ( deps Deps , config * Config , s * Server ) connHandler {
register := func ( srv * grpc . Server ) {
2021-07-22 18:58:08 +00:00
if config . RPCConfig . EnableStreaming {
pbsubscribe . RegisterStateChangeSubscriptionServer ( srv , subscribe . NewServer (
& subscribeBackend { srv : s , connPool : deps . GRPCConnPool } ,
deps . Logger . Named ( "grpc-api.subscription" ) ) )
}
s . registerEnterpriseGRPCServices ( deps , srv )
2020-09-08 21:31:47 +00:00
}
2021-07-22 18:58:08 +00:00
2021-08-22 18:06:26 +00:00
return agentgrpc . NewHandler ( deps . Logger , config . RPCAddr , register )
2020-07-22 23:57:29 +00:00
}
2020-07-28 19:31:48 +00:00
func ( s * Server ) connectCARootsMonitor ( ctx context . Context ) {
2019-06-27 20:22:07 +00:00
for {
ws := memdb . NewWatchSet ( )
state := s . fsm . State ( )
ws . Add ( state . AbandonCh ( ) )
_ , cas , err := state . CARoots ( ws )
if err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "Failed to watch AutoEncrypt CARoot" , "error" , err )
2019-06-27 20:22:07 +00:00
return
}
caPems := [ ] string { }
for _ , ca := range cas {
caPems = append ( caPems , ca . RootCert )
}
2020-07-30 14:15:12 +00:00
if err := s . tlsConfigurator . UpdateAutoTLSCA ( caPems ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "Failed to update AutoEncrypt CAPems" , "error" , err )
2019-06-27 20:22:07 +00:00
}
2020-07-28 19:31:48 +00:00
if err := ws . WatchCtx ( ctx ) ; err == context . Canceled {
s . logger . Info ( "shutting down Connect CA roots monitor" )
return
}
2019-06-27 20:22:07 +00:00
}
}
2013-12-06 23:43:07 +00:00
// setupRaft is used to setup and initialize Raft
func ( s * Server ) setupRaft ( ) error {
2016-07-28 19:11:28 +00:00
// If we have an unclean exit then attempt to close the Raft store.
defer func ( ) {
if s . raft == nil && s . raftStore != nil {
if err := s . raftStore . Close ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "failed to close Raft store" , "error" , err )
2016-07-28 19:11:28 +00:00
}
}
} ( )
2013-12-25 00:48:07 +00:00
2017-08-30 19:36:47 +00:00
var serverAddressProvider raft . ServerAddressProvider = nil
if s . config . RaftConfig . ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher
serverAddressProvider = s . serverLookup
}
2016-07-28 19:11:28 +00:00
// Create a transport layer.
2017-08-30 00:37:48 +00:00
transConfig := & raft . NetworkTransportConfig {
Stream : s . raftLayer ,
MaxPool : 3 ,
Timeout : 10 * time . Second ,
2017-08-30 19:36:47 +00:00
ServerAddressProvider : serverAddressProvider ,
2020-01-28 23:50:41 +00:00
Logger : s . loggers . Named ( logging . Raft ) ,
2017-08-30 00:37:48 +00:00
}
2017-08-23 19:54:10 +00:00
trans := raft . NewNetworkTransportWithConfig ( transConfig )
2015-11-29 04:40:05 +00:00
s . raftTransport = trans
2020-01-28 23:50:41 +00:00
s . config . RaftConfig . Logger = s . loggers . Named ( logging . Raft )
2016-07-30 03:45:59 +00:00
2017-02-22 20:53:32 +00:00
// Versions of the Raft protocol below 3 require the LocalID to match the network
2016-07-30 03:45:59 +00:00
// address of the transport.
s . config . RaftConfig . LocalID = raft . ServerID ( trans . LocalAddr ( ) )
2017-02-22 20:53:32 +00:00
if s . config . RaftConfig . ProtocolVersion >= 3 {
s . config . RaftConfig . LocalID = raft . ServerID ( s . config . NodeID )
}
2016-07-30 03:45:59 +00:00
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
2015-11-29 04:40:05 +00:00
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
if s . config . DevMode {
store := raft . NewInmemStore ( )
s . raftInmem = store
stable = store
log = store
2016-10-31 18:39:47 +00:00
snap = raft . NewInmemSnapshotStore ( )
2015-11-29 04:40:05 +00:00
} else {
2016-07-28 19:11:28 +00:00
// Create the base raft path.
2015-11-29 04:40:05 +00:00
path := filepath . Join ( s . config . DataDir , raftState )
2017-01-18 06:20:11 +00:00
if err := lib . EnsurePath ( path , true ) ; err != nil {
2015-11-29 04:40:05 +00:00
return err
}
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// Create the backend raft store for logs and stable storage.
2015-11-29 04:40:05 +00:00
store , err := raftboltdb . NewBoltStore ( filepath . Join ( path , "raft.db" ) )
if err != nil {
return err
}
s . raftStore = store
stable = store
2015-01-14 23:49:58 +00:00
2016-07-28 19:11:28 +00:00
// Wrap the store in a LogCache to improve performance.
2015-11-29 04:40:05 +00:00
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
return err
}
log = cacheStore
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// Create the snapshot store.
2020-07-29 20:05:51 +00:00
snapshots , err := raft . NewFileSnapshotStoreWithLogger ( path , snapshotsRetained , s . logger . Named ( "snapshot" ) )
2015-11-29 04:40:05 +00:00
if err != nil {
return err
}
snap = snapshots
2013-12-06 23:43:07 +00:00
2016-07-30 07:54:08 +00:00
// For an existing cluster being upgraded to the new version of
// Raft, we almost never want to run recovery based on the old
// peers.json file. We create a peers.info file with a helpful
// note about where peers.json went, and use that as a sentinel
// to avoid ingesting the old one that first time (if we have to
// create the peers.info file because it's not there, we also
// blow away any existing peers.json file).
2016-07-30 03:45:59 +00:00
peersFile := filepath . Join ( path , "peers.json" )
2016-07-30 07:54:08 +00:00
peersInfoFile := filepath . Join ( path , "peers.info" )
if _ , err := os . Stat ( peersInfoFile ) ; os . IsNotExist ( err ) {
2016-08-09 18:56:39 +00:00
if err := ioutil . WriteFile ( peersInfoFile , [ ] byte ( peersInfoContent ) , 0755 ) ; err != nil {
2016-07-30 07:54:08 +00:00
return fmt . Errorf ( "failed to write peers.info file: %v" , err )
}
// Blow away the peers.json file if present, since the
// peers.info sentinel wasn't there.
if _ , err := os . Stat ( peersFile ) ; err == nil {
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
2020-01-28 23:50:41 +00:00
s . logger . Info ( "deleted peers.json file (see peers.info for details)" )
2016-07-30 07:54:08 +00:00
}
} else if _ , err := os . Stat ( peersFile ) ; err == nil {
2020-01-28 23:50:41 +00:00
s . logger . Info ( "found peers.json file, recovering Raft configuration..." )
2017-05-04 21:15:59 +00:00
var configuration raft . Configuration
if s . config . RaftConfig . ProtocolVersion < 3 {
configuration , err = raft . ReadPeersJSON ( peersFile )
} else {
configuration , err = raft . ReadConfigJSON ( peersFile )
}
2016-07-30 03:45:59 +00:00
if err != nil {
return fmt . Errorf ( "recovery failed to parse peers.json: %v" , err )
}
2017-05-04 21:15:59 +00:00
2020-10-23 19:21:37 +00:00
tmpFsm := fsm . NewFromDeps ( fsm . Deps {
Logger : s . logger ,
NewStateStore : func ( ) * state . Store {
return state . NewStateStore ( s . tombstoneGC )
} ,
} )
2016-07-28 19:11:28 +00:00
if err := raft . RecoverCluster ( s . config . RaftConfig , tmpFsm ,
2016-07-31 05:33:07 +00:00
log , stable , snap , trans , configuration ) ; err != nil {
2016-07-28 19:11:28 +00:00
return fmt . Errorf ( "recovery failed: %v" , err )
}
2017-05-04 21:15:59 +00:00
2016-07-30 03:45:59 +00:00
if err := os . Remove ( peersFile ) ; err != nil {
2016-07-30 07:54:08 +00:00
return fmt . Errorf ( "recovery failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
2016-07-28 19:11:28 +00:00
}
2020-01-28 23:50:41 +00:00
s . logger . Info ( "deleted peers.json file after successful recovery" )
2016-07-28 19:11:28 +00:00
}
2015-11-29 04:40:05 +00:00
}
2013-12-06 23:43:07 +00:00
2016-07-28 19:11:28 +00:00
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s . config . Bootstrap || s . config . DevMode {
hasState , err := raft . HasExistingState ( log , stable , snap )
2014-02-21 00:27:14 +00:00
if err != nil {
return err
}
2016-07-28 19:11:28 +00:00
if ! hasState {
configuration := raft . Configuration {
Servers : [ ] raft . Server {
2020-06-16 17:19:31 +00:00
{
2017-02-22 20:53:32 +00:00
ID : s . config . RaftConfig . LocalID ,
2016-07-28 19:11:28 +00:00
Address : trans . LocalAddr ( ) ,
} ,
} ,
}
if err := raft . BootstrapCluster ( s . config . RaftConfig ,
log , stable , snap , trans , configuration ) ; err != nil {
return err
}
2014-02-21 00:27:14 +00:00
}
2013-12-23 19:50:58 +00:00
}
2017-04-13 21:17:32 +00:00
// Set up a channel for reliable leader notifications.
2020-01-22 15:15:59 +00:00
raftNotifyCh := make ( chan bool , 10 )
2017-07-06 14:09:21 +00:00
s . config . RaftConfig . NotifyCh = raftNotifyCh
s . raftNotifyCh = raftNotifyCh
2017-04-13 21:17:32 +00:00
2016-07-28 19:11:28 +00:00
// Setup the Raft store.
2020-10-23 19:21:37 +00:00
var err error
2019-07-24 21:06:39 +00:00
s . raft , err = raft . NewRaft ( s . config . RaftConfig , s . fsm . ChunkingFSM ( ) , log , stable , snap , trans )
2020-10-23 19:21:37 +00:00
return err
2013-12-06 23:43:07 +00:00
}
2017-11-29 00:30:07 +00:00
// endpointFactory is a function that returns an RPC endpoint bound to the given
// server.
type factory func ( s * Server ) interface { }
// endpoints is a list of registered RPC endpoint factories.
var endpoints [ ] factory
// registerEndpoint registers a new RPC endpoint factory.
func registerEndpoint ( fn factory ) {
endpoints = append ( endpoints , fn )
}
2013-12-07 00:35:13 +00:00
// setupRPC is used to setup the RPC listener
2019-06-27 20:22:07 +00:00
func ( s * Server ) setupRPC ( ) error {
2020-01-31 16:19:37 +00:00
s . rpcConnLimiter . SetConfig ( connlimit . Config {
MaxConnsPerClientIP : s . config . RPCMaxConnsPerClient ,
} )
2017-11-29 00:30:07 +00:00
for _ , fn := range endpoints {
s . rpcServer . Register ( fn ( s ) )
}
2013-12-09 22:49:07 +00:00
2019-06-27 20:22:07 +00:00
// Only register AutoEncrypt on the insecure RPC server. Insecure only
// means that verify incoming is turned off even though it might have
// been configured.
s . insecureRPCServer . Register ( & AutoEncrypt { srv : s } )
2020-06-05 19:56:19 +00:00
// Setup the AutoConfig JWT Authorizer
var authz AutoConfigAuthorizer
if s . config . AutoConfigAuthzEnabled {
// create the auto config authorizer from the JWT authmethod
validator , err := ssoauth . NewValidator ( s . logger , & s . config . AutoConfigAuthzAuthMethod )
if err != nil {
return fmt . Errorf ( "Failed to initialize JWT Auto Config Authorizer: %w" , err )
}
authz = & jwtAuthorizer {
validator : validator ,
allowReuse : s . config . AutoConfigAuthzAllowReuse ,
claimAssertions : s . config . AutoConfigAuthzClaimAssertions ,
}
} else {
// This authorizer always returns that the endpoint is disabled
authz = & disabledAuthorizer { }
}
// now register with the insecure RPC server
2021-06-23 20:30:59 +00:00
s . insecureRPCServer . Register ( NewAutoConfig ( s . config , s . tlsConfigurator , autoConfigBackend { Server : s } , authz ) )
2020-06-05 19:56:19 +00:00
2017-05-03 20:59:06 +00:00
ln , err := net . ListenTCP ( "tcp" , s . config . RPCAddr )
2013-12-07 00:35:13 +00:00
if err != nil {
return err
}
2017-06-25 19:36:03 +00:00
s . Listener = ln
2020-03-09 20:59:02 +00:00
2017-06-25 19:36:03 +00:00
if s . config . NotifyListen != nil {
s . config . NotifyListen ( )
}
// todo(fs): we should probably guard this
if s . config . RPCAdvertise == nil {
s . config . RPCAdvertise = ln . Addr ( ) . ( * net . TCPAddr )
}
2013-12-31 22:00:25 +00:00
// Verify that we have a usable advertise address
2017-05-03 20:59:06 +00:00
if s . config . RPCAdvertise . IP . IsUnspecified ( ) {
ln . Close ( )
return fmt . Errorf ( "RPC advertise address is not advertisable: %v" , s . config . RPCAdvertise )
2013-12-31 22:00:25 +00:00
}
2020-04-30 20:12:17 +00:00
// TODO (hans) switch NewRaftLayer to tlsConfigurator
2015-05-11 22:15:36 +00:00
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
2019-06-27 20:22:07 +00:00
wrapper := tlsutil . SpecificDC ( s . config . Datacenter , s . tlsConfigurator . OutgoingRPCWrapper ( ) )
2017-05-10 21:25:48 +00:00
// Define a callback for determining whether to wrap a connection with TLS
tlsFunc := func ( address raft . ServerAddress ) bool {
2020-04-30 20:12:17 +00:00
// raft only talks to its own datacenter
return s . tlsConfigurator . UseTLS ( s . config . Datacenter )
2017-05-10 21:25:48 +00:00
}
s . raftLayer = NewRaftLayer ( s . config . RPCSrcAddr , s . config . RPCAdvertise , wrapper , tlsFunc )
2013-12-07 00:35:13 +00:00
return nil
}
2013-12-06 23:43:07 +00:00
// Shutdown is used to shutdown the server
func ( s * Server ) Shutdown ( ) error {
2020-01-28 23:50:41 +00:00
s . logger . Info ( "shutting down server" )
2013-12-06 23:43:07 +00:00
s . shutdownLock . Lock ( )
defer s . shutdownLock . Unlock ( )
if s . shutdown {
return nil
}
s . shutdown = true
close ( s . shutdownCh )
2019-10-04 17:08:45 +00:00
// ensure that any leader routines still running get canceled
if s . leaderRoutineManager != nil {
s . leaderRoutineManager . StopAll ( )
}
2013-12-07 00:05:26 +00:00
if s . serfLAN != nil {
s . serfLAN . Shutdown ( )
2013-12-06 23:43:07 +00:00
}
2020-09-30 21:23:43 +00:00
for _ , segment := range s . segmentLAN {
segment . Shutdown ( )
}
2013-12-07 00:05:26 +00:00
if s . serfWAN != nil {
s . serfWAN . Shutdown ( )
2017-03-14 01:54:34 +00:00
if err := s . router . RemoveArea ( types . AreaWAN ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Warn ( "error removing WAN area" , "error" , err )
2017-03-14 01:54:34 +00:00
}
2013-12-06 23:43:07 +00:00
}
2017-06-02 23:33:48 +00:00
s . router . Shutdown ( )
2013-12-06 23:43:07 +00:00
if s . raft != nil {
2013-12-09 22:20:56 +00:00
s . raftTransport . Close ( )
2013-12-09 21:13:40 +00:00
s . raftLayer . Close ( )
2013-12-19 00:40:32 +00:00
future := s . raft . Shutdown ( )
if err := future . Error ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Warn ( "error shutting down raft" , "error" , err )
2013-12-19 00:40:32 +00:00
}
2015-11-29 04:40:05 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2013-12-06 23:43:07 +00:00
}
2013-12-07 00:35:13 +00:00
2017-06-25 19:36:03 +00:00
if s . Listener != nil {
s . Listener . Close ( )
2013-12-07 00:35:13 +00:00
}
2020-07-22 23:57:29 +00:00
if s . grpcHandler != nil {
if err := s . grpcHandler . Shutdown ( ) ; err != nil {
s . logger . Warn ( "failed to stop gRPC server" , "error" , err )
}
}
2013-12-09 20:09:57 +00:00
// Close the connection pool
2020-07-30 17:11:10 +00:00
if s . connPool != nil {
s . connPool . Shutdown ( )
}
2013-12-09 20:09:57 +00:00
2020-07-30 17:11:10 +00:00
if s . acls != nil {
s . acls . Close ( )
}
2019-12-06 19:01:34 +00:00
2020-07-30 17:11:10 +00:00
if s . fsm != nil {
s . fsm . State ( ) . Abandon ( )
}
2020-07-06 22:02:40 +00:00
2013-12-06 23:43:07 +00:00
return nil
}
2013-12-07 01:18:09 +00:00
2013-12-09 20:10:27 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func ( s * Server ) Leave ( ) error {
2020-01-28 23:50:41 +00:00
s . logger . Info ( "server starting leave" )
2013-12-09 20:10:27 +00:00
2015-01-21 00:19:29 +00:00
// Check the number of known peers
2020-09-25 17:46:38 +00:00
numPeers , err := s . autopilot . NumVoters ( )
2015-01-21 00:19:29 +00:00
if err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "failed to check raft peers" , "error" , err )
2015-01-21 00:19:29 +00:00
return err
}
2016-07-28 19:11:28 +00:00
addr := s . raftTransport . LocalAddr ( )
2015-01-21 00:19:29 +00:00
// If we are the current leader, and we have any other peers (cluster has multiple
2017-03-27 19:31:38 +00:00
// servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size.
// If we are not the leader, then we should issue our leave intention and wait to be
2021-07-02 16:18:46 +00:00
// removed for some reasonable period of time.
2015-01-21 00:19:29 +00:00
isLeader := s . IsLeader ( )
2016-07-28 19:11:28 +00:00
if isLeader && numPeers > 1 {
2020-09-25 17:46:38 +00:00
if err := s . autopilot . RemoveServer ( raft . ServerID ( s . config . NodeID ) ) ; err != nil {
s . logger . Error ( "failed to remove ourself as a Raft peer" , "error" , err )
2015-01-21 00:19:29 +00:00
}
}
2013-12-09 20:10:27 +00:00
// Leave the WAN pool
if s . serfWAN != nil {
if err := s . serfWAN . Leave ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "failed to leave WAN Serf cluster" , "error" , err )
2013-12-09 20:10:27 +00:00
}
}
// Leave the LAN pool
if s . serfLAN != nil {
if err := s . serfLAN . Leave ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "failed to leave LAN Serf cluster" , "error" , err )
2013-12-09 20:10:27 +00:00
}
}
2015-01-21 00:19:54 +00:00
2019-09-11 09:01:37 +00:00
// Leave everything enterprise related as well
s . handleEnterpriseLeave ( )
2017-10-10 22:19:50 +00:00
// Start refusing RPCs now that we've left the LAN pool. It's important
// to do this *after* we've left the LAN pool so that clients will know
// to shift onto another server if they perform a retry. We also wake up
// all queries in the RPC retry state.
2020-01-28 23:50:41 +00:00
s . logger . Info ( "Waiting to drain RPC traffic" , "drain_time" , s . config . LeaveDrainTime )
2017-10-10 22:19:50 +00:00
close ( s . leaveCh )
time . Sleep ( s . config . LeaveDrainTime )
2016-07-28 19:11:28 +00:00
// If we were not leader, wait to be safely removed from the cluster. We
// must wait to allow the raft replication to take place, otherwise an
// immediate shutdown could cause a loss of quorum.
2015-01-21 00:19:54 +00:00
if ! isLeader {
2016-07-28 19:11:28 +00:00
left := false
2015-01-21 00:19:54 +00:00
limit := time . Now ( ) . Add ( raftRemoveGracePeriod )
2016-07-28 19:11:28 +00:00
for ! left && time . Now ( ) . Before ( limit ) {
// Sleep a while before we check.
time . Sleep ( 50 * time . Millisecond )
2015-01-21 00:19:54 +00:00
2016-07-28 19:11:28 +00:00
// Get the latest configuration.
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "failed to get raft configuration" , "error" , err )
2015-01-21 00:19:54 +00:00
break
}
2016-07-28 19:11:28 +00:00
// See if we are no longer included.
left = true
for _ , server := range future . Configuration ( ) . Servers {
if server . Address == addr {
left = false
break
}
}
2015-01-21 00:19:54 +00:00
}
2016-07-28 19:11:28 +00:00
2016-08-01 21:32:53 +00:00
// TODO (slackpad) With the old Raft library we used to force the
// peers set to empty when a graceful leave occurred. This would
// keep voting spam down if the server was restarted, but it was
// dangerous because the peers was inconsistent with the logs and
// snapshots, so it wasn't really safe in all cases for the server
// to become leader. This is now safe, but the log spam is noisy.
// The next new version of the library will have a "you are not a
// peer stop it" behavior that should address this. We will have
// to evaluate during the RC period if this interim situation is
// not too confusing for operators.
2016-07-28 19:11:28 +00:00
// TODO (slackpad) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if ! left {
2020-01-28 23:50:41 +00:00
s . logger . Warn ( "failed to leave raft configuration gracefully, timeout" )
2015-01-21 00:19:54 +00:00
}
}
2013-12-09 20:10:27 +00:00
return nil
}
2013-12-07 01:18:09 +00:00
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
2013-12-30 20:20:17 +00:00
func ( s * Server ) JoinLAN ( addrs [ ] string ) ( int , error ) {
2014-02-21 00:27:03 +00:00
return s . serfLAN . Join ( addrs , true )
2013-12-07 01:18:09 +00:00
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
2013-12-30 20:20:17 +00:00
func ( s * Server ) JoinWAN ( addrs [ ] string ) ( int , error ) {
2018-03-26 19:21:06 +00:00
if s . serfWAN == nil {
return 0 , ErrWANFederationDisabled
}
2014-02-21 00:27:03 +00:00
return s . serfWAN . Join ( addrs , true )
2013-12-07 01:18:09 +00:00
}
2013-12-10 21:17:54 +00:00
2020-03-09 20:59:02 +00:00
// PrimaryMeshGatewayAddressesReadyCh returns a channel that will be closed
// when federation state replication ships back at least one primary mesh
// gateway (not via fallback config).
func ( s * Server ) PrimaryMeshGatewayAddressesReadyCh ( ) <- chan struct { } {
if s . gatewayLocator == nil {
return nil
}
return s . gatewayLocator . PrimaryMeshGatewayAddressesReadyCh ( )
}
// PickRandomMeshGatewaySuitableForDialing is a convenience function used for writing tests.
func ( s * Server ) PickRandomMeshGatewaySuitableForDialing ( dc string ) string {
if s . gatewayLocator == nil {
return ""
}
return s . gatewayLocator . PickGateway ( dc )
}
// RefreshPrimaryGatewayFallbackAddresses is used to update the list of current
// fallback addresses for locating mesh gateways in the primary datacenter.
func ( s * Server ) RefreshPrimaryGatewayFallbackAddresses ( addrs [ ] string ) {
if s . gatewayLocator != nil {
s . gatewayLocator . RefreshPrimaryGatewayFallbackAddresses ( addrs )
}
}
// PrimaryGatewayFallbackAddresses returns the current set of discovered
// fallback addresses for the mesh gateways in the primary datacenter.
func ( s * Server ) PrimaryGatewayFallbackAddresses ( ) [ ] string {
if s . gatewayLocator == nil {
return nil
}
return s . gatewayLocator . PrimaryGatewayFallbackAddresses ( )
}
2014-05-25 23:59:48 +00:00
// LocalMember is used to return the local node
2017-04-20 18:51:27 +00:00
func ( s * Server ) LocalMember ( ) serf . Member {
return s . serfLAN . LocalMember ( )
2014-05-25 23:59:48 +00:00
}
2013-12-10 21:17:54 +00:00
// LANMembers is used to return the members of the LAN cluster
func ( s * Server ) LANMembers ( ) [ ] serf . Member {
return s . serfLAN . Members ( )
}
2021-06-03 15:25:53 +00:00
// WANMembers is used to return the members of the WAN cluster
2013-12-10 21:17:54 +00:00
func ( s * Server ) WANMembers ( ) [ ] serf . Member {
2018-03-26 19:21:06 +00:00
if s . serfWAN == nil {
return nil
}
2013-12-10 21:17:54 +00:00
return s . serfWAN . Members ( )
}
2013-12-10 22:42:29 +00:00
// RemoveFailedNode is used to remove a failed node from the cluster
2019-10-04 21:10:02 +00:00
func ( s * Server ) RemoveFailedNode ( node string , prune bool ) error {
var removeFn func ( * serf . Serf , string ) error
if prune {
removeFn = ( * serf . Serf ) . RemoveFailedNodePrune
} else {
removeFn = ( * serf . Serf ) . RemoveFailedNode
}
if err := removeFn ( s . serfLAN , node ) ; err != nil {
2013-12-10 22:42:29 +00:00
return err
}
2020-09-25 17:46:38 +00:00
wanNode := node
2019-06-28 17:40:07 +00:00
// The Serf WAN pool stores members as node.datacenter
// so the dc is appended if not present
if ! strings . HasSuffix ( node , "." + s . config . Datacenter ) {
2020-09-25 17:46:38 +00:00
wanNode = node + "." + s . config . Datacenter
2019-06-28 17:40:07 +00:00
}
2018-03-26 19:21:06 +00:00
if s . serfWAN != nil {
2020-09-25 17:46:38 +00:00
if err := removeFn ( s . serfWAN , wanNode ) ; err != nil {
2018-03-26 19:21:06 +00:00
return err
}
2013-12-10 22:42:29 +00:00
}
2020-09-25 17:46:38 +00:00
return s . removeFailedNodeEnterprise ( removeFn , node , wanNode )
2013-12-10 22:42:29 +00:00
}
2013-12-11 22:04:44 +00:00
// IsLeader checks if this server is the cluster leader
func ( s * Server ) IsLeader ( ) bool {
return s . raft . State ( ) == raft . Leader
}
2013-12-19 23:18:25 +00:00
2020-03-09 20:59:02 +00:00
// LeaderLastContact returns the time of last contact by a leader.
// This only makes sense if we are currently a follower.
func ( s * Server ) LeaderLastContact ( ) time . Time {
return s . raft . LastContact ( )
}
2014-09-09 01:09:51 +00:00
// KeyManagerLAN returns the LAN Serf keyring manager
func ( s * Server ) KeyManagerLAN ( ) * serf . KeyManager {
return s . serfLAN . KeyManager ( )
}
// KeyManagerWAN returns the WAN Serf keyring manager
func ( s * Server ) KeyManagerWAN ( ) * serf . KeyManager {
return s . serfWAN . KeyManager ( )
}
2017-08-14 14:36:07 +00:00
// LANSegments returns a map of LAN segments by name
func ( s * Server ) LANSegments ( ) map [ string ] * serf . Serf {
2017-08-30 23:44:04 +00:00
segments := make ( map [ string ] * serf . Serf , len ( s . segmentLAN ) + 1 )
segments [ "" ] = s . serfLAN
2017-08-14 14:36:07 +00:00
for name , segment := range s . segmentLAN {
segments [ name ] = segment
}
return segments
}
2014-06-11 02:12:26 +00:00
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface { }
reply interface { }
err error
}
func ( i * inmemCodec ) ReadRequestHeader ( req * rpc . Request ) error {
req . ServiceMethod = i . method
return nil
}
func ( i * inmemCodec ) ReadRequestBody ( args interface { } ) error {
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . args ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( args ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) WriteResponse ( resp * rpc . Response , reply interface { } ) error {
if resp . Error != "" {
i . err = errors . New ( resp . Error )
return nil
}
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( reply ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . reply ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) Close ( ) error {
return nil
}
2013-12-19 23:18:25 +00:00
// RPC is used to make a local RPC call
func ( s * Server ) RPC ( method string , args interface { } , reply interface { } ) error {
2014-06-11 02:12:26 +00:00
codec := & inmemCodec {
method : method ,
args : args ,
reply : reply ,
}
2019-06-13 09:26:27 +00:00
// Enforce the RPC limit.
//
// "client" metric path because the internal client API is calling to the
// internal server API. It's odd that the same request directed to a server is
// recorded differently. On the other hand this possibly masks the different
// between regular client requests that traverse the network and these which
2021-07-02 16:18:46 +00:00
// don't (unless forwarded). This still seems most reasonable.
2019-06-13 09:26:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" } , 1 )
if ! s . rpcLimiter . Load ( ) . ( * rate . Limiter ) . Allow ( ) {
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "exceeded" } , 1 )
return structs . ErrRPCRateExceeded
}
2014-06-11 02:12:26 +00:00
if err := s . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
return codec . err
2013-12-19 23:18:25 +00:00
}
2014-02-24 00:37:33 +00:00
2016-10-26 02:20:24 +00:00
// SnapshotRPC dispatches the given snapshot request, reading from the streaming
// input and writing to the streaming output depending on the operation.
func ( s * Server ) SnapshotRPC ( args * structs . SnapshotRequest , in io . Reader , out io . Writer ,
2017-06-15 09:50:28 +00:00
replyFn structs . SnapshotReplyFn ) error {
2016-10-26 02:20:24 +00:00
2019-06-13 09:26:27 +00:00
// Enforce the RPC limit.
//
// "client" metric path because the internal client API is calling to the
// internal server API. It's odd that the same request directed to a server is
// recorded differently. On the other hand this possibly masks the different
// between regular client requests that traverse the network and these which
2021-07-02 16:18:46 +00:00
// don't (unless forwarded). This still seems most reasonable.
2019-06-13 09:26:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" } , 1 )
if ! s . rpcLimiter . Load ( ) . ( * rate . Limiter ) . Allow ( ) {
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "exceeded" } , 1 )
return structs . ErrRPCRateExceeded
}
2016-10-26 02:20:24 +00:00
// Perform the operation.
var reply structs . SnapshotResponse
snap , err := s . dispatchSnapshotRequest ( args , in , & reply )
if err != nil {
return err
}
defer func ( ) {
if err := snap . Close ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . logger . Error ( "Failed to close snapshot" , "error" , err )
2016-10-26 02:20:24 +00:00
}
} ( )
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn ( & reply ) ; err != nil {
return nil
}
}
// Stream the snapshot.
if out != nil {
if _ , err := io . Copy ( out , snap ) ; err != nil {
return fmt . Errorf ( "failed to stream snapshot: %v" , err )
}
}
return nil
}
2017-06-16 07:54:09 +00:00
// RegisterEndpoint is used to substitute an endpoint for testing.
func ( s * Server ) RegisterEndpoint ( name string , handler interface { } ) error {
2020-01-28 23:50:41 +00:00
s . logger . Warn ( "endpoint injected; this should only be used for testing" )
2017-06-16 07:54:09 +00:00
return s . rpcServer . RegisterName ( name , handler )
2015-11-12 01:27:25 +00:00
}
2021-05-20 14:07:23 +00:00
func ( s * Server ) FSM ( ) * fsm . FSM {
return s . fsm
}
2014-02-24 00:37:33 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( s * Server ) Stats ( ) map [ string ] map [ string ] string {
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
2017-03-14 01:54:34 +00:00
numKnownDCs := len ( s . router . GetDatacenters ( ) )
2014-02-24 00:37:33 +00:00
stats := map [ string ] map [ string ] string {
2020-06-16 17:19:31 +00:00
"consul" : {
2014-02-24 02:08:58 +00:00
"server" : "true" ,
"leader" : fmt . Sprintf ( "%v" , s . IsLeader ( ) ) ,
2016-07-28 19:11:28 +00:00
"leader_addr" : string ( s . raft . Leader ( ) ) ,
2014-02-24 02:08:58 +00:00
"bootstrap" : fmt . Sprintf ( "%v" , s . config . Bootstrap ) ,
2016-06-20 20:50:59 +00:00
"known_datacenters" : toString ( uint64 ( numKnownDCs ) ) ,
2014-02-24 00:37:33 +00:00
} ,
2014-02-24 02:08:58 +00:00
"raft" : s . raft . Stats ( ) ,
2014-03-09 22:46:03 +00:00
"serf_lan" : s . serfLAN . Stats ( ) ,
2014-04-29 17:55:42 +00:00
"runtime" : runtimeStats ( ) ,
2014-02-24 00:37:33 +00:00
}
2018-10-31 20:00:46 +00:00
2020-07-03 20:52:08 +00:00
if s . config . ACLsEnabled {
2021-09-22 23:34:14 +00:00
stats [ "consul" ] [ "acl" ] = "enabled"
2018-10-31 20:00:46 +00:00
} else {
stats [ "consul" ] [ "acl" ] = "disabled"
}
2018-03-26 19:21:06 +00:00
if s . serfWAN != nil {
stats [ "serf_wan" ] = s . serfWAN . Stats ( )
}
2018-05-24 14:36:42 +00:00
2014-02-24 00:37:33 +00:00
return stats
}
2015-04-15 23:12:45 +00:00
2015-06-06 03:31:33 +00:00
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
2017-08-14 14:36:07 +00:00
func ( s * Server ) GetLANCoordinate ( ) ( lib . CoordinateSet , error ) {
lan , err := s . serfLAN . GetCoordinate ( )
if err != nil {
return nil , err
}
cs := lib . CoordinateSet { "" : lan }
for name , segment := range s . segmentLAN {
c , err := segment . GetCoordinate ( )
if err != nil {
return nil , err
}
cs [ name ] = c
}
return cs , nil
2015-04-15 23:12:45 +00:00
}
2015-05-08 08:31:34 +00:00
2018-06-11 19:51:17 +00:00
// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
2020-09-16 17:28:03 +00:00
func ( s * Server ) ReloadConfig ( config ReloadableConfig ) error {
2021-05-04 14:36:53 +00:00
// Reload raft config first before updating any other state since it could
// error if the new config is invalid.
raftCfg := computeRaftReloadableConfig ( config )
if err := s . raft . ReloadConfig ( raftCfg ) ; err != nil {
return err
}
2020-09-16 17:28:03 +00:00
s . rpcLimiter . Store ( rate . NewLimiter ( config . RPCRateLimit , config . RPCMaxBurst ) )
2020-01-31 16:19:37 +00:00
s . rpcConnLimiter . SetConfig ( connlimit . Config {
MaxConnsPerClientIP : config . RPCMaxConnsPerClient ,
} )
2019-06-13 09:26:27 +00:00
2019-04-26 18:25:03 +00:00
if s . IsLeader ( ) {
// only bootstrap the config entries if we are the leader
// this will error if we lose leadership while bootstrapping here.
return s . bootstrapConfigEntries ( config . ConfigEntryBootstrap )
}
2020-04-30 20:12:17 +00:00
2018-06-11 19:51:17 +00:00
return nil
}
2021-05-04 14:36:53 +00:00
// computeRaftReloadableConfig works out the correct reloadable config for raft.
// We reload raft even if nothing has changed since it's cheap and simpler than
// trying to work out if it's different from the current raft config. This
// function is separate to make it cheap to table test thoroughly without a full
// raft instance.
func computeRaftReloadableConfig ( config ReloadableConfig ) raft . ReloadableConfig {
// We use the raw defaults _not_ the current values so that you can reload
// back to a zero value having previously started Consul with a custom value
// for one of these fields.
defaultConf := DefaultConfig ( )
raftCfg := raft . ReloadableConfig {
TrailingLogs : defaultConf . RaftConfig . TrailingLogs ,
SnapshotInterval : defaultConf . RaftConfig . SnapshotInterval ,
SnapshotThreshold : defaultConf . RaftConfig . SnapshotThreshold ,
}
if config . RaftSnapshotThreshold != 0 {
raftCfg . SnapshotThreshold = uint64 ( config . RaftSnapshotThreshold )
}
if config . RaftSnapshotInterval != 0 {
raftCfg . SnapshotInterval = config . RaftSnapshotInterval
}
if config . RaftTrailingLogs != 0 {
raftCfg . TrailingLogs = uint64 ( config . RaftTrailingLogs )
}
return raftCfg
}
2017-06-16 03:41:30 +00:00
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func ( s * Server ) setConsistentReadReady ( ) {
2017-06-16 15:49:54 +00:00
atomic . StoreInt32 ( & s . readyForConsistentReads , 1 )
2017-06-16 03:41:30 +00:00
}
// Atomically reset readiness state flag on leadership revoke
func ( s * Server ) resetConsistentReadReady ( ) {
2017-06-16 15:49:54 +00:00
atomic . StoreInt32 ( & s . readyForConsistentReads , 0 )
2017-06-16 03:41:30 +00:00
}
// Returns true if this server is ready to serve consistent reads
func ( s * Server ) isReadyForConsistentReads ( ) bool {
2017-06-21 00:43:07 +00:00
return atomic . LoadInt32 ( & s . readyForConsistentReads ) == 1
2017-06-16 03:41:30 +00:00
}
2021-07-22 18:58:08 +00:00
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the grpc resolver up to date for leader forwarding.
func ( s * Server ) trackLeaderChanges ( ) {
obsCh := make ( chan raft . Observation , 16 )
observer := raft . NewObserver ( obsCh , false , func ( o * raft . Observation ) bool {
_ , ok := o . Data . ( raft . LeaderObservation )
return ok
} )
s . raft . RegisterObserver ( observer )
2020-07-07 19:39:04 +00:00
2021-07-22 18:58:08 +00:00
for {
select {
case obs := <- obsCh :
leaderObs , ok := obs . Data . ( raft . LeaderObservation )
if ! ok {
s . logger . Debug ( "got unknown observation type from raft" , "type" , reflect . TypeOf ( obs . Data ) )
continue
}
2020-07-07 19:39:04 +00:00
2021-08-24 21:28:44 +00:00
s . grpcLeaderForwarder . UpdateLeaderAddr ( s . config . Datacenter , string ( leaderObs . Leader ) )
2021-07-22 18:58:08 +00:00
case <- s . shutdownCh :
s . raft . DeregisterObserver ( observer )
return
2020-07-07 19:39:04 +00:00
}
}
}
2016-08-09 18:56:39 +00:00
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
const peersInfoContent = `
As of Consul 0.7 .0 , the peers . json file is only used for recovery
2017-05-04 21:15:59 +00:00
after an outage . The format of this file depends on what the server has
configured for its Raft protocol version . Please see the agent configuration
page at https : //www.consul.io/docs/agent/options.html#_raft_protocol for more
details about this parameter .
For Raft protocol version 2 and earlier , this should be formatted as a JSON
array containing the address and port of each Consul server in the cluster , like
this :
[
"10.1.0.1:8300" ,
"10.1.0.2:8300" ,
"10.1.0.3:8300"
]
For Raft protocol version 3 and later , this should be formatted as a JSON
array containing the node ID , address : port , and suffrage information of each
Consul server in the cluster , like this :
[
{
"id" : "adf4238a-882b-9ddc-4a9d-5b6758e4159e" ,
"address" : "10.1.0.1:8300" ,
"non_voter" : false
} ,
{
"id" : "8b6dda82-3103-11e7-93ae-92361f002671" ,
"address" : "10.1.0.2:8300" ,
"non_voter" : false
} ,
{
"id" : "97e17742-3103-11e7-93ae-92361f002671" ,
"address" : "10.1.0.3:8300" ,
"non_voter" : false
}
]
The "id" field is the node ID of the server . This can be found in the logs when
the server starts up , or in the "node-id" file inside the server ' s data
directory .
The "address" field is the address and port of the server .
The "non_voter" field controls whether the server is a non - voter , which is used
in some advanced Autopilot configurations , please see
https : //www.consul.io/docs/guides/autopilot.html for more information. If
"non_voter" is omitted it will default to false , which is typical for most
clusters .
2016-08-09 18:56:39 +00:00
Under normal operation , the peers . json file will not be present .
When Consul starts for the first time , it will create this peers . info file and
delete any existing peers . json file so that recovery doesn ' t occur on the first
startup .
Once this peers . info file is present , any peers . json file will be ingested at
startup , and will set the Raft peer configuration manually to recover from an
outage . It ' s crucial that all servers in the cluster are shut down before
creating the peers . json file , and that all servers receive the same
configuration . Once the peers . json file is successfully ingested and applied , it
will be deleted .
Please see https : //www.consul.io/docs/guides/outage.html for more information.
`