agent/consul: pass dependencies directly from agent
In an upcoming change we will need to pass a grpc.ClientConnPool from BaseDeps into Server. While looking at that change I noticed all of the existing consulOption fields are already on BaseDeps. Instead of duplicating the fields, we can create a struct used by agent/consul, and use that struct in BaseDeps. This allows us to pass along dependencies without translating them into different representations. I also looked at moving all of BaseDeps in agent/consul, however that created some circular imports. Resolving those cycles wouldn't be too bad (it was only an error in agent/consul being imported from cache-types), however this change seems a little better by starting to introduce some structure to BaseDeps. This change is also a small step in reducing the scope of Agent. Also remove some constants that were only used by tests, and move the relevant comment to where the live configuration is set. Removed some validation from NewServer and NewClient, as these are not really runtime errors. They would be code errors, which will cause a panic anyway, so no reason to handle them specially here.
This commit is contained in:
parent
0536b2047e
commit
c621b4a420
|
@ -18,7 +18,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
@ -29,14 +28,12 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/ae"
|
||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/systemd"
|
||||
|
@ -156,7 +153,8 @@ type notifier interface {
|
|||
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||
// requests to other Consul servers.
|
||||
type Agent struct {
|
||||
autoConf *autoconf.AutoConfig
|
||||
// TODO: remove fields that are already in BaseDeps
|
||||
baseDeps BaseDeps
|
||||
|
||||
// config is the agent configuration.
|
||||
config *config.RuntimeConfig
|
||||
|
@ -164,9 +162,6 @@ type Agent struct {
|
|||
// Used for writing our logs
|
||||
logger hclog.InterceptLogger
|
||||
|
||||
// In-memory sink used for collecting metrics
|
||||
MemSink MetricsHandler
|
||||
|
||||
// delegate is either a *consul.Server or *consul.Client
|
||||
// depending on the configuration
|
||||
delegate delegate
|
||||
|
@ -295,12 +290,6 @@ type Agent struct {
|
|||
// IP.
|
||||
httpConnLimiter connlimit.Limiter
|
||||
|
||||
// Connection Pool
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// Shared RPC Router
|
||||
router *router.Router
|
||||
|
||||
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
||||
enterpriseAgent
|
||||
}
|
||||
|
@ -337,16 +326,12 @@ func New(bd BaseDeps) (*Agent, error) {
|
|||
shutdownCh: make(chan struct{}),
|
||||
endpoints: make(map[string]string),
|
||||
|
||||
// TODO: store the BaseDeps instead of copying them over to Agent
|
||||
baseDeps: bd,
|
||||
tokens: bd.Tokens,
|
||||
logger: bd.Logger,
|
||||
tlsConfigurator: bd.TLSConfigurator,
|
||||
config: bd.RuntimeConfig,
|
||||
cache: bd.Cache,
|
||||
MemSink: bd.MetricsHandler,
|
||||
connPool: bd.ConnPool,
|
||||
autoConf: bd.AutoConfig,
|
||||
router: bd.Router,
|
||||
}
|
||||
|
||||
a.serviceManager = NewServiceManager(&a)
|
||||
|
@ -407,7 +392,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
|
||||
// This needs to be done early on as it will potentially alter the configuration
|
||||
// and then how other bits are brought up
|
||||
c, err := a.autoConf.InitialConfiguration(ctx)
|
||||
c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -454,23 +439,15 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
|
||||
}
|
||||
|
||||
options := []consul.ConsulOption{
|
||||
consul.WithLogger(a.logger),
|
||||
consul.WithTokenStore(a.tokens),
|
||||
consul.WithTLSConfigurator(a.tlsConfigurator),
|
||||
consul.WithConnectionPool(a.connPool),
|
||||
consul.WithRouter(a.router),
|
||||
}
|
||||
|
||||
// Setup either the client or the server.
|
||||
if c.ServerMode {
|
||||
server, err := consul.NewServer(consulCfg, options...)
|
||||
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||
}
|
||||
a.delegate = server
|
||||
} else {
|
||||
client, err := consul.NewClient(consulCfg, options...)
|
||||
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul client: %v", err)
|
||||
}
|
||||
|
@ -487,7 +464,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
a.State.Delegate = a.delegate
|
||||
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
||||
|
||||
if err := a.autoConf.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
||||
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
||||
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
||||
}
|
||||
a.serviceManager.Start()
|
||||
|
@ -1297,7 +1274,7 @@ func (a *Agent) ShutdownAgent() error {
|
|||
|
||||
// this would be cancelled anyways (by the closing of the shutdown ch) but
|
||||
// this should help them to be stopped more quickly
|
||||
a.autoConf.Stop()
|
||||
a.baseDeps.AutoConfig.Stop()
|
||||
|
||||
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
|
||||
if a.serviceManager != nil {
|
||||
|
@ -3472,7 +3449,7 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
|
|||
// all services, checks, tokens, metadata, dnsServer configs, etc.
|
||||
// It will also reload all ongoing watches.
|
||||
func (a *Agent) ReloadConfig() error {
|
||||
newCfg, err := a.autoConf.ReadConfig()
|
||||
newCfg, err := a.baseDeps.AutoConfig.ReadConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) (
|
|||
handler.ServeHTTP(resp, req)
|
||||
return nil, nil
|
||||
}
|
||||
return s.agent.MemSink.DisplayMetrics(resp, req)
|
||||
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
|
||||
}
|
||||
|
||||
func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
|
|
@ -4607,7 +4607,7 @@ func TestSharedRPCRouter(t *testing.T) {
|
|||
|
||||
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
|
||||
|
||||
mgr, server := srv.Agent.router.FindLANRoute()
|
||||
mgr, server := srv.Agent.baseDeps.Router.FindLANRoute()
|
||||
require.NotNil(t, mgr)
|
||||
require.NotNil(t, server)
|
||||
|
||||
|
@ -4619,7 +4619,7 @@ func TestSharedRPCRouter(t *testing.T) {
|
|||
|
||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
||||
|
||||
mgr, server = client.Agent.router.FindLANRoute()
|
||||
mgr, server = client.Agent.baseDeps.Router.FindLANRoute()
|
||||
require.NotNil(t, mgr)
|
||||
require.NotNil(t, server)
|
||||
}
|
||||
|
|
|
@ -22,18 +22,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
||||
// open to a server. 127s was chosen as the first prime above 120s
|
||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||
clientRPCConnMaxIdle = 127 * time.Second
|
||||
|
||||
// clientMaxStreams controls how many idle streams we keep
|
||||
// open to a server
|
||||
clientMaxStreams = 32
|
||||
|
||||
// serfEventBacklog is the maximum number of unprocessed Serf Events
|
||||
// that will be held in queue before new serf events block. A
|
||||
// blocking serf event queue is a bad thing.
|
||||
|
@ -89,12 +77,7 @@ type Client struct {
|
|||
}
|
||||
|
||||
// NewClient creates and returns a Client
|
||||
func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
||||
flat := flattenConsulOptions(options)
|
||||
|
||||
tlsConfigurator := flat.tlsConfigurator
|
||||
connPool := flat.connPool
|
||||
|
||||
func NewClient(config *Config, deps Deps) (*Client, error) {
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -104,35 +87,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
|||
if err := config.CheckACL(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if flat.logger == nil {
|
||||
return nil, fmt.Errorf("logger is required")
|
||||
}
|
||||
if flat.router == nil {
|
||||
return nil, fmt.Errorf("router is required")
|
||||
}
|
||||
|
||||
if connPool == nil {
|
||||
connPool = &pool.ConnPool{
|
||||
Server: false,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
Logger: flat.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
MaxTime: clientRPCConnMaxIdle,
|
||||
MaxStreams: clientMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
}
|
||||
}
|
||||
|
||||
logger := flat.logger.NamedIntercept(logging.ConsulClient)
|
||||
|
||||
// Create client
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: connPool,
|
||||
connPool: deps.ConnPool,
|
||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||
logger: logger,
|
||||
logger: deps.Logger.NamedIntercept(logging.ConsulClient),
|
||||
shutdownCh: make(chan struct{}),
|
||||
tlsConfigurator: tlsConfigurator,
|
||||
tlsConfigurator: deps.TLSConfigurator,
|
||||
}
|
||||
|
||||
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
||||
|
@ -164,11 +126,11 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
|||
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
||||
}
|
||||
|
||||
if err := flat.router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
|
||||
if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
|
||||
c.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
|
||||
}
|
||||
c.router = flat.router
|
||||
c.router = deps.Router
|
||||
|
||||
// Start LAN event handlers after the router is complete since the event
|
||||
// handlers depend on the router and the router depends on Serf.
|
||||
|
|
|
@ -9,8 +9,10 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
|
@ -66,22 +68,8 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
|
|||
if cb != nil {
|
||||
cb(config)
|
||||
}
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Name: config.NodeName,
|
||||
Level: hclog.Debug,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
})
|
||||
|
||||
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
||||
client, err := NewClient(config,
|
||||
WithLogger(logger),
|
||||
WithTLSConfigurator(tlsConf),
|
||||
WithRouter(r))
|
||||
client, err := NewClient(config, newDefaultDeps(t, config))
|
||||
return dir, client, err
|
||||
}
|
||||
|
||||
|
@ -472,18 +460,7 @@ func TestClient_RPC_TLS(t *testing.T) {
|
|||
func newClient(t *testing.T, config *Config) *Client {
|
||||
t.Helper()
|
||||
|
||||
c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
|
||||
require.NoError(t, err, "failed to create tls configuration")
|
||||
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Level: hclog.Debug,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
})
|
||||
r := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
||||
client, err := NewClient(config,
|
||||
WithLogger(logger),
|
||||
WithTLSConfigurator(c),
|
||||
WithRouter(r))
|
||||
client, err := NewClient(config, newDefaultDeps(t, config))
|
||||
require.NoError(t, err, "failed to create client")
|
||||
t.Cleanup(func() {
|
||||
client.Shutdown()
|
||||
|
@ -491,6 +468,39 @@ func newClient(t *testing.T, config *Config) *Client {
|
|||
return client
|
||||
}
|
||||
|
||||
func newDefaultDeps(t *testing.T, c *Config) Deps {
|
||||
t.Helper()
|
||||
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Name: c.NodeName,
|
||||
Level: hclog.Debug,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
})
|
||||
|
||||
tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
|
||||
require.NoError(t, err, "failed to create tls configuration")
|
||||
|
||||
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
Server: false,
|
||||
SrcAddr: c.RPCSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
MaxTime: 2 * time.Minute,
|
||||
MaxStreams: 4,
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: c.Datacenter,
|
||||
}
|
||||
|
||||
return Deps{
|
||||
Logger: logger,
|
||||
TLSConfigurator: tls,
|
||||
Tokens: new(token.Store),
|
||||
Router: r,
|
||||
ConnPool: connPool,
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_RateLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, conf1 := testServerConfig(t)
|
||||
|
|
|
@ -9,14 +9,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -1304,15 +1301,11 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
|
|||
Level: hclog.Debug,
|
||||
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
|
||||
})
|
||||
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
rpcRouter := router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
||||
srv, err := NewServer(config,
|
||||
WithLogger(logger),
|
||||
WithTokenStore(new(token.Store)),
|
||||
WithTLSConfigurator(tlsConf),
|
||||
WithRouter(rpcRouter))
|
||||
deps := newDefaultDeps(t, config)
|
||||
deps.Logger = logger
|
||||
|
||||
srv, err := NewServer(config, deps)
|
||||
require.NoError(t, err)
|
||||
defer srv.Shutdown()
|
||||
|
||||
|
|
|
@ -8,50 +8,10 @@ import (
|
|||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
type consulOptions struct {
|
||||
logger hclog.InterceptLogger
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
connPool *pool.ConnPool
|
||||
tokens *token.Store
|
||||
router *router.Router
|
||||
}
|
||||
|
||||
type ConsulOption func(*consulOptions)
|
||||
|
||||
func WithLogger(logger hclog.InterceptLogger) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.tlsConfigurator = tlsConfigurator
|
||||
}
|
||||
}
|
||||
|
||||
func WithConnectionPool(connPool *pool.ConnPool) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.connPool = connPool
|
||||
}
|
||||
}
|
||||
|
||||
func WithTokenStore(tokens *token.Store) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.tokens = tokens
|
||||
}
|
||||
}
|
||||
|
||||
func WithRouter(router *router.Router) ConsulOption {
|
||||
return func(opt *consulOptions) {
|
||||
opt.router = router
|
||||
}
|
||||
}
|
||||
|
||||
func flattenConsulOptions(options []ConsulOption) consulOptions {
|
||||
var flat consulOptions
|
||||
for _, opt := range options {
|
||||
opt(&flat)
|
||||
}
|
||||
return flat
|
||||
type Deps struct {
|
||||
Logger hclog.InterceptLogger
|
||||
TLSConfigurator *tlsutil.Configurator
|
||||
Tokens *token.Store
|
||||
Router *router.Router
|
||||
ConnPool *pool.ConnPool
|
||||
}
|
||||
|
|
|
@ -70,14 +70,6 @@ const (
|
|||
raftState = "raft/"
|
||||
snapshotsRetained = 2
|
||||
|
||||
// serverRPCCache controls how long we keep an idle connection
|
||||
// open to a server
|
||||
serverRPCCache = 2 * time.Minute
|
||||
|
||||
// serverMaxStreams controls how many idle streams we keep
|
||||
// open to a server
|
||||
serverMaxStreams = 64
|
||||
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
// This is used to reduce disk I/O for the recently committed entries.
|
||||
raftLogCacheSize = 512
|
||||
|
@ -324,14 +316,8 @@ type connHandler interface {
|
|||
|
||||
// NewServer is used to construct a new Consul server from the configuration
|
||||
// and extra options, potentially returning an error.
|
||||
func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
||||
flat := flattenConsulOptions(options)
|
||||
|
||||
logger := flat.logger
|
||||
tokens := flat.tokens
|
||||
tlsConfigurator := flat.tlsConfigurator
|
||||
connPool := flat.connPool
|
||||
|
||||
func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||
logger := flat.Logger
|
||||
if err := config.CheckProtocolVersion(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -341,12 +327,6 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
if err := config.CheckACL(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if logger == nil {
|
||||
return nil, fmt.Errorf("logger is required")
|
||||
}
|
||||
if flat.router == nil {
|
||||
return nil, fmt.Errorf("router is required")
|
||||
}
|
||||
|
||||
// Check if TLS is enabled
|
||||
if config.CAFile != "" || config.CAPath != "" {
|
||||
|
@ -375,36 +355,24 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
|||
// Create the shutdown channel - this is closed but never written to.
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
if connPool == nil {
|
||||
connPool = &pool.ConnPool{
|
||||
Server: true,
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
MaxTime: serverRPCCache,
|
||||
MaxStreams: serverMaxStreams,
|
||||
TLSConfigurator: tlsConfigurator,
|
||||
Datacenter: config.Datacenter,
|
||||
}
|
||||
}
|
||||
|
||||
serverLogger := logger.NamedIntercept(logging.ConsulServer)
|
||||
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
|
||||
loggers := newLoggerStore(serverLogger)
|
||||
|
||||
// Create server.
|
||||
s := &Server{
|
||||
config: config,
|
||||
tokens: tokens,
|
||||
connPool: connPool,
|
||||
tokens: flat.Tokens,
|
||||
connPool: flat.ConnPool,
|
||||
eventChLAN: make(chan serf.Event, serfEventChSize),
|
||||
eventChWAN: make(chan serf.Event, serfEventChSize),
|
||||
logger: serverLogger,
|
||||
loggers: loggers,
|
||||
leaveCh: make(chan struct{}),
|
||||
reconcileCh: make(chan serf.Member, reconcileChSize),
|
||||
router: flat.router,
|
||||
router: flat.Router,
|
||||
rpcServer: rpc.NewServer(),
|
||||
insecureRPCServer: rpc.NewServer(),
|
||||
tlsConfigurator: tlsConfigurator,
|
||||
tlsConfigurator: flat.TLSConfigurator,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
|
||||
sessionTimers: NewSessionTimers(),
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
|
||||
"github.com/google/tcpproxy"
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/memberlist"
|
||||
|
||||
|
@ -31,7 +30,6 @@ import (
|
|||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
|
@ -293,22 +291,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
|
|||
}
|
||||
}
|
||||
|
||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||
Name: c.NodeName,
|
||||
Level: hclog.Debug,
|
||||
Output: testutil.NewLogBuffer(t),
|
||||
})
|
||||
tlsConf, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcRouter := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
|
||||
srv, err := NewServer(c,
|
||||
WithLogger(logger),
|
||||
WithTokenStore(new(token.Store)),
|
||||
WithTLSConfigurator(tlsConf),
|
||||
WithRouter(rpcRouter))
|
||||
srv, err := NewServer(c, newDefaultDeps(t, c))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1492,19 +1475,11 @@ func TestServer_CALogging(t *testing.T) {
|
|||
var buf bytes.Buffer
|
||||
logger := testutil.LoggerWithOutput(t, &buf)
|
||||
|
||||
c, err := tlsutil.NewConfigurator(conf1.ToTLSUtilConfig(), logger)
|
||||
deps := newDefaultDeps(t, conf1)
|
||||
deps.Logger = logger
|
||||
|
||||
s1, err := NewServer(conf1, deps)
|
||||
require.NoError(t, err)
|
||||
|
||||
rpcRouter := router.NewRouter(logger, "dc1", fmt.Sprintf("%s.%s", "nodename", "dc1"))
|
||||
|
||||
s1, err := NewServer(conf1,
|
||||
WithLogger(logger),
|
||||
WithTokenStore(new(token.Store)),
|
||||
WithTLSConfigurator(c),
|
||||
WithRouter(rpcRouter))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
|
@ -25,15 +26,12 @@ import (
|
|||
// has been moved out in front of Agent.New, and we can better see the setup
|
||||
// dependencies.
|
||||
type BaseDeps struct {
|
||||
Logger hclog.InterceptLogger
|
||||
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
|
||||
MetricsHandler MetricsHandler
|
||||
RuntimeConfig *config.RuntimeConfig
|
||||
Tokens *token.Store
|
||||
Cache *cache.Cache
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
ConnPool *pool.ConnPool // TODO: use an interface
|
||||
Router *router.Router
|
||||
consul.Deps // TODO: un-embed
|
||||
|
||||
RuntimeConfig *config.RuntimeConfig
|
||||
MetricsHandler MetricsHandler
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
Cache *cache.Cache
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
|
@ -120,6 +118,12 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
|||
pool.MaxTime = 2 * time.Minute
|
||||
pool.MaxStreams = 64
|
||||
} else {
|
||||
// MaxTime controls how long we keep an idle connection open to a server.
|
||||
// 127s was chosen as the first prime above 120s
|
||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||
pool.MaxTime = 127 * time.Second
|
||||
pool.MaxStreams = 32
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue