server: ensure peer replication can successfully use TLS over external gRPC (#13733)

Ensure that the peer stream replication rpc can successfully be used with TLS activated.

Also:

- If key material is configured for the gRPC port but HTTPS is not
  enabled now TLS will still be activated for the gRPC port.

- peerstream replication stream opened by the establishing-side will now
  ignore grpc.WithBlock so that TLS errors will bubble up instead of
  being awkwardly delayed or suppressed
This commit is contained in:
R.B. Boyer 2022-07-15 13:15:50 -05:00 committed by GitHub
parent 70ad4804b6
commit 61ebb38092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 244 additions and 46 deletions

View File

@ -761,12 +761,7 @@ func (a *Agent) Failed() <-chan struct{} {
}
func (a *Agent) buildExternalGRPCServer() {
// TLS is only enabled on the gRPC server if there's an HTTPS port configured.
var tls *tlsutil.Configurator
if a.config.HTTPSPort > 0 {
tls = a.tlsConfigurator
}
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), tls)
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"), a.tlsConfigurator)
}
func (a *Agent) listenAndServeGRPC() error {

View File

@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
@ -14,6 +15,7 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
@ -225,6 +227,11 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
retryCtx, cancel := context.WithCancel(ctx)
cancelFns[peer.ID] = cancel
streamStatus, err := s.peerStreamTracker.Register(peer.ID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}
// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoff(retryCtx, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
@ -238,8 +245,15 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(retryCtx, addr,
grpc.WithBlock(),
// TODO(peering): use a grpc.WithStatsHandler here?)
tlsOption,
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
// send keepalive pings even if there is no active streams
PermitWithoutStream: true,
}),
)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
@ -277,8 +291,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
return err
}, func(err error) {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service?
streamStatus.TrackSendError(err.Error())
logger.Error("error managing peering stream", "peer_id", peer.ID, "error", err)
})

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"io/ioutil"
"testing"
"time"
@ -21,15 +22,27 @@ import (
)
func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ClientDeletion(t, false)
})
t.Run("with-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ClientDeletion(t, true)
})
}
func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS bool) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
// TODO(peering): Configure with TLS
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.NodeName = "bob"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
if enableTLS {
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key"
}
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -69,9 +82,14 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
// Bring up s2 and store s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s2.dc2"
c.NodeName = "betty"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
if enableTLS {
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key"
}
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
@ -121,15 +139,27 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
}
func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ServerDeletion(t, false)
})
t.Run("with-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ServerDeletion(t, true)
})
}
func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS bool) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
// TODO(peering): Configure with TLS
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.NodeName = "bob"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
if enableTLS {
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key"
}
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -165,9 +195,14 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
// Bring up s2 and store s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s2.dc2"
c.NodeName = "betty"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
if enableTLS {
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key"
}
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
@ -216,6 +251,111 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
})
}
func TestLeader_PeeringSync_FailsForTLSError(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Run("server-name-validation", func(t *testing.T) {
testLeader_PeeringSync_failsForTLSError(t, func(p *pbpeering.Peering) {
p.PeerServerName = "wrong.name"
}, `transport: authentication handshake failed: x509: certificate is valid for server.dc1.consul, bob.server.dc1.consul, not wrong.name`)
})
t.Run("bad-ca-roots", func(t *testing.T) {
wrongRoot, err := ioutil.ReadFile("../../test/client_certs/rootca.crt")
require.NoError(t, err)
testLeader_PeeringSync_failsForTLSError(t, func(p *pbpeering.Peering) {
p.PeerCAPems = []string{string(wrongRoot)}
}, `transport: authentication handshake failed: x509: certificate signed by unknown authority`)
})
}
func testLeader_PeeringSync_failsForTLSError(t *testing.T, peerMutateFn func(p *pbpeering.Peering), expectErr string) {
require.NotNil(t, peerMutateFn)
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "bob"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Bob.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key"
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-s2",
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
// S1 should not have a stream tracked for dc2 because s1 generated a token
// for baz, and therefore needs to wait to be dialed.
time.Sleep(1 * time.Second)
_, found := s1.peerStreamServer.StreamStatus(token.PeerID)
require.False(t, found)
var (
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
)
// Bring up s2 and store s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "betty"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
c.TLSConfig.GRPC.CAFile = "../../test/hostname/CertAuth.crt"
c.TLSConfig.GRPC.CertFile = "../../test/hostname/Betty.crt"
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key"
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Simulate a peering initiation event by writing a peering with data from a peering token.
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
p := &pbpeering.Peering{
ID: s2PeerID,
Name: "my-peer-s1",
PeerID: token.PeerID,
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
peerMutateFn(p)
require.True(t, p.ShouldDial())
// We maintain a pointer to the peering on the write so that we can get the ID without needing to re-query the state store.
require.NoError(t, s2.fsm.State().PeeringWrite(1000, p))
retry.Run(t, func(r *retry.R) {
status, found := s2.peerStreamTracker.StreamStatus(p.ID)
require.True(r, found)
require.False(r, status.Connected)
require.Contains(r, status.LastSendErrorMessage, expectErr)
})
}
func TestLeader_Peering_DeferredDeletion(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -52,7 +52,7 @@ func (b *PeeringBackend) GetLeaderAddress() string {
// GetAgentCACertificates gets the server's raw CA data from its TLS Configurator.
func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) {
// TODO(peering): handle empty CA pems
return b.srv.tlsConfigurator.ManualCAPems(), nil
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
}
// GetServerAddresses looks up server node addresses from the state store.

View File

@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
@ -299,8 +300,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
}
}
srv, err := NewServer(c, deps, grpc.NewServer())
srv, err := NewServer(c, deps, external.NewServer(deps.Logger.Named("grpc.external"), deps.TLSConfigurator))
if err != nil {
return nil, err
}

View File

@ -33,16 +33,37 @@ func (t *Tracker) SetClock(clock func() time.Time) {
}
}
// Register a stream for a given peer but do not mark it as connected.
func (t *Tracker) Register(id string) (*MutableStatus, error) {
t.mu.Lock()
defer t.mu.Unlock()
status, _, err := t.registerLocked(id, false)
return status, err
}
func (t *Tracker) registerLocked(id string, initAsConnected bool) (*MutableStatus, bool, error) {
status, ok := t.streams[id]
if !ok {
status = newMutableStatus(t.timeNow, initAsConnected)
t.streams[id] = status
return status, true, nil
}
return status, false, nil
}
// Connected registers a stream for a given peer, and marks it as connected.
// It also enforces that there is only one active stream for a peer.
func (t *Tracker) Connected(id string) (*MutableStatus, error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.connectedLocked(id)
}
status, ok := t.streams[id]
if !ok {
status = newMutableStatus(t.timeNow)
t.streams[id] = status
func (t *Tracker) connectedLocked(id string) (*MutableStatus, error) {
status, newlyRegistered, err := t.registerLocked(id, true)
if err != nil {
return nil, err
} else if newlyRegistered {
return status, nil
}
@ -150,10 +171,10 @@ type Status struct {
ImportedServices map[string]struct{}
}
func newMutableStatus(now func() time.Time) *MutableStatus {
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
return &MutableStatus{
Status: Status{
Connected: true,
Connected: connected,
},
timeNow: now,
doneCh: make(chan struct{}),

View File

@ -59,7 +59,7 @@ func TestPeeringService_GenerateToken(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, func(c *consul.Config) {
c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.1"
c.TLSConfig.InternalRPC.CAFile = cafile
c.TLSConfig.GRPC.CAFile = cafile
c.DataDir = dir
})
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))

View File

@ -1,6 +1,10 @@
ca_file = "/workdir/primary/tls/consul-agent-ca.pem"
cert_file = "/workdir/primary/tls/primary-server-consul-0.pem"
key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
tls {
internal_rpc {
ca_file = "/workdir/primary/tls/consul-agent-ca.pem"
cert_file = "/workdir/primary/tls/primary-server-consul-0.pem"
key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
}
}

View File

@ -3,9 +3,13 @@ connect {
enabled = true
enable_mesh_gateway_wan_federation = true
}
ca_file = "/workdir/primary/tls/consul-agent-ca.pem"
cert_file = "/workdir/primary/tls/primary-server-consul-0.pem"
key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
tls {
internal_rpc {
ca_file = "/workdir/primary/tls/consul-agent-ca.pem"
cert_file = "/workdir/primary/tls/primary-server-consul-0.pem"
key_file = "/workdir/primary/tls/primary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
}
}

View File

@ -1,6 +1,10 @@
ca_file = "/workdir/secondary/tls/consul-agent-ca.pem"
cert_file = "/workdir/secondary/tls/secondary-server-consul-0.pem"
key_file = "/workdir/secondary/tls/secondary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
tls {
internal_rpc {
ca_file = "/workdir/secondary/tls/consul-agent-ca.pem"
cert_file = "/workdir/secondary/tls/secondary-server-consul-0.pem"
key_file = "/workdir/secondary/tls/secondary-server-consul-0-key.pem"
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
}
}

View File

@ -88,7 +88,7 @@ type ProtocolConfig struct {
// certificate authority. This is used to verify authenticity of server
// nodes.
//
// Note: this setting doesn't apply to the gRPC configuration, as Consul
// Note: this setting doesn't apply to the external gRPC configuration, as Consul
// makes no outgoing connections using this protocol.
VerifyOutgoing bool
@ -233,6 +233,13 @@ func (c *Configurator) ManualCAPems() []string {
return c.internalRPC.manualCAPEMs
}
// GRPCManualCAPems returns the currently loaded CAs for the gRPC in PEM format.
func (c *Configurator) GRPCManualCAPems() []string {
c.lock.RLock()
defer c.lock.RUnlock()
return c.grpc.manualCAPEMs
}
// Update updates the internal configuration which is used to generate
// *tls.Config.
// This function acquires a write lock because it writes the new config.

View File

@ -1998,8 +1998,6 @@ specially crafted certificate signed by the CA can be used to gain full access t
- `grpc` ((#tls_grpc)) Provides settings for the gRPC/xDS interface. To enable
the gRPC interface you must define a port via [`ports.grpc`](#grpc_port).
To enable TLS on the gRPC interface you also must define an HTTPS port via
[`ports.https`](#https_port).
- `ca_file` ((#tls_grpc_ca_file)) Overrides [`tls.defaults.ca_file`](#tls_defaults_ca_file).

View File

@ -16,6 +16,18 @@ upgrade flow.
## Consul 1.13.0
### gRPC TLS
In prior Consul versions if HTTPS was enabled for the client API and exposed
via `ports { https = NUMBER }` then the same TLS material was used to encrypt
the gRPC port used for xDS. Now this is decoupled and activating TLS on the
gRPC endpoint is controlled solely with the gRPC section of the new
[`tls` stanza](/docs/agent/config/config-files#tls-configuration-reference).
If you have not yet switched to the new `tls` stanza and were NOT using HTTPS
for the API then updating to Consul 1.13 will activate TLS for gRPC since the
deprecated TLS settings are used as defaults.
### 1.9 Telemetry Compatibility
#### Removing configuration options