Merge pull request #8680 from hashicorp/dnephin/replace-consul-opts-with-base-deps

agent: Repalce ConsulOptions with a new struct from agent.BaseDeps
This commit is contained in:
Daniel Nephin 2020-09-24 12:45:54 -04:00 committed by GitHub
commit e0119a6e92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 95 additions and 231 deletions

View File

@ -18,7 +18,6 @@ import (
"time"
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
@ -29,14 +28,12 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
@ -156,7 +153,8 @@ type notifier interface {
// mode, it runs a full Consul server. In client-only mode, it only forwards
// requests to other Consul servers.
type Agent struct {
autoConf *autoconf.AutoConfig
// TODO: remove fields that are already in BaseDeps
baseDeps BaseDeps
// config is the agent configuration.
config *config.RuntimeConfig
@ -164,9 +162,6 @@ type Agent struct {
// Used for writing our logs
logger hclog.InterceptLogger
// In-memory sink used for collecting metrics
MemSink MetricsHandler
// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
@ -295,12 +290,6 @@ type Agent struct {
// IP.
httpConnLimiter connlimit.Limiter
// Connection Pool
connPool *pool.ConnPool
// Shared RPC Router
router *router.Router
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
@ -337,16 +326,12 @@ func New(bd BaseDeps) (*Agent, error) {
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
// TODO: store the BaseDeps instead of copying them over to Agent
baseDeps: bd,
tokens: bd.Tokens,
logger: bd.Logger,
tlsConfigurator: bd.TLSConfigurator,
config: bd.RuntimeConfig,
cache: bd.Cache,
MemSink: bd.MetricsHandler,
connPool: bd.ConnPool,
autoConf: bd.AutoConfig,
router: bd.Router,
}
a.serviceManager = NewServiceManager(&a)
@ -407,7 +392,7 @@ func (a *Agent) Start(ctx context.Context) error {
// This needs to be done early on as it will potentially alter the configuration
// and then how other bits are brought up
c, err := a.autoConf.InitialConfiguration(ctx)
c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx)
if err != nil {
return err
}
@ -454,23 +439,15 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}
options := []consul.ConsulOption{
consul.WithLogger(a.logger),
consul.WithTokenStore(a.tokens),
consul.WithTLSConfigurator(a.tlsConfigurator),
consul.WithConnectionPool(a.connPool),
consul.WithRouter(a.router),
}
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServer(consulCfg, options...)
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
} else {
client, err := consul.NewClient(consulCfg, options...)
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
}
@ -487,7 +464,7 @@ func (a *Agent) Start(ctx context.Context) error {
a.State.Delegate = a.delegate
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
if err := a.autoConf.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
}
a.serviceManager.Start()
@ -1297,7 +1274,7 @@ func (a *Agent) ShutdownAgent() error {
// this would be cancelled anyways (by the closing of the shutdown ch) but
// this should help them to be stopped more quickly
a.autoConf.Stop()
a.baseDeps.AutoConfig.Stop()
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
if a.serviceManager != nil {
@ -3472,7 +3449,7 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
// all services, checks, tokens, metadata, dnsServer configs, etc.
// It will also reload all ongoing watches.
func (a *Agent) ReloadConfig() error {
newCfg, err := a.autoConf.ReadConfig()
newCfg, err := a.baseDeps.AutoConfig.ReadConfig()
if err != nil {
return err
}

View File

@ -152,7 +152,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
handler.ServeHTTP(resp, req)
return nil, nil
}
return s.agent.MemSink.DisplayMetrics(resp, req)
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
}
func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@ -4609,7 +4609,7 @@ func TestSharedRPCRouter(t *testing.T) {
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
mgr, server := srv.Agent.router.FindLANRoute()
mgr, server := srv.Agent.baseDeps.Router.FindLANRoute()
require.NotNil(t, mgr)
require.NotNil(t, server)
@ -4621,7 +4621,7 @@ func TestSharedRPCRouter(t *testing.T) {
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
mgr, server = client.Agent.router.FindLANRoute()
mgr, server = client.Agent.baseDeps.Router.FindLANRoute()
require.NotNil(t, mgr)
require.NotNil(t, server)
}

View File

@ -22,18 +22,6 @@ import (
)
const (
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep
// open to a server
clientMaxStreams = 32
// serfEventBacklog is the maximum number of unprocessed Serf Events
// that will be held in queue before new serf events block. A
// blocking serf event queue is a bad thing.
@ -68,8 +56,7 @@ type Client struct {
// from an agent.
rpcLimiter atomic.Value
// eventCh is used to receive events from the
// serf cluster in the datacenter
// eventCh is used to receive events from the serf cluster in the datacenter
eventCh chan serf.Event
// Logger uses the provided LogOutput
@ -90,12 +77,7 @@ type Client struct {
}
// NewClient creates and returns a Client
func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
flat := flattenConsulOptions(options)
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
func NewClient(config *Config, deps Deps) (*Client, error) {
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
@ -105,32 +87,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
if err := config.CheckACL(); err != nil {
return nil, err
}
if flat.logger == nil {
return nil, fmt.Errorf("logger is required")
}
if connPool == nil {
connPool = &pool.ConnPool{
Server: false,
SrcAddr: config.RPCSrcAddr,
Logger: flat.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
logger := flat.logger.NamedIntercept(logging.ConsulClient)
// Create client
c := &Client{
config: config,
connPool: connPool,
connPool: deps.ConnPool,
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
logger: deps.Logger.NamedIntercept(logging.ConsulClient),
shutdownCh: make(chan struct{}),
tlsConfigurator: tlsConfigurator,
tlsConfigurator: deps.TLSConfigurator,
}
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
@ -156,23 +120,17 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
}
// Initialize the LAN Serf
c.serf, err = c.setupSerf(config.SerfLANConfig,
c.eventCh, serfLANSnapshot)
c.serf, err = c.setupSerf(config.SerfLANConfig, c.eventCh, serfLANSnapshot)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
}
rpcRouter := flat.router
if rpcRouter == nil {
rpcRouter = router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
}
if err := rpcRouter.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
}
c.router = rpcRouter
c.router = deps.Router
// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.

View File

@ -2,13 +2,17 @@ package consul
import (
"bytes"
"fmt"
"net"
"os"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
@ -64,18 +68,8 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
if cb != nil {
cb(config)
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: config.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
if err != nil {
t.Fatalf("err: %v", err)
}
client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(tlsConf))
client, err := NewClient(config, newDefaultDeps(t, config))
return dir, client, err
}
@ -466,14 +460,7 @@ func TestClient_RPC_TLS(t *testing.T) {
func newClient(t *testing.T, config *Config) *Client {
t.Helper()
c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
require.NoError(t, err, "failed to create tls configuration")
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(c))
client, err := NewClient(config, newDefaultDeps(t, config))
require.NoError(t, err, "failed to create client")
t.Cleanup(func() {
client.Shutdown()
@ -481,6 +468,39 @@ func newClient(t *testing.T, config *Config) *Client {
return client
}
func newDefaultDeps(t *testing.T, c *Config) Deps {
t.Helper()
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: c.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
require.NoError(t, err, "failed to create tls configuration")
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
connPool := &pool.ConnPool{
Server: false,
SrcAddr: c.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: 2 * time.Minute,
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
}
return Deps{
Logger: logger,
TLSConfigurator: tls,
Tokens: new(token.Store),
Router: r,
ConnPool: connPool,
}
}
func TestClient_RPC_RateLimit(t *testing.T) {
t.Parallel()
_, conf1 := testServerConfig(t)

View File

@ -10,12 +10,10 @@ import (
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
@ -1303,12 +1301,11 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
Level: hclog.Debug,
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
})
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
require.NoError(t, err)
srv, err := NewServer(config,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(tlsConf))
deps := newDefaultDeps(t, config)
deps.Logger = logger
srv, err := NewServer(config, deps)
require.NoError(t, err)
defer srv.Shutdown()

View File

@ -8,50 +8,10 @@ import (
"github.com/hashicorp/go-hclog"
)
type consulOptions struct {
logger hclog.InterceptLogger
tlsConfigurator *tlsutil.Configurator
connPool *pool.ConnPool
tokens *token.Store
router *router.Router
}
type ConsulOption func(*consulOptions)
func WithLogger(logger hclog.InterceptLogger) ConsulOption {
return func(opt *consulOptions) {
opt.logger = logger
}
}
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption {
return func(opt *consulOptions) {
opt.tlsConfigurator = tlsConfigurator
}
}
func WithConnectionPool(connPool *pool.ConnPool) ConsulOption {
return func(opt *consulOptions) {
opt.connPool = connPool
}
}
func WithTokenStore(tokens *token.Store) ConsulOption {
return func(opt *consulOptions) {
opt.tokens = tokens
}
}
func WithRouter(router *router.Router) ConsulOption {
return func(opt *consulOptions) {
opt.router = router
}
}
func flattenConsulOptions(options []ConsulOption) consulOptions {
var flat consulOptions
for _, opt := range options {
opt(&flat)
}
return flat
type Deps struct {
Logger hclog.InterceptLogger
TLSConfigurator *tlsutil.Configurator
Tokens *token.Store
Router *router.Router
ConnPool *pool.ConnPool
}

View File

@ -70,14 +70,6 @@ const (
raftState = "raft/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
// open to a server
serverRPCCache = 2 * time.Minute
// serverMaxStreams controls how many idle streams we keep
// open to a server
serverMaxStreams = 64
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
@ -324,15 +316,8 @@ type connHandler interface {
// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
flat := flattenConsulOptions(options)
logger := flat.logger
tokens := flat.tokens
tlsConfigurator := flat.tlsConfigurator
connPool := flat.connPool
rpcRouter := flat.router
func NewServer(config *Config, flat Deps) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
@ -342,9 +327,6 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
if err := config.CheckACL(); err != nil {
return nil, err
}
if logger == nil {
return nil, fmt.Errorf("logger is required")
}
// Check if TLS is enabled
if config.CAFile != "" || config.CAPath != "" {
@ -373,40 +355,24 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
if connPool == nil {
connPool = &pool.ConnPool{
Server: true,
SrcAddr: config.RPCSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
Datacenter: config.Datacenter,
}
}
serverLogger := logger.NamedIntercept(logging.ConsulServer)
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
loggers := newLoggerStore(serverLogger)
if rpcRouter == nil {
rpcRouter = router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
}
// Create server.
s := &Server{
config: config,
tokens: tokens,
connPool: connPool,
tokens: flat.Tokens,
connPool: flat.ConnPool,
eventChLAN: make(chan serf.Event, serfEventChSize),
eventChWAN: make(chan serf.Event, serfEventChSize),
logger: serverLogger,
loggers: loggers,
leaveCh: make(chan struct{}),
reconcileCh: make(chan serf.Member, reconcileChSize),
router: rpcRouter,
router: flat.Router,
rpcServer: rpc.NewServer(),
insecureRPCServer: rpc.NewServer(),
tlsConfigurator: tlsConfigurator,
tlsConfigurator: flat.TLSConfigurator,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),

View File

@ -30,7 +30,6 @@ import (
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
@ -292,19 +291,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
}
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Name: c.NodeName,
Level: hclog.Debug,
Output: testutil.NewLogBuffer(t),
})
tlsConf, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
if err != nil {
return nil, err
}
srv, err := NewServer(c,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(tlsConf))
srv, err := NewServer(c, newDefaultDeps(t, c))
if err != nil {
return nil, err
}
@ -1488,16 +1475,11 @@ func TestServer_CALogging(t *testing.T) {
var buf bytes.Buffer
logger := testutil.LoggerWithOutput(t, &buf)
c, err := tlsutil.NewConfigurator(conf1.ToTLSUtilConfig(), logger)
require.NoError(t, err)
deps := newDefaultDeps(t, conf1)
deps.Logger = logger
s1, err := NewServer(conf1,
WithLogger(logger),
WithTokenStore(new(token.Store)),
WithTLSConfigurator(c))
if err != nil {
t.Fatalf("err: %v", err)
}
s1, err := NewServer(conf1, deps)
require.NoError(t, err)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -10,6 +10,7 @@ import (
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
@ -25,15 +26,12 @@ import (
// has been moved out in front of Agent.New, and we can better see the setup
// dependencies.
type BaseDeps struct {
Logger hclog.InterceptLogger
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
MetricsHandler MetricsHandler
RuntimeConfig *config.RuntimeConfig
Tokens *token.Store
Cache *cache.Cache
AutoConfig *autoconf.AutoConfig // TODO: use an interface
ConnPool *pool.ConnPool // TODO: use an interface
Router *router.Router
consul.Deps // TODO: un-embed
RuntimeConfig *config.RuntimeConfig
MetricsHandler MetricsHandler
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
}
// MetricsHandler provides an http.Handler for displaying metrics.
@ -120,6 +118,12 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
pool.MaxTime = 2 * time.Minute
pool.MaxStreams = 64
} else {
// MaxTime controls how long we keep an idle connection open to a server.
// 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
pool.MaxTime = 127 * time.Second
pool.MaxStreams = 32
}