Allow the Agent its its child Client/Server to share a connection pool

This is needed so that we can make an AutoConfig RPC at the Agent level prior to creating the Client/Server.
This commit is contained in:
Matt Keeler 2020-06-10 16:15:32 -04:00
parent 8c601ad8db
commit f5d57ccd48
No known key found for this signature in database
GPG Key ID: 04DBAE1857E0081B
4 changed files with 105 additions and 24 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
@ -313,6 +314,9 @@ type Agent struct {
// IP.
httpConnLimiter connlimit.Limiter
// Connection Pool
connPool *pool.ConnPool
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
@ -327,6 +331,11 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error)
return nil, fmt.Errorf("Must configure a DataDir")
}
tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
if err != nil {
return nil, err
}
a := Agent{
config: c,
checkReapAfter: make(map[structs.CheckID]time.Duration),
@ -347,7 +356,13 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error)
endpoints: make(map[string]string),
tokens: new(token.Store),
logger: logger,
tlsConfigurator: tlsConfigurator,
}
err = a.initializeConnectionPool()
if err != nil {
return nil, fmt.Errorf("Failed to initialize the connection pool: %w", err)
}
a.serviceManager = NewServiceManager(&a)
if err := a.initializeACLs(); err != nil {
@ -363,6 +378,37 @@ func New(c *config.RuntimeConfig, logger hclog.InterceptLogger) (*Agent, error)
return &a, nil
}
func (a *Agent) initializeConnectionPool() error {
var rpcSrcAddr *net.TCPAddr
if !ipaddr.IsAny(a.config.RPCBindAddr) {
rpcSrcAddr = &net.TCPAddr{IP: a.config.RPCBindAddr.IP}
}
// Ensure we have a log output for the connection pool.
logOutput := a.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
pool := &pool.ConnPool{
Server: a.config.ServerMode,
SrcAddr: rpcSrcAddr,
LogOutput: logOutput,
TLSConfigurator: a.tlsConfigurator,
Datacenter: a.config.Datacenter,
}
if a.config.ServerMode {
pool.MaxTime = 2 * time.Minute
pool.MaxStreams = 64
} else {
pool.MaxTime = 127 * time.Second
pool.MaxStreams = 32
}
a.connPool = pool
return nil
}
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
lc := local.Config{
@ -438,21 +484,22 @@ func (a *Agent) Start() error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}
tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), a.logger)
if err != nil {
return err
options := []consul.ConsulOption{
consul.WithLogger(a.logger),
consul.WithTokenStore(a.tokens),
consul.WithTLSConfigurator(a.tlsConfigurator),
consul.WithConnectionPool(a.connPool),
}
a.tlsConfigurator = tlsConfigurator
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens, a.tlsConfigurator)
server, err := consul.NewServerWithOptions(consulCfg, options...)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger, a.tlsConfigurator)
client, err := consul.NewClientWithOptions(consulCfg, options...)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}

View File

@ -101,7 +101,13 @@ func NewClient(config *Config) (*Client, error) {
return NewClientLogger(config, nil, c)
}
func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) {
func NewClientWithOptions(config *Config, options ...ConsulOption) (*Client, error) {
flat := flattenConsulOptions(options)
logger := flat.logger
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
// Check the protocol version
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
@ -130,14 +136,16 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
})
}
connPool := &pool.ConnPool{
Server: false,
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
if connPool == nil {
connPool = &pool.ConnPool{
Server: false,
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
// Create client
@ -202,6 +210,10 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
return c, nil
}
func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurator *tlsutil.Configurator) (*Client, error) {
return NewClientWithOptions(config, WithLogger(logger), WithTLSConfigurator(tlsConfigurator))
}
// Shutdown is used to shutdown the client
func (c *Client) Shutdown() error {
c.logger.Info("shutting down client")

View File

@ -322,6 +322,22 @@ func NewServer(config *Config) (*Server, error) {
// NewServerLogger is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token.Store, tlsConfigurator *tlsutil.Configurator) (*Server, error) {
return NewServerWithOptions(config,
WithLogger(logger),
WithTokenStore(tokens),
WithTLSConfigurator(tlsConfigurator))
}
// NewServerWithOptions is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error
func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, error) {
flat := flattenConsulOptions(options)
logger := flat.logger
tokens := flat.tokens
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
// Check the protocol version.
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
@ -376,14 +392,16 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
connPool := &pool.ConnPool{
Server: true,
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
if connPool == nil {
connPool = &pool.ConnPool{
Server: true,
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
serverLogger := logger.NamedIntercept(logging.ConsulServer)

View File

@ -466,7 +466,11 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr) (*Conn,
conf.LogOutput = p.LogOutput
// Create a multiplexed session
session, _ := yamux.Client(conn, conf)
session, err := yamux.Client(conn, conf)
if err != nil {
conn.Close()
return nil, fmt.Errorf("Failed to create yamux client: %w", err)
}
// Wrap the connection
c := &Conn{