From 542852786ce68ff1f14fb108745668ad080eb256 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Wed, 6 Jan 2021 17:21:22 +0100
Subject: [PATCH 1/2] [Streaming][bugfix] handle TLS signalisation when TLS is
disabled on client side
Tnis is an alternative to https://github.com/hashicorp/consul/pull/9494
---
.changelog/9512.txt | 3 +++
agent/grpc/client.go | 9 +++++----
agent/grpc/client_test.go | 15 ++++++++++-----
agent/setup.go | 2 +-
4 files changed, 19 insertions(+), 10 deletions(-)
create mode 100644 .changelog/9512.txt
diff --git a/.changelog/9512.txt b/.changelog/9512.txt
new file mode 100644
index 000000000..b1a8bdebd
--- /dev/null
+++ b/.changelog/9512.txt
@@ -0,0 +1,3 @@
+```release-note:bug
+client: properly set GRPC over RPC magic numbers when encryption was not set or partially set in the cluster with streaming enabled
+```
diff --git a/agent/grpc/client.go b/agent/grpc/client.go
index 7eb80070f..9fdfa54e6 100644
--- a/agent/grpc/client.go
+++ b/agent/grpc/client.go
@@ -35,9 +35,10 @@ type TLSWrapper func(dc string, conn net.Conn) (net.Conn, error)
type dialer func(context.Context, string) (net.Conn, error)
-func NewClientConnPool(servers ServerLocator, tls TLSWrapper) *ClientConnPool {
+// NewClientConnPool create new GRPC client pool to connect to servers using GRPC over RPC
+func NewClientConnPool(servers ServerLocator, tls TLSWrapper, useTLSForDC func(dc string) bool) *ClientConnPool {
return &ClientConnPool{
- dialer: newDialer(servers, tls),
+ dialer: newDialer(servers, tls, useTLSForDC),
servers: servers,
conns: make(map[string]*grpc.ClientConn),
}
@@ -74,7 +75,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error)
// newDialer returns a gRPC dialer function that conditionally wraps the connection
// with TLS based on the Server.useTLS value.
-func newDialer(servers ServerLocator, wrapper TLSWrapper) func(context.Context, string) (net.Conn, error) {
+func newDialer(servers ServerLocator, wrapper TLSWrapper, useTLSForDC func(dc string) bool) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", addr)
@@ -88,7 +89,7 @@ func newDialer(servers ServerLocator, wrapper TLSWrapper) func(context.Context,
return nil, err
}
- if server.UseTLS {
+ if server.UseTLS && useTLSForDC(server.Datacenter) {
if wrapper == nil {
conn.Close()
return nil, fmt.Errorf("TLS enabled but got nil TLS wrapper")
diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go
index 38ecc40aa..5028e34fa 100644
--- a/agent/grpc/client_test.go
+++ b/agent/grpc/client_test.go
@@ -18,6 +18,11 @@ import (
"github.com/hashicorp/consul/tlsutil"
)
+// useTLSForDcAlwaysTrue tell GRPC to always return the TLS is enabled
+func useTLSForDcAlwaysTrue(_ string) bool {
+ return true
+}
+
func TestNewDialer_WithTLSWrapper(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
@@ -37,7 +42,7 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
called = true
return conn, nil
}
- dial := newDialer(builder, wrapper)
+ dial := newDialer(builder, wrapper, useTLSForDcAlwaysTrue)
ctx := context.Background()
conn, err := dial(ctx, lis.Addr().String())
require.NoError(t, err)
@@ -63,7 +68,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
- pool := NewClientConnPool(res, TLSWrapper(tlsConf.OutgoingRPCWrapper()))
+ pool := NewClientConnPool(res, TLSWrapper(tlsConf.OutgoingRPCWrapper()), tlsConf.UseTLS)
conn, err := pool.ClientConn("dc1")
require.NoError(t, err)
@@ -82,7 +87,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
count := 4
res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)
- pool := NewClientConnPool(res, nil)
+ pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
@@ -119,7 +124,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
count := 5
res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)
- pool := NewClientConnPool(res, nil)
+ pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
@@ -168,7 +173,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
res := resolver.NewServerResolverBuilder(newConfig(t))
registerWithGRPC(t, res)
- pool := NewClientConnPool(res, nil)
+ pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)
for _, dc := range dcs {
name := "server-0-" + dc
diff --git a/agent/setup.go b/agent/setup.go
index ccf9c08d1..34cc9ac89 100644
--- a/agent/setup.go
+++ b/agent/setup.go
@@ -101,7 +101,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
builder := resolver.NewServerResolverBuilder(resolver.Config{})
registerWithGRPC(builder)
- d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()))
+ d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS)
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)
From c43888c064a585ca434af78ec3507e2e43f1421a Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Wed, 6 Jan 2021 22:05:07 +0100
Subject: [PATCH 2/2] Added testing of GRPC with TLS combinations
This ensures that https://github.com/hashicorp/consul/issues/9474 will
not reproduce.
---
agent/streaming_test.go | 107 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 107 insertions(+)
create mode 100644 agent/streaming_test.go
diff --git a/agent/streaming_test.go b/agent/streaming_test.go
new file mode 100644
index 000000000..0f45ad9ed
--- /dev/null
+++ b/agent/streaming_test.go
@@ -0,0 +1,107 @@
+package agent
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/hashicorp/consul/sdk/testutil"
+ "github.com/hashicorp/consul/testrpc"
+)
+
+func testGRPCStreamingWorking(t *testing.T, config string) {
+ if testing.Short() {
+ t.Skip("too slow for testing.Short")
+ }
+
+ a := NewTestAgent(t, config)
+ defer a.Shutdown()
+
+ testrpc.WaitForLeader(t, a.RPC, "dc1")
+
+ req, _ := http.NewRequest("GET", "/v1/health/service/consul?index=3", nil)
+ resp := httptest.NewRecorder()
+ _, err := a.srv.HealthServiceNodes(resp, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ assertIndex(t, resp)
+ require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
+}
+
+func TestGRPCWithTLSConfigs(t *testing.T) {
+ t.Parallel()
+ testCases := []struct {
+ name string
+ config string
+ }{
+ {
+ name: "no-tls",
+ config: "",
+ },
+ {
+ name: "tls-all-enabled",
+ config: `
+ # tls
+ ca_file = "../test/hostname/CertAuth.crt"
+ cert_file = "../test/hostname/Bob.crt"
+ key_file = "../test/hostname/Bob.key"
+ verify_incoming = true
+ verify_outgoing = true
+ verify_server_hostname = true
+ `,
+ },
+ {
+ name: "tls ready no verify incoming",
+ config: `
+ # tls
+ ca_file = "../test/hostname/CertAuth.crt"
+ cert_file = "../test/hostname/Bob.crt"
+ key_file = "../test/hostname/Bob.key"
+ verify_incoming = false
+ verify_outgoing = true
+ verify_server_hostname = false
+ `,
+ },
+ {
+ name: "tls ready no verify outgoing and incoming",
+ config: `
+ # tls
+ ca_file = "../test/hostname/CertAuth.crt"
+ cert_file = "../test/hostname/Bob.crt"
+ key_file = "../test/hostname/Bob.key"
+ verify_incoming = false
+ verify_outgoing = false
+ verify_server_hostname = false
+ `,
+ },
+ {
+ name: "tls ready, all defaults",
+ config: `
+ # tls
+ ca_file = "../test/hostname/CertAuth.crt"
+ cert_file = "../test/hostname/Bob.crt"
+ key_file = "../test/hostname/Bob.key"
+ `,
+ },
+ }
+ for _, tt := range testCases {
+ t.Run(tt.name, func(t *testing.T) {
+ dataDir := testutil.TempDir(t, "agent") // we manage the data dir
+ cfg := `data_dir = "` + dataDir + `"
+ domain = "consul"
+ node_name = "my-fancy-server"
+ datacenter = "dc1"
+ primary_datacenter = "dc1"
+ rpc {
+ enable_streaming = true
+ }
+ use_streaming_backend = true
+ ` + tt.config
+ testGRPCStreamingWorking(t, cfg)
+ })
+ }
+}