diff --git a/agent/agent.go b/agent/agent.go index 5412436e5..4b78b69a9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -761,12 +761,7 @@ func (a *Agent) Failed() <-chan struct{} { } func (a *Agent) buildExternalGRPCServer() { - // TLS is only enabled on the gRPC server if there's an HTTPS port configured. - var tls *tlsutil.Configurator - if a.config.HTTPSPort > 0 { - tls = a.tlsConfigurator - } - a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), tls) + a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), a.tlsConfigurator) } func (a *Agent) listenAndServeGRPC() error { diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 49369bbf7..28a8397df 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -14,6 +15,7 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -225,6 +227,11 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer retryCtx, cancel := context.WithCancel(ctx) cancelFns[peer.ID] = cancel + streamStatus, err := s.peerStreamTracker.Register(peer.ID) + if err != nil { + return fmt.Errorf("failed to register stream: %v", err) + } + // Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes. go retryLoopBackoff(retryCtx, func() error { // Try a new address on each iteration by advancing the ring buffer on errors. @@ -238,8 +245,15 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer logger.Trace("dialing peer", "addr", addr) conn, err := grpc.DialContext(retryCtx, addr, - grpc.WithBlock(), + // TODO(peering): use a grpc.WithStatsHandler here?) tlsOption, + // For keep alive parameters there is a larger comment in ClientConnPool.dial about that. + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 30 * time.Second, + Timeout: 10 * time.Second, + // send keepalive pings even if there is no active streams + PermitWithoutStream: true, + }), ) if err != nil { return fmt.Errorf("failed to dial: %w", err) @@ -277,8 +291,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer return err }, func(err error) { - // TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs. - // Lockable status isn't available here though. Could report it via the peering.Service? + streamStatus.TrackSendError(err.Error()) logger.Error("error managing peering stream", "peer_id", peer.ID, "error", err) }) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 1587fc30c..33ef26d61 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "encoding/json" + "io/ioutil" "testing" "time" @@ -21,15 +22,27 @@ import ( ) func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { + t.Run("without-tls", func(t *testing.T) { + testLeader_PeeringSync_Lifecycle_ClientDeletion(t, false) + }) + t.Run("with-tls", func(t *testing.T) { + testLeader_PeeringSync_Lifecycle_ClientDeletion(t, true) + }) +} +func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS bool) { if testing.Short() { t.Skip("too slow for testing.Short") } - // TODO(peering): Configure with TLS _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "s1.dc1" + c.NodeName = "bob" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" + if enableTLS { + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" + } }) testrpc.WaitForLeader(t, s1.RPC, "dc1") @@ -69,9 +82,14 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "s2.dc2" + c.NodeName = "betty" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" + if enableTLS { + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" + } }) testrpc.WaitForLeader(t, s2.RPC, "dc2") @@ -121,15 +139,27 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { } func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { + t.Run("without-tls", func(t *testing.T) { + testLeader_PeeringSync_Lifecycle_ServerDeletion(t, false) + }) + t.Run("with-tls", func(t *testing.T) { + testLeader_PeeringSync_Lifecycle_ServerDeletion(t, true) + }) +} +func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS bool) { if testing.Short() { t.Skip("too slow for testing.Short") } - // TODO(peering): Configure with TLS _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "s1.dc1" + c.NodeName = "bob" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" + if enableTLS { + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" + } }) testrpc.WaitForLeader(t, s1.RPC, "dc1") @@ -165,9 +195,14 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "s2.dc2" + c.NodeName = "betty" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" + if enableTLS { + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" + } }) testrpc.WaitForLeader(t, s2.RPC, "dc2") @@ -216,6 +251,111 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { }) } +func TestLeader_PeeringSync_FailsForTLSError(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Run("server-name-validation", func(t *testing.T) { + testLeader_PeeringSync_failsForTLSError(t, func(p *pbpeering.Peering) { + p.PeerServerName = "wrong.name" + }, `transport: authentication handshake failed: x509: certificate is valid for server.dc1.consul, bob.server.dc1.consul, not wrong.name`) + }) + t.Run("bad-ca-roots", func(t *testing.T) { + wrongRoot, err := ioutil.ReadFile("../../test/client_certs/rootca.crt") + require.NoError(t, err) + + testLeader_PeeringSync_failsForTLSError(t, func(p *pbpeering.Peering) { + p.PeerCAPems = []string{string(wrongRoot)} + }, `transport: authentication handshake failed: x509: certificate signed by unknown authority`) + }) +} + +func testLeader_PeeringSync_failsForTLSError(t *testing.T, peerMutateFn func(p *pbpeering.Peering), expectErr string) { + require.NotNil(t, peerMutateFn) + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "bob" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-s2", + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) + require.NoError(t, err) + + var token structs.PeeringToken + require.NoError(t, json.Unmarshal(tokenJSON, &token)) + + // S1 should not have a stream tracked for dc2 because s1 generated a token + // for baz, and therefore needs to wait to be dialed. + time.Sleep(1 * time.Second) + _, found := s1.peerStreamServer.StreamStatus(token.PeerID) + require.False(t, found) + + var ( + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + + // Bring up s2 and store s1's token so that it attempts to dial. + _, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "betty" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + + c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt" + c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt" + c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" + }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Simulate a peering initiation event by writing a peering with data from a peering token. + // Eventually the leader in dc2 should dial and connect to the leader in dc1. + p := &pbpeering.Peering{ + ID: s2PeerID, + Name: "my-peer-s1", + PeerID: token.PeerID, + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + peerMutateFn(p) + require.True(t, p.ShouldDial()) + + // We maintain a pointer to the peering on the write so that we can get the ID without needing to re-query the state store. + require.NoError(t, s2.fsm.State().PeeringWrite(1000, p)) + + retry.Run(t, func(r *retry.R) { + status, found := s2.peerStreamTracker.StreamStatus(p.ID) + require.True(r, found) + require.False(r, status.Connected) + require.Contains(r, status.LastSendErrorMessage, expectErr) + }) +} + func TestLeader_Peering_DeferredDeletion(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 4014bbdd2..589b4e95b 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -52,7 +52,7 @@ func (b *PeeringBackend) GetLeaderAddress() string { // GetAgentCACertificates gets the server's raw CA data from its TLS Configurator. func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) { // TODO(peering): handle empty CA pems - return b.srv.tlsConfigurator.ManualCAPems(), nil + return b.srv.tlsConfigurator.GRPCManualCAPems(), nil } // GetServerAddresses looks up server node addresses from the state store. diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 77f761f68..b9f9cc4f1 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/connect" + external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" @@ -299,8 +300,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) { } } - srv, err := NewServer(c, deps, grpc.NewServer()) - + srv, err := NewServer(c, deps, external.NewServer(deps.Logger.Named("grpc.external"), deps.TLSConfigurator)) if err != nil { return nil, err } diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index 4d4c8746c..4244bbe09 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -33,16 +33,37 @@ func (t *Tracker) SetClock(clock func() time.Time) { } } +// Register a stream for a given peer but do not mark it as connected. +func (t *Tracker) Register(id string) (*MutableStatus, error) { + t.mu.Lock() + defer t.mu.Unlock() + status, _, err := t.registerLocked(id, false) + return status, err +} + +func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) { + status, ok := t.streams[id] + if !ok { + status = newMutableStatus(t.timeNow, initAsConnected) + t.streams[id] = status + return status, true, nil + } + return status, false, nil +} + // Connected registers a stream for a given peer, and marks it as connected. // It also enforces that there is only one active stream for a peer. func (t *Tracker) Connected(id string) (*MutableStatus, error) { t.mu.Lock() defer t.mu.Unlock() + return t.connectedLocked(id) +} - status, ok := t.streams[id] - if !ok { - status = newMutableStatus(t.timeNow) - t.streams[id] = status +func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) { + status, newlyRegistered, err := t.registerLocked(id, true) + if err != nil { + return nil, err + } else if newlyRegistered { return status, nil } @@ -150,10 +171,10 @@ type Status struct { ImportedServices map[string]struct{} } -func newMutableStatus(now func() time.Time) *MutableStatus { +func newMutableStatus(now func() time.Time, connected bool) *MutableStatus { return &MutableStatus{ Status: Status{ - Connected: true, + Connected: connected, }, timeNow: now, doneCh: make(chan struct{}), diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 6a8f32915..e4ab2947a 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -59,7 +59,7 @@ func TestPeeringService_GenerateToken(t *testing.T) { // TODO(peering): see note on newTestServer, refactor to not use this s := newTestServer(t, func(c *consul.Config) { c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.1" - c.TLSConfig.InternalRPC.CAFile = cafile + c.TLSConfig.GRPC.CAFile = cafile c.DataDir = dir }) client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) diff --git a/test/integration/connect/envoy/case-wanfed-gw/primary/common.hcl b/test/integration/connect/envoy/case-wanfed-gw/primary/common.hcl index c8c086fb4..d586b280b 100644 --- a/test/integration/connect/envoy/case-wanfed-gw/primary/common.hcl +++ b/test/integration/connect/envoy/case-wanfed-gw/primary/common.hcl @@ -1,6 +1,10 @@ -ca_file = "/workdir/primary/tls/consul-agent-ca.pem" -cert_file = "/workdir/primary/tls/primary-server-consul-0.pem" -key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem" -verify_incoming = true -verify_outgoing = true -verify_server_hostname = true +tls { + internal_rpc { + ca_file = "/workdir/primary/tls/consul-agent-ca.pem" + cert_file = "/workdir/primary/tls/primary-server-consul-0.pem" + key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem" + verify_incoming = true + verify_outgoing = true + verify_server_hostname = true + } +} diff --git a/test/integration/connect/envoy/case-wanfed-gw/primary/server.hcl b/test/integration/connect/envoy/case-wanfed-gw/primary/server.hcl index f3c0bc000..8dd5b39dc 100644 --- a/test/integration/connect/envoy/case-wanfed-gw/primary/server.hcl +++ b/test/integration/connect/envoy/case-wanfed-gw/primary/server.hcl @@ -3,9 +3,13 @@ connect { enabled = true enable_mesh_gateway_wan_federation = true } -ca_file = "/workdir/primary/tls/consul-agent-ca.pem" -cert_file = "/workdir/primary/tls/primary-server-consul-0.pem" -key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem" -verify_incoming = true -verify_outgoing = true -verify_server_hostname = true +tls { + internal_rpc { + ca_file = "/workdir/primary/tls/consul-agent-ca.pem" + cert_file = "/workdir/primary/tls/primary-server-consul-0.pem" + key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem" + verify_incoming = true + verify_outgoing = true + verify_server_hostname = true + } +} diff --git a/test/integration/connect/envoy/case-wanfed-gw/secondary/common.hcl b/test/integration/connect/envoy/case-wanfed-gw/secondary/common.hcl index 36bfc41c3..8e536a87a 100644 --- a/test/integration/connect/envoy/case-wanfed-gw/secondary/common.hcl +++ b/test/integration/connect/envoy/case-wanfed-gw/secondary/common.hcl @@ -1,6 +1,10 @@ -ca_file = "/workdir/secondary/tls/consul-agent-ca.pem" -cert_file = "/workdir/secondary/tls/secondary-server-consul-0.pem" -key_file = "/workdir/secondary/tls/secondary-server-consul-0-key.pem" -verify_incoming = true -verify_outgoing = true -verify_server_hostname = true +tls { + internal_rpc { + ca_file = "/workdir/secondary/tls/consul-agent-ca.pem" + cert_file = "/workdir/secondary/tls/secondary-server-consul-0.pem" + key_file = "/workdir/secondary/tls/secondary-server-consul-0-key.pem" + verify_incoming = true + verify_outgoing = true + verify_server_hostname = true + } +} diff --git a/tlsutil/config.go b/tlsutil/config.go index da85c2e72..7c9e6d2ad 100644 --- a/tlsutil/config.go +++ b/tlsutil/config.go @@ -88,7 +88,7 @@ type ProtocolConfig struct { // certificate authority. This is used to verify authenticity of server // nodes. // - // Note: this setting doesn't apply to the gRPC configuration, as Consul + // Note: this setting doesn't apply to the external gRPC configuration, as Consul // makes no outgoing connections using this protocol. VerifyOutgoing bool @@ -233,6 +233,13 @@ func (c *Configurator) ManualCAPems() []string { return c.internalRPC.manualCAPEMs } +// GRPCManualCAPems returns the currently loaded CAs for the gRPC in PEM format. +func (c *Configurator) GRPCManualCAPems() []string { + c.lock.RLock() + defer c.lock.RUnlock() + return c.grpc.manualCAPEMs +} + // Update updates the internal configuration which is used to generate // *tls.Config. // This function acquires a write lock because it writes the new config. diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index 5fb0c849c..4506d80ff 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -1998,8 +1998,6 @@ specially crafted certificate signed by the CA can be used to gain full access t - `grpc` ((#tls_grpc)) Provides settings for the gRPC/xDS interface. To enable the gRPC interface you must define a port via [`ports.grpc`](#grpc_port). - To enable TLS on the gRPC interface you also must define an HTTPS port via - [`ports.https`](#https_port). - `ca_file` ((#tls_grpc_ca_file)) Overrides [`tls.defaults.ca_file`](#tls_defaults_ca_file). diff --git a/website/content/docs/upgrading/upgrade-specific.mdx b/website/content/docs/upgrading/upgrade-specific.mdx index a9a72c3f9..ec0cf54d5 100644 --- a/website/content/docs/upgrading/upgrade-specific.mdx +++ b/website/content/docs/upgrading/upgrade-specific.mdx @@ -16,6 +16,18 @@ upgrade flow. ## Consul 1.13.0 +### gRPC TLS + +In prior Consul versions if HTTPS was enabled for the client API and exposed +via `ports { https = NUMBER }` then the same TLS material was used to encrypt +the gRPC port used for xDS. Now this is decoupled and activating TLS on the +gRPC endpoint is controlled solely with the gRPC section of the new +[`tls` stanza](/docs/agent/config/config-files#tls-configuration-reference). + +If you have not yet switched to the new `tls` stanza and were NOT using HTTPS +for the API then updating to Consul 1.13 will activate TLS for gRPC since the +deprecated TLS settings are used as defaults. + ### 1.9 Telemetry Compatibility #### Removing configuration options