4068 lines
126 KiB
Go
4068 lines
126 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/armon/go-metrics/prometheus"
|
|
"github.com/hashicorp/go-connlimit"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/serf/serf"
|
|
"golang.org/x/net/http2"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/ae"
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/checks"
|
|
"github.com/hashicorp/consul/agent/config"
|
|
"github.com/hashicorp/consul/agent/consul"
|
|
"github.com/hashicorp/consul/agent/dns"
|
|
"github.com/hashicorp/consul/agent/local"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/rpcclient/health"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/systemd"
|
|
"github.com/hashicorp/consul/agent/token"
|
|
"github.com/hashicorp/consul/agent/xds"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/api/watch"
|
|
"github.com/hashicorp/consul/ipaddr"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/lib/file"
|
|
"github.com/hashicorp/consul/lib/mutex"
|
|
"github.com/hashicorp/consul/lib/routine"
|
|
"github.com/hashicorp/consul/logging"
|
|
"github.com/hashicorp/consul/tlsutil"
|
|
"github.com/hashicorp/consul/types"
|
|
)
|
|
|
|
const (
|
|
// Path to save agent service definitions
|
|
servicesDir = "services"
|
|
serviceConfigDir = "services/configs"
|
|
|
|
// Path to save agent proxy definitions
|
|
proxyDir = "proxies"
|
|
|
|
// Path to save local agent checks
|
|
checksDir = "checks"
|
|
checkStateDir = "checks/state"
|
|
|
|
// Default reasons for node/service maintenance mode
|
|
defaultNodeMaintReason = "Maintenance mode is enabled for this node, " +
|
|
"but no reason was provided. This is a default message."
|
|
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
|
|
"service, but no reason was provided. This is a default message."
|
|
|
|
// ID of the roots watch
|
|
rootsWatchID = "roots"
|
|
|
|
// ID of the leaf watch
|
|
leafWatchID = "leaf"
|
|
|
|
// maxQueryTime is used to bound the limit of a blocking query
|
|
maxQueryTime = 600 * time.Second
|
|
|
|
// defaultQueryTime is the amount of time we block waiting for a change
|
|
// if no time is specified. Previously we would wait the maxQueryTime.
|
|
defaultQueryTime = 300 * time.Second
|
|
)
|
|
|
|
var (
|
|
httpAddrRE = regexp.MustCompile(`^(http[s]?://)(\[.*?\]|\[?[\w\-\.]+)(:\d+)?([^?]*)(\?.*)?$`)
|
|
grpcAddrRE = regexp.MustCompile("(.*)((?::)(?:[0-9]+))(.*)$")
|
|
)
|
|
|
|
type configSource int
|
|
|
|
const (
|
|
ConfigSourceLocal configSource = iota
|
|
ConfigSourceRemote
|
|
)
|
|
|
|
var configSourceToName = map[configSource]string{
|
|
ConfigSourceLocal: "local",
|
|
ConfigSourceRemote: "remote",
|
|
}
|
|
var configSourceFromName = map[string]configSource{
|
|
"local": ConfigSourceLocal,
|
|
"remote": ConfigSourceRemote,
|
|
// If the value is not found in the persisted config file, then use the
|
|
// former default.
|
|
"": ConfigSourceLocal,
|
|
}
|
|
|
|
func (s configSource) String() string {
|
|
return configSourceToName[s]
|
|
}
|
|
|
|
// ConfigSourceFromName will unmarshal the string form of a configSource.
|
|
func ConfigSourceFromName(name string) (configSource, bool) {
|
|
s, ok := configSourceFromName[name]
|
|
return s, ok
|
|
}
|
|
|
|
// delegate defines the interface shared by both
|
|
// consul.Client and consul.Server.
|
|
type delegate interface {
|
|
// Leave is used to prepare for a graceful shutdown.
|
|
Leave() error
|
|
|
|
// AgentLocalMember is used to retrieve the LAN member for the local node.
|
|
AgentLocalMember() serf.Member
|
|
|
|
// LANMembersInAgentPartition returns the LAN members for this agent's
|
|
// canonical serf pool. For clients this is the only pool that exists. For
|
|
// servers it's the pool in the default segment and the default partition.
|
|
LANMembersInAgentPartition() []serf.Member
|
|
|
|
// LANMembers returns the LAN members for one of:
|
|
//
|
|
// - the requested partition
|
|
// - the requested segment
|
|
// - all segments
|
|
//
|
|
// This is limited to segments and partitions that the node is a member of.
|
|
LANMembers(f consul.LANMemberFilter) ([]serf.Member, error)
|
|
|
|
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
|
|
// pool.
|
|
//
|
|
// - Clients return a single coordinate for the single gossip pool they are
|
|
// in (default, segment, or partition).
|
|
//
|
|
// - Servers return one coordinate for their canonical gossip pool (i.e.
|
|
// default partition/segment) and one per segment they are also ancillary
|
|
// members of.
|
|
//
|
|
// NOTE: servers do not emit coordinates for partitioned gossip pools they
|
|
// are ancillary members of.
|
|
//
|
|
// NOTE: This assumes coordinates are enabled, so check that before calling.
|
|
GetLANCoordinate() (lib.CoordinateSet, error)
|
|
|
|
// JoinLAN is used to have Consul join the inner-DC pool The target address
|
|
// should be another node inside the DC listening on the Serf LAN address
|
|
JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error)
|
|
|
|
// RemoveFailedNode is used to remove a failed node from the cluster.
|
|
RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error
|
|
|
|
// ResolveTokenAndDefaultMeta returns an acl.Authorizer which authorizes
|
|
// actions based on the permissions granted to the token.
|
|
// If either entMeta or authzContext are non-nil they will be populated with the
|
|
// default partition and namespace from the token.
|
|
ResolveTokenAndDefaultMeta(token string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (consul.ACLResolveResult, error)
|
|
|
|
RPC(method string, args interface{}, reply interface{}) error
|
|
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
|
|
Shutdown() error
|
|
Stats() map[string]map[string]string
|
|
ReloadConfig(config consul.ReloadableConfig) error
|
|
enterpriseDelegate
|
|
}
|
|
|
|
// notifier is called after a successful JoinLAN.
|
|
type notifier interface {
|
|
Notify(string) error
|
|
}
|
|
|
|
// Agent is the long running process that is run on every machine.
|
|
// It exposes an RPC interface that is used by the CLI to control the
|
|
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
|
|
// However, it can run in either a client, or server mode. In server
|
|
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
|
// requests to other Consul servers.
|
|
type Agent struct {
|
|
// TODO: remove fields that are already in BaseDeps
|
|
baseDeps BaseDeps
|
|
|
|
// config is the agent configuration.
|
|
config *config.RuntimeConfig
|
|
|
|
// Used for writing our logs
|
|
logger hclog.InterceptLogger
|
|
|
|
// delegate is either a *consul.Server or *consul.Client
|
|
// depending on the configuration
|
|
delegate delegate
|
|
|
|
// state stores a local representation of the node,
|
|
// services and checks. Used for anti-entropy.
|
|
State *local.State
|
|
|
|
// sync manages the synchronization of the local
|
|
// and the remote state.
|
|
sync *ae.StateSyncer
|
|
|
|
// syncMu and syncCh are used to coordinate agent endpoints that are blocking
|
|
// on local state during a config reload.
|
|
syncMu sync.Mutex
|
|
syncCh chan struct{}
|
|
|
|
// cache is the in-memory cache for data the Agent requests.
|
|
cache *cache.Cache
|
|
|
|
// checkReapAfter maps the check ID to a timeout after which we should
|
|
// reap its associated service
|
|
checkReapAfter map[structs.CheckID]time.Duration
|
|
|
|
// checkMonitors maps the check ID to an associated monitor
|
|
checkMonitors map[structs.CheckID]*checks.CheckMonitor
|
|
|
|
// checkHTTPs maps the check ID to an associated HTTP check
|
|
checkHTTPs map[structs.CheckID]*checks.CheckHTTP
|
|
|
|
// checkH2PINGs maps the check ID to an associated HTTP2 PING check
|
|
checkH2PINGs map[structs.CheckID]*checks.CheckH2PING
|
|
|
|
// checkTCPs maps the check ID to an associated TCP check
|
|
checkTCPs map[structs.CheckID]*checks.CheckTCP
|
|
|
|
// checkGRPCs maps the check ID to an associated GRPC check
|
|
checkGRPCs map[structs.CheckID]*checks.CheckGRPC
|
|
|
|
// checkTTLs maps the check ID to an associated check TTL
|
|
checkTTLs map[structs.CheckID]*checks.CheckTTL
|
|
|
|
// checkDockers maps the check ID to an associated Docker Exec based check
|
|
checkDockers map[structs.CheckID]*checks.CheckDocker
|
|
|
|
// checkAliases maps the check ID to an associated Alias checks
|
|
checkAliases map[structs.CheckID]*checks.CheckAlias
|
|
|
|
// exposedPorts tracks listener ports for checks exposed through a proxy
|
|
exposedPorts map[string]int
|
|
|
|
// stateLock protects the agent state
|
|
stateLock *mutex.Mutex
|
|
|
|
// dockerClient is the client for performing docker health checks.
|
|
dockerClient *checks.DockerClient
|
|
|
|
// eventCh is used to receive user events
|
|
eventCh chan serf.UserEvent
|
|
|
|
// eventBuf stores the most recent events in a ring buffer
|
|
// using eventIndex as the next index to insert into. This
|
|
// is guarded by eventLock. When an insert happens, the
|
|
// eventNotify group is notified.
|
|
eventBuf []*UserEvent
|
|
eventIndex int
|
|
eventLock sync.RWMutex
|
|
eventNotify NotifyGroup
|
|
|
|
shutdown bool
|
|
shutdownCh chan struct{}
|
|
shutdownLock sync.Mutex
|
|
|
|
// joinLANNotifier is called after a successful JoinLAN.
|
|
joinLANNotifier notifier
|
|
|
|
// retryJoinCh transports errors from the retry join
|
|
// attempts.
|
|
retryJoinCh chan error
|
|
|
|
// endpoints maps unique RPC endpoint names to common ones
|
|
// to allow overriding of RPC handlers since the golang
|
|
// net/rpc server does not allow this.
|
|
endpoints map[string]string
|
|
endpointsLock sync.RWMutex
|
|
|
|
// dnsServer provides the DNS API
|
|
dnsServers []*DNSServer
|
|
|
|
// apiServers listening for connections. If any of these server goroutines
|
|
// fail, the agent will be shutdown.
|
|
apiServers *apiServers
|
|
|
|
// httpHandlers provides direct access to (one of) the HTTPHandlers started by
|
|
// this agent. This is used in tests to test HTTP endpoints without overhead
|
|
// of TCP connections etc.
|
|
//
|
|
// TODO: this is a temporary re-introduction after we removed a list of
|
|
// HTTPServers in favour of apiServers abstraction. Now that HTTPHandlers is
|
|
// stateful and has config reloading though it's not OK to just use a
|
|
// different instance of handlers in tests to the ones that the agent is wired
|
|
// up to since then config reloads won't actually affect the handlers under
|
|
// test while plumbing the external handlers in the TestAgent through bypasses
|
|
// testing that the agent itself is actually reloading the state correctly.
|
|
// Once we move `apiServers` to be a passed-in dependency for NewAgent, we
|
|
// should be able to remove this and have the Test Agent create the
|
|
// HTTPHandlers and pass them in removing the need to pull them back out
|
|
// again.
|
|
httpHandlers *HTTPHandlers
|
|
|
|
// wgServers is the wait group for all HTTP and DNS servers
|
|
// TODO: remove once dnsServers are handled by apiServers
|
|
wgServers sync.WaitGroup
|
|
|
|
// watchPlans tracks all the currently-running watch plans for the
|
|
// agent.
|
|
watchPlans []*watch.Plan
|
|
|
|
// tokens holds ACL tokens initially from the configuration, but can
|
|
// be updated at runtime, so should always be used instead of going to
|
|
// the configuration directly.
|
|
tokens *token.Store
|
|
|
|
// proxyConfig is the manager for proxy service (Kind = connect-proxy)
|
|
// configuration state. This ensures all state needed by a proxy registration
|
|
// is maintained in cache and handles pushing updates to that state into XDS
|
|
// server to be pushed out to Envoy.
|
|
proxyConfig *proxycfg.Manager
|
|
|
|
// serviceManager is the manager for combining local service registrations with
|
|
// the centrally configured proxy/service defaults.
|
|
serviceManager *ServiceManager
|
|
|
|
// grpcServer is the server instance used currently to serve xDS API for
|
|
// Envoy.
|
|
grpcServer *grpc.Server
|
|
|
|
// tlsConfigurator is the central instance to provide a *tls.Config
|
|
// based on the current consul configuration.
|
|
tlsConfigurator *tlsutil.Configurator
|
|
|
|
// httpConnLimiter is used to limit connections to the HTTP server by client
|
|
// IP.
|
|
httpConnLimiter connlimit.Limiter
|
|
|
|
// configReloaders are subcomponents that need to be notified on a reload so
|
|
// they can update their internal state.
|
|
configReloaders []ConfigReloader
|
|
|
|
// TODO: pass directly to HTTPHandlers and DNSServer once those are passed
|
|
// into Agent, which will allow us to remove this field.
|
|
rpcClientHealth *health.Client
|
|
|
|
// routineManager is responsible for managing longer running go routines
|
|
// run by the Agent
|
|
routineManager *routine.Manager
|
|
|
|
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
|
enterpriseAgent
|
|
}
|
|
|
|
// New process the desired options and creates a new Agent.
|
|
// This process will
|
|
// * parse the config given the config Flags
|
|
// * setup logging
|
|
// * using predefined logger given in an option
|
|
// OR
|
|
// * initialize a new logger from the configuration
|
|
// including setting up gRPC logging
|
|
// * initialize telemetry
|
|
// * create a TLS Configurator
|
|
// * build a shared connection pool
|
|
// * create the ServiceManager
|
|
// * setup the NodeID if one isn't provided in the configuration
|
|
// * create the AutoConfig object for future use in fully
|
|
// resolving the configuration
|
|
func New(bd BaseDeps) (*Agent, error) {
|
|
a := Agent{
|
|
checkReapAfter: make(map[structs.CheckID]time.Duration),
|
|
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
|
|
checkTTLs: make(map[structs.CheckID]*checks.CheckTTL),
|
|
checkHTTPs: make(map[structs.CheckID]*checks.CheckHTTP),
|
|
checkH2PINGs: make(map[structs.CheckID]*checks.CheckH2PING),
|
|
checkTCPs: make(map[structs.CheckID]*checks.CheckTCP),
|
|
checkGRPCs: make(map[structs.CheckID]*checks.CheckGRPC),
|
|
checkDockers: make(map[structs.CheckID]*checks.CheckDocker),
|
|
checkAliases: make(map[structs.CheckID]*checks.CheckAlias),
|
|
eventCh: make(chan serf.UserEvent, 1024),
|
|
eventBuf: make([]*UserEvent, 256),
|
|
joinLANNotifier: &systemd.Notifier{},
|
|
retryJoinCh: make(chan error),
|
|
shutdownCh: make(chan struct{}),
|
|
endpoints: make(map[string]string),
|
|
stateLock: mutex.New(),
|
|
|
|
baseDeps: bd,
|
|
tokens: bd.Tokens,
|
|
logger: bd.Logger,
|
|
tlsConfigurator: bd.TLSConfigurator,
|
|
config: bd.RuntimeConfig,
|
|
cache: bd.Cache,
|
|
routineManager: routine.NewManager(bd.Logger),
|
|
}
|
|
|
|
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
|
|
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a.rpcClientHealth = &health.Client{
|
|
Cache: bd.Cache,
|
|
NetRPC: &a,
|
|
CacheName: cachetype.HealthServicesName,
|
|
ViewStore: bd.ViewStore,
|
|
MaterializerDeps: health.MaterializerDeps{
|
|
Conn: conn,
|
|
Logger: bd.Logger.Named("rpcclient.health"),
|
|
},
|
|
UseStreamingBackend: a.config.UseStreamingBackend,
|
|
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
|
|
}
|
|
|
|
a.serviceManager = NewServiceManager(&a)
|
|
|
|
// We used to do this in the Start method. However it doesn't need to go
|
|
// there any longer. Originally it did because we passed the agent
|
|
// delegate to some of the cache registrations. Now we just
|
|
// pass the agent itself so its safe to move here.
|
|
a.registerCache()
|
|
|
|
// TODO: why do we ignore failure to load persisted tokens?
|
|
_ = a.tokens.Load(bd.RuntimeConfig.ACLTokens, a.logger)
|
|
|
|
// TODO: pass in a fully populated apiServers into Agent.New
|
|
a.apiServers = NewAPIServers(a.logger)
|
|
|
|
return &a, nil
|
|
}
|
|
|
|
// GetConfig retrieves the agents config
|
|
// TODO make export the config field and get rid of this method
|
|
// This is here for now to simplify the work I am doing and make
|
|
// reviewing the final PR easier.
|
|
func (a *Agent) GetConfig() *config.RuntimeConfig {
|
|
a.stateLock.Lock()
|
|
defer a.stateLock.Unlock()
|
|
return a.config
|
|
}
|
|
|
|
// LocalConfig takes a config.RuntimeConfig and maps the fields to a local.Config
|
|
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
|
lc := local.Config{
|
|
AdvertiseAddr: cfg.AdvertiseAddrLAN.String(),
|
|
CheckUpdateInterval: cfg.CheckUpdateInterval,
|
|
Datacenter: cfg.Datacenter,
|
|
DiscardCheckOutput: cfg.DiscardCheckOutput,
|
|
NodeID: cfg.NodeID,
|
|
NodeName: cfg.NodeName,
|
|
Partition: cfg.PartitionOrDefault(),
|
|
TaggedAddresses: map[string]string{},
|
|
}
|
|
for k, v := range cfg.TaggedAddresses {
|
|
lc.TaggedAddresses[k] = v
|
|
}
|
|
return lc
|
|
}
|
|
|
|
// Start verifies its configuration and runs an agent's various subprocesses.
|
|
func (a *Agent) Start(ctx context.Context) error {
|
|
a.stateLock.Lock()
|
|
defer a.stateLock.Unlock()
|
|
|
|
// This needs to be done early on as it will potentially alter the configuration
|
|
// and then how other bits are brought up
|
|
c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// copy over the existing node id, this cannot be
|
|
// changed while running anyways but this prevents
|
|
// breaking some existing behavior. then overwrite
|
|
// the configuration
|
|
c.NodeID = a.config.NodeID
|
|
a.config = c
|
|
|
|
if err := a.tlsConfigurator.Update(a.config.ToTLSUtilConfig()); err != nil {
|
|
return fmt.Errorf("Failed to load TLS configurations after applying auto-config settings: %w", err)
|
|
}
|
|
|
|
if err := a.startLicenseManager(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// create the local state
|
|
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)
|
|
|
|
// create the state synchronization manager which performs
|
|
// regular and on-demand state synchronizations (anti-entropy).
|
|
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
|
|
|
|
// create the config for the rpc server/client
|
|
consulCfg, err := newConsulConfig(a.config, a.logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Setup the user event callback
|
|
consulCfg.UserEventHandler = func(e serf.UserEvent) {
|
|
select {
|
|
case a.eventCh <- e:
|
|
case <-a.shutdownCh:
|
|
}
|
|
}
|
|
|
|
// ServerUp is used to inform that a new consul server is now
|
|
// up. This can be used to speed up the sync process if we are blocking
|
|
// waiting to discover a consul server
|
|
consulCfg.ServerUp = a.sync.SyncFull.Trigger
|
|
|
|
err = a.initEnterprise(consulCfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
|
|
}
|
|
|
|
// Setup either the client or the server.
|
|
if c.ServerMode {
|
|
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to start Consul server: %v", err)
|
|
}
|
|
a.delegate = server
|
|
} else {
|
|
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to start Consul client: %v", err)
|
|
}
|
|
a.delegate = client
|
|
}
|
|
|
|
// The staggering of the state syncing depends on the cluster size.
|
|
//
|
|
// NOTE: we will use the agent's canonical serf pool for this since that's
|
|
// similarly scoped with the state store side of anti-entropy.
|
|
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembersInAgentPartition()) }
|
|
|
|
// link the state with the consul server/client and the state syncer
|
|
// via callbacks. After several attempts this was easier than using
|
|
// channels since the event notification needs to be non-blocking
|
|
// and that should be hidden in the state syncer implementation.
|
|
a.State.Delegate = a.delegate
|
|
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
|
|
|
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
|
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
|
}
|
|
|
|
// Load checks/services/metadata.
|
|
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}
|
|
if err := a.loadServices(c, emptyCheckSnapshot); err != nil {
|
|
return err
|
|
}
|
|
if err := a.loadChecks(c, nil); err != nil {
|
|
return err
|
|
}
|
|
if err := a.loadMetadata(c); err != nil {
|
|
return err
|
|
}
|
|
|
|
var intentionDefaultAllow bool
|
|
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
|
|
case "allow":
|
|
intentionDefaultAllow = true
|
|
case "deny":
|
|
intentionDefaultAllow = false
|
|
default:
|
|
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
|
|
}
|
|
|
|
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
|
|
|
|
// Start the proxy config manager.
|
|
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
|
|
Cache: a.cache,
|
|
Health: a.rpcClientHealth,
|
|
Logger: a.logger.Named(logging.ProxyConfig),
|
|
State: a.State,
|
|
Tokens: a.baseDeps.Tokens,
|
|
Source: &structs.QuerySource{
|
|
Datacenter: a.config.Datacenter,
|
|
Segment: a.config.SegmentName,
|
|
NodePartition: a.config.PartitionOrEmpty(),
|
|
},
|
|
DNSConfig: proxycfg.DNSConfig{
|
|
Domain: a.config.DNSDomain,
|
|
AltDomain: a.config.DNSAltDomain,
|
|
},
|
|
TLSConfigurator: a.tlsConfigurator,
|
|
IntentionDefaultAllow: intentionDefaultAllow,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
if err := a.proxyConfig.Run(); err != nil {
|
|
a.logger.Error("proxy config manager exited with error", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Start watching for critical services to deregister, based on their
|
|
// checks.
|
|
go a.reapServices()
|
|
|
|
// Start handling events.
|
|
go a.handleEvents()
|
|
|
|
// Start sending network coordinate to the server.
|
|
if !c.DisableCoordinates {
|
|
go a.sendCoordinate()
|
|
}
|
|
|
|
// Write out the PID file if necessary.
|
|
if err := a.storePid(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// start DNS servers
|
|
if err := a.listenAndServeDNS(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Configure the http connection limiter.
|
|
a.httpConnLimiter.SetConfig(connlimit.Config{
|
|
MaxConnsPerClientIP: a.config.HTTPMaxConnsPerClient,
|
|
})
|
|
|
|
// Create listeners and unstarted servers; see comment on listenHTTP why
|
|
// we are doing this.
|
|
servers, err := a.listenHTTP()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start HTTP and HTTPS servers.
|
|
for _, srv := range servers {
|
|
a.apiServers.Start(srv)
|
|
}
|
|
|
|
// Start gRPC server.
|
|
if err := a.listenAndServeGRPC(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// register watches
|
|
if err := a.reloadWatches(a.config); err != nil {
|
|
return err
|
|
}
|
|
|
|
// start retry join
|
|
go a.retryJoinLAN()
|
|
if a.config.ServerMode {
|
|
go a.retryJoinWAN()
|
|
}
|
|
|
|
// DEPRECATED: Warn users if they're emitting deprecated metrics. Remove this warning and the flagged metrics in a
|
|
// future release of Consul.
|
|
if !a.config.Telemetry.DisableCompatOneNine {
|
|
a.logger.Warn("DEPRECATED Backwards compatibility with pre-1.9 metrics enabled. These metrics will be removed in a future version of Consul. Set `telemetry { disable_compat_1.9 = true }` to disable them.")
|
|
}
|
|
|
|
if a.tlsConfigurator.Cert() != nil {
|
|
m := tlsCertExpirationMonitor(a.tlsConfigurator, a.logger)
|
|
go m.Monitor(&lib.StopChannelContext{StopCh: a.shutdownCh})
|
|
}
|
|
|
|
// consul version metric with labels
|
|
metrics.SetGaugeWithLabels([]string{"version"}, 1, []metrics.Label{
|
|
{Name: "version", Value: a.config.Version},
|
|
{Name: "pre_release", Value: a.config.VersionPrerelease},
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
var Gauges = []prometheus.GaugeDefinition{
|
|
{
|
|
Name: []string{"version"},
|
|
Help: "Represents the Consul version.",
|
|
},
|
|
}
|
|
|
|
// Failed returns a channel which is closed when the first server goroutine exits
|
|
// with a non-nil error.
|
|
func (a *Agent) Failed() <-chan struct{} {
|
|
return a.apiServers.failed
|
|
}
|
|
|
|
func (a *Agent) listenAndServeGRPC() error {
|
|
if len(a.config.GRPCAddrs) < 1 {
|
|
return nil
|
|
}
|
|
|
|
xdsServer := xds.NewServer(
|
|
a.logger.Named(logging.Envoy),
|
|
a.proxyConfig,
|
|
func(id string) (acl.Authorizer, error) {
|
|
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
|
|
},
|
|
a,
|
|
a,
|
|
)
|
|
|
|
tlsConfig := a.tlsConfigurator
|
|
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled
|
|
// then gRPC should not use TLS.
|
|
if a.config.HTTPSPort <= 0 {
|
|
tlsConfig = nil
|
|
}
|
|
var err error
|
|
a.grpcServer = xds.NewGRPCServer(xdsServer, tlsConfig)
|
|
|
|
ln, err := a.startListeners(a.config.GRPCAddrs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, l := range ln {
|
|
go func(innerL net.Listener) {
|
|
a.logger.Info("Started gRPC server",
|
|
"address", innerL.Addr().String(),
|
|
"network", innerL.Addr().Network(),
|
|
)
|
|
err := a.grpcServer.Serve(innerL)
|
|
if err != nil {
|
|
a.logger.Error("gRPC server failed", "error", err)
|
|
}
|
|
}(l)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Agent) listenAndServeDNS() error {
|
|
notif := make(chan net.Addr, len(a.config.DNSAddrs))
|
|
errCh := make(chan error, len(a.config.DNSAddrs))
|
|
for _, addr := range a.config.DNSAddrs {
|
|
// create server
|
|
s, err := NewDNSServer(a)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.dnsServers = append(a.dnsServers, s)
|
|
|
|
// start server
|
|
a.wgServers.Add(1)
|
|
go func(addr net.Addr) {
|
|
defer a.wgServers.Done()
|
|
err := s.ListenAndServe(addr.Network(), addr.String(), func() { notif <- addr })
|
|
if err != nil && !strings.Contains(err.Error(), "accept") {
|
|
errCh <- err
|
|
}
|
|
}(addr)
|
|
}
|
|
|
|
// wait for servers to be up
|
|
timeout := time.After(time.Second)
|
|
var merr *multierror.Error
|
|
for range a.config.DNSAddrs {
|
|
select {
|
|
case addr := <-notif:
|
|
a.logger.Info("Started DNS server",
|
|
"address", addr.String(),
|
|
"network", addr.Network(),
|
|
)
|
|
|
|
case err := <-errCh:
|
|
merr = multierror.Append(merr, err)
|
|
case <-timeout:
|
|
merr = multierror.Append(merr, fmt.Errorf("agent: timeout starting DNS servers"))
|
|
return merr.ErrorOrNil()
|
|
}
|
|
}
|
|
return merr.ErrorOrNil()
|
|
}
|
|
|
|
func (a *Agent) startListeners(addrs []net.Addr) ([]net.Listener, error) {
|
|
var ln []net.Listener
|
|
for _, addr := range addrs {
|
|
var l net.Listener
|
|
var err error
|
|
|
|
switch x := addr.(type) {
|
|
case *net.UnixAddr:
|
|
l, err = a.listenSocket(x.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
case *net.TCPAddr:
|
|
l, err = net.Listen("tcp", x.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
l = &tcpKeepAliveListener{l.(*net.TCPListener)}
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported address type %T", addr)
|
|
}
|
|
ln = append(ln, l)
|
|
}
|
|
return ln, nil
|
|
}
|
|
|
|
// listenHTTP binds listeners to the provided addresses and also returns
|
|
// pre-configured HTTP servers which are not yet started. The motivation is
|
|
// that in the current startup/shutdown setup we de-couple the listener
|
|
// creation from the server startup assuming that if any of the listeners
|
|
// cannot be bound we fail immediately and later failures do not occur.
|
|
// Therefore, starting a server with a running listener is assumed to not
|
|
// produce an error.
|
|
//
|
|
// The second motivation is that an HTTPS server needs to use the same TLSConfig
|
|
// on both the listener and the HTTP server. When listeners and servers are
|
|
// created at different times this becomes difficult to handle without keeping
|
|
// the TLS configuration somewhere or recreating it.
|
|
//
|
|
// This approach should ultimately be refactored to the point where we just
|
|
// start the server and any error should trigger a proper shutdown of the agent.
|
|
func (a *Agent) listenHTTP() ([]apiServer, error) {
|
|
var ln []net.Listener
|
|
var servers []apiServer
|
|
|
|
start := func(proto string, addrs []net.Addr) error {
|
|
listeners, err := a.startListeners(addrs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ln = append(ln, listeners...)
|
|
|
|
for _, l := range listeners {
|
|
var tlscfg *tls.Config
|
|
_, isTCP := l.(*tcpKeepAliveListener)
|
|
if isTCP && proto == "https" {
|
|
tlscfg = a.tlsConfigurator.IncomingHTTPSConfig()
|
|
l = tls.NewListener(l, tlscfg)
|
|
}
|
|
|
|
srv := &HTTPHandlers{
|
|
agent: a,
|
|
denylist: NewDenylist(a.config.HTTPBlockEndpoints),
|
|
}
|
|
a.configReloaders = append(a.configReloaders, srv.ReloadConfig)
|
|
a.httpHandlers = srv
|
|
httpServer := &http.Server{
|
|
Addr: l.Addr().String(),
|
|
TLSConfig: tlscfg,
|
|
Handler: srv.handler(a.config.EnableDebug),
|
|
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
|
|
}
|
|
|
|
// Load the connlimit helper into the server
|
|
connLimitFn := a.httpConnLimiter.HTTPConnStateFuncWithDefault429Handler(10 * time.Millisecond)
|
|
|
|
if proto == "https" {
|
|
if err := setupHTTPS(httpServer, connLimitFn, a.config.HTTPSHandshakeTimeout); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
httpServer.ConnState = connLimitFn
|
|
}
|
|
|
|
servers = append(servers, newAPIServerHTTP(proto, l, httpServer))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := start("http", a.config.HTTPAddrs); err != nil {
|
|
closeListeners(ln)
|
|
return nil, err
|
|
}
|
|
if err := start("https", a.config.HTTPSAddrs); err != nil {
|
|
closeListeners(ln)
|
|
return nil, err
|
|
}
|
|
return servers, nil
|
|
}
|
|
|
|
func closeListeners(lns []net.Listener) {
|
|
for _, l := range lns {
|
|
l.Close()
|
|
}
|
|
}
|
|
|
|
// setupHTTPS adds HTTP/2 support, ConnState, and a connection handshake timeout
|
|
// to the http.Server.
|
|
func setupHTTPS(server *http.Server, connState func(net.Conn, http.ConnState), timeout time.Duration) error {
|
|
// Enforce TLS handshake timeout
|
|
server.ConnState = func(conn net.Conn, state http.ConnState) {
|
|
switch state {
|
|
case http.StateNew:
|
|
// Set deadline to prevent slow send before TLS handshake or first
|
|
// byte of request.
|
|
conn.SetReadDeadline(time.Now().Add(timeout))
|
|
case http.StateActive:
|
|
// Clear read deadline. We should maybe set read timeouts more
|
|
// generally but that's a bigger task as some HTTP endpoints may
|
|
// stream large requests and responses (e.g. snapshot) so we can't
|
|
// set sensible blanket timeouts here.
|
|
conn.SetReadDeadline(time.Time{})
|
|
}
|
|
// Pass through to conn limit. This is OK because we didn't change
|
|
// state (i.e. Close conn).
|
|
connState(conn, state)
|
|
}
|
|
|
|
// This will enable upgrading connections to HTTP/2 as
|
|
// part of TLS negotiation.
|
|
return http2.ConfigureServer(server, nil)
|
|
}
|
|
|
|
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
|
|
// connections. It's used so dead TCP connections eventually go away.
|
|
type tcpKeepAliveListener struct {
|
|
*net.TCPListener
|
|
}
|
|
|
|
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
|
|
tc, err := ln.AcceptTCP()
|
|
if err != nil {
|
|
return
|
|
}
|
|
tc.SetKeepAlive(true)
|
|
tc.SetKeepAlivePeriod(30 * time.Second)
|
|
return tc, nil
|
|
}
|
|
|
|
func (a *Agent) listenSocket(path string) (net.Listener, error) {
|
|
if _, err := os.Stat(path); !os.IsNotExist(err) {
|
|
a.logger.Warn("Replacing socket", "path", path)
|
|
}
|
|
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
|
|
return nil, fmt.Errorf("error removing socket file: %s", err)
|
|
}
|
|
l, err := net.Listen("unix", path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
user, group, mode := a.config.UnixSocketUser, a.config.UnixSocketGroup, a.config.UnixSocketMode
|
|
if err := setFilePermissions(path, user, group, mode); err != nil {
|
|
return nil, fmt.Errorf("Failed setting up socket: %s", err)
|
|
}
|
|
return l, nil
|
|
}
|
|
|
|
// stopAllWatches stops all the currently running watches
|
|
func (a *Agent) stopAllWatches() {
|
|