diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index bbed2bf1b..3f79b3d1b 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1804,8 +1804,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) { t.Skip("too slow for testing.Short") } - t.Parallel() - dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index aa425c738..dd6185a19 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -237,7 +237,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, } } - logger.Trace("checking connected streams", "streams", s.peerStreamServer.ConnectedStreams(), "sequence_id", seq) + logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq) // Clean up active streams of peerings that were deleted from the state store. // TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK? diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index fd724e373..add579c24 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -41,8 +41,8 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "bob" + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" if enableTLS { @@ -51,25 +51,25 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" } }) - testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") // Create a peering by generating a token ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s1Client := pbpeering.NewPeeringServiceClient(conn) + acceptorClient := pbpeering.NewPeeringServiceClient(conn) req := pbpeering.GenerateTokenRequest{ - PeerName: "my-peer-s2", + PeerName: "my-peer-dialer", } - resp, err := s1Client.GenerateToken(ctx, &req) + resp, err := acceptorClient.GenerateToken(ctx, &req) require.NoError(t, err) tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) @@ -78,14 +78,14 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) - // S1 should not have a stream tracked for dc2 because s1 generated a token for baz, and therefore needs to wait to be dialed. + // S1 should not have a stream tracked for dc2 because acceptor generated a token for baz, and therefore needs to wait to be dialed. time.Sleep(1 * time.Second) - _, found := s1.peerStreamServer.StreamStatus(token.PeerID) + _, found := acceptor.peerStreamServer.StreamStatus(token.PeerID) require.False(t, found) - // Bring up s2 and establish a peering with s1's token so that it attempts to dial. - _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "betty" + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" if enableTLS { @@ -94,33 +94,39 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" } }) - testrpc.WaitForLeader(t, s2.RPC, "dc2") + testrpc.WaitForLeader(t, dialer.RPC, "dc2") - // Create a peering at s2 by establishing a peering with s1's token + // Create a peering at dialer by establishing a peering with acceptor's token ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())), + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s2Client := pbpeering.NewPeeringServiceClient(conn) + dialerClient := pbpeering.NewPeeringServiceClient(conn) establishReq := pbpeering.EstablishRequest{ - PeerName: "my-peer-s1", + PeerName: "my-peer-acceptor", PeeringToken: resp.PeeringToken, } - _, err = s2Client.Establish(ctx, &establishReq) + _, err = dialerClient.Establish(ctx, &establishReq) require.NoError(t, err) - p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { - status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + retry.Run(t, func(r *retry.R) { + status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.True(r, found) require.True(r, status.Connected) }) @@ -128,21 +134,21 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo // Delete the peering to trigger the termination sequence. deleted := &pbpeering.Peering{ ID: p.Peering.ID, - Name: "my-peer-s1", + Name: "my-peer-acceptor", DeletedAt: structs.TimeToProto(time.Now()), } - require.NoError(t, s2.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) - s2.logger.Trace("deleted peering for my-peer-s1") + require.NoError(t, dialer.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) + dialer.logger.Trace("deleted peering for my-peer-acceptor") retry.Run(t, func(r *retry.R) { - _, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + _, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) require.False(r, found) }) - // s1 should have also marked the peering as terminated. + // acceptor should have also marked the peering as terminated. retry.Run(t, func(r *retry.R) { - _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ - Value: "my-peer-s2", + _, peering, err := acceptor.fsm.State().PeeringRead(nil, state.Query{ + Value: "my-peer-dialer", }) require.NoError(r, err) require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) @@ -151,20 +157,20 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { t.Run("without-tls", func(t *testing.T) { - testLeader_PeeringSync_Lifecycle_ServerDeletion(t, false) + testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false) }) t.Run("with-tls", func(t *testing.T) { - testLeader_PeeringSync_Lifecycle_ServerDeletion(t, true) + testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, true) }) } -func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS bool) { +func testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t *testing.T, enableTLS bool) { if testing.Short() { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "bob" + _, acceptor := testServerWithConfig(t, func(c *Config) { + c.NodeName = "acceptor" c.Datacenter = "dc1" c.TLSConfig.Domain = "consul" if enableTLS { @@ -173,14 +179,14 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key" } }) - testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, acceptor.RPC, "dc1") - // Define a peering by generating a token for s2 + // Define a peering by generating a token for dialer ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) @@ -189,7 +195,7 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo peeringClient := pbpeering.NewPeeringServiceClient(conn) req := pbpeering.GenerateTokenRequest{ - PeerName: "my-peer-s2", + PeerName: "my-peer-dialer", } resp, err := peeringClient.GenerateToken(ctx, &req) require.NoError(t, err) @@ -200,9 +206,9 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) - // Bring up s2 and establish a peering with s1's token so that it attempts to dial. - _, s2 := testServerWithConfig(t, func(c *Config) { - c.NodeName = "betty" + // Bring up dialer and establish a peering with acceptor's token so that it attempts to dial. + _, dialer := testServerWithConfig(t, func(c *Config) { + c.NodeName = "dialer" c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" if enableTLS { @@ -211,33 +217,39 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key" } }) - testrpc.WaitForLeader(t, s2.RPC, "dc2") + testrpc.WaitForLeader(t, dialer.RPC, "dc2") - // Create a peering at s2 by establishing a peering with s1's token + // Create a peering at dialer by establishing a peering with acceptor's token ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) t.Cleanup(cancel) - conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())), + conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())), grpc.WithInsecure(), grpc.WithBlock()) require.NoError(t, err) defer conn.Close() - s2Client := pbpeering.NewPeeringServiceClient(conn) + dialerClient := pbpeering.NewPeeringServiceClient(conn) establishReq := pbpeering.EstablishRequest{ - PeerName: "my-peer-s1", + PeerName: "my-peer-acceptor", PeeringToken: resp.PeeringToken, } - _, err = s2Client.Establish(ctx, &establishReq) + _, err = dialerClient.Establish(ctx, &establishReq) require.NoError(t, err) - p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"}) + p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { - status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID) + status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID) + require.True(r, found) + require.True(r, status.Connected) + }) + + retry.Run(t, func(r *retry.R) { + status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.True(r, found) require.True(r, status.Connected) }) @@ -245,21 +257,22 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo // Delete the peering from the server peer to trigger the termination sequence. deleted := &pbpeering.Peering{ ID: p.Peering.PeerID, - Name: "my-peer-s2", + Name: "my-peer-dialer", DeletedAt: structs.TimeToProto(time.Now()), } - require.NoError(t, s1.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) - s2.logger.Trace("deleted peering for my-peer-s2") + + require.NoError(t, acceptor.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted})) + acceptor.logger.Trace("deleted peering for my-peer-dialer") retry.Run(t, func(r *retry.R) { - _, found := s1.peerStreamServer.StreamStatus(p.Peering.PeerID) + _, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID) require.False(r, found) }) - // s2 should have received the termination message and updated the peering state. + // dialer should have received the termination message and updated the peering state. retry.Run(t, func(r *retry.R) { - _, peering, err := s2.fsm.State().PeeringRead(nil, state.Query{ - Value: "my-peer-s1", + _, peering, err := dialer.fsm.State().PeeringRead(nil, state.Query{ + Value: "my-peer-acceptor", }) require.NoError(r, err) require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 2542c2e4a..af4938c16 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -447,6 +447,8 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { // exits. After the method exits this code here won't receive any recv errors and those will be handled // by DrainStream(). err = fmt.Errorf("stream ended unexpectedly") + } else { + err = fmt.Errorf("unexpected error receiving from the stream: %w", err) } status.TrackRecvError(err.Error()) return err @@ -684,10 +686,29 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) { dir = "received" } + // Redact the long-lived stream secret to avoid leaking it in trace logs. + pbToLog := pb + switch msg := pb.(type) { + case *pbpeerstream.ReplicationMessage: + clone := &pbpeerstream.ReplicationMessage{} + proto.Merge(clone, msg) + + if clone.GetOpen() != nil { + clone.GetOpen().StreamSecretID = "hidden" + pbToLog = clone + } + case *pbpeerstream.ReplicationMessage_Open: + clone := &pbpeerstream.ReplicationMessage_Open{} + proto.Merge(clone, msg) + + clone.StreamSecretID = "hidden" + pbToLog = clone + } + m := jsonpb.Marshaler{ Indent: " ", } - out, err := m.MarshalToString(pb) + out, err := m.MarshalToString(pbToLog) if err != nil { out = "" } diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index e6e26e81d..8264cce41 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1,6 +1,7 @@ package peerstream import ( + "bytes" "context" "fmt" "io" @@ -10,13 +11,14 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" + newproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" @@ -26,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeerstream" @@ -1657,7 +1660,7 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs. return clusterID, rootA } -func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any { +func makeAnyPB(t *testing.T, pb newproto.Message) *anypb.Any { any, err := anypb.New(pb) require.NoError(t, err) return any @@ -2592,6 +2595,51 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { } } +// TestLogTraceProto tests that all PB trace log helpers redact the +// long-lived SecretStreamID. +// We ensure it gets redacted when logging a ReplicationMessage_Open or a ReplicationMessage. +// In the stream handler we only log the ReplicationMessage_Open, but testing both guards against +// a change in that behavior. +func TestLogTraceProto(t *testing.T) { + type testCase struct { + input proto.Message + } + + tt := map[string]testCase{ + "replication message": { + input: &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Open_{ + Open: &pbpeerstream.ReplicationMessage_Open{ + StreamSecretID: testPendingStreamSecretID, + }, + }, + }, + }, + "open message": { + input: &pbpeerstream.ReplicationMessage_Open{ + StreamSecretID: testPendingStreamSecretID, + }, + }, + } + for name, tc := range tt { + t.Run(name, func(t *testing.T) { + var b bytes.Buffer + logger, err := logging.Setup(logging.Config{ + LogLevel: "TRACE", + }, &b) + require.NoError(t, err) + + logTraceRecv(logger, tc.input) + logTraceSend(logger, tc.input) + logTraceProto(logger, tc.input, false) + + body, err := io.ReadAll(&b) + require.NoError(t, err) + require.NotContains(t, string(body), testPendingStreamSecretID) + }) + } +} + func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) { t.Helper()