package agent import ( "fmt" "io" "net" "net/http" "sync" "time" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc/resolver" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" ) // TODO: BaseDeps should be renamed in the future once more of Agent.Start // has been moved out in front of Agent.New, and we can better see the setup // dependencies. type BaseDeps struct { consul.Deps // TODO: un-embed RuntimeConfig *config.RuntimeConfig MetricsHandler MetricsHandler AutoConfig *autoconf.AutoConfig // TODO: use an interface Cache *cache.Cache } // MetricsHandler provides an http.Handler for displaying metrics. type MetricsHandler interface { DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) } type ConfigLoader func(source config.Source) (cfg *config.RuntimeConfig, warnings []string, err error) func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) { d := BaseDeps{} cfg, warnings, err := configLoader(nil) if err != nil { return d, err } logConf := cfg.Logging logConf.Name = logging.Agent d.Logger, err = logging.Setup(logConf, logOut) if err != nil { return d, err } grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger)) for _, w := range warnings { d.Logger.Warn(w) } cfg.NodeID, err = newNodeIDFromConfig(cfg, d.Logger) if err != nil { return d, fmt.Errorf("failed to setup node ID: %w", err) } gauges, counters, summaries := getPrometheusDefs() cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries d.MetricsHandler, err = lib.InitTelemetry(cfg.Telemetry) if err != nil { return d, fmt.Errorf("failed to initialize telemetry: %w", err) } d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.ToTLSUtilConfig(), d.Logger) if err != nil { return d, err } d.RuntimeConfig = cfg d.Tokens = new(token.Store) // cache-types are not registered yet, but they won't be used until the components are started. d.Cache = cache.New(cfg.Cache) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) registerWithGRPC(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper())) d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) acConf := autoconf.Config{ DirectRPC: d.ConnPool, Logger: d.Logger, Loader: configLoader, ServerProvider: d.Router, TLSConfigurator: d.TLSConfigurator, Cache: d.Cache, Tokens: d.Tokens, } d.AutoConfig, err = autoconf.New(acConf) if err != nil { return d, err } if err := registerCacheTypes(d); err != nil { return d, err } return d, nil } // registerCacheTypes on bd.Cache. // // Note: most cache types are still registered in Agent.registerCache. This // function is for registering newer cache-types which no longer have a dependency // on Agent. func registerCacheTypes(bd BaseDeps) error { if bd.RuntimeConfig.UseStreamingBackend { conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) if err != nil { return err } matDeps := cachetype.MaterializerDeps{ Client: pbsubscribe.NewStateChangeSubscriptionClient(conn), Logger: bd.Logger, } bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps)) } return nil } func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { var rpcSrcAddr *net.TCPAddr if !ipaddr.IsAny(config.RPCBindAddr) { rpcSrcAddr = &net.TCPAddr{IP: config.RPCBindAddr.IP} } pool := &pool.ConnPool{ Server: config.ServerMode, SrcAddr: rpcSrcAddr, Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), TLSConfigurator: tls, Datacenter: config.Datacenter, } if config.ServerMode { pool.MaxTime = 2 * time.Minute pool.MaxStreams = 64 } else { // MaxTime controls how long we keep an idle connection open to a server. // 127s was chosen as the first prime above 120s // (arbitrarily chose to use a prime) with the intent of reusing // connections who are used by once-a-minute cron(8) jobs *and* who // use a 60s jitter window (e.g. in vixie cron job execution can // drift by up to 59s per job, or 119s for a once-a-minute cron job). pool.MaxTime = 127 * time.Second pool.MaxStreams = 32 } return pool } var registerLock sync.Mutex // registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. // This function exists to synchronize registrations with a lock. // grpc/resolver.Register expects all registration to happen at init and does // not allow for concurrent registration. This function exists to support // parallel testing. func registerWithGRPC(b grpcresolver.Builder) { registerLock.Lock() defer registerLock.Unlock() grpcresolver.Register(b) } // getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends // all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. func getPrometheusDefs() ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) { serviceName := []string{"consul"} var gauges = [][]prometheus.GaugeDefinition{ cache.Gauges, consul.AutopilotGauges, consul.RPCGauges, consul.SessionGauges, grpc.StatsGauges, usagemetrics.Gauges, } var gaugeDefs []prometheus.GaugeDefinition for _, g := range gauges { // Set Consul to each definition's namespace var withService []prometheus.GaugeDefinition for _, gauge := range g { gauge.Name = append(serviceName, gauge.Name...) withService = append(withService, gauge) } gaugeDefs = append(gaugeDefs, withService...) } raftCounters := []prometheus.CounterDefinition{ // TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry // package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. { Name: []string{"raft", "apply"}, Help: "This counts the number of Raft transactions occurring over the interval.", }, { Name: []string{"raft", "state", "candidate"}, Help: "This increments whenever a Consul server starts an election.", }, { Name: []string{"raft", "state", "leader"}, Help: "This increments whenever a Consul server becomes a leader.", }, } var counters = [][]prometheus.CounterDefinition{ CatalogCounters, cache.Counters, consul.ACLCounters, consul.CatalogCounters, consul.ClientCounters, consul.RPCCounters, grpc.StatsCounters, local.StateCounters, raftCounters, } var counterDefs []prometheus.CounterDefinition for _, c := range counters { // Set Consul to each definition's namespace var withService []prometheus.CounterDefinition for _, counter := range c { counter.Name = append(serviceName, counter.Name...) withService = append(withService, counter) } counterDefs = append(counterDefs, withService...) } raftSummaries := []prometheus.SummaryDefinition{ // TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry // package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. { Name: []string{"raft", "commitTime"}, Help: "This measures the time it takes to commit a new entry to the Raft log on the leader.", }, { Name: []string{"raft", "leader", "lastContact"}, Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.", }, } var summaries = [][]prometheus.SummaryDefinition{ HTTPSummaries, consul.ACLSummaries, consul.ACLEndpointSummaries, consul.ACLEndpointLegacySummaries, consul.CatalogSummaries, consul.FederationStateSummaries, consul.IntentionSummaries, consul.KVSummaries, consul.LeaderSummaries, consul.PreparedQuerySummaries, consul.RPCSummaries, consul.SegmentOSSSummaries, consul.SessionSummaries, consul.SessionEndpointSummaries, consul.TxnSummaries, fsm.CommandsSummaries, fsm.SnapshotSummaries, raftSummaries, } var summaryDefs []prometheus.SummaryDefinition for _, s := range summaries { // Set Consul to each definition's namespace var withService []prometheus.SummaryDefinition for _, summary := range s { summary.Name = append(serviceName, summary.Name...) withService = append(withService, summary) } summaryDefs = append(summaryDefs, withService...) } return gaugeDefs, counterDefs, summaryDefs }