Fix issue with peer services incorrectly appearing as connect-enabled. (#16339)
Prior to this commit, all peer services were transmitted as connect-enabled as long as a one or more mesh-gateways were healthy. With this change, there is now a difference between typical services and connect services transmitted via peering. A service will be reported as "connect-enabled" as long as any of these conditions are met: 1. a connect-proxy sidecar is registered for the service name. 2. a connect-native instance of the service is registered. 3. a service resolver / splitter / router is registered for the service name. 4. a terminating gateway has registered the service.
This commit is contained in:
parent
7685c14885
commit
1c4640f0df
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
peering: Fix bug where services were incorrectly imported as connect-enabled.
|
||||||
|
```
|
|
@ -900,12 +900,17 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
||||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if svc.PeerName == "" && sn.Name != "" {
|
||||||
|
if err := upsertKindServiceName(tx, idx, structs.ServiceKindConnectEnabled, sn); err != nil {
|
||||||
|
return fmt.Errorf("failed to persist service name as connect-enabled: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the virtual IP for the service
|
||||||
supported, err := virtualIPsSupported(tx, nil)
|
supported, err := virtualIPsSupported(tx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the virtual IP for the service
|
|
||||||
if supported {
|
if supported {
|
||||||
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
|
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
|
||||||
vip, err := assignServiceVirtualIP(tx, idx, psn)
|
vip, err := assignServiceVirtualIP(tx, idx, psn)
|
||||||
|
@ -1964,6 +1969,24 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup ConnectEnabled for this service if none exist.
|
||||||
|
if svc.PeerName == "" && (svc.ServiceKind == structs.ServiceKindConnectProxy || svc.ServiceConnect.Native) {
|
||||||
|
service := svc.ServiceName
|
||||||
|
if svc.ServiceKind == structs.ServiceKindConnectProxy {
|
||||||
|
service = svc.ServiceProxy.DestinationServiceName
|
||||||
|
}
|
||||||
|
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
|
||||||
|
connectEnabled, err := serviceHasConnectEnabledInstances(tx, sn.Name, &sn.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to search for connect instances for service %q: %w", sn.Name, err)
|
||||||
|
}
|
||||||
|
if !connectEnabled {
|
||||||
|
if err := cleanupKindServiceName(tx, idx, sn, structs.ServiceKindConnectEnabled); err != nil {
|
||||||
|
return fmt.Errorf("failed to cleanup connect-enabled service name: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if svc.PeerName == "" {
|
if svc.PeerName == "" {
|
||||||
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
|
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
|
||||||
if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil {
|
if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil {
|
||||||
|
@ -3731,6 +3754,27 @@ func serviceHasConnectInstances(tx WriteTxn, serviceName string, entMeta *acl.En
|
||||||
return hasConnectInstance, hasNonConnectInstance, nil
|
return hasConnectInstance, hasNonConnectInstance, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serviceHasConnectEnabledInstances returns whether the given service name
|
||||||
|
// has a corresponding connect-proxy or connect-native instance.
|
||||||
|
// This function is mostly a clone of `serviceHasConnectInstances`, but it has
|
||||||
|
// an early return to improve performance and returns true if at least one
|
||||||
|
// connect-native instance exists.
|
||||||
|
func serviceHasConnectEnabledInstances(tx WriteTxn, serviceName string, entMeta *acl.EnterpriseMeta) (bool, error) {
|
||||||
|
query := Query{
|
||||||
|
Value: serviceName,
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
}
|
||||||
|
|
||||||
|
svc, err := tx.First(tableServices, indexConnect, query)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed service lookup: %w", err)
|
||||||
|
}
|
||||||
|
if svc != nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// updateGatewayService associates services with gateways after an eligible event
|
// updateGatewayService associates services with gateways after an eligible event
|
||||||
// ie. Registering a service in a namespace targeted by a gateway
|
// ie. Registering a service in a namespace targeted by a gateway
|
||||||
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
|
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
|
||||||
|
|
|
@ -8664,7 +8664,7 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var idx uint64
|
var idx, connectEnabledIdx uint64
|
||||||
testRegisterNode(t, s, idx, "node1")
|
testRegisterNode(t, s, idx, "node1")
|
||||||
|
|
||||||
for _, svc := range services {
|
for _, svc := range services {
|
||||||
|
@ -8678,8 +8678,29 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
|
||||||
require.Len(t, gotNames, 1)
|
require.Len(t, gotNames, 1)
|
||||||
require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service)
|
require.Equal(t, svc.CompoundServiceName(), gotNames[0].Service)
|
||||||
require.Equal(t, svc.Kind, gotNames[0].Kind)
|
require.Equal(t, svc.Kind, gotNames[0].Kind)
|
||||||
|
if svc.Kind == structs.ServiceKindConnectProxy {
|
||||||
|
connectEnabledIdx = idx
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A ConnectEnabled service should exist if a corresponding ConnectProxy or ConnectNative service exists.
|
||||||
|
verifyConnectEnabled := func(expectIdx uint64) {
|
||||||
|
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectIdx, gotIdx)
|
||||||
|
require.Equal(t, []*KindServiceName{
|
||||||
|
{
|
||||||
|
Kind: structs.ServiceKindConnectEnabled,
|
||||||
|
Service: structs.NewServiceName("foo", entMeta),
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: connectEnabledIdx,
|
||||||
|
ModifyIndex: connectEnabledIdx,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, gotNames)
|
||||||
|
}
|
||||||
|
verifyConnectEnabled(connectEnabledIdx)
|
||||||
|
|
||||||
// Register another ingress gateway and there should be two names under the kind index
|
// Register another ingress gateway and there should be two names under the kind index
|
||||||
newIngress := structs.NodeService{
|
newIngress := structs.NodeService{
|
||||||
Kind: structs.ServiceKindIngressGateway,
|
Kind: structs.ServiceKindIngressGateway,
|
||||||
|
@ -8749,6 +8770,38 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, idx, gotIdx)
|
require.Equal(t, idx, gotIdx)
|
||||||
require.Empty(t, got)
|
require.Empty(t, got)
|
||||||
|
|
||||||
|
// A ConnectEnabled entry should not be removed until all corresponding services are removed.
|
||||||
|
{
|
||||||
|
verifyConnectEnabled(connectEnabledIdx)
|
||||||
|
// Add a connect-native service.
|
||||||
|
idx++
|
||||||
|
require.NoError(t, s.EnsureService(idx, "node1", &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
ID: "foo",
|
||||||
|
Service: "foo",
|
||||||
|
Address: "5.5.5.5",
|
||||||
|
Port: 5555,
|
||||||
|
EnterpriseMeta: *entMeta,
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
verifyConnectEnabled(connectEnabledIdx)
|
||||||
|
|
||||||
|
// Delete the proxy. This should not clean up the entry, because we still have a
|
||||||
|
// connect-native service registered.
|
||||||
|
idx++
|
||||||
|
require.NoError(t, s.DeleteService(idx, "node1", "connect-proxy", entMeta, ""))
|
||||||
|
verifyConnectEnabled(connectEnabledIdx)
|
||||||
|
|
||||||
|
// Remove the connect-native service to clear out the connect-enabled entry.
|
||||||
|
require.NoError(t, s.DeleteService(idx, "node1", "foo", entMeta, ""))
|
||||||
|
gotIdx, gotNames, err := s.ServiceNamesOfKind(nil, structs.ServiceKindConnectEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, idx, gotIdx)
|
||||||
|
require.Empty(t, gotNames)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) {
|
func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) {
|
||||||
|
|
|
@ -770,88 +770,181 @@ func exportedServicesForPeerTxn(
|
||||||
maxIdx := peering.ModifyIndex
|
maxIdx := peering.ModifyIndex
|
||||||
|
|
||||||
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
||||||
idx, conf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta)
|
idx, exportConf, err := getExportedServicesConfigEntryTxn(tx, ws, nil, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to fetch exported-services config entry: %w", err)
|
return 0, nil, fmt.Errorf("failed to fetch exported-services config entry: %w", err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
if conf == nil {
|
if exportConf == nil {
|
||||||
return maxIdx, &structs.ExportedServiceList{}, nil
|
return maxIdx, &structs.ExportedServiceList{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
normalSet = make(map[structs.ServiceName]struct{})
|
// exportedServices will contain the listing of all service names that are being exported
|
||||||
discoSet = make(map[structs.ServiceName]struct{})
|
// and will need to be queried for connect / discovery chain information.
|
||||||
|
exportedServices = make(map[structs.ServiceName]struct{})
|
||||||
|
|
||||||
|
// exportedConnectServices will contain the listing of all connect service names that are being exported.
|
||||||
|
exportedConnectServices = make(map[structs.ServiceName]struct{})
|
||||||
|
|
||||||
|
// namespaceConnectServices provides a listing of all connect service names for a particular partition+namespace pair.
|
||||||
|
namespaceConnectServices = make(map[acl.EnterpriseMeta]map[string]struct{})
|
||||||
|
|
||||||
|
// namespaceDiscoChains provides a listing of all disco chain names for a particular partition+namespace pair.
|
||||||
|
namespaceDiscoChains = make(map[acl.EnterpriseMeta]map[string]struct{})
|
||||||
)
|
)
|
||||||
|
|
||||||
// At least one of the following should be true for a name for it to
|
// Helper function for inserting data and auto-creating maps.
|
||||||
// replicate:
|
insertEntry := func(m map[acl.EnterpriseMeta]map[string]struct{}, entMeta acl.EnterpriseMeta, name string) {
|
||||||
//
|
names, ok := m[entMeta]
|
||||||
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
|
if !ok {
|
||||||
// - have an explicit sidecar kind=connect-proxy
|
names = make(map[string]struct{})
|
||||||
// - use connect native mode
|
m[entMeta] = names
|
||||||
|
}
|
||||||
|
names[name] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
for _, svc := range conf.Services {
|
// Build the set of all services that will be exported.
|
||||||
|
// Any possible namespace wildcards or "consul" services should be removed by this step.
|
||||||
|
for _, svc := range exportConf.Services {
|
||||||
// Prevent exporting the "consul" service.
|
// Prevent exporting the "consul" service.
|
||||||
if svc.Name == structs.ConsulServiceName {
|
if svc.Name == structs.ConsulServiceName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
svcMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
|
svcEntMeta := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), svc.Namespace)
|
||||||
|
svcName := structs.NewServiceName(svc.Name, &svcEntMeta)
|
||||||
|
|
||||||
sawPeer := false
|
peerFound := false
|
||||||
for _, consumer := range svc.Consumers {
|
for _, consumer := range svc.Consumers {
|
||||||
name := structs.NewServiceName(svc.Name, &svcMeta)
|
if consumer.Peer == peering.Name {
|
||||||
|
peerFound = true
|
||||||
if _, ok := normalSet[name]; ok {
|
break
|
||||||
// Service was covered by a wildcard that was already accounted for
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if consumer.Peer != peering.Name {
|
}
|
||||||
continue
|
// Only look for more information if the matching peer was found.
|
||||||
}
|
if !peerFound {
|
||||||
sawPeer = true
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if svc.Name != structs.WildcardSpecifier {
|
// If this isn't a wildcard, we can simply add it to the list of services to watch and move to the next entry.
|
||||||
normalSet[name] = struct{}{}
|
if svc.Name != structs.WildcardSpecifier {
|
||||||
|
exportedServices[svcName] = struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all services in the namespace are exported by the wildcard, query those service names.
|
||||||
|
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcEntMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
for _, sn := range typicalServices {
|
||||||
|
// Prevent exporting the "consul" service.
|
||||||
|
if sn.Service.Name != structs.ConsulServiceName {
|
||||||
|
exportedServices[sn.Service] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the target peer is a consumer, and all services in the namespace are exported, query those service names.
|
// List all config entries of kind service-resolver, service-router, service-splitter, because they
|
||||||
if sawPeer && svc.Name == structs.WildcardSpecifier {
|
// will be exported as connect services.
|
||||||
idx, typicalServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, svcMeta)
|
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcEntMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to get typical service names: %w", err)
|
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
|
||||||
}
|
}
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
for _, s := range typicalServices {
|
for _, sn := range discoChains {
|
||||||
// Prevent exporting the "consul" service.
|
// Prevent exporting the "consul" service.
|
||||||
if s.Service.Name == structs.ConsulServiceName {
|
if sn.Name != structs.ConsulServiceName {
|
||||||
continue
|
exportedConnectServices[sn] = struct{}{}
|
||||||
}
|
insertEntry(namespaceDiscoChains, svcEntMeta, sn.Name)
|
||||||
normalSet[s.Service] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// list all config entries of kind service-resolver, service-router, service-splitter?
|
|
||||||
idx, discoChains, err := listDiscoveryChainNamesTxn(tx, ws, nil, svcMeta)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed to get discovery chain names: %w", err)
|
|
||||||
}
|
|
||||||
if idx > maxIdx {
|
|
||||||
maxIdx = idx
|
|
||||||
}
|
|
||||||
for _, sn := range discoChains {
|
|
||||||
discoSet[sn] = struct{}{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
normal := maps.SliceOfKeys(normalSet)
|
// At least one of the following should be true for a name to replicate it as a *connect* service:
|
||||||
disco := maps.SliceOfKeys(discoSet)
|
// - are a discovery chain by definition (service-router, service-splitter, service-resolver)
|
||||||
|
// - have an explicit sidecar kind=connect-proxy
|
||||||
|
// - use connect native mode
|
||||||
|
// - are registered with a terminating gateway
|
||||||
|
populateConnectService := func(sn structs.ServiceName) error {
|
||||||
|
// Load all disco-chains in this namespace if we haven't already.
|
||||||
|
if _, ok := namespaceDiscoChains[sn.EnterpriseMeta]; !ok {
|
||||||
|
// Check to see if we have a discovery chain with the same name.
|
||||||
|
idx, chains, err := listDiscoveryChainNamesTxn(tx, ws, nil, sn.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get connect services: %w", err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
for _, sn := range chains {
|
||||||
|
insertEntry(namespaceDiscoChains, sn.EnterpriseMeta, sn.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check to see if we have the connect service.
|
||||||
|
if _, ok := namespaceDiscoChains[sn.EnterpriseMeta][sn.Name]; ok {
|
||||||
|
exportedConnectServices[sn] = struct{}{}
|
||||||
|
// Do not early return because we have multiple watches that should be established.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load all services in this namespace if we haven't already.
|
||||||
|
if _, ok := namespaceConnectServices[sn.EnterpriseMeta]; !ok {
|
||||||
|
// This is more efficient than querying the service instance table.
|
||||||
|
idx, connectServices, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindConnectEnabled, sn.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get connect services: %w", err)
|
||||||
|
}
|
||||||
|
if idx > maxIdx {
|
||||||
|
maxIdx = idx
|
||||||
|
}
|
||||||
|
for _, ksn := range connectServices {
|
||||||
|
insertEntry(namespaceConnectServices, sn.EnterpriseMeta, ksn.Service.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check to see if we have the connect service.
|
||||||
|
if _, ok := namespaceConnectServices[sn.EnterpriseMeta][sn.Name]; ok {
|
||||||
|
exportedConnectServices[sn] = struct{}{}
|
||||||
|
// Do not early return because we have multiple watches that should be established.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the service is exposed via terminating gateways.
|
||||||
|
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed gateway lookup for %q: %w", sn.Name, err)
|
||||||
|
}
|
||||||
|
ws.Add(svcGateways.WatchCh())
|
||||||
|
for svc := svcGateways.Next(); svc != nil; svc = svcGateways.Next() {
|
||||||
|
gs, ok := svc.(*structs.GatewayService)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("failed converting to GatewayService for %q", sn.Name)
|
||||||
|
}
|
||||||
|
if gs.GatewayKind == structs.ServiceKindTerminatingGateway {
|
||||||
|
exportedConnectServices[sn] = struct{}{}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform queries and check if each service is connect-enabled.
|
||||||
|
for sn := range exportedServices {
|
||||||
|
// Do not query for data if we already know it's a connect service.
|
||||||
|
if _, ok := exportedConnectServices[sn]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := populateConnectService(sn); err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the protocol / targets for connect services.
|
||||||
chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo)
|
chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo)
|
||||||
populateChainInfo := func(svc structs.ServiceName) error {
|
populateChainInfo := func(svc structs.ServiceName) error {
|
||||||
if _, ok := chainInfo[svc]; ok {
|
if _, ok := chainInfo[svc]; ok {
|
||||||
|
@ -899,21 +992,17 @@ func exportedServicesForPeerTxn(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, svc := range normal {
|
for svc := range exportedConnectServices {
|
||||||
if err := populateChainInfo(svc); err != nil {
|
|
||||||
return 0, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, svc := range disco {
|
|
||||||
if err := populateChainInfo(svc); err != nil {
|
if err := populateChainInfo(svc); err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
structs.ServiceList(normal).Sort()
|
sortedServices := maps.SliceOfKeys(exportedServices)
|
||||||
|
structs.ServiceList(sortedServices).Sort()
|
||||||
|
|
||||||
list := &structs.ExportedServiceList{
|
list := &structs.ExportedServiceList{
|
||||||
Services: normal,
|
Services: sortedServices,
|
||||||
DiscoChains: chainInfo,
|
DiscoChains: chainInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1908,18 +1908,28 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
// Should be exported as both a normal and disco chain (resolver).
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{Peer: "my-peering"},
|
{Peer: "my-peering"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
// Should be exported as both a normal and disco chain (connect-proxy).
|
||||||
Name: "redis",
|
Name: "redis",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{Peer: "my-peering"},
|
{Peer: "my-peering"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
// Should only be exported as a normal service.
|
||||||
|
Name: "prometheus",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{Peer: "my-peering"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Should not be exported (different peer consumer)
|
||||||
Name: "mongo",
|
Name: "mongo",
|
||||||
Consumers: []structs.ServiceConsumer{
|
Consumers: []structs.ServiceConsumer{
|
||||||
{Peer: "my-other-peering"},
|
{Peer: "my-other-peering"},
|
||||||
|
@ -1932,12 +1942,37 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
|
// Register extra things so that disco chain entries appear.
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{
|
||||||
|
Node: "node1", Address: "10.0.0.1",
|
||||||
|
}))
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureService(lastIdx, "node1", &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "redis-sidecar-proxy",
|
||||||
|
Service: "redis-sidecar-proxy",
|
||||||
|
Port: 5005,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "redis",
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "mysql",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
})
|
||||||
|
|
||||||
expect := &structs.ExportedServiceList{
|
expect := &structs.ExportedServiceList{
|
||||||
Services: []structs.ServiceName{
|
Services: []structs.ServiceName{
|
||||||
{
|
{
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
EnterpriseMeta: *defaultEntMeta,
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "prometheus",
|
||||||
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "redis",
|
Name: "redis",
|
||||||
EnterpriseMeta: *defaultEntMeta,
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
|
@ -1998,17 +2033,21 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
expect := &structs.ExportedServiceList{
|
expect := &structs.ExportedServiceList{
|
||||||
|
// Only "billing" shows up, because there are no other service instances running,
|
||||||
|
// and "consul" is never exported.
|
||||||
Services: []structs.ServiceName{
|
Services: []structs.ServiceName{
|
||||||
{
|
{
|
||||||
Name: "billing",
|
Name: "billing",
|
||||||
EnterpriseMeta: *defaultEntMeta,
|
EnterpriseMeta: *defaultEntMeta,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// Only "mysql" appears because there it has a service resolver.
|
||||||
|
// "redis" does not appear, because it's a sidecar proxy without a corresponding service, so the wildcard doesn't find it.
|
||||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||||
newSN("billing"): {
|
newSN("mysql"): {
|
||||||
Protocol: "tcp",
|
Protocol: "tcp",
|
||||||
TCPTargets: []*structs.DiscoveryTarget{
|
TCPTargets: []*structs.DiscoveryTarget{
|
||||||
newTarget("billing", "", "dc1"),
|
newTarget("mysql", "", "dc1"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -2025,13 +2064,17 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
ID: "payments", Service: "payments", Port: 5000,
|
ID: "payments", Service: "payments", Port: 5000,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
// The proxy will be ignored.
|
// The proxy will cause "payments" to be output in the disco chains. It will NOT be output
|
||||||
|
// in the normal services list.
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
ID: "payments-proxy",
|
ID: "payments-proxy",
|
||||||
Service: "payments-proxy",
|
Service: "payments-proxy",
|
||||||
Port: 5000,
|
Port: 5000,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "payments",
|
||||||
|
},
|
||||||
}))
|
}))
|
||||||
lastIdx++
|
lastIdx++
|
||||||
// The consul service should never be exported.
|
// The consul service should never be exported.
|
||||||
|
@ -2099,10 +2142,11 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
},
|
},
|
||||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||||
// NOTE: no consul-redirect here
|
// NOTE: no consul-redirect here
|
||||||
newSN("billing"): {
|
// NOTE: no billing here, because it does not have a proxy.
|
||||||
|
newSN("payments"): {
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
},
|
},
|
||||||
newSN("payments"): {
|
newSN("mysql"): {
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
},
|
},
|
||||||
newSN("resolver"): {
|
newSN("resolver"): {
|
||||||
|
@ -2129,6 +2173,9 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
|
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceSplitter, "splitter", nil))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ServiceResolver, "mysql", nil))
|
||||||
|
|
||||||
require.True(t, watchFired(ws))
|
require.True(t, watchFired(ws))
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
|
||||||
|
@ -2160,6 +2207,51 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
require.Equal(t, expect, got)
|
require.Equal(t, expect, got)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "terminating gateway services are exported", func(t *testing.T) {
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
|
ID: "term-svc", Service: "term-svc", Port: 6000,
|
||||||
|
}))
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTerminatingGateway,
|
||||||
|
Service: "some-terminating-gateway",
|
||||||
|
ID: "some-terminating-gateway",
|
||||||
|
Port: 9000,
|
||||||
|
}))
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, s.EnsureConfigEntry(lastIdx, &structs.TerminatingGatewayConfigEntry{
|
||||||
|
Kind: structs.TerminatingGateway,
|
||||||
|
Name: "some-terminating-gateway",
|
||||||
|
Services: []structs.LinkedService{{Name: "term-svc"}},
|
||||||
|
}))
|
||||||
|
|
||||||
|
expect := &structs.ExportedServiceList{
|
||||||
|
Services: []structs.ServiceName{
|
||||||
|
newSN("payments"),
|
||||||
|
newSN("term-svc"),
|
||||||
|
},
|
||||||
|
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||||
|
newSN("payments"): {
|
||||||
|
Protocol: "http",
|
||||||
|
},
|
||||||
|
newSN("resolver"): {
|
||||||
|
Protocol: "http",
|
||||||
|
},
|
||||||
|
newSN("router"): {
|
||||||
|
Protocol: "http",
|
||||||
|
},
|
||||||
|
newSN("term-svc"): {
|
||||||
|
Protocol: "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, lastIdx, idx)
|
||||||
|
require.Equal(t, expect, got)
|
||||||
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
|
testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
|
||||||
expect := &structs.ExportedServiceList{}
|
expect := &structs.ExportedServiceList{}
|
||||||
|
|
||||||
|
|
|
@ -844,6 +844,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||||
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
||||||
}
|
}
|
||||||
|
mysqlSidecar := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
Service: "mysql-sidecar-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "mysql",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
|
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
|
||||||
|
@ -851,6 +858,9 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
|
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
|
||||||
|
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.EnsureService(lastIdx, "foo", mysqlSidecar))
|
||||||
|
|
||||||
mongoSvcDefaults := &structs.ServiceConfigEntry{
|
mongoSvcDefaults := &structs.ServiceConfigEntry{
|
||||||
Kind: structs.ServiceDefaults,
|
Kind: structs.ServiceDefaults,
|
||||||
Name: "mongo",
|
Name: "mongo",
|
||||||
|
@ -870,6 +880,24 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial stream data is received", func(t *testing.T) {
|
||||||
|
expectReplEvents(t, client,
|
||||||
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
|
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
|
||||||
|
// Roots tested in TestStreamResources_Server_CARootUpdates
|
||||||
|
},
|
||||||
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
|
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
|
||||||
|
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
|
||||||
|
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
|
||||||
|
|
||||||
|
var exportedServices pbpeerstream.ExportedServiceList
|
||||||
|
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices))
|
||||||
|
require.ElementsMatch(t, []string{}, exportedServices.Services)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
|
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
|
||||||
entry := &structs.ExportedServicesConfigEntry{
|
entry := &structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
|
@ -895,10 +923,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
|
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
|
||||||
|
|
||||||
expectReplEvents(t, client,
|
expectReplEvents(t, client,
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
|
||||||
require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
|
|
||||||
// Roots tested in TestStreamResources_Server_CARootUpdates
|
|
||||||
},
|
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
// no mongo instances exist
|
// no mongo instances exist
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
||||||
|
@ -909,16 +933,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
||||||
require.Len(t, nodes.Nodes, 0)
|
require.Len(t, nodes.Nodes, 0)
|
||||||
},
|
},
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
|
||||||
// proxies can't export because no mesh gateway exists yet
|
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
|
||||||
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
|
|
||||||
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
|
|
||||||
|
|
||||||
var nodes pbpeerstream.ExportedService
|
|
||||||
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
|
||||||
require.Len(t, nodes.Nodes, 0)
|
|
||||||
},
|
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
||||||
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
|
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
|
||||||
|
@ -938,17 +952,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
||||||
require.Len(t, nodes.Nodes, 0)
|
require.Len(t, nodes.Nodes, 0)
|
||||||
},
|
},
|
||||||
// This event happens because this is the first test case and there are
|
|
||||||
// no exported services when replication is initially set up.
|
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
|
|
||||||
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
|
|
||||||
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
|
|
||||||
|
|
||||||
var exportedServices pbpeerstream.ExportedServiceList
|
|
||||||
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&exportedServices))
|
|
||||||
require.ElementsMatch(t, []string{}, exportedServices.Services)
|
|
||||||
},
|
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
|
require.Equal(t, pbpeerstream.TypeURLExportedServiceList, msg.GetResponse().ResourceURL)
|
||||||
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
|
require.Equal(t, subExportedServiceList, msg.GetResponse().ResourceID)
|
||||||
|
@ -976,23 +979,6 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service))
|
require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service))
|
||||||
|
|
||||||
expectReplEvents(t, client,
|
expectReplEvents(t, client,
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
|
||||||
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
|
|
||||||
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
|
|
||||||
|
|
||||||
var nodes pbpeerstream.ExportedService
|
|
||||||
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
|
||||||
require.Len(t, nodes.Nodes, 1)
|
|
||||||
|
|
||||||
pm := nodes.Nodes[0].Service.Connect.PeerMeta
|
|
||||||
require.Equal(t, "grpc", pm.Protocol)
|
|
||||||
spiffeIDs := []string{
|
|
||||||
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
|
|
||||||
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
|
|
||||||
}
|
|
||||||
require.Equal(t, spiffeIDs, pm.SpiffeID)
|
|
||||||
},
|
|
||||||
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
||||||
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
|
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
|
||||||
|
@ -1013,6 +999,33 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "register service resolver to send proxy updates", func(t *testing.T) {
|
||||||
|
lastIdx++
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(lastIdx, &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "mongo",
|
||||||
|
}))
|
||||||
|
expectReplEvents(t, client,
|
||||||
|
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
|
||||||
|
require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
|
||||||
|
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
|
||||||
|
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
|
||||||
|
|
||||||
|
var nodes pbpeerstream.ExportedService
|
||||||
|
require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
|
||||||
|
require.Len(t, nodes.Nodes, 1)
|
||||||
|
|
||||||
|
pm := nodes.Nodes[0].Service.Connect.PeerMeta
|
||||||
|
require.Equal(t, "grpc", pm.Protocol)
|
||||||
|
spiffeIDs := []string{
|
||||||
|
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
|
||||||
|
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
|
||||||
|
}
|
||||||
|
require.Equal(t, spiffeIDs, pm.SpiffeID)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
mongo := &structs.CheckServiceNode{
|
mongo := &structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||||
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
||||||
|
|
|
@ -143,7 +143,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
|
||||||
pending := &pendingPayload{}
|
pending := &pendingPayload{}
|
||||||
m.syncNormalServices(ctx, state, evt.Services)
|
m.syncNormalServices(ctx, state, evt.Services)
|
||||||
if m.config.ConnectEnabled {
|
if m.config.ConnectEnabled {
|
||||||
m.syncDiscoveryChains(state, pending, evt.ListAllDiscoveryChains())
|
m.syncDiscoveryChains(state, pending, evt.DiscoChains)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := pending.Add(
|
err := pending.Add(
|
||||||
|
|
|
@ -472,15 +472,40 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
|
||||||
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
|
||||||
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
|
||||||
}
|
}
|
||||||
|
mysqlSidecar := structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
Service: "mysql-sidecar-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "mysql",
|
||||||
|
},
|
||||||
|
}
|
||||||
backend.ensureNode(t, mysql.Node)
|
backend.ensureNode(t, mysql.Node)
|
||||||
backend.ensureService(t, "foo", mysql.Service)
|
backend.ensureService(t, "foo", mysql.Service)
|
||||||
|
backend.ensureService(t, "foo", &mysqlSidecar)
|
||||||
|
|
||||||
mongo := &structs.CheckServiceNode{
|
mongo := &structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
|
||||||
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
|
Service: &structs.NodeService{
|
||||||
|
ID: "mongo-1",
|
||||||
|
Service: "mongo",
|
||||||
|
Port: 5000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
mongoSidecar := structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
Service: "mongo-sidecar-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "mongo",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
backend.ensureNode(t, mongo.Node)
|
backend.ensureNode(t, mongo.Node)
|
||||||
backend.ensureService(t, "zip", mongo.Service)
|
backend.ensureService(t, "zip", mongo.Service)
|
||||||
|
backend.ensureService(t, "zip", &mongoSidecar)
|
||||||
|
|
||||||
|
backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "chain",
|
||||||
|
})
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
|
||||||
|
|
|
@ -49,6 +49,14 @@ func TestServerExportedPeeredServices(t *testing.T) {
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
// Create resolvers for each of the services so that they are guaranteed to be replicated by the peer stream.
|
||||||
|
for _, s := range []string{"web", "api", "db"} {
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(0, &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: s,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
authz := policyAuthorizer(t, `
|
authz := policyAuthorizer(t, `
|
||||||
service "web" { policy = "read" }
|
service "web" { policy = "read" }
|
||||||
service "api" { policy = "read" }
|
service "api" { policy = "read" }
|
||||||
|
|
|
@ -62,8 +62,7 @@ func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListAllDiscoveryChains returns all discovery chains (union of Services and
|
// ListAllDiscoveryChains returns all discovery chains (union of Services and DiscoChains).
|
||||||
// DiscoChains).
|
|
||||||
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo {
|
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo {
|
||||||
chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo)
|
chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo)
|
||||||
if list == nil {
|
if list == nil {
|
||||||
|
|
|
@ -1235,6 +1235,12 @@ const (
|
||||||
// This service allows external traffic to exit the mesh through a terminating gateway
|
// This service allows external traffic to exit the mesh through a terminating gateway
|
||||||
// based on centralized configuration.
|
// based on centralized configuration.
|
||||||
ServiceKindDestination ServiceKind = "destination"
|
ServiceKindDestination ServiceKind = "destination"
|
||||||
|
|
||||||
|
// ServiceKindConnectEnabled is used to indicate whether a service is either
|
||||||
|
// connect-native or if the service has a corresponding sidecar. It is used for
|
||||||
|
// internal query purposes and should not be exposed to users as a valid Kind
|
||||||
|
// option.
|
||||||
|
ServiceKindConnectEnabled ServiceKind = "connect-enabled"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Type to hold a address and port of a service
|
// Type to hold a address and port of a service
|
||||||
|
|
Loading…
Reference in New Issue