Various peering fixes (#13979)

* Avoid logging StreamSecretID
* Wrap additional errors in stream handler
* Fix flakiness in leader test and rename servers for clarity. There was
  a race condition where the peering was being deleted in the test
  before the stream was active. Now the test waits for the stream to be
  connected on both sides before deleting the associated peering.
* Run flaky test serially
This commit is contained in:
Freddy 2022-08-01 15:06:18 -06:00 committed by GitHub
parent e46a4b3cc1
commit 56144cf5f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 64 deletions

View File

@ -1804,8 +1804,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

View File

@ -237,7 +237,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
}
}
logger.Trace("checking connected streams", "streams", s.peerStreamServer.ConnectedStreams(), "sequence_id", seq)
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)
// Clean up active streams of peerings that were deleted from the state store.
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?

View File

@ -41,8 +41,8 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "bob"
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
if enableTLS {
@ -51,25 +51,25 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key"
}
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
s1Client := pbpeering.NewPeeringServiceClient(conn)
acceptorClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-s2",
PeerName: "my-peer-dialer",
}
resp, err := s1Client.GenerateToken(ctx, &req)
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
@ -78,14 +78,14 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
// S1 should not have a stream tracked for dc2 because s1 generated a token for baz, and therefore needs to wait to be dialed.
// S1 should not have a stream tracked for dc2 because acceptor generated a token for baz, and therefore needs to wait to be dialed.
time.Sleep(1 * time.Second)
_, found := s1.peerStreamServer.StreamStatus(token.PeerID)
_, found := acceptor.peerStreamServer.StreamStatus(token.PeerID)
require.False(t, found)
// Bring up s2 and establish a peering with s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "betty"
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
if enableTLS {
@ -94,33 +94,39 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key"
}
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Create a peering at s2 by establishing a peering with s1's token
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())),
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
s2Client := pbpeering.NewPeeringServiceClient(conn)
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-s1",
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = s2Client.Establish(ctx, &establishReq)
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID)
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
retry.Run(t, func(r *retry.R) {
status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.True(r, found)
require.True(r, status.Connected)
})
@ -128,21 +134,21 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
// Delete the peering to trigger the termination sequence.
deleted := &pbpeering.Peering{
ID: p.Peering.ID,
Name: "my-peer-s1",
Name: "my-peer-acceptor",
DeletedAt: structs.TimeToProto(time.Now()),
}
require.NoError(t, s2.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted}))
s2.logger.Trace("deleted peering for my-peer-s1")
require.NoError(t, dialer.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted}))
dialer.logger.Trace("deleted peering for my-peer-acceptor")
retry.Run(t, func(r *retry.R) {
_, found := s2.peerStreamServer.StreamStatus(p.Peering.ID)
_, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.False(r, found)
})
// s1 should have also marked the peering as terminated.
// acceptor should have also marked the peering as terminated.
retry.Run(t, func(r *retry.R) {
_, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{
Value: "my-peer-s2",
_, peering, err := acceptor.fsm.State().PeeringRead(nil, state.Query{
Value: "my-peer-dialer",
})
require.NoError(r, err)
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)
@ -151,20 +157,20 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ServerDeletion(t, false)
testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false)
})
t.Run("with-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_ServerDeletion(t, true)
testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, true)
})
}
func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS bool) {
func testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t *testing.T, enableTLS bool) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "bob"
_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
if enableTLS {
@ -173,14 +179,14 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Bob.key"
}
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")
// Define a peering by generating a token for s2
// Define a peering by generating a token for dialer
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
@ -189,7 +195,7 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-s2",
PeerName: "my-peer-dialer",
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
@ -200,9 +206,9 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
// Bring up s2 and establish a peering with s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "betty"
// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
if enableTLS {
@ -211,33 +217,39 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo
c.TLSConfig.GRPC.KeyFile = "../../test/hostname/Betty.key"
}
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
testrpc.WaitForLeader(t, dialer.RPC, "dc2")
// Create a peering at s2 by establishing a peering with s1's token
// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
conn, err = grpc.DialContext(ctx, s2.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())),
conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
s2Client := pbpeering.NewPeeringServiceClient(conn)
dialerClient := pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-s1",
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = s2Client.Establish(ctx, &establishReq)
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)
p, err := s2Client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
status, found := s2.peerStreamServer.StreamStatus(p.Peering.ID)
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})
retry.Run(t, func(r *retry.R) {
status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.True(r, found)
require.True(r, status.Connected)
})
@ -245,21 +257,22 @@ func testLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T, enableTLS boo
// Delete the peering from the server peer to trigger the termination sequence.
deleted := &pbpeering.Peering{
ID: p.Peering.PeerID,
Name: "my-peer-s2",
Name: "my-peer-dialer",
DeletedAt: structs.TimeToProto(time.Now()),
}
require.NoError(t, s1.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted}))
s2.logger.Trace("deleted peering for my-peer-s2")
require.NoError(t, acceptor.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{Peering: deleted}))
acceptor.logger.Trace("deleted peering for my-peer-dialer")
retry.Run(t, func(r *retry.R) {
_, found := s1.peerStreamServer.StreamStatus(p.Peering.PeerID)
_, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.False(r, found)
})
// s2 should have received the termination message and updated the peering state.
// dialer should have received the termination message and updated the peering state.
retry.Run(t, func(r *retry.R) {
_, peering, err := s2.fsm.State().PeeringRead(nil, state.Query{
Value: "my-peer-s1",
_, peering, err := dialer.fsm.State().PeeringRead(nil, state.Query{
Value: "my-peer-acceptor",
})
require.NoError(r, err)
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)

View File

@ -447,6 +447,8 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// exits. After the method exits this code here won't receive any recv errors and those will be handled
// by DrainStream().
err = fmt.Errorf("stream ended unexpectedly")
} else {
err = fmt.Errorf("unexpected error receiving from the stream: %w", err)
}
status.TrackRecvError(err.Error())
return err
@ -684,10 +686,29 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
dir = "received"
}
// Redact the long-lived stream secret to avoid leaking it in trace logs.
pbToLog := pb
switch msg := pb.(type) {
case *pbpeerstream.ReplicationMessage:
clone := &pbpeerstream.ReplicationMessage{}
proto.Merge(clone, msg)
if clone.GetOpen() != nil {
clone.GetOpen().StreamSecretID = "hidden"
pbToLog = clone
}
case *pbpeerstream.ReplicationMessage_Open:
clone := &pbpeerstream.ReplicationMessage_Open{}
proto.Merge(clone, msg)
clone.StreamSecretID = "hidden"
pbToLog = clone
}
m := jsonpb.Marshaler{
Indent: " ",
}
out, err := m.MarshalToString(pb)
out, err := m.MarshalToString(pbToLog)
if err != nil {
out = "<ERROR: " + err.Error() + ">"
}

View File

@ -1,6 +1,7 @@
package peerstream
import (
"bytes"
"context"
"fmt"
"io"
@ -10,13 +11,14 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
@ -26,6 +28,7 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
@ -1657,7 +1660,7 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs.
return clusterID, rootA
}
func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any {
func makeAnyPB(t *testing.T, pb newproto.Message) *anypb.Any {
any, err := anypb.New(pb)
require.NoError(t, err)
return any
@ -2592,6 +2595,51 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}
}
// TestLogTraceProto tests that all PB trace log helpers redact the
// long-lived SecretStreamID.
// We ensure it gets redacted when logging a ReplicationMessage_Open or a ReplicationMessage.
// In the stream handler we only log the ReplicationMessage_Open, but testing both guards against
// a change in that behavior.
func TestLogTraceProto(t *testing.T) {
type testCase struct {
input proto.Message
}
tt := map[string]testCase{
"replication message": {
input: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Open_{
Open: &pbpeerstream.ReplicationMessage_Open{
StreamSecretID: testPendingStreamSecretID,
},
},
},
},
"open message": {
input: &pbpeerstream.ReplicationMessage_Open{
StreamSecretID: testPendingStreamSecretID,
},
},
}
for name, tc := range tt {
t.Run(name, func(t *testing.T) {
var b bytes.Buffer
logger, err := logging.Setup(logging.Config{
LogLevel: "TRACE",
}, &b)
require.NoError(t, err)
logTraceRecv(logger, tc.input)
logTraceSend(logger, tc.input)
logTraceProto(logger, tc.input, false)
body, err := io.ReadAll(&b)
require.NoError(t, err)
require.NotContains(t, string(body), testPendingStreamSecretID)
})
}
}
func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) {
t.Helper()