2013-12-19 22:48:14 +00:00
package consul
import (
2022-12-14 15:24:22 +00:00
"context"
2013-12-19 22:48:14 +00:00
"fmt"
2016-10-26 02:20:24 +00:00
"io"
2014-02-24 00:37:33 +00:00
"strconv"
2013-12-19 22:48:14 +00:00
"sync"
2018-06-11 19:51:17 +00:00
"sync/atomic"
2013-12-19 23:42:17 +00:00
"time"
2015-01-06 23:48:46 +00:00
2017-09-01 22:02:50 +00:00
"github.com/armon/go-metrics"
2020-11-13 02:12:12 +00:00
"github.com/armon/go-metrics/prometheus"
2021-07-30 22:56:11 +00:00
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
2022-04-05 21:10:06 +00:00
"github.com/hashicorp/consul/acl"
2023-02-06 16:31:25 +00:00
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
2017-06-15 13:16:16 +00:00
"github.com/hashicorp/consul/agent/pool"
2017-07-06 10:40:54 +00:00
"github.com/hashicorp/consul/agent/router"
2017-07-06 10:34:00 +00:00
"github.com/hashicorp/consul/agent/structs"
2017-01-18 06:20:11 +00:00
"github.com/hashicorp/consul/lib"
2020-01-28 23:50:41 +00:00
"github.com/hashicorp/consul/logging"
2019-02-26 15:52:07 +00:00
"github.com/hashicorp/consul/tlsutil"
2020-08-27 15:23:52 +00:00
"github.com/hashicorp/consul/types"
2013-12-19 22:48:14 +00:00
)
2020-11-13 02:12:12 +00:00
var ClientCounters = [ ] prometheus . CounterDefinition {
{
2020-11-13 21:18:04 +00:00
Name : [ ] string { "client" , "rpc" } ,
2020-11-13 02:12:12 +00:00
Help : "Increments whenever a Consul agent in client mode makes an RPC request to a Consul server." ,
} ,
{
2020-11-13 21:18:04 +00:00
Name : [ ] string { "client" , "rpc" , "exceeded" } ,
2020-11-13 02:12:12 +00:00
Help : "Increments whenever a Consul agent in client mode makes an RPC request to a Consul server gets rate limited by that agent's limits configuration." ,
} ,
{
2020-11-13 21:18:04 +00:00
Name : [ ] string { "client" , "rpc" , "failed" } ,
2020-11-13 02:12:12 +00:00
Help : "Increments whenever a Consul agent in client mode makes an RPC request to a Consul server and fails." ,
} ,
}
2014-05-27 21:33:09 +00:00
const (
2016-02-19 01:46:02 +00:00
// serfEventBacklog is the maximum number of unprocessed Serf Events
// that will be held in queue before new serf events block. A
// blocking serf event queue is a bad thing.
serfEventBacklog = 256
// serfEventBacklogWarning is the threshold at which point log
// warnings will be emitted indicating a problem when processing serf
// events.
serfEventBacklogWarning = 200
2014-02-03 19:53:04 +00:00
)
2013-12-19 22:48:14 +00:00
// Client is Consul client which uses RPC to communicate with the
// services for service discovery, health checking, and DC forwarding.
type Client struct {
config * Config
2018-10-19 16:04:07 +00:00
// acls is used to resolve tokens to effective policies
2022-01-22 19:07:26 +00:00
* ACLResolver
2018-10-19 16:04:07 +00:00
2013-12-19 22:48:14 +00:00
// Connection pool to consul servers
2017-06-15 13:16:16 +00:00
connPool * pool . ConnPool
2013-12-19 22:48:14 +00:00
2020-08-27 15:23:52 +00:00
// router is responsible for the selection and maintenance of
2016-03-25 18:57:54 +00:00
// Consul servers this agent uses for RPC requests
2020-08-27 15:23:52 +00:00
router * router . Router
2013-12-19 22:48:14 +00:00
2017-09-01 22:02:50 +00:00
// rpcLimiter is used to rate limit the total number of RPCs initiated
// from an agent.
2018-06-11 19:51:17 +00:00
rpcLimiter atomic . Value
2017-09-01 22:02:50 +00:00
2020-09-11 16:43:29 +00:00
// eventCh is used to receive events from the serf cluster in the datacenter
2013-12-19 22:48:14 +00:00
eventCh chan serf . Event
// Logger uses the provided LogOutput
2020-01-28 23:50:41 +00:00
logger hclog . InterceptLogger
2013-12-19 22:48:14 +00:00
// serf is the Serf cluster maintained inside the DC
// which contains all the DC nodes
serf * serf . Serf
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
2018-05-24 14:36:42 +00:00
// embedded struct to hold all the enterprise specific data
EnterpriseClient
2019-06-27 20:22:07 +00:00
tlsConfigurator * tlsutil . Configurator
2013-12-19 22:48:14 +00:00
}
2020-08-05 17:20:12 +00:00
// NewClient creates and returns a Client
2020-09-14 22:31:07 +00:00
func NewClient ( config * Config , deps Deps ) ( * Client , error ) {
2017-05-03 19:02:01 +00:00
if err := config . CheckProtocolVersion ( ) ; err != nil {
2014-03-09 22:18:36 +00:00
return nil , err
}
2013-12-19 22:48:14 +00:00
if config . DataDir == "" {
return nil , fmt . Errorf ( "Config must provide a DataDir" )
}
2014-08-05 22:20:35 +00:00
if err := config . CheckACL ( ) ; err != nil {
return nil , err
}
2020-08-27 15:23:52 +00:00
2013-12-19 22:48:14 +00:00
c := & Client {
2019-06-27 20:22:07 +00:00
config : config ,
2020-09-14 22:31:07 +00:00
connPool : deps . ConnPool ,
2019-06-27 20:22:07 +00:00
eventCh : make ( chan serf . Event , serfEventBacklog ) ,
2020-09-14 22:31:07 +00:00
logger : deps . Logger . NamedIntercept ( logging . ConsulClient ) ,
2019-06-27 20:22:07 +00:00
shutdownCh : make ( chan struct { } ) ,
2020-09-14 22:31:07 +00:00
tlsConfigurator : deps . TLSConfigurator ,
2013-12-19 22:48:14 +00:00
}
2020-09-16 17:29:59 +00:00
c . rpcLimiter . Store ( rate . NewLimiter ( config . RPCRateLimit , config . RPCMaxBurst ) )
2018-06-11 20:11:36 +00:00
2021-05-11 14:50:03 +00:00
if err := c . initEnterprise ( deps ) ; err != nil {
2018-05-24 14:36:42 +00:00
c . Shutdown ( )
return nil , err
}
2018-10-19 16:04:07 +00:00
aclConfig := ACLResolverConfig {
2021-08-09 20:29:21 +00:00
Config : config . ACLResolverSettings ,
2021-07-30 23:20:02 +00:00
Backend : & clientACLResolverBackend { Client : c } ,
2021-08-09 20:29:21 +00:00
Logger : c . logger ,
DisableDuration : aclClientDisabledTTL ,
CacheConfig : clientACLCacheConfig ,
2021-10-24 22:28:46 +00:00
ACLConfig : newACLConfig ( & partitionInfoNoop { } , c . logger ) ,
2021-08-09 20:29:21 +00:00
Tokens : deps . Tokens ,
2018-10-19 16:04:07 +00:00
}
2019-03-13 09:29:06 +00:00
var err error
2022-01-22 19:07:26 +00:00
if c . ACLResolver , err = NewACLResolver ( & aclConfig ) ; err != nil {
2018-10-19 16:04:07 +00:00
c . Shutdown ( )
return nil , fmt . Errorf ( "Failed to create ACL resolver: %v" , err )
}
2017-11-10 20:26:48 +00:00
// Initialize the LAN Serf
2020-09-11 16:43:29 +00:00
c . serf , err = c . setupSerf ( config . SerfLANConfig , c . eventCh , serfLANSnapshot )
2013-12-19 22:48:14 +00:00
if err != nil {
c . Shutdown ( )
return nil , fmt . Errorf ( "Failed to start lan serf: %v" , err )
}
2016-03-27 04:59:45 +00:00
2020-09-14 22:31:07 +00:00
if err := deps . Router . AddArea ( types . AreaLAN , c . serf , c . connPool ) ; err != nil {
2020-08-27 15:23:52 +00:00
c . Shutdown ( )
return nil , fmt . Errorf ( "Failed to add LAN area to the RPC router: %w" , err )
}
2020-09-14 22:31:07 +00:00
c . router = deps . Router
2016-03-27 04:59:45 +00:00
2017-11-10 20:26:48 +00:00
// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.
go c . lanEventHandler ( )
2013-12-19 22:48:14 +00:00
return c , nil
}
// Shutdown is used to shutdown the client
func ( c * Client ) Shutdown ( ) error {
2020-01-28 23:50:41 +00:00
c . logger . Info ( "shutting down client" )
2013-12-19 22:48:14 +00:00
c . shutdownLock . Lock ( )
defer c . shutdownLock . Unlock ( )
if c . shutdown {
return nil
}
c . shutdown = true
close ( c . shutdownCh )
if c . serf != nil {
c . serf . Shutdown ( )
}
// Close the connection pool
c . connPool . Shutdown ( )
2019-12-06 19:01:34 +00:00
2022-01-22 19:07:26 +00:00
c . ACLResolver . Close ( )
2019-12-06 19:01:34 +00:00
2013-12-19 22:48:14 +00:00
return nil
}
2021-10-26 20:08:55 +00:00
// Leave is used to prepare for a graceful shutdown.
2013-12-19 22:48:14 +00:00
func ( c * Client ) Leave ( ) error {
2020-01-28 23:50:41 +00:00
c . logger . Info ( "client starting leave" )
2013-12-19 22:48:14 +00:00
// Leave the LAN pool
if c . serf != nil {
if err := c . serf . Leave ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
c . logger . Error ( "Failed to leave LAN Serf cluster" , "error" , err )
2013-12-19 22:48:14 +00:00
}
}
return nil
}
2021-10-26 20:08:55 +00:00
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
2022-04-05 21:10:06 +00:00
func ( c * Client ) JoinLAN ( addrs [ ] string , entMeta * acl . EnterpriseMeta ) ( int , error ) {
2021-11-15 15:51:14 +00:00
// Partitions definitely have to match.
if c . config . AgentEnterpriseMeta ( ) . PartitionOrDefault ( ) != entMeta . PartitionOrDefault ( ) {
return 0 , fmt . Errorf ( "target partition %q must match client agent partition %q" ,
entMeta . PartitionOrDefault ( ) ,
c . config . AgentEnterpriseMeta ( ) . PartitionOrDefault ( ) ,
)
}
2014-02-21 00:27:03 +00:00
return c . serf . Join ( addrs , true )
2013-12-19 22:48:14 +00:00
}
2021-10-26 20:08:55 +00:00
// AgentLocalMember is used to retrieve the LAN member for the local node.
func ( c * Client ) AgentLocalMember ( ) serf . Member {
2014-05-29 18:21:56 +00:00
return c . serf . LocalMember ( )
}
2021-10-26 20:08:55 +00:00
// LANMembersInAgentPartition returns the LAN members for this agent's
// canonical serf pool. For clients this is the only pool that exists. For
// servers it's the pool in the default segment and the default partition.
func ( c * Client ) LANMembersInAgentPartition ( ) [ ] serf . Member {
2013-12-19 22:48:14 +00:00
return c . serf . Members ( )
}
2021-10-26 20:08:55 +00:00
// LANMembers returns the LAN members for one of:
//
// - the requested partition
// - the requested segment
// - all segments
//
// This is limited to segments and partitions that the node is a member of.
func ( c * Client ) LANMembers ( filter LANMemberFilter ) ( [ ] serf . Member , error ) {
if err := filter . Validate ( ) ; err != nil {
return nil , err
}
2021-11-15 15:51:14 +00:00
// Partitions definitely have to match.
if c . config . AgentEnterpriseMeta ( ) . PartitionOrDefault ( ) != filter . PartitionOrDefault ( ) {
return nil , fmt . Errorf ( "partition %q not found" , filter . PartitionOrDefault ( ) )
}
2017-09-05 19:22:20 +00:00
2021-10-26 20:08:55 +00:00
if ! filter . AllSegments && filter . Segment != c . config . Segment {
return nil , fmt . Errorf ( "segment %q not found" , filter . Segment )
2017-08-14 14:36:07 +00:00
}
2021-10-26 20:08:55 +00:00
return c . serf . Members ( ) , nil
2017-08-14 14:36:07 +00:00
}
2021-10-26 20:08:55 +00:00
// RemoveFailedNode is used to remove a failed node from the cluster.
2022-04-05 21:10:06 +00:00
func ( c * Client ) RemoveFailedNode ( node string , prune bool , entMeta * acl . EnterpriseMeta ) error {
2021-11-15 15:51:14 +00:00
// Partitions definitely have to match.
if c . config . AgentEnterpriseMeta ( ) . PartitionOrDefault ( ) != entMeta . PartitionOrDefault ( ) {
return fmt . Errorf ( "client agent in partition %q cannot remove node in different partition %q" ,
c . config . AgentEnterpriseMeta ( ) . PartitionOrDefault ( ) , entMeta . PartitionOrDefault ( ) )
}
if ! isSerfMember ( c . serf , node ) {
return fmt . Errorf ( "agent: No node found with name '%s'" , node )
}
2019-10-04 21:10:02 +00:00
if prune {
2019-10-07 21:15:23 +00:00
return c . serf . RemoveFailedNodePrune ( node )
2019-10-04 21:10:02 +00:00
}
2013-12-30 22:42:23 +00:00
return c . serf . RemoveFailedNode ( node )
}
2014-11-20 00:45:49 +00:00
// KeyManagerLAN returns the LAN Serf keyring manager
func ( c * Client ) KeyManagerLAN ( ) * serf . KeyManager {
return c . serf . KeyManager ( )
}
2013-12-19 23:08:55 +00:00
// RPC is used to forward an RPC call to a consul server, or fail if no servers
2022-12-14 15:24:22 +00:00
func ( c * Client ) RPC ( ctx context . Context , method string , args interface { } , reply interface { } ) error {
2017-10-10 22:19:50 +00:00
// This is subtle but we start measuring the time on the client side
// right at the time of the first request, vs. on the first retry as
// is done on the server side inside forward(). This is because the
// servers may already be applying the RPCHoldTimeout up there, so by
// starting the timer here we won't potentially double up the delay.
// TODO (slackpad) Plumb a deadline here with a context.
firstCheck := time . Now ( )
2023-02-06 19:07:41 +00:00
retryCount := 0
previousJitter := time . Duration ( 0 )
2017-10-10 22:19:50 +00:00
TRY :
2023-02-06 19:07:41 +00:00
retryCount ++
2020-08-27 15:23:52 +00:00
manager , server := c . router . FindLANRoute ( )
2016-02-19 21:17:52 +00:00
if server == nil {
return structs . ErrNoServers
}
2017-09-01 22:02:50 +00:00
// Enforce the RPC limit.
2017-10-04 23:43:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" } , 1 )
2018-06-11 19:51:17 +00:00
if ! c . rpcLimiter . Load ( ) . ( * rate . Limiter ) . Allow ( ) {
2017-10-04 23:43:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "exceeded" } , 1 )
2017-09-01 22:02:50 +00:00
return structs . ErrRPCRateExceeded
}
// Make the request.
2020-05-28 07:48:34 +00:00
rpcErr := c . connPool . RPC ( c . config . Datacenter , server . ShortName , server . Addr , method , args , reply )
2017-10-10 22:19:50 +00:00
if rpcErr == nil {
return nil
2014-02-03 19:53:04 +00:00
}
2017-10-10 22:19:50 +00:00
// Move off to another server, and see if we can retry.
2020-08-27 15:23:52 +00:00
manager . NotifyFailedServer ( server )
2021-04-20 18:23:50 +00:00
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
info , _ := args . ( structs . RPCInfo )
2023-02-06 16:31:25 +00:00
retryableMessages := [ ] error {
// If we are chunking and it doesn't seem to have completed, try again.
ErrChunkingResubmit ,
// These rate limit errors are returned before the handler is called, so are
// safe to retry.
rpcRate . ErrRetryElsewhere ,
}
if retry := canRetry ( info , rpcErr , firstCheck , c . config , retryableMessages ) ; ! retry {
2022-04-21 20:21:35 +00:00
c . logger . Error ( "RPC failed to server" ,
"method" , method ,
"server" , server . Addr ,
"error" , rpcErr ,
)
metrics . IncrCounterWithLabels ( [ ] string { "client" , "rpc" , "failed" } , 1 , [ ] metrics . Label { { Name : "server" , Value : server . Name } } )
2017-10-10 22:19:50 +00:00
return rpcErr
}
2022-04-21 20:21:35 +00:00
c . logger . Warn ( "Retrying RPC to server" ,
"method" , method ,
"server" , server . Addr ,
"error" , rpcErr ,
)
2017-10-10 22:19:50 +00:00
// We can wait a bit and retry!
2023-02-06 19:07:41 +00:00
jitter := lib . RandomStaggerWithRange ( previousJitter , getWaitTime ( c . config . RPCHoldTimeout , retryCount ) )
previousJitter = jitter
2021-05-27 21:29:43 +00:00
select {
case <- time . After ( jitter ) :
goto TRY
case <- c . shutdownCh :
2017-10-10 22:19:50 +00:00
}
return rpcErr
2013-12-19 23:08:55 +00:00
}
2014-02-24 00:37:33 +00:00
2016-10-26 02:20:24 +00:00
// SnapshotRPC sends the snapshot request to one of the servers, reading from
// the streaming input and writing to the streaming output depending on the
// operation.
func ( c * Client ) SnapshotRPC ( args * structs . SnapshotRequest , in io . Reader , out io . Writer ,
2017-06-15 09:50:28 +00:00
replyFn structs . SnapshotReplyFn ) error {
2020-08-27 15:23:52 +00:00
manager , server := c . router . FindLANRoute ( )
2016-10-26 02:20:24 +00:00
if server == nil {
return structs . ErrNoServers
}
2017-09-01 22:02:50 +00:00
// Enforce the RPC limit.
2017-10-04 23:43:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" } , 1 )
2018-06-11 19:51:17 +00:00
if ! c . rpcLimiter . Load ( ) . ( * rate . Limiter ) . Allow ( ) {
2017-10-04 23:43:27 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "exceeded" } , 1 )
2017-09-01 22:02:50 +00:00
return structs . ErrRPCRateExceeded
}
2016-10-26 02:20:24 +00:00
// Request the operation.
var reply structs . SnapshotResponse
2020-05-28 08:18:30 +00:00
snap , err := SnapshotRPC ( c . connPool , c . config . Datacenter , server . ShortName , server . Addr , args , in , & reply )
2016-10-26 02:20:24 +00:00
if err != nil {
2020-08-27 15:23:52 +00:00
manager . NotifyFailedServer ( server )
2016-10-26 02:20:24 +00:00
return err
}
defer func ( ) {
if err := snap . Close ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
c . logger . Error ( "Failed closing snapshot stream" , "error" , err )
2016-10-26 02:20:24 +00:00
}
} ( )
// Let the caller peek at the reply.
if replyFn != nil {
if err := replyFn ( & reply ) ; err != nil {
2021-12-23 21:56:30 +00:00
return err
2016-10-26 02:20:24 +00:00
}
}
// Stream the snapshot.
if out != nil {
if _ , err := io . Copy ( out , snap ) ; err != nil {
return fmt . Errorf ( "failed to stream snapshot: %v" , err )
}
}
return nil
}
2014-02-24 00:37:33 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( c * Client ) Stats ( ) map [ string ] map [ string ] string {
2020-08-27 15:23:52 +00:00
numServers := c . router . GetLANManager ( ) . NumServers ( )
2016-02-19 20:13:17 +00:00
2014-02-24 00:37:33 +00:00
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
stats := map [ string ] map [ string ] string {
2020-06-16 17:19:31 +00:00
"consul" : {
2014-02-24 02:08:58 +00:00
"server" : "false" ,
2016-02-20 01:32:16 +00:00
"known_servers" : toString ( uint64 ( numServers ) ) ,
2014-02-24 00:37:33 +00:00
} ,
2014-03-09 22:46:03 +00:00
"serf_lan" : c . serf . Stats ( ) ,
2014-04-29 17:55:42 +00:00
"runtime" : runtimeStats ( ) ,
2014-02-24 00:37:33 +00:00
}
2018-05-24 14:36:42 +00:00
2020-07-03 20:52:08 +00:00
if c . config . ACLsEnabled {
2021-09-22 22:55:53 +00:00
stats [ "consul" ] [ "acl" ] = "enabled"
2018-10-31 20:00:46 +00:00
} else {
stats [ "consul" ] [ "acl" ] = "disabled"
}
2014-02-24 00:37:33 +00:00
return stats
}
2015-04-15 23:12:45 +00:00
2021-11-15 15:51:14 +00:00
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
2022-10-18 19:05:09 +00:00
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
2021-11-15 15:51:14 +00:00
//
2022-10-18 19:05:09 +00:00
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
2021-11-15 15:51:14 +00:00
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
//
// NOTE: This assumes coordinates are enabled, so check that before calling.
2017-08-14 14:36:07 +00:00
func ( c * Client ) GetLANCoordinate ( ) ( lib . CoordinateSet , error ) {
lan , err := c . serf . GetCoordinate ( )
if err != nil {
return nil , err
}
cs := lib . CoordinateSet { c . config . Segment : lan }
return cs , nil
2015-04-15 23:12:45 +00:00
}
2018-06-11 19:51:17 +00:00
// ReloadConfig is used to have the Client do an online reload of
// relevant configuration information
2020-09-16 17:28:03 +00:00
func ( c * Client ) ReloadConfig ( config ReloadableConfig ) error {
c . rpcLimiter . Store ( rate . NewLimiter ( config . RPCRateLimit , config . RPCMaxBurst ) )
2022-10-18 19:05:09 +00:00
c . connPool . SetRPCClientTimeout ( config . RPCClientTimeout )
2018-06-11 19:51:17 +00:00
return nil
}
2021-11-15 15:51:14 +00:00
2022-04-05 21:10:06 +00:00
func ( c * Client ) AgentEnterpriseMeta ( ) * acl . EnterpriseMeta {
2021-11-15 15:51:14 +00:00
return c . config . AgentEnterpriseMeta ( )
}
func ( c * Client ) agentSegmentName ( ) string {
return c . config . Segment
}