Address PR comments.

This commit is contained in:
Derek Menteer 2022-09-01 12:32:11 -05:00
parent ab9d421ba2
commit cb478b0e61
13 changed files with 364 additions and 142 deletions

View File

@ -217,10 +217,6 @@ type Agent struct {
// opposed to the multiplexed "server" port). // opposed to the multiplexed "server" port).
externalGRPCServer *grpc.Server externalGRPCServer *grpc.Server
// externalGRPCTLSServer is the gRPC server exposed on a dedicated gRPC-TLS port (as
// opposed to the multiplexed "server" port).
externalGRPCTLSServer *grpc.Server
// state stores a local representation of the node, // state stores a local representation of the node,
// services and checks. Used for anti-entropy. // services and checks. Used for anti-entropy.
State *local.State State *local.State
@ -543,8 +539,7 @@ func (a *Agent) Start(ctx context.Context) error {
// This needs to happen after the initial auto-config is loaded, because TLS // This needs to happen after the initial auto-config is loaded, because TLS
// can only be configured on the gRPC server at the point of creation. // can only be configured on the gRPC server at the point of creation.
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers( a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"))
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
if err := a.startLicenseManager(ctx); err != nil { if err := a.startLicenseManager(ctx); err != nil {
return err return err
@ -583,14 +578,7 @@ func (a *Agent) Start(ctx context.Context) error {
// Setup either the client or the server. // Setup either the client or the server.
if c.ServerMode { if c.ServerMode {
var externalGRPCServers []*grpc.Server server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer)
if a.externalGRPCServer != nil {
externalGRPCServers = append(externalGRPCServers, a.externalGRPCServer)
}
if a.externalGRPCTLSServer != nil {
externalGRPCServers = append(externalGRPCServers, a.externalGRPCTLSServer)
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, externalGRPCServers)
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err) return fmt.Errorf("Failed to start Consul server: %v", err)
} }
@ -804,29 +792,35 @@ func (a *Agent) listenAndServeGRPC() error {
}, },
a, a,
) )
a.xdsServer.Register(a.externalGRPCServer)
// Spawn listeners and register xds servers. // Attempt to spawn listeners
var listeners []net.Listener var listeners []net.Listener
start := func(port_name string, addrs []net.Addr, srv *grpc.Server) error { start := func(port_name string, addrs []net.Addr, tlsConf *tls.Config) error {
if len(addrs) < 1 || srv == nil { if len(addrs) < 1 {
return nil return nil
} }
a.xdsServer.Register(srv)
ln, err := a.startListeners(addrs) ln, err := a.startListeners(addrs)
if err != nil { if err != nil {
return err return err
} }
listeners = append(listeners, ln...) for i := range ln {
// Wrap with TLS, if provided.
if tlsConf != nil {
ln[i] = tls.NewListener(ln[i], tlsConf)
}
listeners = append(listeners, ln[i])
}
for _, l := range ln { for _, l := range ln {
go func(innerL net.Listener) { go func(innerL net.Listener) {
a.logger.Info("Started gRPC server", a.logger.Info("Started gRPC listeners",
"port_name", port_name, "port_name", port_name,
"address", innerL.Addr().String(), "address", innerL.Addr().String(),
"network", innerL.Addr().Network(), "network", innerL.Addr().Network(),
) )
err := srv.Serve(innerL) err := a.externalGRPCServer.Serve(innerL)
if err != nil { if err != nil {
a.logger.Error("gRPC server failed", "port_name", port_name, "error", err) a.logger.Error("gRPC server failed", "port_name", port_name, "error", err)
} }
@ -835,13 +829,26 @@ func (a *Agent) listenAndServeGRPC() error {
return nil return nil
} }
if err := start("grpc", a.config.GRPCAddrs, a.externalGRPCServer); err != nil { // The original grpc port may spawn in either plain-text or TLS mode (for backwards compatibility).
closeListeners(listeners) // TODO: Simplify this block to only spawn plain-text after 1.14 when deprecated TLS support is removed.
return err if a.config.GRPCPort > 0 {
// Only allow the grpc port to spawn TLS connections if the other grpc_tls port is NOT defined.
var tlsConf *tls.Config = nil
if a.config.GRPCTLSPort <= 0 && a.tlsConfigurator.GRPCServerUseTLS() {
a.logger.Warn("deprecated gRPC TLS configuration detected. Consider using `ports.grpc_tls` instead")
tlsConf = a.tlsConfigurator.IncomingGRPCConfig()
}
if err := start("grpc", a.config.GRPCAddrs, tlsConf); err != nil {
closeListeners(listeners)
return err
}
} }
if err := start("grpc_tls", a.config.GRPCTLSAddrs, a.externalGRPCTLSServer); err != nil { // Only allow grpc_tls to spawn with a TLS listener.
closeListeners(listeners) if a.config.GRPCTLSPort > 0 {
return err if err := start("grpc_tls", a.config.GRPCTLSAddrs, a.tlsConfigurator.IncomingGRPCConfig()); err != nil {
closeListeners(listeners)
return err
}
} }
return nil return nil
} }
@ -1535,9 +1542,6 @@ func (a *Agent) ShutdownAgent() error {
if a.externalGRPCServer != nil { if a.externalGRPCServer != nil {
a.externalGRPCServer.Stop() a.externalGRPCServer.Stop()
} }
if a.externalGRPCTLSServer != nil {
a.externalGRPCTLSServer.Stop()
}
// Stop the proxy config manager // Stop the proxy config manager
if a.proxyConfig != nil { if a.proxyConfig != nil {
@ -3966,9 +3970,8 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading tls configuration: %s", err) return fmt.Errorf("Failed reloading tls configuration: %s", err)
} }
// Setup the external GRPC servers. // Setup the external GRPC server.
a.externalGRPCServer, a.externalGRPCTLSServer = external.BuildExternalGRPCServers( a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"))
a.config.GRPCPort, a.config.GRPCTLSPort, a.tlsConfigurator, a.logger)
// Reload service/check definitions and metadata. // Reload service/check definitions and metadata.
if err := a.loadServices(newCfg, snap); err != nil { if err := a.loadServices(newCfg, snap); err != nil {

View File

@ -48,8 +48,16 @@ type XDSSelf struct {
// Port could be used for either TLS or plain-text communication // Port could be used for either TLS or plain-text communication
// up through version 1.14. In order to maintain backwards-compatibility, // up through version 1.14. In order to maintain backwards-compatibility,
// Port will now default to TLS and fallback to the standard port value. // Port will now default to TLS and fallback to the standard port value.
Port int // DEPRECATED: Use Ports field instead
PortTLS int Port int
Ports GRPCPorts
}
// GRPCPorts is used to hold the external GRPC server's port numbers.
type GRPCPorts struct {
// Technically, this port is not always plain-text as of 1.14, but will be in a future release.
Plaintext int
TLS int
} }
func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -83,11 +91,14 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
"envoy": proxysupport.EnvoyVersions, "envoy": proxysupport.EnvoyVersions,
}, },
// Prefer the TLS port. See comment on the XDSSelf struct for details. // Prefer the TLS port. See comment on the XDSSelf struct for details.
Port: s.agent.config.GRPCTLSPort, Port: s.agent.config.GRPCTLSPort,
PortTLS: s.agent.config.GRPCTLSPort, Ports: GRPCPorts{
Plaintext: s.agent.config.GRPCPort,
TLS: s.agent.config.GRPCTLSPort,
},
} }
// Fallback to standard port if TLS is not enabled. // Fallback to standard port if TLS is not enabled.
if xds.PortTLS <= 0 { if s.agent.config.GRPCTLSPort <= 0 {
xds.Port = s.agent.config.GRPCPort xds.Port = s.agent.config.GRPCPort
} }
} }

View File

@ -1502,7 +1502,8 @@ func TestAgent_Self(t *testing.T) {
map[string][]string{"envoy": proxysupport.EnvoyVersions}, map[string][]string{"envoy": proxysupport.EnvoyVersions},
val.XDS.SupportedProxies, val.XDS.SupportedProxies,
) )
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.PortTLS) require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Ports.TLS)
require.Equal(t, a.Config.GRPCPort, val.XDS.Ports.Plaintext)
if tc.grpcTLS { if tc.grpcTLS {
require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Port) require.Equal(t, a.Config.GRPCTLSPort, val.XDS.Port)
} else { } else {

View File

@ -552,7 +552,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1) deps := newDefaultDeps(t, conf1)
deps.Logger = logger deps.Logger = logger
s1, err := NewServer(conf1, deps, []*grpc.Server{grpc.NewServer()}) s1, err := NewServer(conf1, deps, grpc.NewServer())
require.NoError(t, err) require.NoError(t, err)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -580,9 +580,7 @@ func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
require.NoError(t, acceptingServer.Shutdown()) require.NoError(t, acceptingServer.Shutdown())
// Have to manually shut down the gRPC server otherwise it stays bound to the port. // Have to manually shut down the gRPC server otherwise it stays bound to the port.
for i := range acceptingServer.externalGRPCServers { acceptingServer.externalGRPCServer.Stop()
acceptingServer.externalGRPCServers[i].Stop()
}
// Restart the server by re-using the previous acceptor's data directory and node id. // Restart the server by re-using the previous acceptor's data directory and node id.
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) { _, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
@ -1492,9 +1490,7 @@ func Test_Leader_PeeringSync_ServerAddressUpdates(t *testing.T) {
testutil.RunStep(t, "updated server addresses are picked up by the leader", func(t *testing.T) { testutil.RunStep(t, "updated server addresses are picked up by the leader", func(t *testing.T) {
// force close the acceptor's gRPC server so the dialier retries with a new address. // force close the acceptor's gRPC server so the dialier retries with a new address.
for i := range acceptor.externalGRPCServers { acceptor.externalGRPCServer.Stop()
acceptor.externalGRPCServers[i].Stop()
}
clone := proto.Clone(p.Peering) clone := proto.Clone(p.Peering)
updated := clone.(*pbpeering.Peering) updated := clone.(*pbpeering.Peering)

View File

@ -1543,7 +1543,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config) deps := newDefaultDeps(t, config)
deps.Logger = logger deps.Logger = logger
srv, err := NewServer(config, deps, []*grpc.Server{grpc.NewServer()}) srv, err := NewServer(config, deps, grpc.NewServer())
require.NoError(t, err) require.NoError(t, err)
defer srv.Shutdown() defer srv.Shutdown()

View File

@ -253,9 +253,9 @@ type Server struct {
// enable RPC forwarding. // enable RPC forwarding.
externalConnectCAServer *connectca.Server externalConnectCAServer *connectca.Server
// externalGRPCServers has all gRPC servers exposed on the dedicated gRPC ports, as // externalGRPCServers has a gRPC server exposed on the dedicated gRPC ports, as
// opposed to the multiplexed "server" port which is served by grpcHandler. // opposed to the multiplexed "server" port which is served by grpcHandler.
externalGRPCServers []*grpc.Server externalGRPCServer *grpc.Server
// router is used to map out Consul servers in the WAN and in Consul // router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas. // Enterprise user-defined areas.
@ -388,7 +388,7 @@ type connHandler interface {
// NewServer is used to construct a new Consul server from the configuration // NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error. // and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*Server, error) { func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
logger := flat.Logger logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil { if err := config.CheckProtocolVersion(); err != nil {
return nil, err return nil, err
@ -434,7 +434,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
reconcileCh: make(chan serf.Member, reconcileChSize), reconcileCh: make(chan serf.Member, reconcileChSize),
router: flat.Router, router: flat.Router,
tlsConfigurator: flat.TLSConfigurator, tlsConfigurator: flat.TLSConfigurator,
externalGRPCServers: externalGRPCServers, externalGRPCServer: externalGRPCServer,
reassertLeaderCh: make(chan chan error), reassertLeaderCh: make(chan chan error),
sessionTimers: NewSessionTimers(), sessionTimers: NewSessionTimers(),
tombstoneGC: gc, tombstoneGC: gc,
@ -681,13 +681,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval) s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Helper function for registering to all GRPC servers
registerGrpc := func(g GRPCService) {
for _, srv := range s.externalGRPCServers {
g.Register(srv)
}
}
// Initialize external gRPC server - register services on external gRPC server. // Initialize external gRPC server - register services on external gRPC server.
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{ s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled, ACLsEnabled: s.config.ACLsEnabled,
@ -705,7 +698,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
PrimaryDatacenter: s.config.PrimaryDatacenter, PrimaryDatacenter: s.config.PrimaryDatacenter,
ValidateEnterpriseRequest: s.validateEnterpriseRequest, ValidateEnterpriseRequest: s.validateEnterpriseRequest,
}) })
registerGrpc(s.externalACLServer) s.externalACLServer.Register(externalGRPCServer)
s.externalConnectCAServer = connectca.NewServer(connectca.Config{ s.externalConnectCAServer = connectca.NewServer(connectca.Config{
Publisher: s.publisher, Publisher: s.publisher,
@ -718,7 +711,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
}, },
ConnectEnabled: s.config.ConnectEnabled, ConnectEnabled: s.config.ConnectEnabled,
}) })
registerGrpc(s.externalConnectCAServer) s.externalConnectCAServer.Register(externalGRPCServer)
dataplaneServer := dataplane.NewServer(dataplane.Config{ dataplaneServer := dataplane.NewServer(dataplane.Config{
GetStore: func() dataplane.StateStore { return s.FSM().State() }, GetStore: func() dataplane.StateStore { return s.FSM().State() },
@ -726,14 +719,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
ACLResolver: s.ACLResolver, ACLResolver: s.ACLResolver,
Datacenter: s.config.Datacenter, Datacenter: s.config.Datacenter,
}) })
registerGrpc(dataplaneServer) dataplaneServer.Register(externalGRPCServer)
serverDiscoveryServer := serverdiscovery.NewServer(serverdiscovery.Config{ serverDiscoveryServer := serverdiscovery.NewServer(serverdiscovery.Config{
Publisher: s.publisher, Publisher: s.publisher,
ACLResolver: s.ACLResolver, ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.server-discovery"), Logger: logger.Named("grpc-api.server-discovery"),
}) })
registerGrpc(serverDiscoveryServer) serverDiscoveryServer.Register(externalGRPCServer)
s.peerStreamTracker = peerstream.NewTracker() s.peerStreamTracker = peerstream.NewTracker()
s.peeringBackend = NewPeeringBackend(s) s.peeringBackend = NewPeeringBackend(s)
@ -754,7 +747,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServers []*grpc.Server) (*
}, },
}) })
s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout) s.peerStreamTracker.SetHeartbeatTimeout(s.peerStreamServer.Config.IncomingHeartbeatTimeout)
registerGrpc(s.peerStreamServer) s.peerStreamServer.Register(externalGRPCServer)
// Initialize internal gRPC server. // Initialize internal gRPC server.
// //

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"crypto/tls"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"net" "net"
@ -218,9 +219,10 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
var dir string var dir string
var srv *Server var srv *Server
var config *Config
var deps Deps
// Retry added to avoid cases where bind addr is already in use // Retry added to avoid cases where bind addr is already in use
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) { retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
var config *Config
dir, config = testServerConfig(t) dir, config = testServerConfig(t)
for _, fn := range configOpts { for _, fn := range configOpts {
fn(config) fn(config)
@ -234,7 +236,8 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
config.ACLResolverSettings.EnterpriseMeta = *config.AgentEnterpriseMeta() config.ACLResolverSettings.EnterpriseMeta = *config.AgentEnterpriseMeta()
var err error var err error
srv, err = newServer(t, config) deps = newDefaultDeps(t, config)
srv, err = newServerWithDeps(t, config, deps)
if err != nil { if err != nil {
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
@ -245,13 +248,18 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
// Normally the gRPC server listener is created at the agent level and // Normally the gRPC server listener is created at the agent level and
// passed down into the Server creation. // passed down into the Server creation.
externalGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort) externalGRPCAddr := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCPort)
ln, err := net.Listen("tcp", externalGRPCAddr) ln, err := net.Listen("tcp", externalGRPCAddr)
require.NoError(t, err) require.NoError(t, err)
// Wrap the listener with TLS
if deps.TLSConfigurator.GRPCServerUseTLS() {
ln = tls.NewListener(ln, deps.TLSConfigurator.IncomingGRPCConfig())
}
go func() { go func() {
_ = srv.externalGRPCServers[0].Serve(ln) _ = srv.externalGRPCServer.Serve(ln)
}() }()
t.Cleanup(srv.externalGRPCServers[0].Stop) t.Cleanup(srv.externalGRPCServer.Stop)
} }
return dir, srv return dir, srv
@ -300,19 +308,8 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
oldNotify() oldNotify()
} }
} }
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"))
// setup grpc servers srv, err := NewServer(c, deps, grpcServer)
srvGRPC, srvGRPCTLS := external.BuildExternalGRPCServers(
c.GRPCPort, c.GRPCTLSPort, deps.TLSConfigurator, deps.Logger)
var grpcServers []*grpc.Server
if srvGRPC != nil {
grpcServers = append(grpcServers, srvGRPC)
}
if srvGRPCTLS != nil {
grpcServers = append(grpcServers, srvGRPCTLS)
}
srv, err := NewServer(c, deps, grpcServers)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1219,7 +1216,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
} }
} }
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()}) s1, err := NewServer(conf, deps, grpc.NewServer())
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1257,7 +1254,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
return nil return nil
} }
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()}) s2, err := NewServer(conf, deps, grpc.NewServer())
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1291,7 +1288,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
deps := newDefaultDeps(t, conf) deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil deps.NewRequestRecorderFunc = nil
s1, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()}) s1, err := NewServer(conf, deps, grpc.NewServer())
require.Error(t, err, "need err when provider func is nil") require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider") require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
@ -1310,7 +1307,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
return nil return nil
} }
s2, err := NewServer(conf, deps, []*grpc.Server{grpc.NewServer()}) s2, err := NewServer(conf, deps, grpc.NewServer())
require.Error(t, err, "need err when RequestRecorder is nil") require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder") require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")

View File

@ -6,17 +6,14 @@ import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware" middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
) )
// NewServer constructs a gRPC server for the external gRPC port, to which // NewServer constructs a gRPC server for the external gRPC port, to which
// handlers can be registered. // handlers can be registered.
func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.Server { func NewServer(logger agentmiddleware.Logger) *grpc.Server {
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger) recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)
opts := []grpc.ServerOption{ opts := []grpc.ServerOption{
@ -36,38 +33,5 @@ func NewServer(logger agentmiddleware.Logger, tls *tlsutil.Configurator) *grpc.S
MinTime: 15 * time.Second, MinTime: 15 * time.Second,
}), }),
} }
if tls != nil && tls.GRPCServerUseTLS() {
creds := credentials.NewTLS(tls.IncomingGRPCConfig())
opts = append(opts, grpc.Creds(creds))
}
return grpc.NewServer(opts...) return grpc.NewServer(opts...)
} }
// BuildExternalGRPCServers constructs two gRPC servers for the external gRPC ports.
// This function exists because behavior for the original `ports.grpc` is dependent on
// whether the new `ports.grpc_tls` is defined. This behavior should be simplified in
// a future release so that the `ports.grpc` is always plain-text and not dependent on
// the `ports.grpc_tls` configuration.
func BuildExternalGRPCServers(grpcPort int, grpcTLSPort int, t *tlsutil.Configurator, l hclog.InterceptLogger) (grpc, grpcTLS *grpc.Server) {
if grpcPort > 0 {
// TODO: remove this deprecated behavior in a future version and only support plain-text for this port.
if grpcTLSPort > 0 {
// Use plain-text if the new grpc_tls port is configured.
grpc = NewServer(l.Named("grpc.external"), nil)
} else {
// Otherwise, check TLS configuration to determine whether to encrypt (for backwards compatibility).
grpc = NewServer(l.Named("grpc.external"), t)
if t != nil && t.GRPCServerUseTLS() {
l.Warn("deprecated gRPC TLS configuration detected. Consider using `ports.grpc_tls` instead")
}
}
}
if grpcTLSPort > 0 {
if t.GRPCServerUseTLS() {
grpcTLS = NewServer(l.Named("grpc_tls.external"), t)
} else {
l.Error("error starting gRPC TLS server", "error", "port is set, but invalid TLS configuration detected")
}
}
return
}

View File

@ -1325,7 +1325,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
externalGRPCServer := gogrpc.NewServer() externalGRPCServer := gogrpc.NewServer()
deps := newDefaultDeps(t, conf) deps := newDefaultDeps(t, conf)
server, err := consul.NewServer(conf, deps, []*gogrpc.Server{externalGRPCServer}) server, err := consul.NewServer(conf, deps, externalGRPCServer)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, server.Shutdown()) require.NoError(t, server.Shutdown())

View File

@ -705,20 +705,21 @@ func (c *cmd) lookupXDSPort() (int, string, error) {
type response struct { type response struct {
XDS struct { XDS struct {
Port int Ports struct {
PortTLS int Plaintext int
TLS int
}
} }
} }
var resp response var resp response
if err := mapstructure.Decode(self, &resp); err == nil && resp.XDS.Port != 0 { if err := mapstructure.Decode(self, &resp); err == nil {
// Determine the protocol based on the provided Port matching PortTLS if resp.XDS.Ports.TLS > 0 {
proto := "http://" return resp.XDS.Ports.TLS, "https://", nil
// TODO: Simplify this check after 1.14 when Port is guaranteed to be plain-text. }
if resp.XDS.Port == resp.XDS.PortTLS { if resp.XDS.Ports.Plaintext > 0 {
proto = "https://" return resp.XDS.Ports.Plaintext, "http://", nil
} }
return resp.XDS.Port, proto, nil
} }
// Fallback to old API for the case where a new consul CLI is being used with // Fallback to old API for the case where a new consul CLI is being used with

View File

@ -117,8 +117,8 @@ type generateConfigTestCase struct {
Files map[string]string Files map[string]string
ProxyConfig map[string]interface{} ProxyConfig map[string]interface{}
NamespacesEnabled bool NamespacesEnabled bool
XDSPort int // only used for testing custom-configured grpc port XDSPorts agent.GRPCPorts // only used for testing custom-configured grpc port
AgentSelf110 bool // fake the agent API from versions v1.10 and earlier AgentSelf110 bool // fake the agent API from versions v1.10 and earlier
WantArgs BootstrapTplArgs WantArgs BootstrapTplArgs
WantErr string WantErr string
} }
@ -447,9 +447,9 @@ func TestGenerateConfig(t *testing.T) {
}, },
}, },
{ {
Name: "xds-addr-config", Name: "xds-addr-config",
Flags: []string{"-proxy-id", "test-proxy"}, Flags: []string{"-proxy-id", "test-proxy"},
XDSPort: 9999, XDSPorts: agent.GRPCPorts{Plaintext: 9999, TLS: 0},
WantArgs: BootstrapTplArgs{ WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy", ProxyCluster: "test-proxy",
ProxyID: "test-proxy", ProxyID: "test-proxy",
@ -470,10 +470,36 @@ func TestGenerateConfig(t *testing.T) {
PrometheusScrapePath: "/metrics", PrometheusScrapePath: "/metrics",
}, },
}, },
{
Name: "grpc-tls-addr-config",
Flags: []string{"-proxy-id", "test-proxy"},
XDSPorts: agent.GRPCPorts{Plaintext: 9997, TLS: 9998},
AgentSelf110: false,
WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy",
ProxyID: "test-proxy",
// We don't know this til after the lookup so it will be empty in the
// initial args call we are testing here.
ProxySourceService: "",
// Should resolve IP, note this might not resolve the same way
// everywhere which might make this test brittle but not sure what else
// to do.
GRPC: GRPC{
AgentAddress: "127.0.0.1",
AgentPort: "9998",
AgentTLS: true,
},
AdminAccessLogPath: "/dev/null",
AdminBindAddress: "127.0.0.1",
AdminBindPort: "19000",
LocalAgentClusterName: xds.LocalAgentClusterName,
PrometheusScrapePath: "/metrics",
},
},
{ {
Name: "deprecated-grpc-addr-config", Name: "deprecated-grpc-addr-config",
Flags: []string{"-proxy-id", "test-proxy"}, Flags: []string{"-proxy-id", "test-proxy"},
XDSPort: 9999, XDSPorts: agent.GRPCPorts{Plaintext: 9999, TLS: 0},
AgentSelf110: true, AgentSelf110: true,
WantArgs: BootstrapTplArgs{ WantArgs: BootstrapTplArgs{
ProxyCluster: "test-proxy", ProxyCluster: "test-proxy",
@ -1138,7 +1164,7 @@ func testMockAgent(tc generateConfigTestCase) http.HandlerFunc {
case strings.Contains(r.URL.Path, "/agent/service"): case strings.Contains(r.URL.Path, "/agent/service"):
testMockAgentProxyConfig(tc.ProxyConfig, tc.NamespacesEnabled)(w, r) testMockAgentProxyConfig(tc.ProxyConfig, tc.NamespacesEnabled)(w, r)
case strings.Contains(r.URL.Path, "/agent/self"): case strings.Contains(r.URL.Path, "/agent/self"):
testMockAgentSelf(tc.XDSPort, tc.AgentSelf110)(w, r) testMockAgentSelf(tc.XDSPorts, tc.AgentSelf110)(w, r)
case strings.Contains(r.URL.Path, "/catalog/node-services"): case strings.Contains(r.URL.Path, "/catalog/node-services"):
testMockCatalogNodeServiceList()(w, r) testMockCatalogNodeServiceList()(w, r)
default: default:
@ -1378,7 +1404,7 @@ func TestEnvoyCommand_canBindInternal(t *testing.T) {
// testMockAgentSelf returns an empty /v1/agent/self response except GRPC // testMockAgentSelf returns an empty /v1/agent/self response except GRPC
// port is filled in to match the given wantXDSPort argument. // port is filled in to match the given wantXDSPort argument.
func testMockAgentSelf(wantXDSPort int, agentSelf110 bool) http.HandlerFunc { func testMockAgentSelf(wantXDSPorts agent.GRPCPorts, agentSelf110 bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
resp := agent.Self{ resp := agent.Self{
Config: map[string]interface{}{ Config: map[string]interface{}{
@ -1388,10 +1414,17 @@ func testMockAgentSelf(wantXDSPort int, agentSelf110 bool) http.HandlerFunc {
if agentSelf110 { if agentSelf110 {
resp.DebugConfig = map[string]interface{}{ resp.DebugConfig = map[string]interface{}{
"GRPCPort": wantXDSPort, "GRPCPort": wantXDSPorts.Plaintext,
} }
} else { } else {
resp.XDS = &agent.XDSSelf{Port: wantXDSPort} resp.XDS = &agent.XDSSelf{
// The deprecated Port field should default to TLS if it's available.
Port: wantXDSPorts.TLS,
Ports: wantXDSPorts,
}
if wantXDSPorts.TLS <= 0 {
resp.XDS.Port = wantXDSPorts.Plaintext
}
} }
selfJSON, err := json.Marshal(resp) selfJSON, err := json.Marshal(resp)

View File

@ -0,0 +1,223 @@
{
"admin": {
"access_log_path": "/dev/null",
"address": {
"socket_address": {
"address": "127.0.0.1",
"port_value": 19000
}
}
},
"node": {
"cluster": "test",
"id": "test-proxy",
"metadata": {
"namespace": "default",
"partition": "default"
}
},
"layered_runtime": {
"layers": [
{
"name": "base",
"static_layer": {
"re2.max_program_size.error_level": 1048576
}
}
]
},
"static_resources": {
"clusters": [
{
"name": "local_agent",
"ignore_health_on_host_removal": false,
"connect_timeout": "1s",
"type": "STATIC",
"transport_socket": {
"name": "tls",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"common_tls_context": {
"validation_context": {
"trusted_ca": {
"inline_string": ""
}
}
}
}
},
"http2_protocol_options": {},
"loadAssignment": {
"clusterName": "local_agent",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socket_address": {
"address": "127.0.0.1",
"port_value": 9998
}
}
}
}
]
}
]
}
}
]
},
"stats_config": {
"stats_tags": [
{
"regex": "^cluster\\.(?:passthrough~)?((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.custom_hash"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.service_subset"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.service"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.namespace"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:([^.]+)\\.)?[^.]+\\.internal[^.]*\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.partition"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.datacenter"
},
{
"regex": "^cluster\\.([^.]+\\.(?:[^.]+\\.)?([^.]+)\\.external\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.peer"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.routing_type"
},
{
"regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)",
"tag_name": "consul.destination.trust_domain"
},
{
"regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.destination.target"
},
{
"regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)",
"tag_name": "consul.destination.full_target"
},
{
"regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.(([^.]+)(?:\\.[^.]+)?(?:\\.[^.]+)?\\.[^.]+\\.)",
"tag_name": "consul.upstream.service"
},
{
"regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.[^.]+)?\\.([^.]+)\\.)",
"tag_name": "consul.upstream.datacenter"
},
{
"regex": "^(?:tcp|http)\\.upstream_peered\\.([^.]+(?:\\.[^.]+)?\\.([^.]+)\\.)",
"tag_name": "consul.upstream.peer"
},
{
"regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.([^.]+(?:\\.([^.]+))?(?:\\.[^.]+)?\\.[^.]+\\.)",
"tag_name": "consul.upstream.namespace"
},
{
"regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.([^.]+))?\\.[^.]+\\.)",
"tag_name": "consul.upstream.partition"
},
{
"regex": "^cluster\\.((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.custom_hash"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.service_subset"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.service"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.namespace"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)",
"tag_name": "consul.datacenter"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)",
"tag_name": "consul.routing_type"
},
{
"regex": "^cluster\\.((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)",
"tag_name": "consul.trust_domain"
},
{
"regex": "^cluster\\.(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)",
"tag_name": "consul.target"
},
{
"regex": "^cluster\\.(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)",
"tag_name": "consul.full_target"
},
{
"tag_name": "local_cluster",
"fixed_value": "test"
},
{
"tag_name": "consul.source.service",
"fixed_value": "test"
},
{
"tag_name": "consul.source.namespace",
"fixed_value": "default"
},
{
"tag_name": "consul.source.partition",
"fixed_value": "default"
},
{
"tag_name": "consul.source.datacenter",
"fixed_value": "dc1"
}
],
"use_all_default_tags": true
},
"dynamic_resources": {
"lds_config": {
"ads": {},
"resource_api_version": "V3"
},
"cds_config": {
"ads": {},
"resource_api_version": "V3"
},
"ads_config": {
"api_type": "DELTA_GRPC",
"transport_api_version": "V3",
"grpc_services": {
"initial_metadata": [
{
"key": "x-consul-token",
"value": ""
}
],
"envoy_grpc": {
"cluster_name": "local_agent"
}
}
}
}
}