peering: replicate all SpiffeID values necessary for the importing side to do SAN validation (#13612)

When traversing an exported peered service, the discovery chain
evaluation at the other side may re-route the request to a variety of
endpoints. Furthermore we intend to terminate mTLS at the mesh gateway
for arriving peered traffic that is http-like (L7), so the caller needs
to know the mesh gateway's SpiffeID in that case as well.

The following new SpiffeID values will be shipped back in the peerstream
replication:

- tcp: all possible SpiffeIDs resulting from the service-resolver
        component of the exported discovery chain

- http-like: the SpiffeID of the mesh gateway
This commit is contained in:
R.B. Boyer 2022-06-27 14:37:18 -05:00 committed by GitHub
parent 891a864b75
commit 2dba16be52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 501 additions and 124 deletions

View File

@ -506,6 +506,25 @@ var serviceGraphKinds = []string{
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain // discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) { func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta)
if err != nil {
return 0, nil, err
}
var resp []structs.ServiceName
for _, t := range targets {
em := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), t.Namespace)
target := structs.NewServiceName(t.Service, &em)
// TODO (freddy): Allow upstream DC and encode in response
if t.Datacenter == dc {
resp = append(resp, target)
}
}
return idx, resp, nil
}
func (s *Store) discoveryChainOriginalTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []*structs.DiscoveryTarget, error) {
source := structs.NewServiceName(service, entMeta) source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{ req := discoverychain.CompileRequest{
ServiceName: source.Name, ServiceName: source.Name,
@ -518,17 +537,7 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
} }
var resp []structs.ServiceName return idx, maps.SliceOfValues(chain.Targets), nil
for _, t := range chain.Targets {
em := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), t.Namespace)
target := structs.NewServiceName(t.Service, &em)
// TODO (freddy): Allow upstream DC and encode in response
if t.Datacenter == dc {
resp = append(resp, target)
}
}
return idx, resp, nil
} }
// discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target // discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target

View File

@ -3,6 +3,7 @@ package state
import ( import (
"errors" "errors"
"fmt" "fmt"
"sort"
"strings" "strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -309,7 +310,7 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error {
// gateway's config entry, which we wouldn't want to replicate. How would // gateway's config entry, which we wouldn't want to replicate. How would
// client peers know to route through terminating gateways when they're not // client peers know to route through terminating gateways when they're not
// dialing through a remote mesh gateway? // dialing through a remote mesh gateway?
func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) { func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc string) (uint64, *structs.ExportedServiceList, error) {
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
defer tx.Abort() defer tx.Abort()
@ -321,7 +322,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
return 0, &structs.ExportedServiceList{}, nil return 0, &structs.ExportedServiceList{}, nil
} }
return s.exportedServicesForPeerTxn(ws, tx, peering) return s.exportedServicesForPeerTxn(ws, tx, peering, dc)
} }
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
@ -335,7 +336,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
out := make(map[string]structs.ServiceList) out := make(map[string]structs.ServiceList)
for _, peering := range peerings { for _, peering := range peerings {
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering) idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, "")
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
} }
@ -351,7 +352,11 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
return maxIdx, out, nil return maxIdx, out, nil
} }
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) { // exportedServicesForPeerTxn will find all services that are exported to a
// specific peering, and optionally include information about discovery chain
// reachable targets for these exported services if the "dc" parameter is
// specified.
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering, dc string) (uint64, *structs.ExportedServiceList, error) {
maxIdx := peering.ModifyIndex maxIdx := peering.ModifyIndex
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
@ -437,42 +442,63 @@ func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peerin
normal := maps.SliceOfKeys(normalSet) normal := maps.SliceOfKeys(normalSet)
disco := maps.SliceOfKeys(discoSet) disco := maps.SliceOfKeys(discoSet)
structs.ServiceList(normal).Sort() chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo)
structs.ServiceList(disco).Sort() populateChainInfo := func(svc structs.ServiceName) error {
if _, ok := chainInfo[svc]; ok {
serviceProtocols := make(map[structs.ServiceName]string)
populateProtocol := func(svc structs.ServiceName) error {
if _, ok := serviceProtocols[svc]; ok {
return nil // already processed return nil // already processed
} }
var info structs.ExportedDiscoveryChainInfo
idx, protocol, err := protocolForService(tx, ws, svc) idx, protocol, err := protocolForService(tx, ws, svc)
if err != nil { if err != nil {
return fmt.Errorf("failed to get protocol for service: %w", err) return fmt.Errorf("failed to get protocol for service %q: %w", svc, err)
}
if idx > maxIdx {
maxIdx = idx
}
info.Protocol = protocol
if dc != "" && !structs.IsProtocolHTTPLike(protocol) {
// We only need to populate the targets for replication purposes for L4 protocols, which
// do not ultimately get intercepted by the mesh gateways.
idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
} }
if idx > maxIdx { if idx > maxIdx {
maxIdx = idx maxIdx = idx
} }
serviceProtocols[svc] = protocol sort.Slice(targets, func(i, j int) bool {
return targets[i].ID < targets[j].ID
})
info.TCPTargets = targets
}
chainInfo[svc] = info
return nil return nil
} }
for _, svc := range normal { for _, svc := range normal {
if err := populateProtocol(svc); err != nil { if err := populateChainInfo(svc); err != nil {
return 0, nil, err return 0, nil, err
} }
} }
for _, svc := range disco { for _, svc := range disco {
if err := populateProtocol(svc); err != nil { if err := populateChainInfo(svc); err != nil {
return 0, nil, err return 0, nil, err
} }
} }
structs.ServiceList(normal).Sort()
list := &structs.ExportedServiceList{ list := &structs.ExportedServiceList{
Services: normal, Services: normal,
DiscoChains: disco, DiscoChains: chainInfo,
ConnectProtocol: serviceProtocols,
} }
return maxIdx, list, nil return maxIdx, list, nil
@ -521,25 +547,44 @@ func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, en
return maxIdx, peerings, nil return maxIdx, peerings, nil
} }
// TrustBundleListByService returns the trust bundles for all peers that the given service is exported to. // TrustBundleListByService returns the trust bundles for all peers that the
func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { // given service is exported to, via a discovery chain target.
func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) {
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
defer tx.Abort() defer tx.Abort()
maxIdx, peers, err := peeringsForServiceTxn(tx, ws, service, entMeta) realSvc := structs.NewServiceName(service, &entMeta)
maxIdx, chainNames, err := s.discoveryChainSourcesTxn(tx, ws, dc, realSvc)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", service, err) return 0, nil, fmt.Errorf("failed to list all discovery chains referring to %q: %w", realSvc, err)
} }
var resp []*pbpeering.PeeringTrustBundle peerNames := make(map[string]struct{})
for _, chainSvc := range chainNames {
idx, peers, err := peeringsForServiceTxn(tx, ws, chainSvc.Name, chainSvc.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", chainSvc, err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, peer := range peers { for _, peer := range peers {
peerNames[peer.Name] = struct{}{}
}
}
peerNamesSlice := maps.SliceOfKeys(peerNames)
sort.Strings(peerNamesSlice)
var resp []*pbpeering.PeeringTrustBundle
for _, peerName := range peerNamesSlice {
pq := Query{ pq := Query{
Value: strings.ToLower(peer.Name), Value: strings.ToLower(peerName),
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()), EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()),
} }
idx, trustBundle, err := peeringTrustBundleReadTxn(tx, ws, pq) idx, trustBundle, err := peeringTrustBundleReadTxn(tx, ws, pq)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peer.Name, err) return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peerName, err)
} }
if idx > maxIdx { if idx > maxIdx {
maxIdx = idx maxIdx = idx
@ -548,6 +593,7 @@ func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entM
resp = append(resp, trustBundle) resp = append(resp, trustBundle)
} }
} }
return maxIdx, resp, nil return maxIdx, resp, nil
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
@ -674,6 +675,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
var lastIdx uint64 var lastIdx uint64
ca := &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
}
lastIdx++
require.NoError(t, s.CASetConfig(lastIdx, ca))
lastIdx++ lastIdx++
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
ID: testUUID(), ID: testUUID(),
@ -705,10 +713,18 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.NoError(t, s.EnsureConfigEntry(lastIdx, entry)) require.NoError(t, s.EnsureConfigEntry(lastIdx, entry))
} }
newTarget := func(service, serviceSubset, datacenter string) *structs.DiscoveryTarget {
t := structs.NewDiscoveryTarget(service, serviceSubset, "default", "default", datacenter)
t.SNI = connect.TargetSNI(t, connect.TestTrustDomain)
t.Name = t.SNI
t.ConnectTimeout = 5 * time.Second // default
return t
}
testutil.RunStep(t, "no exported services", func(t *testing.T) { testutil.RunStep(t, "no exported services", func(t *testing.T) {
expect := &structs.ExportedServiceList{} expect := &structs.ExportedServiceList{}
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -754,13 +770,23 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("mysql"): "tcp", newSN("mysql"): {
newSN("redis"): "tcp", Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("mysql", "", "dc1"),
},
},
newSN("redis"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("redis", "", "dc1"),
},
},
}, },
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -800,11 +826,16 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
EnterpriseMeta: *defaultEntMeta, EnterpriseMeta: *defaultEntMeta,
}, },
}, },
ConnectProtocol: map[structs.ServiceName]string{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
newSN("billing"): "tcp", newSN("billing"): {
Protocol: "tcp",
TCPTargets: []*structs.DiscoveryTarget{
newTarget("billing", "", "dc1"),
},
},
}, },
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -869,29 +900,25 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
}, },
// NOTE: no payments-proxy here // NOTE: no payments-proxy here
}, },
DiscoChains: []structs.ServiceName{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
{ newSN("billing"): {
Name: "resolver", Protocol: "http",
EnterpriseMeta: *defaultEntMeta,
}, },
{ newSN("payments"): {
Name: "router", Protocol: "http",
EnterpriseMeta: *defaultEntMeta,
}, },
{ newSN("resolver"): {
Name: "splitter", Protocol: "http",
EnterpriseMeta: *defaultEntMeta,
}, },
newSN("router"): {
Protocol: "http",
},
newSN("splitter"): {
Protocol: "http",
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("billing"): "http",
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
newSN("splitter"): "http",
}, },
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -915,23 +942,19 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
}, },
// NOTE: no payments-proxy here // NOTE: no payments-proxy here
}, },
DiscoChains: []structs.ServiceName{ DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
{ newSN("payments"): {
Name: "resolver", Protocol: "http",
EnterpriseMeta: *defaultEntMeta,
}, },
{ newSN("resolver"): {
Name: "router", Protocol: "http",
EnterpriseMeta: *defaultEntMeta,
}, },
newSN("router"): {
Protocol: "http",
}, },
ConnectProtocol: map[structs.ServiceName]string{
newSN("payments"): "http",
newSN("resolver"): "http",
newSN("router"): "http",
}, },
} }
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -941,7 +964,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
expect := &structs.ExportedServiceList{} expect := &structs.ExportedServiceList{}
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta)) require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta))
idx, got, err := s.ExportedServicesForPeer(ws, id) idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Equal(t, expect, got) require.Equal(t, expect, got)
@ -1218,15 +1241,22 @@ func TestStore_TrustBundleListByService(t *testing.T) {
entMeta := *acl.DefaultEnterpriseMeta() entMeta := *acl.DefaultEnterpriseMeta()
var lastIdx uint64 var lastIdx uint64
ws := memdb.NewWatchSet()
ca := &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
}
lastIdx++
require.NoError(t, store.CASetConfig(lastIdx, ca))
var ( var (
peerID1 = testUUID() peerID1 = testUUID()
peerID2 = testUUID() peerID2 = testUUID()
) )
ws := memdb.NewWatchSet()
testutil.RunStep(t, "no results on initial setup", func(t *testing.T) { testutil.RunStep(t, "no results on initial setup", func(t *testing.T) {
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0) require.Len(t, resp, 0)
@ -1248,7 +1278,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws)) require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, resp, 0) require.Len(t, resp, 0)
require.Equal(t, lastIdx-2, idx) require.Equal(t, lastIdx-2, idx)
@ -1264,10 +1294,10 @@ func TestStore_TrustBundleListByService(t *testing.T) {
// The peering is only watched after the service is exported via config entry. // The peering is only watched after the service is exported via config entry.
require.False(t, watchFired(ws)) require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(0), idx)
require.Len(t, resp, 0) require.Len(t, resp, 0)
require.Equal(t, lastIdx-3, idx)
}) })
testutil.RunStep(t, "exporting the service does not yield trust bundles", func(t *testing.T) { testutil.RunStep(t, "exporting the service does not yield trust bundles", func(t *testing.T) {
@ -1290,7 +1320,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0) require.Len(t, resp, 0)
@ -1307,7 +1337,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1) require.Len(t, resp, 1)
@ -1321,7 +1351,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 0) require.Len(t, resp, 0)
@ -1346,7 +1376,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1) require.Len(t, resp, 1)
@ -1371,7 +1401,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws)) require.False(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx-2, idx) require.Equal(t, lastIdx-2, idx)
require.Len(t, resp, 1) require.Len(t, resp, 1)
@ -1400,7 +1430,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 2) require.Len(t, resp, 2)
@ -1419,7 +1449,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.True(t, watchFired(ws)) require.True(t, watchFired(ws))
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Len(t, resp, 1) require.Len(t, resp, 1)
@ -1432,7 +1462,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
require.False(t, watchFired(ws)) require.False(t, watchFired(ws))
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx-1, idx) require.Equal(t, lastIdx-1, idx)
require.Len(t, resp, 1) require.Len(t, resp, 1)

View File

@ -130,12 +130,12 @@ type Store interface {
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
AbandonCh() <-chan struct{} AbandonCh() <-chan struct{}
} }
@ -533,7 +533,7 @@ func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.T
switch { switch {
case req.ServiceName != "": case req.ServiceName != "":
idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, entMeta) idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, s.config.Datacenter, entMeta)
case req.Kind == string(structs.ServiceKindMeshGateway): case req.Kind == string(structs.ServiceKindMeshGateway):
idx, bundles, err = s.Backend.Store().PeeringTrustBundleList(nil, entMeta) idx, bundles, err = s.Backend.Store().PeeringTrustBundleList(nil, entMeta)
case req.Kind != "": case req.Kind != "":

View File

@ -490,8 +490,8 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
resp, err := client.TrustBundleListByService(context.Background(), &req) resp, err := client.TrustBundleListByService(context.Background(), &req)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, resp.Bundles, 2) require.Len(t, resp.Bundles, 2)
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[0].RootPEMs) require.Equal(t, []string{"bar-root-1"}, resp.Bundles[0].RootPEMs)
require.Equal(t, []string{"bar-root-1"}, resp.Bundles[1].RootPEMs) require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
} }
func Test_StreamHandler_UpsertServices(t *testing.T) { func Test_StreamHandler_UpsertServices(t *testing.T) {
@ -912,7 +912,10 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
testrpc.WaitForLeader(t, server.RPC, conf.Datacenter) testrpc.WaitForLeader(t, server.RPC, conf.Datacenter)
backend := consul.NewPeeringBackend(server, deps.GRPCConnPool) backend := consul.NewPeeringBackend(server, deps.GRPCConnPool)
handler := &peering.Service{Backend: backend} handler := peering.NewService(testutil.Logger(t), peering.Config{
Datacenter: "dc1",
ConnectEnabled: true,
}, backend)
grpcServer := gogrpc.NewServer() grpcServer := gogrpc.NewServer()
pbpeering.RegisterPeeringServiceServer(grpcServer, handler) pbpeering.RegisterPeeringServiceServer(grpcServer, handler)

View File

@ -655,6 +655,18 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
mongoSvcDefaults := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mongo",
Protocol: "grpc",
}
require.NoError(t, mongoSvcDefaults.Normalize())
require.NoError(t, mongoSvcDefaults.Validate())
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, mongoSvcDefaults))
// NOTE: for this test we'll just live in a fantasy realm where we assume
// that mongo understands gRPC
var ( var (
mongoSN = structs.NewServiceName("mongo", nil).String() mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
@ -681,6 +693,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
}, },
}, },
} }
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
lastIdx++ lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
@ -744,8 +758,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1) require.Len(t, nodes.Nodes, 1)
svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo" pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID) require.Equal(t, "grpc", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
}, },
func(t *testing.T, msg *pbpeering.ReplicationMessage) { func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
@ -756,8 +775,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1) require.Len(t, nodes.Nodes, 1)
svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql" pm := nodes.Nodes[0].Service.Connect.PeerMeta
require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID) require.Equal(t, "tcp", pm.Protocol)
spiffeIDs := []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
}
require.Equal(t, spiffeIDs, pm.SpiffeID)
}, },
) )
}) })
@ -800,6 +823,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
}, },
}, },
} }
require.NoError(t, entry.Normalize())
require.NoError(t, entry.Validate())
lastIdx++ lastIdx++
err := store.EnsureConfigEntry(lastIdx, entry) err := store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err) require.NoError(t, err)

View File

@ -23,7 +23,7 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex
// match the list of services exported to the peer. // match the list of services exported to the peer.
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) { m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) {
// Get exported services for peer id // Get exported services for peer id
_, list, err := store.ExportedServicesForPeer(ws, peerID) _, list, err := store.ExportedServicesForPeer(ws, peerID, m.config.Datacenter)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err) return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
} }

View File

@ -238,8 +238,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
if state.exportList != nil { if state.exportList != nil {
// Trigger public events for all synthetic discovery chain replies. // Trigger public events for all synthetic discovery chain replies.
for chainName, protocol := range state.connectServices { for chainName, info := range state.connectServices {
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
} }
} }
@ -456,17 +456,17 @@ func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context, ctx context.Context,
state *subscriptionState, state *subscriptionState,
pending *pendingPayload, pending *pendingPayload,
chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable chainsByName map[structs.ServiceName]structs.ExportedDiscoveryChainInfo,
) { ) {
// if it was newly added, then try to emit an UPDATE event // if it was newly added, then try to emit an UPDATE event
for chainName, protocol := range chainsByName { for chainName, info := range chainsByName {
if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol { if oldInfo, ok := state.connectServices[chainName]; ok && info.Equal(oldInfo) {
continue continue
} }
state.connectServices[chainName] = protocol state.connectServices[chainName] = info
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info)
} }
// if it was dropped, try to emit an DELETE event // if it was dropped, try to emit an DELETE event
@ -498,7 +498,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
state *subscriptionState, state *subscriptionState,
pending *pendingPayload, pending *pendingPayload,
chainName structs.ServiceName, chainName structs.ServiceName,
protocol string, info structs.ExportedDiscoveryChainInfo,
) { ) {
if _, ok := state.connectServices[chainName]; !ok { if _, ok := state.connectServices[chainName]; !ok {
return // not found return // not found
@ -519,7 +519,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain(
m.config.Datacenter, m.config.Datacenter,
m.trustDomain, m.trustDomain,
chainName, chainName,
protocol, info,
state.meshGateway, state.meshGateway,
), ),
) )
@ -532,7 +532,7 @@ func createDiscoChainHealth(
peerName string, peerName string,
datacenter, trustDomain string, datacenter, trustDomain string,
sn structs.ServiceName, sn structs.ServiceName,
protocol string, info structs.ExportedDiscoveryChainInfo,
pb *pbservice.IndexedCheckServiceNodes, pb *pbservice.IndexedCheckServiceNodes,
) *pbservice.IndexedCheckServiceNodes { ) *pbservice.IndexedCheckServiceNodes {
fakeProxyName := sn.Name + syntheticProxyNameSuffix fakeProxyName := sn.Name + syntheticProxyNameSuffix
@ -546,6 +546,7 @@ func createDiscoChainHealth(
Datacenter: datacenter, Datacenter: datacenter,
Service: sn.Name, Service: sn.Name,
} }
mainSpiffeIDString := spiffeID.URI().String()
sni := connect.PeeredServiceSNI( sni := connect.PeeredServiceSNI(
sn.Name, sn.Name,
@ -560,8 +561,34 @@ func createDiscoChainHealth(
// TODO(peering): should this be replicated by service and not by instance? // TODO(peering): should this be replicated by service and not by instance?
peerMeta = &pbservice.PeeringServiceMeta{ peerMeta = &pbservice.PeeringServiceMeta{
SNI: []string{sni}, SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()}, SpiffeID: []string{
Protocol: protocol, mainSpiffeIDString,
},
Protocol: info.Protocol,
}
if structs.IsProtocolHTTPLike(info.Protocol) {
gwSpiffeID := connect.SpiffeIDMeshGateway{
Host: trustDomain,
Partition: sn.PartitionOrDefault(),
Datacenter: datacenter,
}
peerMeta.SpiffeID = append(peerMeta.SpiffeID, gwSpiffeID.URI().String())
} else {
for _, target := range info.TCPTargets {
targetSpiffeID := connect.SpiffeIDService{
Host: trustDomain,
Partition: target.Partition,
Namespace: target.Namespace,
Datacenter: target.Datacenter,
Service: target.Service,
}
targetSpiffeIDString := targetSpiffeID.URI().String()
if targetSpiffeIDString != mainSpiffeIDString {
peerMeta.SpiffeID = append(peerMeta.SpiffeID, targetSpiffeIDString)
}
}
} }
} }

View File

@ -281,6 +281,156 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
) )
}) })
testutil.RunStep(t, "peer meta changes when L4 disco chain changes", func(t *testing.T) {
backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mysql",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "failover",
Datacenters: []string{"dc2", "dc3"},
},
},
})
// ensure we get updated peer meta
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc2/svc/failover",
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc3/svc/failover",
},
Protocol: "tcp",
},
},
},
}, res.Nodes[0])
},
)
// reset so the next subtest is valid
backend.deleteConfigEntry(t, structs.ServiceResolver, "mysql")
// ensure we get peer meta is restored
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
},
Protocol: "tcp",
},
},
},
}, res.Nodes[0])
},
)
})
testutil.RunStep(t, "peer meta changes when protocol switches from L4 to L7", func(t *testing.T) {
// NOTE: for this test we'll just live in a fantasy realm where we assume
// that mysql understands gRPC
backend.ensureConfigEntry(t, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mysql",
Protocol: "grpc",
})
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, mysqlProxyCorrID, got.CorrelationID)
res := got.Result.(*pbservice.IndexedCheckServiceNodes)
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("mgw", "10.1.1.1", partition),
Service: &pbservice.NodeService{
Kind: "connect-proxy",
ID: "mysql-sidecar-proxy-instance-0",
Service: "mysql-sidecar-proxy",
Port: 8443,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
Proxy: &pbservice.ConnectProxyConfig{
DestinationServiceID: "mysql-instance-0",
DestinationServiceName: "mysql",
},
Connect: &pbservice.ServiceConnect{
PeerMeta: &pbservice.PeeringServiceMeta{
SNI: []string{
"mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul",
},
SpiffeID: []string{
"spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql",
"spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1",
},
Protocol: "grpc",
},
},
},
}, res.Nodes[0])
},
)
})
testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
backend.deleteService(t, "bar", mysql2.Service.ID) backend.deleteService(t, "bar", mysql2.Service.ID)
@ -514,6 +664,11 @@ func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
store: store, store: store,
} }
backend.ensureCAConfig(t, &structs.CAConfiguration{
Provider: "consul",
ClusterID: connect.TestClusterID,
})
// Create some placeholder data to ensure raft index > 0 // Create some placeholder data to ensure raft index > 0
// //
// TODO(peering): is there some extremely subtle max-index table reading bug in play? // TODO(peering): is there some extremely subtle max-index table reading bug in play?
@ -545,6 +700,12 @@ func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs.
return b.lastIdx return b.lastIdx
} }
func (b *testSubscriptionBackend) deleteConfigEntry(t *testing.T, kind, name string) uint64 {
b.lastIdx++
require.NoError(t, b.store.DeleteConfigEntry(b.lastIdx, kind, name, nil))
return b.lastIdx
}
func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 { func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 {
b.lastIdx++ b.lastIdx++
require.NoError(t, b.store.EnsureNode(b.lastIdx, node)) require.NoError(t, b.store.EnsureNode(b.lastIdx, node))

View File

@ -26,7 +26,7 @@ type subscriptionState struct {
exportList *structs.ExportedServiceList exportList *structs.ExportedServiceList
watchedServices map[structs.ServiceName]context.CancelFunc watchedServices map[structs.ServiceName]context.CancelFunc
connectServices map[structs.ServiceName]string // value:protocol connectServices map[structs.ServiceName]structs.ExportedDiscoveryChainInfo
// eventVersions is a duplicate event suppression system keyed by the "id" // eventVersions is a duplicate event suppression system keyed by the "id"
// not the "correlationID" // not the "correlationID"
@ -48,7 +48,7 @@ func newSubscriptionState(peerName, partition string) *subscriptionState {
peerName: peerName, peerName: peerName,
partition: partition, partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc), watchedServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]string), connectServices: make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo),
eventVersions: make(map[string]string), eventVersions: make(map[string]string),
} }
} }

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
) )
@ -290,3 +289,7 @@ func (t *DiscoveryTarget) String() string {
func (t *DiscoveryTarget) ServiceID() ServiceID { func (t *DiscoveryTarget) ServiceID() ServiceID {
return NewServiceID(t.Service, t.GetEnterpriseMetadata()) return NewServiceID(t.Service, t.GetEnterpriseMetadata())
} }
func (t *DiscoveryTarget) ServiceName() ServiceName {
return NewServiceName(t.Service, t.GetEnterpriseMetadata())
}

View File

@ -19,26 +19,54 @@ type ExportedServiceList struct {
// service discovery and service mesh. // service discovery and service mesh.
Services []ServiceName Services []ServiceName
// DiscoChains is a list of exported service that ONLY apply to service mesh. // DiscoChains is a map of service names to their exported discovery chains
DiscoChains []ServiceName // for service mesh purposes as defined in the exported-services
// configuration entry.
DiscoChains map[ServiceName]ExportedDiscoveryChainInfo
}
// TODO(peering): reduce duplication here in the response // NOTE: this is not serialized via msgpack so it can be changed without concern.
ConnectProtocol map[ServiceName]string type ExportedDiscoveryChainInfo struct {
// Protocol is the overall protocol associated with this discovery chain.
Protocol string
// TCPTargets is the list of discovery chain targets that are reachable by
// this discovery chain.
//
// NOTE: this is only populated if Protocol=tcp.
TCPTargets []*DiscoveryTarget
}
func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool {
switch {
case i.Protocol != o.Protocol:
return false
case len(i.TCPTargets) != len(o.TCPTargets):
return false
}
for j := 0; j < len(i.TCPTargets); j++ {
if i.TCPTargets[j].ID != o.TCPTargets[j].ID {
return false
}
}
return true
} }
// ListAllDiscoveryChains returns all discovery chains (union of Services and // ListAllDiscoveryChains returns all discovery chains (union of Services and
// DiscoChains). // DiscoChains).
func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]string { func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo {
chainsByName := make(map[ServiceName]string) chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo)
if list == nil { if list == nil {
return chainsByName return chainsByName
} }
for _, svc := range list.Services { for _, svc := range list.Services {
chainsByName[svc] = list.ConnectProtocol[svc] chainsByName[svc] = list.DiscoChains[svc]
} }
for _, chainName := range list.DiscoChains { for chainName, info := range list.DiscoChains {
chainsByName[chainName] = list.ConnectProtocol[chainName] chainsByName[chainName] = info
} }
return chainsByName return chainsByName
} }

View File

@ -10,3 +10,14 @@ func SliceOfKeys[K comparable, V any](m map[K]V) []K {
} }
return res return res
} }
func SliceOfValues[K comparable, V any](m map[K]V) []V {
if len(m) == 0 {
return nil
}
res := make([]V, 0, len(m))
for _, v := range m {
res = append(res, v)
}
return res
}

View File

@ -39,3 +39,37 @@ func TestSliceOfKeys(t *testing.T) {
require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m)) require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m))
}) })
} }
func TestSliceOfValues(t *testing.T) {
t.Run("string to int", func(t *testing.T) {
m := make(map[string]int)
require.Equal(t, []int(nil), SliceOfValues(m))
m["foo"] = 5
m["bar"] = 6
require.ElementsMatch(t, []int{5, 6}, SliceOfValues(m))
})
type blah struct {
V string
}
t.Run("int to struct", func(t *testing.T) {
m := make(map[int]blah)
require.Equal(t, []blah(nil), SliceOfValues(m))
m[5] = blah{V: "foo"}
m[6] = blah{V: "bar"}
require.ElementsMatch(t, []blah{{V: "foo"}, {V: "bar"}}, SliceOfValues(m))
})
type id struct {
Name string
}
t.Run("struct to struct pointer", func(t *testing.T) {
m := make(map[id]*blah)
require.Equal(t, []*blah(nil), SliceOfValues(m))
m[id{Name: "foo"}] = &blah{V: "oof"}
m[id{Name: "bar"}] = &blah{V: "rab"}
require.ElementsMatch(t, []*blah{{V: "oof"}, {V: "rab"}}, SliceOfValues(m))
})
}