From 56144cf5f72799ed8b575c97ad63a45b97118f09 Mon Sep 17 00:00:00 2001 From: Freddy Date: Mon, 1 Aug 2022 15:06:18 -0600 Subject: [PATCH] Various peering fixes (#13979) * Avoid logging StreamSecretID * Wrap additional errors in stream handler * Fix flakiness in leader test and rename servers for clarity. There was a race condition where the peering was being deleted in the test before the stream was active. Now the test waits for the stream to be connected on both sides before deleting the associated peering. * Run flaky test serially --- agent/consul/config_endpoint_test.go | 2 - agent/consul/leader_peering.go | 2 +- agent/consul/leader_peering_test.go | 129 ++++++++++-------- .../services/peerstream/stream_resources.go | 23 +++- .../services/peerstream/stream_test.go | 52 ++++++- 5 files changed, 144 insertions(+), 64 deletions(-) diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index bbed2bf1b..3f79b3d1b 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1804,8 +1804,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) { t.Skip("too slow for testing.Short") } - t.Parallel() - dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index aa425c738..dd6185a19 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -237,7 +237,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, } } - logger.Trace("checking connected streams", "streams", s.peerStreamServer.ConnectedStreams(), "sequence_id", seq) + logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) // Clean up active streams of peerings that were deleted from the state store. // TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK? diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index fd724e373..add579c24 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -41,8 +41,8 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "bob" + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" if enableTLS { @@ -51,25 +51,25 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" } }) - testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") // Create a peering by generating a token ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s1Client := pbpeering.NewPeeringServiceClient(conn) + acceptorClient := pbpeering.NewPeeringServiceClient(conn) req := pbpeering.GenerateTokenRequest{ - PeerName: "my-peer-s2", + PeerName: "my-peer-dialer", } - resp, err := s1Client.GenerateToken(ctx, &req) + resp, err := acceptorClient.GenerateToken(ctx, &req) require.NoError(t, err) tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) @@ -78,14 +78,14 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) - // S1 should not have a stream tracked for dc2 because s1 generated a token for baz, and therefore needs to wait to be dialed. + // S1 should not have a stream tracked for dc2 because acceptor generated a token for baz, and therefore needs to wait to be dialed. time.Sleep(1 * time.Second) - _, found := s1.peerStreamServer.StreamStatus(token.PeerID) + _, found := acceptor.peerStreamServer.StreamStatus(token.PeerID) require.False(t, found) - // Bring up s2 and establish a peering with s1's token so that it attempts to dial. - _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "betty" + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" if enableTLS { @@ -94,33 +94,39 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" } }) - testrpc.WaitForLeader(t, s2.RPC, "dc2") + testrpc.WaitForLeader(t, dialer.RPC, "dc2") - // Create a peering at s2 by establishing a peering with s1's token + // Create a peering at dialer by establishing a peering with acceptor's token ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())), + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s2Client := pbpeering.NewPeeringServiceClient(conn) + dialerClient := pbpeering.NewPeeringServiceClient(conn) establishReq := pbpeering.EstablishRequest{ - PeerName: "my-peer-s1", + PeerName: "my-peer-acceptor", PeeringToken: resp.PeeringToken, } - _, err = s2Client.Establish(ctx, &establishReq) + _, err = dialerClient.Establish(ctx, &establishReq) require.NoError(t, err) - p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { - status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + retry.Run(t, func(r *retry.R) { + status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.True(r, found) require.True(r, status.Connected) }) @@ -128,21 +134,21 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo // Delete the peering to trigger the termination sequence. deleted := &pbpeering.Peering{ ID: p.Peering.ID, - Name: "my-peer-s1", + Name: "my-peer-acceptor", DeletedAt: structs.TimeToProto(time.Now()), } - require.NoError(t, s2.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) - s2.logger.Trace("deleted peering for my-peer-s1") + require.NoError(t, dialer.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) + dialer.logger.Trace("deleted peering for my-peer-acceptor") retry.Run(t, func(r *retry.R) { - _, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + _, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) require.False(r, found) }) - // s1 should have also marked the peering as terminated. + // acceptor should have also marked the peering as terminated. retry.Run(t, func(r *retry.R) { - _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ - Value: "my-peer-s2", + _, peering, err := acceptor.fsm.State().PeeringRead(nil, state.Query{ + Value: "my-peer-dialer", }) require.NoError(r, err) require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) @@ -151,20 +157,20 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { t.Run("without-tls", func(t *testing.T) { - testLeader_PeeringSync_Lifecycle_ServerDeletion(t, false) + testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false) }) t.Run("with-tls", func(t *testing.T) { - testLeader_PeeringSync_Lifecycle_ServerDeletion(t, true) + testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, true) }) } -func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS bool) { +func testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t *testing.T, enableTLS bool) { if testing.Short() { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "bob" + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" if enableTLS { @@ -173,14 +179,14 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" } }) - testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") - // Define a peering by generating a token for s2 + // Define a peering by generating a token for dialer ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) @@ -189,7 +195,7 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo peeringClient := pbpeering.NewPeeringServiceClient(conn) req := pbpeering.GenerateTokenRequest{ - PeerName: "my-peer-s2", + PeerName: "my-peer-dialer", } resp, err := peeringClient.GenerateToken(ctx, &req) require.NoError(t, err) @@ -200,9 +206,9 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) - // Bring up s2 and establish a peering with s1's token so that it attempts to dial. - _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "betty" + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" if enableTLS { @@ -211,33 +217,39 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" } }) - testrpc.WaitForLeader(t, s2.RPC, "dc2") + testrpc.WaitForLeader(t, dialer.RPC, "dc2") - // Create a peering at s2 by establishing a peering with s1's token + // Create a peering at dialer by establishing a peering with acceptor's token ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())), + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s2Client := pbpeering.NewPeeringServiceClient(conn) + dialerClient := pbpeering.NewPeeringServiceClient(conn) establishReq := pbpeering.EstablishRequest{ - PeerName: "my-peer-s1", + PeerName: "my-peer-acceptor", PeeringToken: resp.PeeringToken, } - _, err = s2Client.Establish(ctx, &establishReq) + _, err = dialerClient.Establish(ctx, &establishReq) require.NoError(t, err) - p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { - status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + retry.Run(t, func(r *retry.R) { + status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.True(r, found) require.True(r, status.Connected) }) @@ -245,21 +257,22 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo // Delete the peering from the server peer to trigger the termination sequence. deleted := &pbpeering.Peering{ ID: p.Peering.PeerID, - Name: "my-peer-s2", + Name: "my-peer-dialer", DeletedAt: structs.TimeToProto(time.Now()), } - require.NoError(t, s1.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) - s2.logger.Trace("deleted peering for my-peer-s2") + + require.NoError(t, acceptor.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) + acceptor.logger.Trace("deleted peering for my-peer-dialer") retry.Run(t, func(r *retry.R) { - _, found := s1.peerStreamServer.StreamStatus(p.Peering.PeerID) + _, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.False(r, found) }) - // s2 should have received the termination message and updated the peering state. + // dialer should have received the termination message and updated the peering state. retry.Run(t, func(r *retry.R) { - _, peering, err := s2.fsm.State().PeeringRead(nil, state.Query{ - Value: "my-peer-s1", + _, peering, err := dialer.fsm.State().PeeringRead(nil, state.Query{ + Value: "my-peer-acceptor", }) require.NoError(r, err) require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 2542c2e4a..af4938c16 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -447,6 +447,8 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // exits. After the method exits this code here won't receive any recv errors and those will be handled // by DrainStream(). err = fmt.Errorf("stream ended unexpectedly") + } else { + err = fmt.Errorf("unexpected error receiving from the stream: %w", err) } status.TrackRecvError(err.Error()) return err @@ -684,10 +686,29 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) { dir = "received" } + // Redact the long-lived stream secret to avoid leaking it in trace logs. + pbToLog := pb + switch msg := pb.(type) { + case *pbpeerstream.ReplicationMessage: + clone := &pbpeerstream.ReplicationMessage{} + proto.Merge(clone, msg) + + if clone.GetOpen() != nil { + clone.GetOpen().StreamSecretID = "hidden" + pbToLog = clone + } + case *pbpeerstream.ReplicationMessage_Open: + clone := &pbpeerstream.ReplicationMessage_Open{} + proto.Merge(clone, msg) + + clone.StreamSecretID = "hidden" + pbToLog = clone + } + m := jsonpb.Marshaler{ Indent: " ", } - out, err := m.MarshalToString(pb) + out, err := m.MarshalToString(pbToLog) if err != nil { out = "" } diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index e6e26e81d..8264cce41 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1,6 +1,7 @@ package peerstream import ( + "bytes" "context" "fmt" "io" @@ -10,13 +11,14 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" + newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" @@ -26,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeerstream" @@ -1657,7 +1660,7 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs. return clusterID, rootA } -func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any { +func makeAnyPB(t *testing.T, pb newproto.Message) *anypb.Any { any, err := anypb.New(pb) require.NoError(t, err) return any @@ -2592,6 +2595,51 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { } } +// TestLogTraceProto tests that all PB trace log helpers redact the +// long-lived SecretStreamID. +// We ensure it gets redacted when logging a ReplicationMessage_Open or a ReplicationMessage. +// In the stream handler we only log the ReplicationMessage_Open, but testing both guards against +// a change in that behavior. +func TestLogTraceProto(t *testing.T) { + type testCase struct { + input proto.Message + } + + tt := map[string]testCase{ + "replication message": { + input: &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Open_{ + Open: &pbpeerstream.ReplicationMessage_Open{ + StreamSecretID: testPendingStreamSecretID, + }, + }, + }, + }, + "open message": { + input: &pbpeerstream.ReplicationMessage_Open{ + StreamSecretID: testPendingStreamSecretID, + }, + }, + } + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + var b bytes.Buffer + logger, err := logging.Setup(logging.Config{ + LogLevel: "TRACE", + }, &b) + require.NoError(t, err) + + logTraceRecv(logger, tc.input) + logTraceSend(logger, tc.input) + logTraceProto(logger, tc.input, false) + + body, err := io.ReadAll(&b) + require.NoError(t, err) + require.NotContains(t, string(body), testPendingStreamSecretID) + }) + } +} + func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) { t.Helper()