diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index bf02a83f0..f13f0128c 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -103,6 +103,24 @@ func makeCARootsResponse( }, nil } +func makeServerAddrsResponse( + update cache.UpdateEvent, +) (*pbpeerstream.ReplicationMessage_Response, error) { + any, _, err := marshalToProtoAny[*pbpeering.PeeringServerAddresses](update.Result) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %w", err) + } + + return &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: "server-addrs", + Operation: pbpeerstream.Operation_OPERATION_UPSERT, + Resource: any, + }, nil +} + // marshalToProtoAny takes any input and returns: // the protobuf.Any type, the asserted T type, and any errors // during marshalling or type assertion. @@ -225,6 +243,13 @@ func (s *Server) handleUpsert( return s.handleUpsertRoots(peerName, partition, roots) + case pbpeerstream.TypeURLPeeringServerAddresses: + addrs := &pbpeering.PeeringServerAddresses{} + if err := resource.UnmarshalTo(addrs); err != nil { + return fmt.Errorf("failed to unmarshal resource: %w", err) + } + + return s.handleUpsertServerAddrs(peerName, partition, addrs) default: return fmt.Errorf("unexpected resourceURL: %s", resourceURL) } diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 47a66590d..21e5c1fb9 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -161,8 +161,20 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes if p == nil { return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID) } + if !p.IsActive() { + logger.Warn("peering is marked as deleted or terminated", "peer_id", req.PeerID) + term := &pbpeerstream.ReplicationMessage{ + Payload: &pbpeerstream.ReplicationMessage_Terminated_{ + Terminated: &pbpeerstream.ReplicationMessage_Terminated{}, + }, + } + logTraceSend(logger, term) - // TODO(peering): If the peering is marked as deleted, send a Terminated message and return + err := stream.Send(term) + if err != nil { + return grpcstatus.Error(codes.FailedPrecondition, "peering is marked as deleted: "+req.PeerID) + } + } secrets, err := s.GetStore().PeeringSecretsRead(nil, req.PeerID) if err != nil { @@ -347,6 +359,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { for _, resourceURL := range []string{ pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLPeeringTrustBundle, + pbpeerstream.TypeURLPeeringServerAddresses, } { sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ ResourceURL: resourceURL, @@ -630,6 +643,13 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { continue } + case update.CorrelationID == subServerAddrs: + resp, err = makeServerAddrsResponse(update) + if err != nil { + logger.Error("failed to create server address response", "error", err) + continue + } + default: logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID) continue @@ -640,6 +660,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { replResp := makeReplicationResponse(resp) if err := streamSend(replResp); err != nil { + // note: govet warns of context leak but it is cleaned up in a defer return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } } diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 88b1e0c3c..1c4706c44 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1350,6 +1350,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s for _, resourceURL := range []string{ pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLPeeringTrustBundle, + pbpeerstream.TypeURLPeeringServerAddresses, } { init := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Request_{ diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 2d7aa70ca..ce22b0cd7 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -70,7 +70,7 @@ func newSubscriptionManager( getStore: getStore, serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService), trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle), - serverAddrsSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLServerAddress), + serverAddrsSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringServerAddresses), } } diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index d121ffcf1..7c4025655 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -641,7 +641,7 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) { // Only configure a tracker for CA roots events. tracker := newResourceSubscriptionTracker() - tracker.Subscribe(pbpeerstream.TypeURLServerAddress) + tracker.Subscribe(pbpeerstream.TypeURLPeeringServerAddresses) mgr := newSubscriptionManager(ctx, testutil.Logger(t), diff --git a/proto/pbpeerstream/types.go b/proto/pbpeerstream/types.go index df300cccd..4bf114c0e 100644 --- a/proto/pbpeerstream/types.go +++ b/proto/pbpeerstream/types.go @@ -3,13 +3,14 @@ package pbpeerstream const ( apiTypePrefix = "type.googleapis.com/" - TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService" - TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle" + TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService" + TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle" + TypeURLPeeringServerAddresses = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringServerAddresses" ) func KnownTypeURL(s string) bool { switch s { - case TypeURLExportedService, TypeURLPeeringTrustBundle: + case TypeURLExportedService, TypeURLPeeringTrustBundle, TypeURLPeeringServerAddresses: return true } return false