From bc10055edc6197f54951c3ab5e2e81cd59320ee8 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Wed, 25 May 2022 12:37:44 -0500 Subject: [PATCH] peering: replicate expected SNI, SPIFFE, and service protocol to peers (#13218) The importing peer will need to know what SNI and SPIFFE name corresponds to each exported service. Additionally it will need to know at a high level the protocol in use (L4/L7) to generate the appropriate connection pool and local metrics. For replicated connect synthetic entities we edit the `Connect{}` part of a `NodeService` to have a new section: { "PeerMeta": { "SNI": [ "web.default.default.owt.external.183150d5-1033-3672-c426-c29205a576b8.consul" ], "SpiffeID": [ "spiffe://183150d5-1033-3672-c426-c29205a576b8.consul/ns/default/dc/dc1/svc/web" ], "Protocol": "tcp" } } This data is then replicated and saved as-is at the importing side. Both SNI and SpiffeID are slices for now until I can be sure we don't need them for how mesh gateways will ultimately work. --- agent/connect/sni.go | 16 + agent/connect/sni_test.go | 8 +- agent/connect/testing_ca.go | 1 + agent/consul/server.go | 4 + agent/consul/state/peering.go | 40 +- agent/consul/state/peering_test.go | 23 + agent/rpc/peering/service.go | 55 +- agent/rpc/peering/service_test.go | 13 +- agent/rpc/peering/stream_test.go | 327 ++++-------- agent/rpc/peering/subscription_manager.go | 156 +++++- .../rpc/peering/subscription_manager_test.go | 478 ++++++++++++++---- agent/rpc/peering/subscription_state.go | 9 +- agent/rpc/peering/subscription_state_test.go | 4 +- agent/structs/peering.go | 11 +- agent/structs/structs.go | 9 + proto/pbservice/service.gen.go | 26 + proto/pbservice/service.pb.binary.go | 10 + proto/pbservice/service.pb.go | 246 ++++++--- proto/pbservice/service.proto | 17 + proto/prototest/testing.go | 4 +- 20 files changed, 1014 insertions(+), 443 deletions(-) diff --git a/agent/connect/sni.go b/agent/connect/sni.go index e693985b5..17fce7e27 100644 --- a/agent/connect/sni.go +++ b/agent/connect/sni.go @@ -11,6 +11,7 @@ const ( internal = "internal" version = "v1" internalVersion = internal + "-" + version + external = "external" ) func UpstreamSNI(u *structs.Upstream, subset string, dc string, trustDomain string) string { @@ -64,6 +65,21 @@ func ServiceSNI(service string, subset string, namespace string, partition strin } } +func PeeredServiceSNI(service, namespace, partition, peerName, trustDomain string) string { + if peerName == "" { + panic("peer name is a requirement for this function and does not make sense without it") + } + if namespace == "" { + namespace = structs.IntentionDefaultNamespace + } + if partition == "" { + // TODO(partitions) Make default available in OSS as a constant for uses like this one + partition = "default" + } + + return dotJoin(service, namespace, partition, peerName, external, trustDomain) +} + func dotJoin(parts ...string) string { return strings.Join(parts, ".") } diff --git a/agent/connect/sni_test.go b/agent/connect/sni_test.go index 0ead8b485..26fae1da7 100644 --- a/agent/connect/sni_test.go +++ b/agent/connect/sni_test.go @@ -3,8 +3,9 @@ package connect import ( "testing" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" ) const ( @@ -164,6 +165,11 @@ func TestServiceSNI(t *testing.T) { ServiceSNI("api", "canary", "neighbor", "part1", "foo", testTrustDomain2)) } +func TestPeeredServiceSNI(t *testing.T) { + require.Equal(t, "api.billing.default.webstuff.external."+testTrustDomainSuffix1, + PeeredServiceSNI("api", "billing", "", "webstuff", testTrustDomainSuffix1)) +} + func TestQuerySNI(t *testing.T) { require.Equal(t, "magicquery.default.foo.query."+testTrustDomain1, QuerySNI("magicquery", "foo", testTrustDomain1)) diff --git a/agent/connect/testing_ca.go b/agent/connect/testing_ca.go index 16ffb6536..4fb47cdb9 100644 --- a/agent/connect/testing_ca.go +++ b/agent/connect/testing_ca.go @@ -23,6 +23,7 @@ import ( // // NOTE: this is duplicated in the api package as testClusterID const TestClusterID = "11111111-2222-3333-4444-555555555555" +const TestTrustDomain = TestClusterID + ".consul" // testCACounter is just an atomically incremented counter for creating // unique names for the CA certs. diff --git a/agent/consul/server.go b/agent/consul/server.go index 8468ab314..e2ad87fd5 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -760,6 +760,10 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { p := peering.NewService( deps.Logger.Named("grpc-api.peering"), + peering.Config{ + Datacenter: config.Datacenter, + ConnectEnabled: config.ConnectEnabled, + }, NewPeeringBackend(s, deps.GRPCConnPool), ) s.peeringService = p diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 954ad590c..e566532b6 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -398,10 +398,42 @@ func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peerin structs.ServiceList(normal).Sort() structs.ServiceList(disco).Sort() - return maxIdx, &structs.ExportedServiceList{ - Services: normal, - DiscoChains: disco, - }, nil + serviceProtocols := make(map[structs.ServiceName]string) + populateProtocol := func(svc structs.ServiceName) error { + if _, ok := serviceProtocols[svc]; ok { + return nil // already processed + } + + idx, protocol, err := protocolForService(tx, ws, svc) + if err != nil { + return fmt.Errorf("failed to get protocol for service: %w", err) + } + + if idx > maxIdx { + maxIdx = idx + } + + serviceProtocols[svc] = protocol + return nil + } + for _, svc := range normal { + if err := populateProtocol(svc); err != nil { + return 0, nil, err + } + } + for _, svc := range disco { + if err := populateProtocol(svc); err != nil { + return 0, nil, err + } + } + + list := &structs.ExportedServiceList{ + Services: normal, + DiscoChains: disco, + ConnectProtocol: serviceProtocols, + } + + return maxIdx, list, nil } // PeeringsForService returns the list of peerings that are associated with the service name provided in the query. diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 9a93111b5..53b80cb9c 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -644,6 +644,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { defaultEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition() + newSN := func(name string) structs.ServiceName { + return structs.NewServiceName(name, defaultEntMeta) + } + ws := memdb.NewWatchSet() ensureConfigEntry := func(t *testing.T, entry structs.ConfigEntry) { @@ -704,6 +708,10 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, + ConnectProtocol: map[structs.ServiceName]string{ + newSN("mysql"): "tcp", + newSN("redis"): "tcp", + }, } idx, got, err := s.ExportedServicesForPeer(ws, id) @@ -746,6 +754,9 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, + ConnectProtocol: map[structs.ServiceName]string{ + newSN("billing"): "tcp", + }, } idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) @@ -826,6 +837,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, + ConnectProtocol: map[structs.ServiceName]string{ + newSN("billing"): "http", + newSN("payments"): "http", + newSN("resolver"): "http", + newSN("router"): "http", + newSN("splitter"): "http", + }, } idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) @@ -861,6 +879,11 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, + ConnectProtocol: map[structs.ServiceName]string{ + newSN("payments"): "http", + newSN("resolver"): "http", + newSN("router"): "http", + }, } idx, got, err := s.ExportedServicesForPeer(ws, id) require.NoError(t, err) diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index d38d80ed5..81243eae3 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -19,6 +19,7 @@ import ( grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/dns" @@ -44,25 +45,30 @@ func (e *errPeeringInvalidServerAddress) Error() string { return fmt.Sprintf("%s is not a valid peering server address", e.addr) } +type Config struct { + Datacenter string + ConnectEnabled bool + // TODO(peering): remove this when we're ready + DisableMeshGatewayMode bool +} + // Service implements pbpeering.PeeringService to provide RPC operations for // managing peering relationships. type Service struct { Backend Backend logger hclog.Logger + config Config streams *streamTracker - - // TODO(peering): remove this when we're ready - DisableMeshGatewayMode bool } -func NewService(logger hclog.Logger, backend Backend) *Service { - srv := &Service{ +func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service { + cfg.DisableMeshGatewayMode = true + return &Service{ Backend: backend, logger: logger, + config: cfg, streams: newStreamTracker(), } - srv.DisableMeshGatewayMode = true - return srv } var _ pbpeering.PeeringServiceServer = (*Service)(nil) @@ -112,6 +118,7 @@ type Store interface { ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) + CAConfig(memdb.WatchSet) (uint64, *structs.CAConfiguration, error) AbandonCh() <-chan struct{} } @@ -521,9 +528,24 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { // TODO(peering) Also need to clear subscriptions associated with the peer defer s.streams.disconnected(req.LocalID) - mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend) - mgr.DisableMeshGatewayMode = s.DisableMeshGatewayMode - subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition) + var trustDomain string + if s.config.ConnectEnabled { + // Read the TrustDomain up front - we do not allow users to change the ClusterID + // so reading it once at the beginning of the stream is sufficient. + trustDomain, err = getTrustDomain(s.Backend.Store(), logger) + if err != nil { + return err + } + } + + mgr := newSubscriptionManager( + req.Stream.Context(), + logger, + s.config, + trustDomain, + s.Backend, + ) + subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.PeerName, req.Partition) sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -674,6 +696,19 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { } } +func getTrustDomain(store Store, logger hclog.Logger) (string, error) { + _, cfg, err := store.CAConfig(nil) + switch { + case err != nil: + logger.Error("failed to read Connect CA Config", "error", err) + return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config") + case cfg == nil: + logger.Warn("cannot begin stream because Connect CA is not yet initialized") + return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized") + } + return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil +} + func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { return s.streams.streamStatus(peer) } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index c661be925..587d1ac40 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -679,8 +679,16 @@ func Test_StreamHandler_UpsertServices(t *testing.T) { s := newTestServer(t, nil) testrpc.WaitForLeader(t, s.Server.RPC, "dc1") + testrpc.WaitForActiveCARoot(t, s.Server.RPC, "dc1", nil) - srv := peering.NewService(testutil.Logger(t), consul.NewPeeringBackend(s.Server, nil)) + srv := peering.NewService( + testutil.Logger(t), + peering.Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, + consul.NewPeeringBackend(s.Server, nil), + ) require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{ Name: "my-peer", @@ -1038,6 +1046,9 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { conf.SerfWANConfig.MemberlistConfig.BindPort = ports[2] conf.SerfWANConfig.MemberlistConfig.AdvertisePort = ports[2] + conf.PrimaryDatacenter = "dc1" + conf.ConnectEnabled = true + nodeID, err := uuid.GenerateUUID() if err != nil { t.Fatal(err) diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 0ebdadf6f..6f58a9dbf 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -34,13 +35,19 @@ func TestStreamResources_Server_Follower(t *testing.T) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - leader: func() bool { - return false + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, }, - }) + &testStreamBackend{ + store: store, + pub: publisher, + leader: func() bool { + return false + }, + }) client := NewMockClient(context.Background()) @@ -76,11 +83,18 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { } return false } - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - leader: leaderFunc, - }) + + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, + &testStreamBackend{ + store: store, + pub: publisher, + leader: leaderFunc, + }) client := NewMockClient(context.Background()) @@ -97,6 +111,9 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) { p := writeInitiatedPeering(t, store, 1, "my-peer") peerID := p.ID + // Set the initial roots and CA configuration. + _ = writeInitialRootsAndCA(t, store) + // Receive a subscription from a peer sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -142,10 +159,15 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, &testStreamBackend{ + store: store, + pub: publisher, + }) client := NewMockClient(context.Background()) @@ -243,10 +265,15 @@ func TestStreamResources_Server_Terminate(t *testing.T) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, &testStreamBackend{ + store: store, + pub: publisher, + }) it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), @@ -272,6 +299,9 @@ func TestStreamResources_Server_Terminate(t *testing.T) { remotePeerID = p.PeerID // for Recv ) + // Set the initial roots and CA configuration. + _ = writeInitialRootsAndCA(t, store) + // Receive a subscription from a peer sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -330,10 +360,15 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, &testStreamBackend{ + store: store, + pub: publisher, + }) it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), @@ -353,6 +388,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { remotePeerID = p.PeerID // for Recv ) + // Set the initial roots and CA configuration. + _ = writeInitialRootsAndCA(t, store) + // Receive a subscription from a peer sub := &pbpeering.ReplicationMessage{ Payload: &pbpeering.ReplicationMessage_Request_{ @@ -602,200 +640,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { } func TestStreamResources_Server_ServiceUpdates(t *testing.T) { - publisher := stream.NewEventPublisher(10 * time.Second) - store := newStateStore(t, publisher) - - // Create a peering - var lastIdx uint64 = 1 - p := writeInitiatedPeering(t, store, lastIdx, "my-peering") - var ( - peerID = p.ID // for Send - remotePeerID = p.PeerID // for Recv - ) - - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) - srv.DisableMeshGatewayMode = false - - client := NewMockClient(context.Background()) - - errCh := make(chan error, 1) - client.ErrCh = errCh - - go func() { - // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). - // This matches gRPC's behavior when an error is returned by a server. - if err := srv.StreamResources(client.ReplicationStream); err != nil { - errCh <- err - } - }() - - // Issue a services subscription to server - init := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Request_{ - Request: &pbpeering.ReplicationMessage_Request{ - PeerID: peerID, - ResourceURL: pbpeering.TypeURLService, - }, - }, - } - require.NoError(t, client.Send(init)) - - // Receive a services subscription from server - receivedSub, err := client.Recv() - require.NoError(t, err) - - expect := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Request_{ - Request: &pbpeering.ReplicationMessage_Request{ - ResourceURL: pbpeering.TypeURLService, - PeerID: remotePeerID, - }, - }, - } - prototest.AssertDeepEqual(t, expect, receivedSub) - - // Register a service that is not yet exported - mysql := &structs.CheckServiceNode{ - Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, - Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, - } - - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mysql.Node)) - - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) - - var ( - mongoSN = structs.NewServiceName("mongo", nil).String() - mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() - mysqlSN = structs.NewServiceName("mysql", nil).String() - mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String() - ) - - testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { - entry := &structs.ExportedServicesConfigEntry{ - Name: "default", - Services: []structs.ExportedService{ - { - Name: "mysql", - Consumers: []structs.ServiceConsumer{ - {PeerName: "my-peering"}, - }, - }, - { - // Mongo does not get pushed because it does not have instances registered. - Name: "mongo", - Consumers: []structs.ServiceConsumer{ - {PeerName: "my-peering"}, - }, - }, - }, - } - lastIdx++ - require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) - - expectReplEvents(t, client, - func(t *testing.T, msg *pbpeering.ReplicationMessage) { - require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) - require.Equal(t, mongoSN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) - require.Nil(t, msg.GetResponse().Resource) - }, - func(t *testing.T, msg *pbpeering.ReplicationMessage) { - require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) - require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) - require.Nil(t, msg.GetResponse().Resource) - }, - func(t *testing.T, msg *pbpeering.ReplicationMessage) { - require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) - require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) - - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) - require.Len(t, nodes.Nodes, 1) - }, - func(t *testing.T, msg *pbpeering.ReplicationMessage) { - require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) - require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) - require.Nil(t, msg.GetResponse().Resource) - }, - ) - }) - - mongo := &structs.CheckServiceNode{ - Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, - Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, - } - - testutil.RunStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) { - lastIdx++ - require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) - - lastIdx++ - require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service)) - - retry.Run(t, func(r *retry.R) { - msg, err := client.RecvWithTimeout(100 * time.Millisecond) - require.NoError(r, err) - require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) - require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) - - var nodes pbservice.IndexedCheckServiceNodes - require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) - require.Len(r, nodes.Nodes, 1) - }) - }) - - testutil.RunStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) { - entry := &structs.ExportedServicesConfigEntry{ - Name: "default", - Services: []structs.ExportedService{ - { - Name: "mongo", - Consumers: []structs.ServiceConsumer{ - { - PeerName: "my-peering", - }, - }, - }, - }, - } - lastIdx++ - err = store.EnsureConfigEntry(lastIdx, entry) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - msg, err := client.RecvWithTimeout(100 * time.Millisecond) - require.NoError(r, err) - require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) - require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) - require.Nil(r, msg.GetResponse().Resource) - }) - }) - - testutil.RunStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) { - lastIdx++ - err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - msg, err := client.RecvWithTimeout(100 * time.Millisecond) - require.NoError(r, err) - require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) - require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) - require.Nil(r, msg.GetResponse().Resource) - }) - }) + testStreamResources_Server_ServiceUpdates(t, true) } - -func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing.T) { +func TestStreamResources_Server_ServiceUpdates_EnableMeshGateways(t *testing.T) { + testStreamResources_Server_ServiceUpdates(t, false) +} +func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways bool) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) @@ -807,11 +657,19 @@ func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing remotePeerID = p.PeerID // for Recv ) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) - srv.DisableMeshGatewayMode = true + // Set the initial roots and CA configuration. + _ = writeInitialRootsAndCA(t, store) + + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + DisableMeshGatewayMode: disableMeshGateways, + }, &testStreamBackend{ + store: store, + pub: publisher, + }) client := NewMockClient(context.Background()) @@ -1052,10 +910,16 @@ func Test_processResponse_Validation(t *testing.T) { publisher := stream.NewEventPublisher(10 * time.Second) store := newStateStore(t, publisher) - srv := NewService(testutil.Logger(t), &testStreamBackend{ - store: store, - pub: publisher, - }) + + srv := NewService( + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, &testStreamBackend{ + store: store, + pub: publisher, + }) run := func(t *testing.T, tc testCase) { reply, err := srv.processResponse("", "", tc.in) @@ -1194,6 +1058,19 @@ func writeInitiatedPeering(t *testing.T, store *state.Store, idx uint64, peerNam return p } +func writeInitialRootsAndCA(t *testing.T, store *state.Store) string { + const clusterID = connect.TestClusterID + + rootA := connect.TestCA(t, nil) + _, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA}) + require.NoError(t, err) + + err = store.CASetConfig(0, &structs.CAConfiguration{ClusterID: clusterID}) + require.NoError(t, err) + + return clusterID +} + func makeAnyPB(t *testing.T, pb proto.Message) *any.Any { any, err := ptypes.MarshalAny(pb) require.NoError(t, err) diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index 928d32e54..d4ac7d769 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/api" @@ -29,41 +30,48 @@ type SubscriptionBackend interface { // subscriptionManager handlers requests to subscribe to events from an events publisher. type subscriptionManager struct { - logger hclog.Logger - viewStore MaterializedViewStore - backend SubscriptionBackend - - // TODO(peering): remove this when we're ready - DisableMeshGatewayMode bool + logger hclog.Logger + config Config + trustDomain string + viewStore MaterializedViewStore + backend SubscriptionBackend } // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. -func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend SubscriptionBackend) *subscriptionManager { +func newSubscriptionManager( + ctx context.Context, + logger hclog.Logger, + config Config, + trustDomain string, + backend SubscriptionBackend, +) *subscriptionManager { logger = logger.Named("subscriptions") store := submatview.NewStore(logger.Named("viewstore")) go store.Run(ctx) return &subscriptionManager{ - logger: logger, - viewStore: store, - backend: backend, + logger: logger, + config: config, + trustDomain: trustDomain, + viewStore: store, + backend: backend, } } // subscribe returns a channel that will contain updates to exported service instances for a given peer. -func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition string) <-chan cache.UpdateEvent { +func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, partition string) <-chan cache.UpdateEvent { var ( updateCh = make(chan cache.UpdateEvent, 1) publicUpdateCh = make(chan cache.UpdateEvent, 1) ) - state := newSubscriptionState(partition) + state := newSubscriptionState(peerName, partition) state.publicUpdateCh = publicUpdateCh state.updateCh = updateCh // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) - if !m.DisableMeshGatewayMode { + if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled { go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) } @@ -112,10 +120,12 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti pending := &pendingPayload{} m.syncNormalServices(ctx, state, pending, evt.Services) - if m.DisableMeshGatewayMode { + if m.config.DisableMeshGatewayMode { m.syncProxyServices(ctx, state, pending, evt.Services) } else { - m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) + if m.config.ConnectEnabled { + m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) + } } state.sendPendingEvents(ctx, m.logger, pending) @@ -133,7 +143,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // Clear this raft index before exporting. csn.Index = 0 - if !m.DisableMeshGatewayMode { + if !m.config.DisableMeshGatewayMode { // Ensure that connect things are scrubbed so we don't mix-and-match // with the synthetic entries that point to mesh gateways. filterConnectReferences(csn) @@ -150,6 +160,18 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti } } + // Scrub raft indexes + for _, instance := range csn.Nodes { + instance.Node.RaftIndex = nil + instance.Service.RaftIndex = nil + if m.config.DisableMeshGatewayMode { + for _, chk := range instance.Checks { + chk.RaftIndex = nil + } + } + // skip checks since we just generated one from scratch + } + id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService) // Just ferry this one directly along to the destination. @@ -165,7 +187,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti return fmt.Errorf("invalid type for response: %T", u.Result) } - if !m.DisableMeshGatewayMode { + if !m.config.DisableMeshGatewayMode { return nil // ignore event } @@ -183,6 +205,18 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // ) // } + // Scrub raft indexes + for _, instance := range csn.Nodes { + instance.Node.RaftIndex = nil + instance.Service.RaftIndex = nil + if m.config.DisableMeshGatewayMode { + for _, chk := range instance.Checks { + chk.RaftIndex = nil + } + } + // skip checks since we just generated one from scratch + } + id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService) // Just ferry this one directly along to the destination. @@ -200,7 +234,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway) - if m.DisableMeshGatewayMode { + if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled { return nil // ignore event } @@ -211,6 +245,30 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // Clear this raft index before exporting. csn.Index = 0 + // Flatten health checks + for _, instance := range csn.Nodes { + instance.Checks = flattenChecks( + instance.Node.Node, + instance.Service.ID, + instance.Service.Service, + instance.Service.EnterpriseMeta, + instance.Checks, + ) + } + + // Scrub raft indexes + for _, instance := range csn.Nodes { + instance.Node.RaftIndex = nil + instance.Service.RaftIndex = nil + // skip checks since we just generated one from scratch + + // Remove connect things like native mode. + if instance.Service.Connect != nil || instance.Service.Proxy != nil { + instance.Service.Connect = nil + instance.Service.Proxy = nil + } + } + state.meshGateway = csn pending := &pendingPayload{} @@ -223,8 +281,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti if state.exportList != nil { // Trigger public events for all synthetic discovery chain replies. - for chainName := range state.connectServices { - m.emitEventForDiscoveryChain(ctx, state, pending, chainName) + for chainName, protocol := range state.connectServices { + m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) } } @@ -369,17 +427,17 @@ func (m *subscriptionManager) syncDiscoveryChains( ctx context.Context, state *subscriptionState, pending *pendingPayload, - chainsByName map[structs.ServiceName]struct{}, + chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable ) { // if it was newly added, then try to emit an UPDATE event - for chainName := range chainsByName { - if _, ok := state.connectServices[chainName]; ok { + for chainName, protocol := range chainsByName { + if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol { continue } - state.connectServices[chainName] = struct{}{} + state.connectServices[chainName] = protocol - m.emitEventForDiscoveryChain(ctx, state, pending, chainName) + m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) } // if it was dropped, try to emit an DELETE event @@ -411,6 +469,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain( state *subscriptionState, pending *pendingPayload, chainName structs.ServiceName, + protocol string, ) { if _, ok := state.connectServices[chainName]; !ok { return // not found @@ -427,7 +486,11 @@ func (m *subscriptionManager) emitEventForDiscoveryChain( discoveryChainPayloadIDPrefix+chainName.String(), subExportedService+proxyName.String(), createDiscoChainHealth( + state.peerName, + m.config.Datacenter, + m.trustDomain, chainName, + protocol, state.meshGateway, ), ) @@ -436,9 +499,43 @@ func (m *subscriptionManager) emitEventForDiscoveryChain( } } -func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckServiceNodes) *pbservice.IndexedCheckServiceNodes { +func createDiscoChainHealth( + peerName string, + datacenter, trustDomain string, + sn structs.ServiceName, + protocol string, + pb *pbservice.IndexedCheckServiceNodes, +) *pbservice.IndexedCheckServiceNodes { fakeProxyName := sn.Name + syntheticProxyNameSuffix + var peerMeta *pbservice.PeeringServiceMeta + { + spiffeID := connect.SpiffeIDService{ + Host: trustDomain, + Partition: sn.PartitionOrDefault(), + Namespace: sn.NamespaceOrDefault(), + Datacenter: datacenter, + Service: sn.Name, + } + + sni := connect.PeeredServiceSNI( + sn.Name, + sn.NamespaceOrDefault(), + sn.PartitionOrDefault(), + peerName, + trustDomain, + ) + + // Create common peer meta. + // + // TODO(peering): should this be replicated by service and not by instance? + peerMeta = &pbservice.PeeringServiceMeta{ + SNI: []string{sni}, + SpiffeID: []string{spiffeID.URI().String()}, + Protocol: protocol, + } + } + newNodes := make([]*pbservice.CheckServiceNode, 0, len(pb.Nodes)) for i := range pb.Nodes { gwNode := pb.Nodes[i].Node @@ -448,10 +545,12 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe pbEntMeta := pbcommon.NewEnterpriseMetaFromStructs(sn.EnterpriseMeta) fakeProxyID := fakeProxyName + destServiceID := sn.Name if gwService.ID != "" { // This is only going to be relevant if multiple mesh gateways are // on the same exporting node. fakeProxyID = fmt.Sprintf("%s-instance-%d", fakeProxyName, i) + destServiceID = fmt.Sprintf("%s-instance-%d", sn.Name, i) } csn := &pbservice.CheckServiceNode{ @@ -464,7 +563,7 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe PeerName: structs.DefaultPeerKeyword, Proxy: &pbservice.ConnectProxyConfig{ DestinationServiceName: sn.Name, - DestinationServiceID: sn.Name, + DestinationServiceID: destServiceID, }, // direct Address: gwService.Address, @@ -472,6 +571,9 @@ func createDiscoChainHealth(sn structs.ServiceName, pb *pbservice.IndexedCheckSe Port: gwService.Port, SocketPath: gwService.SocketPath, Weights: gwService.Weights, + Connect: &pbservice.ServiceConnect{ + PeerMeta: peerMeta, + }, }, Checks: flattenChecks(gwNode.Node, fakeProxyID, fakeProxyName, pbEntMeta, gwChecks), } diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index b1530fc70..28583cfaf 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -7,18 +7,28 @@ import ( "time" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/sdk/testutil" ) func TestSubscriptionManager_RegisterDeregister(t *testing.T) { + testSubscriptionManager_RegisterDeregister(t, true) +} +func TestSubscriptionManager_RegisterDeregister_EnableMeshGateways(t *testing.T) { + testSubscriptionManager_RegisterDeregister(t, false) +} +func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateways bool) { backend := newTestSubscriptionBackend(t) // initialCatalogIdx := backend.lastIdx @@ -29,8 +39,12 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { _, id := backend.ensurePeering(t, "my-peering") partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() - mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) - subCh := mgr.subscribe(ctx, id, partition) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ + Datacenter: "dc1", + ConnectEnabled: true, + DisableMeshGatewayMode: disableMeshGateways, + }, connect.TestTrustDomain, backend) + subCh := mgr.subscribe(ctx, id, "my-peering", partition) var ( gatewayCorrID = subMeshGateway + partition @@ -38,12 +52,18 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() + + mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String() ) - // Expect just the empty mesh gateway event to replicate. - expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, gatewayCorrID, 0) - }) + if disableMeshGateways { + expectEvents(t, subCh) + } else { + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) + }) + } testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) { backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{ @@ -64,14 +84,25 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { }, }) - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID, 0) - }, - ) + if disableMeshGateways { + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID_temp, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 0) + }, + ) + } else { + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) + } }) mysql1 := &structs.CheckServiceNode{ @@ -93,12 +124,18 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 1) - node := res.Nodes[0] - require.NotNil(t, node.Node) - require.Equal(t, "foo", node.Node.Node) - require.NotNil(t, node.Service) - require.Equal(t, "mysql-1", node.Service.ID) - require.Len(t, node.Checks, 0) + + if disableMeshGateways { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), + }, res.Nodes[0]) + } else { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + }, res.Nodes[0]) + } }) backend.ensureCheck(t, mysql1.Checks[0]) @@ -110,13 +147,24 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 1) - node := res.Nodes[0] - require.NotNil(t, node.Node) - require.Equal(t, "foo", node.Node.Node) - require.NotNil(t, node.Service) - require.Equal(t, "mysql-1", node.Service.ID) - require.Len(t, node.Checks, 1) - require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) + + if disableMeshGateways { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), + }, + }, res.Nodes[0]) + } else { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[0]) + } }) }) @@ -139,21 +187,31 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 2) - { - node := res.Nodes[0] - require.NotNil(t, node.Node) - require.Equal(t, "bar", node.Node.Node) - require.NotNil(t, node.Service) - require.Equal(t, "mysql-2", node.Service.ID) - require.Len(t, node.Checks, 0) - } - { - node := res.Nodes[1] - require.NotNil(t, node.Node) - require.Equal(t, "foo", node.Node.Node) - require.NotNil(t, node.Service) - require.Len(t, node.Checks, 1) - require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) + + if disableMeshGateways { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), + }, + }, res.Nodes[1]) + } else { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[1]) } }) @@ -166,22 +224,36 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 2) - { - node := res.Nodes[0] - require.NotNil(t, node.Node) - require.Equal(t, "bar", node.Node.Node) - require.NotNil(t, node.Service) - require.Equal(t, "mysql-2", node.Service.ID) - require.Len(t, node.Checks, 1) - require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) - } - { - node := res.Nodes[1] - require.NotNil(t, node.Node) - require.Equal(t, "foo", node.Node.Node) - require.NotNil(t, node.Service) - require.Len(t, node.Checks, 1) - require.Equal(t, "mysql-1:overall-check", node.Checks[0].CheckID) + if disableMeshGateways { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil), + }, + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService_temp("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil), + }, + }, res.Nodes[1]) + } else { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("bar", "mysql-2", "mysql", "critical", nil), + }, + }, res.Nodes[0]) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("foo", "10.0.0.1", partition), + Service: pbService("", "mysql-1", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("foo", "mysql-1", "mysql", "critical", nil), + }, + }, res.Nodes[1]) } }) }) @@ -212,17 +284,90 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Equal(t, uint64(0), res.Index) require.Len(t, res.Nodes, 1) - - node := res.Nodes[0] - require.NotNil(t, node.Node) - require.Equal(t, "bar", node.Node.Node) - require.NotNil(t, node.Service) - require.Equal(t, "mysql-2", node.Service.ID) - require.Len(t, node.Checks, 1) - require.Equal(t, "mysql-2:overall-check", node.Checks[0].CheckID) + if disableMeshGateways { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService_temp("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil), + }, + }, res.Nodes[0]) + } else { + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("bar", "10.0.0.2", partition), + Service: pbService("", "mysql-2", "mysql", 5000, nil), + Checks: []*pbservice.HealthCheck{ + pbCheck("bar", "mysql-2", "mysql", "critical", nil), + }, + }, res.Nodes[0]) + } }) }) + testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) { + if disableMeshGateways { + t.Skip() + return + } + gateway := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, + Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, + // TODO: checks + } + backend.ensureNode(t, gateway.Node) + backend.ensureService(t, "mgw", gateway.Service) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlProxyCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 1) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("mgw", "10.1.1.1", partition), + Service: &pbservice.NodeService{ + Kind: "connect-proxy", + ID: "mysql-sidecar-proxy-instance-0", + Service: "mysql-sidecar-proxy", + Port: 8443, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, + Proxy: &pbservice.ConnectProxyConfig{ + DestinationServiceID: "mysql-instance-0", + DestinationServiceName: "mysql", + }, + Connect: &pbservice.ServiceConnect{ + PeerMeta: &pbservice.PeeringServiceMeta{ + SNI: []string{ + "mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul", + }, + SpiffeID: []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql", + }, + Protocol: "tcp", + }, + }, + }, + }, res.Nodes[0]) + }, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, gatewayCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 1) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("mgw", "10.1.1.1", partition), + Service: pbService("mesh-gateway", "gateway-1", "gateway", 8443, nil), + }, res.Nodes[0]) + }, + ) + }) + testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { backend.deleteService(t, "bar", mysql2.Service.ID) @@ -234,9 +379,40 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { require.Len(t, res.Nodes, 0) }) }) + + testutil.RunStep(t, "deregister mesh gateway to send proxy removals", func(t *testing.T) { + if disableMeshGateways { + t.Skip() + return + } + backend.deleteService(t, "mgw", "gateway-1") + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlProxyCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, gatewayCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 0) + }, + ) + }) } func TestSubscriptionManager_InitialSnapshot(t *testing.T) { + testSubscriptionManager_InitialSnapshot(t, true) +} +func TestSubscriptionManager_InitialSnapshot_EnableMeshGateways(t *testing.T) { + testSubscriptionManager_InitialSnapshot(t, false) +} +func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways bool) { backend := newTestSubscriptionBackend(t) // initialCatalogIdx := backend.lastIdx @@ -247,8 +423,12 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) { _, id := backend.ensurePeering(t, "my-peering") partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() - mgr := newSubscriptionManager(ctx, testutil.Logger(t), backend) - subCh := mgr.subscribe(ctx, id, partition) + mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ + Datacenter: "dc1", + ConnectEnabled: true, + DisableMeshGatewayMode: disableMeshGateways, + }, connect.TestTrustDomain, backend) + subCh := mgr.subscribe(ctx, id, "my-peering", partition) // Register two services that are not yet exported mysql := &structs.CheckServiceNode{ @@ -275,12 +455,20 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) { mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String() mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String() chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String() + + mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String() + mongoProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mongo", nil).String() + chainProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("chain", nil).String() ) - // Expect just the empty mesh gateway event to replicate. - expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, gatewayCorrID, 0) - }) + if disableMeshGateways { + expectEvents(t, subCh) + } else { + // Expect just the empty mesh gateway event to replicate. + expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, gatewayCorrID, 0) + }) + } // At this point in time we'll have a mesh-gateway notification with no // content stored and handled. @@ -309,29 +497,56 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) { }, }) - expectEvents(t, subCh, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, chainProxyCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mongoProxyCorrID, 0) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) - }, - func(t *testing.T, got cache.UpdateEvent) { - checkEvent(t, got, mysqlProxyCorrID, 0) - }, - ) + if disableMeshGateways { + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainProxyCorrID_temp, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoProxyCorrID_temp, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID_temp, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) + }, + ) + } else { + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, chainProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mongoProxyCorrID, 0) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical)) + }, + func(t *testing.T, got cache.UpdateEvent) { + checkEvent(t, got, mysqlProxyCorrID, 0) + }, + ) + } }) testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) { + if disableMeshGateways { + t.Skip() + return + } gateway := &structs.CheckServiceNode{ Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"}, Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443}, @@ -535,3 +750,82 @@ func checkEvent( } } } + +func pbNode(node, addr, partition string) *pbservice.Node { + return &pbservice.Node{Node: node, Partition: partition, Address: addr} +} + +func pbService(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService { + if entMeta == nil { + entMeta = pbcommon.DefaultEnterpriseMeta + } + return &pbservice.NodeService{ + ID: id, + Kind: kind, + Service: name, + Port: port, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: entMeta, + } +} + +func pbService_temp(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService { + if entMeta == nil { + entMeta = pbcommon.DefaultEnterpriseMeta + } + return &pbservice.NodeService{ + ID: id, + Kind: kind, + Service: name, + Port: port, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: entMeta, + Connect: &pbservice.ServiceConnect{}, + Proxy: &pbservice.ConnectProxyConfig{ + MeshGateway: &pbservice.MeshGatewayConfig{}, + Expose: &pbservice.ExposeConfig{}, + TransparentProxy: &pbservice.TransparentProxyConfig{}, + }, + } +} + +func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck { + if entMeta == nil { + entMeta = pbcommon.DefaultEnterpriseMeta + } + return &pbservice.HealthCheck{ + Node: node, + CheckID: svcID + ":overall-check", + Name: "overall-check", + Status: status, + ServiceID: svcID, + ServiceName: svcName, + EnterpriseMeta: entMeta, + } +} + +func pbCheck_temp(node, svcID, svcName, checkID, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck { + if entMeta == nil { + entMeta = pbcommon.DefaultEnterpriseMeta + } + return &pbservice.HealthCheck{ + Node: node, + CheckID: checkID, + Status: status, + ServiceID: svcID, + ServiceName: svcName, + EnterpriseMeta: entMeta, + Definition: &pbservice.HealthCheckDefinition{ + DeregisterCriticalServiceAfter: durationpb.New(0), + Interval: durationpb.New(0), + TTL: durationpb.New(0), + Timeout: durationpb.New(0), + }, + } +} diff --git a/agent/rpc/peering/subscription_state.go b/agent/rpc/peering/subscription_state.go index 55f1fda00..093f3aa6b 100644 --- a/agent/rpc/peering/subscription_state.go +++ b/agent/rpc/peering/subscription_state.go @@ -17,6 +17,8 @@ import ( // subscriptionState is a collection of working state tied to a peerID subscription. type subscriptionState struct { + // peerName is immutable and is the LOCAL name for the peering + peerName string // partition is immutable partition string @@ -25,7 +27,7 @@ type subscriptionState struct { watchedServices map[structs.ServiceName]context.CancelFunc watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove - connectServices map[structs.ServiceName]struct{} + connectServices map[structs.ServiceName]string // value:protocol // eventVersions is a duplicate event suppression system keyed by the "id" // not the "correlationID" @@ -42,12 +44,13 @@ type subscriptionState struct { publicUpdateCh chan<- cache.UpdateEvent } -func newSubscriptionState(partition string) *subscriptionState { +func newSubscriptionState(peerName, partition string) *subscriptionState { return &subscriptionState{ + peerName: peerName, partition: partition, watchedServices: make(map[structs.ServiceName]context.CancelFunc), watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc), - connectServices: make(map[structs.ServiceName]struct{}), + connectServices: make(map[structs.ServiceName]string), eventVersions: make(map[string]string), } } diff --git a/agent/rpc/peering/subscription_state_test.go b/agent/rpc/peering/subscription_state_test.go index d71fea425..e94459ac6 100644 --- a/agent/rpc/peering/subscription_state_test.go +++ b/agent/rpc/peering/subscription_state_test.go @@ -21,7 +21,7 @@ func TestSubscriptionState_Events(t *testing.T) { partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() - state := newSubscriptionState(partition) + state := newSubscriptionState("my-peering", partition) testutil.RunStep(t, "empty", func(t *testing.T) { pending := &pendingPayload{} @@ -193,7 +193,7 @@ func testNewSubscriptionState(partition string) ( publicUpdateCh = make(chan cache.UpdateEvent, 1) ) - state := newSubscriptionState(partition) + state := newSubscriptionState("my-peering", partition) state.publicUpdateCh = publicUpdateCh return state, publicUpdateCh diff --git a/agent/structs/peering.go b/agent/structs/peering.go index afde38269..bd0351bb3 100644 --- a/agent/structs/peering.go +++ b/agent/structs/peering.go @@ -22,21 +22,24 @@ type ExportedServiceList struct { // DiscoChains is a list of exported service that ONLY apply to service mesh. DiscoChains []ServiceName + + // TODO(peering): reduce duplication here in the response + ConnectProtocol map[ServiceName]string } // ListAllDiscoveryChains returns all discovery chains (union of Services and // DiscoChains). -func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]struct{} { - chainsByName := make(map[ServiceName]struct{}) +func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]string { + chainsByName := make(map[ServiceName]string) if list == nil { return chainsByName } for _, svc := range list.Services { - chainsByName[svc] = struct{}{} + chainsByName[svc] = list.ConnectProtocol[svc] } for _, chainName := range list.DiscoChains { - chainsByName[chainName] = struct{}{} + chainsByName[chainName] = list.ConnectProtocol[chainName] } return chainsByName } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e1083ba7f..6b2ab7bf1 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1227,6 +1227,13 @@ type NodeService struct { RaftIndex `bexpr:"-"` } +// PeeringServiceMeta is read-only information provided from an exported peer. +type PeeringServiceMeta struct { + SNI []string `json:",omitempty"` + SpiffeID []string `json:",omitempty"` + Protocol string `json:",omitempty"` +} + func (ns *NodeService) BestAddress(wan bool) (string, int) { addr := ns.Address port := ns.Port @@ -1297,6 +1304,8 @@ type ServiceConnect struct { // result is identical to just making a second service registration via any // other means. SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` + + PeerMeta *PeeringServiceMeta `json:",omitempty" bexpr:"-"` } func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) { diff --git a/proto/pbservice/service.gen.go b/proto/pbservice/service.gen.go index 0f4d0603b..952133bf8 100644 --- a/proto/pbservice/service.gen.go +++ b/proto/pbservice/service.gen.go @@ -100,12 +100,33 @@ func MeshGatewayConfigFromStructs(t *structs.MeshGatewayConfig, s *MeshGatewayCo } s.Mode = string(t.Mode) } +func PeeringServiceMetaToStructs(s *PeeringServiceMeta, t *structs.PeeringServiceMeta) { + if s == nil { + return + } + t.SNI = s.SNI + t.SpiffeID = s.SpiffeID + t.Protocol = s.Protocol +} +func PeeringServiceMetaFromStructs(t *structs.PeeringServiceMeta, s *PeeringServiceMeta) { + if s == nil { + return + } + s.SNI = t.SNI + s.SpiffeID = t.SpiffeID + s.Protocol = t.Protocol +} func ServiceConnectToStructs(s *ServiceConnect, t *structs.ServiceConnect) { if s == nil { return } t.Native = s.Native t.SidecarService = ServiceDefinitionPtrToStructs(s.SidecarService) + if s.PeerMeta != nil { + var x structs.PeeringServiceMeta + PeeringServiceMetaToStructs(s.PeerMeta, &x) + t.PeerMeta = &x + } } func ServiceConnectFromStructs(t *structs.ServiceConnect, s *ServiceConnect) { if s == nil { @@ -113,6 +134,11 @@ func ServiceConnectFromStructs(t *structs.ServiceConnect, s *ServiceConnect) { } s.Native = t.Native s.SidecarService = NewServiceDefinitionPtrFromStructs(t.SidecarService) + if t.PeerMeta != nil { + var x PeeringServiceMeta + PeeringServiceMetaFromStructs(t.PeerMeta, &x) + s.PeerMeta = &x + } } func ServiceDefinitionToStructs(s *ServiceDefinition, t *structs.ServiceDefinition) { if s == nil { diff --git a/proto/pbservice/service.pb.binary.go b/proto/pbservice/service.pb.binary.go index aa449f239..9819f4dcd 100644 --- a/proto/pbservice/service.pb.binary.go +++ b/proto/pbservice/service.pb.binary.go @@ -37,6 +37,16 @@ func (msg *ServiceConnect) UnmarshalBinary(b []byte) error { return proto.Unmarshal(b, msg) } +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *PeeringServiceMeta) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *PeeringServiceMeta) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + // MarshalBinary implements encoding.BinaryMarshaler func (msg *ExposeConfig) MarshalBinary() ([]byte, error) { return proto.Marshal(msg) diff --git a/proto/pbservice/service.pb.go b/proto/pbservice/service.pb.go index 80d4b474e..89ff546dc 100644 --- a/proto/pbservice/service.pb.go +++ b/proto/pbservice/service.pb.go @@ -389,7 +389,8 @@ type ServiceConnect struct { // result is identical to just making a second service registration via any // other means. // mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs - SidecarService *ServiceDefinition `protobuf:"bytes,3,opt,name=SidecarService,proto3" json:"SidecarService,omitempty"` + SidecarService *ServiceDefinition `protobuf:"bytes,3,opt,name=SidecarService,proto3" json:"SidecarService,omitempty"` + PeerMeta *PeeringServiceMeta `protobuf:"bytes,4,opt,name=PeerMeta,proto3" json:"PeerMeta,omitempty"` } func (x *ServiceConnect) Reset() { @@ -438,6 +439,83 @@ func (x *ServiceConnect) GetSidecarService() *ServiceDefinition { return nil } +func (x *ServiceConnect) GetPeerMeta() *PeeringServiceMeta { + if x != nil { + return x.PeerMeta + } + return nil +} + +// PeeringServiceMeta is read-only information provided from an exported peer. +// +// mog annotation: +// +// target=github.com/hashicorp/consul/agent/structs.PeeringServiceMeta +// output=service.gen.go +// name=Structs +type PeeringServiceMeta struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SNI []string `protobuf:"bytes,1,rep,name=SNI,proto3" json:"SNI,omitempty"` + SpiffeID []string `protobuf:"bytes,2,rep,name=SpiffeID,proto3" json:"SpiffeID,omitempty"` + Protocol string `protobuf:"bytes,3,opt,name=Protocol,proto3" json:"Protocol,omitempty"` +} + +func (x *PeeringServiceMeta) Reset() { + *x = PeeringServiceMeta{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_pbservice_service_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PeeringServiceMeta) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeeringServiceMeta) ProtoMessage() {} + +func (x *PeeringServiceMeta) ProtoReflect() protoreflect.Message { + mi := &file_proto_pbservice_service_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeeringServiceMeta.ProtoReflect.Descriptor instead. +func (*PeeringServiceMeta) Descriptor() ([]byte, []int) { + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{3} +} + +func (x *PeeringServiceMeta) GetSNI() []string { + if x != nil { + return x.SNI + } + return nil +} + +func (x *PeeringServiceMeta) GetSpiffeID() []string { + if x != nil { + return x.SpiffeID + } + return nil +} + +func (x *PeeringServiceMeta) GetProtocol() string { + if x != nil { + return x.Protocol + } + return "" +} + // ExposeConfig describes HTTP paths to expose through Envoy outside of Connect. // Users can expose individual paths and/or all HTTP/GRPC paths for checks. // @@ -462,7 +540,7 @@ type ExposeConfig struct { func (x *ExposeConfig) Reset() { *x = ExposeConfig{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[3] + mi := &file_proto_pbservice_service_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -475,7 +553,7 @@ func (x *ExposeConfig) String() string { func (*ExposeConfig) ProtoMessage() {} func (x *ExposeConfig) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[3] + mi := &file_proto_pbservice_service_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -488,7 +566,7 @@ func (x *ExposeConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ExposeConfig.ProtoReflect.Descriptor instead. func (*ExposeConfig) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{3} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{4} } func (x *ExposeConfig) GetChecks() bool { @@ -533,7 +611,7 @@ type ExposePath struct { func (x *ExposePath) Reset() { *x = ExposePath{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[4] + mi := &file_proto_pbservice_service_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -546,7 +624,7 @@ func (x *ExposePath) String() string { func (*ExposePath) ProtoMessage() {} func (x *ExposePath) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[4] + mi := &file_proto_pbservice_service_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -559,7 +637,7 @@ func (x *ExposePath) ProtoReflect() protoreflect.Message { // Deprecated: Use ExposePath.ProtoReflect.Descriptor instead. func (*ExposePath) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{4} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{5} } func (x *ExposePath) GetListenerPort() int32 { @@ -614,7 +692,7 @@ type MeshGatewayConfig struct { func (x *MeshGatewayConfig) Reset() { *x = MeshGatewayConfig{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[5] + mi := &file_proto_pbservice_service_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -627,7 +705,7 @@ func (x *MeshGatewayConfig) String() string { func (*MeshGatewayConfig) ProtoMessage() {} func (x *MeshGatewayConfig) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[5] + mi := &file_proto_pbservice_service_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -640,7 +718,7 @@ func (x *MeshGatewayConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use MeshGatewayConfig.ProtoReflect.Descriptor instead. func (*MeshGatewayConfig) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{5} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{6} } func (x *MeshGatewayConfig) GetMode() string { @@ -671,7 +749,7 @@ type TransparentProxyConfig struct { func (x *TransparentProxyConfig) Reset() { *x = TransparentProxyConfig{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[6] + mi := &file_proto_pbservice_service_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -684,7 +762,7 @@ func (x *TransparentProxyConfig) String() string { func (*TransparentProxyConfig) ProtoMessage() {} func (x *TransparentProxyConfig) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[6] + mi := &file_proto_pbservice_service_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -697,7 +775,7 @@ func (x *TransparentProxyConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use TransparentProxyConfig.ProtoReflect.Descriptor instead. func (*TransparentProxyConfig) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{6} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{7} } func (x *TransparentProxyConfig) GetOutboundListenerPort() int32 { @@ -768,7 +846,7 @@ type ServiceDefinition struct { func (x *ServiceDefinition) Reset() { *x = ServiceDefinition{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[7] + mi := &file_proto_pbservice_service_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -781,7 +859,7 @@ func (x *ServiceDefinition) String() string { func (*ServiceDefinition) ProtoMessage() {} func (x *ServiceDefinition) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[7] + mi := &file_proto_pbservice_service_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -794,7 +872,7 @@ func (x *ServiceDefinition) ProtoReflect() protoreflect.Message { // Deprecated: Use ServiceDefinition.ProtoReflect.Descriptor instead. func (*ServiceDefinition) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{7} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{8} } func (x *ServiceDefinition) GetKind() string { @@ -930,7 +1008,7 @@ type ServiceAddress struct { func (x *ServiceAddress) Reset() { *x = ServiceAddress{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[8] + mi := &file_proto_pbservice_service_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -943,7 +1021,7 @@ func (x *ServiceAddress) String() string { func (*ServiceAddress) ProtoMessage() {} func (x *ServiceAddress) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[8] + mi := &file_proto_pbservice_service_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -956,7 +1034,7 @@ func (x *ServiceAddress) ProtoReflect() protoreflect.Message { // Deprecated: Use ServiceAddress.ProtoReflect.Descriptor instead. func (*ServiceAddress) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{8} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{9} } func (x *ServiceAddress) GetAddress() string { @@ -988,7 +1066,7 @@ type Weights struct { func (x *Weights) Reset() { *x = Weights{} if protoimpl.UnsafeEnabled { - mi := &file_proto_pbservice_service_proto_msgTypes[9] + mi := &file_proto_pbservice_service_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1001,7 +1079,7 @@ func (x *Weights) String() string { func (*Weights) ProtoMessage() {} func (x *Weights) ProtoReflect() protoreflect.Message { - mi := &file_proto_pbservice_service_proto_msgTypes[9] + mi := &file_proto_pbservice_service_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1014,7 +1092,7 @@ func (x *Weights) ProtoReflect() protoreflect.Message { // Deprecated: Use Weights.ProtoReflect.Descriptor instead. func (*Weights) Descriptor() ([]byte, []int) { - return file_proto_pbservice_service_proto_rawDescGZIP(), []int{9} + return file_proto_pbservice_service_proto_rawDescGZIP(), []int{10} } func (x *Weights) GetPassing() int32 { @@ -1117,14 +1195,24 @@ var file_proto_pbservice_service_proto_rawDesc = []byte{ 0x6b, 0x65, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x30, 0x0a, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x42, 0x69, 0x6e, 0x64, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x42, 0x69, 0x6e, 0x64, 0x53, - 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x22, 0x6c, 0x0a, 0x0e, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x4e, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x4e, 0x61, 0x74, - 0x69, 0x76, 0x65, 0x12, 0x42, 0x0a, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65, 0x66, - 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0x51, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x6f, 0x73, + 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x4d, 0x6f, 0x64, 0x65, 0x22, 0xab, 0x01, 0x0a, 0x0e, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x0a, 0x06, + 0x4e, 0x61, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x4e, 0x61, + 0x74, 0x69, 0x76, 0x65, 0x12, 0x42, 0x0a, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65, + 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0e, 0x53, 0x69, 0x64, 0x65, 0x63, 0x61, + 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, + 0x4d, 0x65, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4d, 0x65, 0x74, + 0x61, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0x5e, 0x0a, 0x12, 0x50, 0x65, 0x65, 0x72, 0x69, + 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x10, 0x0a, + 0x03, 0x53, 0x4e, 0x49, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x03, 0x53, 0x4e, 0x49, 0x12, + 0x1a, 0x0a, 0x08, 0x53, 0x70, 0x69, 0x66, 0x66, 0x65, 0x49, 0x44, 0x18, 0x02, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x08, 0x53, 0x70, 0x69, 0x66, 0x66, 0x65, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x51, 0x0a, 0x0c, 0x45, 0x78, 0x70, 0x6f, 0x73, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x12, 0x29, 0x0a, 0x05, 0x50, 0x61, 0x74, 0x68, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, @@ -1235,48 +1323,50 @@ func file_proto_pbservice_service_proto_rawDescGZIP() []byte { return file_proto_pbservice_service_proto_rawDescData } -var file_proto_pbservice_service_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_proto_pbservice_service_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_proto_pbservice_service_proto_goTypes = []interface{}{ (*ConnectProxyConfig)(nil), // 0: service.ConnectProxyConfig (*Upstream)(nil), // 1: service.Upstream (*ServiceConnect)(nil), // 2: service.ServiceConnect - (*ExposeConfig)(nil), // 3: service.ExposeConfig - (*ExposePath)(nil), // 4: service.ExposePath - (*MeshGatewayConfig)(nil), // 5: service.MeshGatewayConfig - (*TransparentProxyConfig)(nil), // 6: service.TransparentProxyConfig - (*ServiceDefinition)(nil), // 7: service.ServiceDefinition - (*ServiceAddress)(nil), // 8: service.ServiceAddress - (*Weights)(nil), // 9: service.Weights - nil, // 10: service.ServiceDefinition.TaggedAddressesEntry - nil, // 11: service.ServiceDefinition.MetaEntry - (*structpb.Struct)(nil), // 12: google.protobuf.Struct - (*CheckType)(nil), // 13: service.CheckType - (*pbcommon.EnterpriseMeta)(nil), // 14: common.EnterpriseMeta + (*PeeringServiceMeta)(nil), // 3: service.PeeringServiceMeta + (*ExposeConfig)(nil), // 4: service.ExposeConfig + (*ExposePath)(nil), // 5: service.ExposePath + (*MeshGatewayConfig)(nil), // 6: service.MeshGatewayConfig + (*TransparentProxyConfig)(nil), // 7: service.TransparentProxyConfig + (*ServiceDefinition)(nil), // 8: service.ServiceDefinition + (*ServiceAddress)(nil), // 9: service.ServiceAddress + (*Weights)(nil), // 10: service.Weights + nil, // 11: service.ServiceDefinition.TaggedAddressesEntry + nil, // 12: service.ServiceDefinition.MetaEntry + (*structpb.Struct)(nil), // 13: google.protobuf.Struct + (*CheckType)(nil), // 14: service.CheckType + (*pbcommon.EnterpriseMeta)(nil), // 15: common.EnterpriseMeta } var file_proto_pbservice_service_proto_depIdxs = []int32{ - 12, // 0: service.ConnectProxyConfig.Config:type_name -> google.protobuf.Struct + 13, // 0: service.ConnectProxyConfig.Config:type_name -> google.protobuf.Struct 1, // 1: service.ConnectProxyConfig.Upstreams:type_name -> service.Upstream - 5, // 2: service.ConnectProxyConfig.MeshGateway:type_name -> service.MeshGatewayConfig - 3, // 3: service.ConnectProxyConfig.Expose:type_name -> service.ExposeConfig - 6, // 4: service.ConnectProxyConfig.TransparentProxy:type_name -> service.TransparentProxyConfig - 12, // 5: service.Upstream.Config:type_name -> google.protobuf.Struct - 5, // 6: service.Upstream.MeshGateway:type_name -> service.MeshGatewayConfig - 7, // 7: service.ServiceConnect.SidecarService:type_name -> service.ServiceDefinition - 4, // 8: service.ExposeConfig.Paths:type_name -> service.ExposePath - 10, // 9: service.ServiceDefinition.TaggedAddresses:type_name -> service.ServiceDefinition.TaggedAddressesEntry - 11, // 10: service.ServiceDefinition.Meta:type_name -> service.ServiceDefinition.MetaEntry - 13, // 11: service.ServiceDefinition.Check:type_name -> service.CheckType - 13, // 12: service.ServiceDefinition.Checks:type_name -> service.CheckType - 9, // 13: service.ServiceDefinition.Weights:type_name -> service.Weights - 0, // 14: service.ServiceDefinition.Proxy:type_name -> service.ConnectProxyConfig - 14, // 15: service.ServiceDefinition.EnterpriseMeta:type_name -> common.EnterpriseMeta - 2, // 16: service.ServiceDefinition.Connect:type_name -> service.ServiceConnect - 8, // 17: service.ServiceDefinition.TaggedAddressesEntry.value:type_name -> service.ServiceAddress - 18, // [18:18] is the sub-list for method output_type - 18, // [18:18] is the sub-list for method input_type - 18, // [18:18] is the sub-list for extension type_name - 18, // [18:18] is the sub-list for extension extendee - 0, // [0:18] is the sub-list for field type_name + 6, // 2: service.ConnectProxyConfig.MeshGateway:type_name -> service.MeshGatewayConfig + 4, // 3: service.ConnectProxyConfig.Expose:type_name -> service.ExposeConfig + 7, // 4: service.ConnectProxyConfig.TransparentProxy:type_name -> service.TransparentProxyConfig + 13, // 5: service.Upstream.Config:type_name -> google.protobuf.Struct + 6, // 6: service.Upstream.MeshGateway:type_name -> service.MeshGatewayConfig + 8, // 7: service.ServiceConnect.SidecarService:type_name -> service.ServiceDefinition + 3, // 8: service.ServiceConnect.PeerMeta:type_name -> service.PeeringServiceMeta + 5, // 9: service.ExposeConfig.Paths:type_name -> service.ExposePath + 11, // 10: service.ServiceDefinition.TaggedAddresses:type_name -> service.ServiceDefinition.TaggedAddressesEntry + 12, // 11: service.ServiceDefinition.Meta:type_name -> service.ServiceDefinition.MetaEntry + 14, // 12: service.ServiceDefinition.Check:type_name -> service.CheckType + 14, // 13: service.ServiceDefinition.Checks:type_name -> service.CheckType + 10, // 14: service.ServiceDefinition.Weights:type_name -> service.Weights + 0, // 15: service.ServiceDefinition.Proxy:type_name -> service.ConnectProxyConfig + 15, // 16: service.ServiceDefinition.EnterpriseMeta:type_name -> common.EnterpriseMeta + 2, // 17: service.ServiceDefinition.Connect:type_name -> service.ServiceConnect + 9, // 18: service.ServiceDefinition.TaggedAddressesEntry.value:type_name -> service.ServiceAddress + 19, // [19:19] is the sub-list for method output_type + 19, // [19:19] is the sub-list for method input_type + 19, // [19:19] is the sub-list for extension type_name + 19, // [19:19] is the sub-list for extension extendee + 0, // [0:19] is the sub-list for field type_name } func init() { file_proto_pbservice_service_proto_init() } @@ -1323,7 +1413,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ExposeConfig); i { + switch v := v.(*PeeringServiceMeta); i { case 0: return &v.state case 1: @@ -1335,7 +1425,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ExposePath); i { + switch v := v.(*ExposeConfig); i { case 0: return &v.state case 1: @@ -1347,7 +1437,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MeshGatewayConfig); i { + switch v := v.(*ExposePath); i { case 0: return &v.state case 1: @@ -1359,7 +1449,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TransparentProxyConfig); i { + switch v := v.(*MeshGatewayConfig); i { case 0: return &v.state case 1: @@ -1371,7 +1461,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ServiceDefinition); i { + switch v := v.(*TransparentProxyConfig); i { case 0: return &v.state case 1: @@ -1383,7 +1473,7 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ServiceAddress); i { + switch v := v.(*ServiceDefinition); i { case 0: return &v.state case 1: @@ -1395,6 +1485,18 @@ func file_proto_pbservice_service_proto_init() { } } file_proto_pbservice_service_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServiceAddress); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_pbservice_service_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Weights); i { case 0: return &v.state @@ -1413,7 +1515,7 @@ func file_proto_pbservice_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_pbservice_service_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/pbservice/service.proto b/proto/pbservice/service.proto index 011460b35..9c7956a0d 100644 --- a/proto/pbservice/service.proto +++ b/proto/pbservice/service.proto @@ -139,6 +139,8 @@ message ServiceConnect { // Native is true when this service can natively understand Connect. bool Native = 1; + reserved 2; + // SidecarService is a nested Service Definition to register at the same time. // It's purely a convenience mechanism to allow specifying a sidecar service // along with the application service definition. It's nested nature allows @@ -148,6 +150,21 @@ message ServiceConnect { // other means. // mog: func-to=ServiceDefinitionPtrToStructs func-from=NewServiceDefinitionPtrFromStructs ServiceDefinition SidecarService = 3; + + PeeringServiceMeta PeerMeta = 4; +} + +// PeeringServiceMeta is read-only information provided from an exported peer. +// +// mog annotation: +// +// target=github.com/hashicorp/consul/agent/structs.PeeringServiceMeta +// output=service.gen.go +// name=Structs +message PeeringServiceMeta { + repeated string SNI = 1; + repeated string SpiffeID = 2; + string Protocol = 3; } // ExposeConfig describes HTTP paths to expose through Envoy outside of Connect. diff --git a/proto/prototest/testing.go b/proto/prototest/testing.go index 1dbe03618..b17f359b3 100644 --- a/proto/prototest/testing.go +++ b/proto/prototest/testing.go @@ -3,14 +3,14 @@ package prototest import ( "testing" - "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" ) func AssertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() - opts = append(opts, cmp.Comparer(proto.Equal)) + opts = append(opts, protocmp.Transform()) if diff := cmp.Diff(x, y, opts...); diff != "" { t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)