diff --git a/.changelog/16339.txt b/.changelog/16339.txt new file mode 100644 index 000000000..cf44f010a --- /dev/null +++ b/.changelog/16339.txt @@ -0,0 +1,3 @@ +```release-note:bug +peering: Fix bug where services were incorrectly imported as connect-enabled. +``` diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 4d6450136..39a40d9ca 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -900,12 +900,17 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool return fmt.Errorf("failed updating gateway mapping: %s", err) } + if svc.PeerName == "" && sn.Name != "" { + if err := upsertKindServiceName(tx, idx, structs.ServiceKindConnectEnabled, sn); err != nil { + return fmt.Errorf("failed to persist service name as connect-enabled: %v", err) + } + } + + // Update the virtual IP for the service supported, err := virtualIPsSupported(tx, nil) if err != nil { return err } - - // Update the virtual IP for the service if supported { psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn} vip, err := assignServiceVirtualIP(tx, idx, psn) @@ -1964,6 +1969,24 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) } + // Cleanup ConnectEnabled for this service if none exist. + if svc.PeerName == "" && (svc.ServiceKind == structs.ServiceKindConnectProxy || svc.ServiceConnect.Native) { + service := svc.ServiceName + if svc.ServiceKind == structs.ServiceKindConnectProxy { + service = svc.ServiceProxy.DestinationServiceName + } + sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta} + connectEnabled, err := serviceHasConnectEnabledInstances(tx, sn.Name, &sn.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to search for connect instances for service %q: %w", sn.Name, err) + } + if !connectEnabled { + if err := cleanupKindServiceName(tx, idx, sn, structs.ServiceKindConnectEnabled); err != nil { + return fmt.Errorf("failed to cleanup connect-enabled service name: %v", err) + } + } + } + if svc.PeerName == "" { sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta} if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil { @@ -3731,6 +3754,27 @@ func serviceHasConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.En return hasConnectInstance, hasNonConnectInstance, nil } +// serviceHasConnectEnabledInstances returns whether the given service name +// has a corresponding connect-proxy or connect-native instance. +// This function is mostly a clone of `serviceHasConnectInstances`, but it has +// an early return to improve performance and returns true if at least one +// connect-native instance exists. +func serviceHasConnectEnabledInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, error) { + query := Query{ + Value: serviceName, + EnterpriseMeta: *entMeta, + } + + svc, err := tx.First(tableServices, indexConnect, query) + if err != nil { + return false, fmt.Errorf("failed service lookup: %w", err) + } + if svc != nil { + return true, nil + } + return false, nil +} + // updateGatewayService associates services with gateways after an eligible event // ie. Registering a service in a namespace targeted by a gateway func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index d354b9b09..cef5ba0a0 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -8664,7 +8664,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) { }, } - var idx uint64 + var idx, connectEnabledIdx uint64 testRegisterNode(t, s, idx, "node1") for _, svc := range services { @@ -8678,8 +8678,29 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) { require.Len(t, gotNames, 1) require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service) require.Equal(t, svc.Kind, gotNames[0].Kind) + if svc.Kind == structs.ServiceKindConnectProxy { + connectEnabledIdx = idx + } } + // A ConnectEnabled service should exist if a corresponding ConnectProxy or ConnectNative service exists. + verifyConnectEnabled := func(expectIdx uint64) { + gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled) + require.NoError(t, err) + require.Equal(t, expectIdx, gotIdx) + require.Equal(t, []*KindServiceName{ + { + Kind: structs.ServiceKindConnectEnabled, + Service: structs.NewServiceName("foo", entMeta), + RaftIndex: structs.RaftIndex{ + CreateIndex: connectEnabledIdx, + ModifyIndex: connectEnabledIdx, + }, + }, + }, gotNames) + } + verifyConnectEnabled(connectEnabledIdx) + // Register another ingress gateway and there should be two names under the kind index newIngress := structs.NodeService{ Kind: structs.ServiceKindIngressGateway, @@ -8749,6 +8770,38 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) { require.NoError(t, err) require.Equal(t, idx, gotIdx) require.Empty(t, got) + + // A ConnectEnabled entry should not be removed until all corresponding services are removed. + { + verifyConnectEnabled(connectEnabledIdx) + // Add a connect-native service. + idx++ + require.NoError(t, s.EnsureService(idx, "node1", &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "foo", + Service: "foo", + Address: "5.5.5.5", + Port: 5555, + EnterpriseMeta: *entMeta, + Connect: structs.ServiceConnect{ + Native: true, + }, + })) + verifyConnectEnabled(connectEnabledIdx) + + // Delete the proxy. This should not clean up the entry, because we still have a + // connect-native service registered. + idx++ + require.NoError(t, s.DeleteService(idx, "node1", "connect-proxy", entMeta, "")) + verifyConnectEnabled(connectEnabledIdx) + + // Remove the connect-native service to clear out the connect-enabled entry. + require.NoError(t, s.DeleteService(idx, "node1", "foo", entMeta, "")) + gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled) + require.NoError(t, err) + require.Equal(t, idx, gotIdx) + require.Empty(t, gotNames) + } } func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) { diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 708b28848..491d4887a 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -770,88 +770,181 @@ func exportedServicesForPeerTxn( maxIdx := peering.ModifyIndex entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) - idx, conf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta) + idx, exportConf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed to fetch exported-services config entry: %w", err) } if idx > maxIdx { maxIdx = idx } - if conf == nil { + if exportConf == nil { return maxIdx, &structs.ExportedServiceList{}, nil } var ( - normalSet = make(map[structs.ServiceName]struct{}) - discoSet = make(map[structs.ServiceName]struct{}) + // exportedServices will contain the listing of all service names that are being exported + // and will need to be queried for connect / discovery chain information. + exportedServices = make(map[structs.ServiceName]struct{}) + + // exportedConnectServices will contain the listing of all connect service names that are being exported. + exportedConnectServices = make(map[structs.ServiceName]struct{}) + + // namespaceConnectServices provides a listing of all connect service names for a particular partition+namespace pair. + namespaceConnectServices = make(map[acl.EnterpriseMeta]map[string]struct{}) + + // namespaceDiscoChains provides a listing of all disco chain names for a particular partition+namespace pair. + namespaceDiscoChains = make(map[acl.EnterpriseMeta]map[string]struct{}) ) - // At least one of the following should be true for a name for it to - // replicate: - // - // - are a discovery chain by definition (service-router, service-splitter, service-resolver) - // - have an explicit sidecar kind=connect-proxy - // - use connect native mode + // Helper function for inserting data and auto-creating maps. + insertEntry := func(m map[acl.EnterpriseMeta]map[string]struct{}, entMeta acl.EnterpriseMeta, name string) { + names, ok := m[entMeta] + if !ok { + names = make(map[string]struct{}) + m[entMeta] = names + } + names[name] = struct{}{} + } - for _, svc := range conf.Services { + // Build the set of all services that will be exported. + // Any possible namespace wildcards or "consul" services should be removed by this step. + for _, svc := range exportConf.Services { // Prevent exporting the "consul" service. if svc.Name == structs.ConsulServiceName { continue } - svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) + svcEntMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace) + svcName := structs.NewServiceName(svc.Name, &svcEntMeta) - sawPeer := false + peerFound := false for _, consumer := range svc.Consumers { - name := structs.NewServiceName(svc.Name, &svcMeta) - - if _, ok := normalSet[name]; ok { - // Service was covered by a wildcard that was already accounted for - continue + if consumer.Peer == peering.Name { + peerFound = true + break } - if consumer.Peer != peering.Name { - continue - } - sawPeer = true + } + // Only look for more information if the matching peer was found. + if !peerFound { + continue + } - if svc.Name != structs.WildcardSpecifier { - normalSet[name] = struct{}{} + // If this isn't a wildcard, we can simply add it to the list of services to watch and move to the next entry. + if svc.Name != structs.WildcardSpecifier { + exportedServices[svcName] = struct{}{} + continue + } + + // If all services in the namespace are exported by the wildcard, query those service names. + idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcEntMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get typical service names: %w", err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, sn := range typicalServices { + // Prevent exporting the "consul" service. + if sn.Service.Name != structs.ConsulServiceName { + exportedServices[sn.Service] = struct{}{} } } - // If the target peer is a consumer, and all services in the namespace are exported, query those service names. - if sawPeer && svc.Name == structs.WildcardSpecifier { - idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get typical service names: %w", err) - } - if idx > maxIdx { - maxIdx = idx - } - for _, s := range typicalServices { - // Prevent exporting the "consul" service. - if s.Service.Name == structs.ConsulServiceName { - continue - } - normalSet[s.Service] = struct{}{} - } - - // list all config entries of kind service-resolver, service-router, service-splitter? - idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err) - } - if idx > maxIdx { - maxIdx = idx - } - for _, sn := range discoChains { - discoSet[sn] = struct{}{} + // List all config entries of kind service-resolver, service-router, service-splitter, because they + // will be exported as connect services. + idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcEntMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, sn := range discoChains { + // Prevent exporting the "consul" service. + if sn.Name != structs.ConsulServiceName { + exportedConnectServices[sn] = struct{}{} + insertEntry(namespaceDiscoChains, svcEntMeta, sn.Name) } } } - normal := maps.SliceOfKeys(normalSet) - disco := maps.SliceOfKeys(discoSet) + // At least one of the following should be true for a name to replicate it as a *connect* service: + // - are a discovery chain by definition (service-router, service-splitter, service-resolver) + // - have an explicit sidecar kind=connect-proxy + // - use connect native mode + // - are registered with a terminating gateway + populateConnectService := func(sn structs.ServiceName) error { + // Load all disco-chains in this namespace if we haven't already. + if _, ok := namespaceDiscoChains[sn.EnterpriseMeta]; !ok { + // Check to see if we have a discovery chain with the same name. + idx, chains, err := listDiscoveryChainNamesTxn(tx, ws, nil, sn.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to get connect services: %w", err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, sn := range chains { + insertEntry(namespaceDiscoChains, sn.EnterpriseMeta, sn.Name) + } + } + // Check to see if we have the connect service. + if _, ok := namespaceDiscoChains[sn.EnterpriseMeta][sn.Name]; ok { + exportedConnectServices[sn] = struct{}{} + // Do not early return because we have multiple watches that should be established. + } + // Load all services in this namespace if we haven't already. + if _, ok := namespaceConnectServices[sn.EnterpriseMeta]; !ok { + // This is more efficient than querying the service instance table. + idx, connectServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectEnabled, sn.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to get connect services: %w", err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, ksn := range connectServices { + insertEntry(namespaceConnectServices, sn.EnterpriseMeta, ksn.Service.Name) + } + } + // Check to see if we have the connect service. + if _, ok := namespaceConnectServices[sn.EnterpriseMeta][sn.Name]; ok { + exportedConnectServices[sn] = struct{}{} + // Do not early return because we have multiple watches that should be established. + } + + // Check if the service is exposed via terminating gateways. + svcGateways, err := tx.Get(tableGatewayServices, indexService, sn) + if err != nil { + return fmt.Errorf("failed gateway lookup for %q: %w", sn.Name, err) + } + ws.Add(svcGateways.WatchCh()) + for svc := svcGateways.Next(); svc != nil; svc = svcGateways.Next() { + gs, ok := svc.(*structs.GatewayService) + if !ok { + return fmt.Errorf("failed converting to GatewayService for %q", sn.Name) + } + if gs.GatewayKind == structs.ServiceKindTerminatingGateway { + exportedConnectServices[sn] = struct{}{} + break + } + } + + return nil + } + + // Perform queries and check if each service is connect-enabled. + for sn := range exportedServices { + // Do not query for data if we already know it's a connect service. + if _, ok := exportedConnectServices[sn]; ok { + continue + } + if err := populateConnectService(sn); err != nil { + return 0, nil, err + } + } + + // Fetch the protocol / targets for connect services. chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo) populateChainInfo := func(svc structs.ServiceName) error { if _, ok := chainInfo[svc]; ok { @@ -899,21 +992,17 @@ func exportedServicesForPeerTxn( return nil } - for _, svc := range normal { - if err := populateChainInfo(svc); err != nil { - return 0, nil, err - } - } - for _, svc := range disco { + for svc := range exportedConnectServices { if err := populateChainInfo(svc); err != nil { return 0, nil, err } } - structs.ServiceList(normal).Sort() + sortedServices := maps.SliceOfKeys(exportedServices) + structs.ServiceList(sortedServices).Sort() list := &structs.ExportedServiceList{ - Services: normal, + Services: sortedServices, DiscoChains: chainInfo, } diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index dc8dd8974..f7232ca1a 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1908,18 +1908,28 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { }, }, { + // Should be exported as both a normal and disco chain (resolver). Name: "mysql", Consumers: []structs.ServiceConsumer{ {Peer: "my-peering"}, }, }, { + // Should be exported as both a normal and disco chain (connect-proxy). Name: "redis", Consumers: []structs.ServiceConsumer{ {Peer: "my-peering"}, }, }, { + // Should only be exported as a normal service. + Name: "prometheus", + Consumers: []structs.ServiceConsumer{ + {Peer: "my-peering"}, + }, + }, + { + // Should not be exported (different peer consumer) Name: "mongo", Consumers: []structs.ServiceConsumer{ {Peer: "my-other-peering"}, @@ -1932,12 +1942,37 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() + // Register extra things so that disco chain entries appear. + lastIdx++ + require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{ + Node: "node1", Address: "10.0.0.1", + })) + lastIdx++ + require.NoError(t, s.EnsureService(lastIdx, "node1", &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "redis-sidecar-proxy", + Service: "redis-sidecar-proxy", + Port: 5005, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "redis", + }, + })) + ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "mysql", + EnterpriseMeta: *defaultEntMeta, + }) + expect := &structs.ExportedServiceList{ Services: []structs.ServiceName{ { Name: "mysql", EnterpriseMeta: *defaultEntMeta, }, + { + Name: "prometheus", + EnterpriseMeta: *defaultEntMeta, + }, { Name: "redis", EnterpriseMeta: *defaultEntMeta, @@ -1998,17 +2033,21 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { ws = memdb.NewWatchSet() expect := &structs.ExportedServiceList{ + // Only "billing" shows up, because there are no other service instances running, + // and "consul" is never exported. Services: []structs.ServiceName{ { Name: "billing", EnterpriseMeta: *defaultEntMeta, }, }, + // Only "mysql" appears because there it has a service resolver. + // "redis" does not appear, because it's a sidecar proxy without a corresponding service, so the wildcard doesn't find it. DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ - newSN("billing"): { + newSN("mysql"): { Protocol: "tcp", TCPTargets: []*structs.DiscoveryTarget{ - newTarget("billing", "", "dc1"), + newTarget("mysql", "", "dc1"), }, }, }, @@ -2025,13 +2064,17 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { ID: "payments", Service: "payments", Port: 5000, })) - // The proxy will be ignored. + // The proxy will cause "payments" to be output in the disco chains. It will NOT be output + // in the normal services list. lastIdx++ require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, ID: "payments-proxy", Service: "payments-proxy", Port: 5000, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "payments", + }, })) lastIdx++ // The consul service should never be exported. @@ -2099,10 +2142,11 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { }, DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ // NOTE: no consul-redirect here - newSN("billing"): { + // NOTE: no billing here, because it does not have a proxy. + newSN("payments"): { Protocol: "http", }, - newSN("payments"): { + newSN("mysql"): { Protocol: "http", }, newSN("resolver"): { @@ -2129,6 +2173,9 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { lastIdx++ require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil)) + lastIdx++ + require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceResolver, "mysql", nil)) + require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() @@ -2160,6 +2207,51 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { require.Equal(t, expect, got) }) + testutil.RunStep(t, "terminating gateway services are exported", func(t *testing.T) { + lastIdx++ + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + ID: "term-svc", Service: "term-svc", Port: 6000, + })) + lastIdx++ + require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "some-terminating-gateway", + ID: "some-terminating-gateway", + Port: 9000, + })) + lastIdx++ + require.NoError(t, s.EnsureConfigEntry(lastIdx, &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "some-terminating-gateway", + Services: []structs.LinkedService{{Name: "term-svc"}}, + })) + + expect := &structs.ExportedServiceList{ + Services: []structs.ServiceName{ + newSN("payments"), + newSN("term-svc"), + }, + DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("payments"): { + Protocol: "http", + }, + newSN("resolver"): { + Protocol: "http", + }, + newSN("router"): { + Protocol: "http", + }, + newSN("term-svc"): { + Protocol: "http", + }, + }, + } + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") + require.NoError(t, err) + require.Equal(t, lastIdx, idx) + require.Equal(t, expect, got) + }) + testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) { expect := &structs.ExportedServiceList{} diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 63df91aa7..904c2c28b 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -844,6 +844,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, } + mysqlSidecar := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + Service: "mysql-sidecar-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "mysql", + }, + } lastIdx++ require.NoError(t, store.EnsureNode(lastIdx, mysql.Node)) @@ -851,6 +858,9 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { lastIdx++ require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) + lastIdx++ + require.NoError(t, store.EnsureService(lastIdx, "foo", mysqlSidecar)) + mongoSvcDefaults := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "mongo", @@ -870,6 +880,24 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String() ) + testutil.RunStep(t, "initial stream data is received", func(t *testing.T) { + expectReplEvents(t, client, + func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { + require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) + // Roots tested in TestStreamResources_Server_CARootUpdates + }, + func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { + require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL) + require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID) + require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) + + var exportedServices pbpeerstream.ExportedServiceList + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices)) + require.ElementsMatch(t, []string{}, exportedServices.Services) + }, + ) + }) + testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { entry := &structs.ExportedServicesConfigEntry{ Name: "default", @@ -895,10 +923,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) expectReplEvents(t, client, - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL) - // Roots tested in TestStreamResources_Server_CARootUpdates - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { // no mongo instances exist require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) @@ -909,16 +933,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(t, nodes.Nodes, 0) }, - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - // proxies can't export because no mesh gateway exists yet - require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) - require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - - var nodes pbpeerstream.ExportedService - require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) - require.Len(t, nodes.Nodes, 0) - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) @@ -938,17 +952,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) require.Len(t, nodes.Nodes, 0) }, - // This event happens because this is the first test case and there are - // no exported services when replication is initially set up. - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL) - require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID) - require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - - var exportedServices pbpeerstream.ExportedServiceList - require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices)) - require.ElementsMatch(t, []string{}, exportedServices.Services) - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL) require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID) @@ -976,23 +979,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service)) expectReplEvents(t, client, - func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { - require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) - require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) - require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) - - var nodes pbpeerstream.ExportedService - require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) - require.Len(t, nodes.Nodes, 1) - - pm := nodes.Nodes[0].Service.Connect.PeerMeta - require.Equal(t, "grpc", pm.Protocol) - spiffeIDs := []string{ - "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo", - "spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1", - } - require.Equal(t, spiffeIDs, pm.SpiffeID) - }, func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) @@ -1013,6 +999,33 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { ) }) + testutil.RunStep(t, "register service resolver to send proxy updates", func(t *testing.T) { + lastIdx++ + require.NoError(t, store.EnsureConfigEntry(lastIdx, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "mongo", + })) + expectReplEvents(t, client, + func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { + require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) + + var nodes pbpeerstream.ExportedService + require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes)) + require.Len(t, nodes.Nodes, 1) + + pm := nodes.Nodes[0].Service.Connect.PeerMeta + require.Equal(t, "grpc", pm.Protocol) + spiffeIDs := []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo", + "spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1", + } + require.Equal(t, spiffeIDs, pm.SpiffeID) + }, + ) + }) + mongo := &structs.CheckServiceNode{ Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index d290d2dc1..58c629d5c 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -143,7 +143,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti pending := &pendingPayload{} m.syncNormalServices(ctx, state, evt.Services) if m.config.ConnectEnabled { - m.syncDiscoveryChains(state, pending, evt.ListAllDiscoveryChains()) + m.syncDiscoveryChains(state, pending, evt.DiscoChains) } err := pending.Add( diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index a74978774..1e663ba4f 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -472,15 +472,40 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) { Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, } + mysqlSidecar := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + Service: "mysql-sidecar-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "mysql", + }, + } backend.ensureNode(t, mysql.Node) backend.ensureService(t, "foo", mysql.Service) + backend.ensureService(t, "foo", &mysqlSidecar) mongo := &structs.CheckServiceNode{ - Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, - Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, + Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, + Service: &structs.NodeService{ + ID: "mongo-1", + Service: "mongo", + Port: 5000, + }, + } + mongoSidecar := structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + Service: "mongo-sidecar-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "mongo", + }, } backend.ensureNode(t, mongo.Node) backend.ensureService(t, "zip", mongo.Service) + backend.ensureService(t, "zip", &mongoSidecar) + + backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "chain", + }) var ( mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String() diff --git a/agent/proxycfg-glue/exported_peered_services_test.go b/agent/proxycfg-glue/exported_peered_services_test.go index 697c0d52c..ed07404ad 100644 --- a/agent/proxycfg-glue/exported_peered_services_test.go +++ b/agent/proxycfg-glue/exported_peered_services_test.go @@ -49,6 +49,14 @@ func TestServerExportedPeeredServices(t *testing.T) { }, })) + // Create resolvers for each of the services so that they are guaranteed to be replicated by the peer stream. + for _, s := range []string{"web", "api", "db"} { + require.NoError(t, store.EnsureConfigEntry(0, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: s, + })) + } + authz := policyAuthorizer(t, ` service "web" { policy = "read" } service "api" { policy = "read" } diff --git a/agent/structs/peering.go b/agent/structs/peering.go index 52d2f32a3..714a442e8 100644 --- a/agent/structs/peering.go +++ b/agent/structs/peering.go @@ -62,8 +62,7 @@ func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool { return true } -// ListAllDiscoveryChains returns all discovery chains (union of Services and -// DiscoChains). +// ListAllDiscoveryChains returns all discovery chains (union of Services and DiscoChains). func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo { chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo) if list == nil { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 0a95da695..5191272a6 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1235,6 +1235,12 @@ const ( // This service allows external traffic to exit the mesh through a terminating gateway // based on centralized configuration. ServiceKindDestination ServiceKind = "destination" + + // ServiceKindConnectEnabled is used to indicate whether a service is either + // connect-native or if the service has a corresponding sidecar. It is used for + // internal query purposes and should not be exposed to users as a valid Kind + // option. + ServiceKindConnectEnabled ServiceKind = "connect-enabled" ) // Type to hold a address and port of a service