diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index f0e4c069e..02731d1b0 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -506,6 +506,25 @@ var serviceGraphKinds = []string{ // discoveryChainTargets will return a list of services listed as a target for the input's discovery chain func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, service, entMeta) + if err != nil { + return 0, nil, err + } + + var resp []structs.ServiceName + for _, t := range targets { + em := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), t.Namespace) + target := structs.NewServiceName(t.Service, &em) + + // TODO (freddy): Allow upstream DC and encode in response + if t.Datacenter == dc { + resp = append(resp, target) + } + } + return idx, resp, nil +} + +func (s *Store) discoveryChainOriginalTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *acl.EnterpriseMeta) (uint64, []*structs.DiscoveryTarget, error) { source := structs.NewServiceName(service, entMeta) req := discoverychain.CompileRequest{ ServiceName: source.Name, @@ -518,17 +537,7 @@ func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, serv return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) } - var resp []structs.ServiceName - for _, t := range chain.Targets { - em := acl.NewEnterpriseMetaWithPartition(entMeta.PartitionOrDefault(), t.Namespace) - target := structs.NewServiceName(t.Service, &em) - - // TODO (freddy): Allow upstream DC and encode in response - if t.Datacenter == dc { - resp = append(resp, target) - } - } - return idx, resp, nil + return idx, maps.SliceOfValues(chain.Targets), nil } // discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 78595e4e6..34f99d06b 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -3,6 +3,7 @@ package state import ( "errors" "fmt" + "sort" "strings" "github.com/golang/protobuf/proto" @@ -309,7 +310,7 @@ func (s *Store) PeeringTerminateByID(idx uint64, id string) error { // gateway's config entry, which we wouldn't want to replicate. How would // client peers know to route through terminating gateways when they're not // dialing through a remote mesh gateway? -func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) { +func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc string) (uint64, *structs.ExportedServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() @@ -321,7 +322,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6 return 0, &structs.ExportedServiceList{}, nil } - return s.exportedServicesForPeerTxn(ws, tx, peering) + return s.exportedServicesForPeerTxn(ws, tx, peering, dc) } func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) { @@ -335,7 +336,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl out := make(map[string]structs.ServiceList) for _, peering := range peerings { - idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering) + idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering, "") if err != nil { return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err) } @@ -351,7 +352,11 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl return maxIdx, out, nil } -func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) { +// exportedServicesForPeerTxn will find all services that are exported to a +// specific peering, and optionally include information about discovery chain +// reachable targets for these exported services if the "dc" parameter is +// specified. +func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering, dc string) (uint64, *structs.ExportedServiceList, error) { maxIdx := peering.ModifyIndex entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition) @@ -437,42 +442,63 @@ func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peerin normal := maps.SliceOfKeys(normalSet) disco := maps.SliceOfKeys(discoSet) - structs.ServiceList(normal).Sort() - structs.ServiceList(disco).Sort() - - serviceProtocols := make(map[structs.ServiceName]string) - populateProtocol := func(svc structs.ServiceName) error { - if _, ok := serviceProtocols[svc]; ok { + chainInfo := make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo) + populateChainInfo := func(svc structs.ServiceName) error { + if _, ok := chainInfo[svc]; ok { return nil // already processed } + var info structs.ExportedDiscoveryChainInfo + idx, protocol, err := protocolForService(tx, ws, svc) if err != nil { - return fmt.Errorf("failed to get protocol for service: %w", err) + return fmt.Errorf("failed to get protocol for service %q: %w", svc, err) } if idx > maxIdx { maxIdx = idx } + info.Protocol = protocol - serviceProtocols[svc] = protocol + if dc != "" && !structs.IsProtocolHTTPLike(protocol) { + // We only need to populate the targets for replication purposes for L4 protocols, which + // do not ultimately get intercepted by the mesh gateways. + idx, targets, err := s.discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err) + } + + if idx > maxIdx { + maxIdx = idx + } + + sort.Slice(targets, func(i, j int) bool { + return targets[i].ID < targets[j].ID + }) + + info.TCPTargets = targets + } + + chainInfo[svc] = info return nil } + for _, svc := range normal { - if err := populateProtocol(svc); err != nil { + if err := populateChainInfo(svc); err != nil { return 0, nil, err } } for _, svc := range disco { - if err := populateProtocol(svc); err != nil { + if err := populateChainInfo(svc); err != nil { return 0, nil, err } } + structs.ServiceList(normal).Sort() + list := &structs.ExportedServiceList{ - Services: normal, - DiscoChains: disco, - ConnectProtocol: serviceProtocols, + Services: normal, + DiscoChains: chainInfo, } return maxIdx, list, nil @@ -521,25 +547,44 @@ func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, en return maxIdx, peerings, nil } -// TrustBundleListByService returns the trust bundles for all peers that the given service is exported to. -func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { +// TrustBundleListByService returns the trust bundles for all peers that the +// given service is exported to, via a discovery chain target. +func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) { tx := s.db.ReadTxn() defer tx.Abort() - maxIdx, peers, err := peeringsForServiceTxn(tx, ws, service, entMeta) + realSvc := structs.NewServiceName(service, &entMeta) + + maxIdx, chainNames, err := s.discoveryChainSourcesTxn(tx, ws, dc, realSvc) if err != nil { - return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", service, err) + return 0, nil, fmt.Errorf("failed to list all discovery chains referring to %q: %w", realSvc, err) } + peerNames := make(map[string]struct{}) + for _, chainSvc := range chainNames { + idx, peers, err := peeringsForServiceTxn(tx, ws, chainSvc.Name, chainSvc.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get peers for service %s: %v", chainSvc, err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, peer := range peers { + peerNames[peer.Name] = struct{}{} + } + } + peerNamesSlice := maps.SliceOfKeys(peerNames) + sort.Strings(peerNamesSlice) + var resp []*pbpeering.PeeringTrustBundle - for _, peer := range peers { + for _, peerName := range peerNamesSlice { pq := Query{ - Value: strings.ToLower(peer.Name), + Value: strings.ToLower(peerName), EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(entMeta.PartitionOrDefault()), } idx, trustBundle, err := peeringTrustBundleReadTxn(tx, ws, pq) if err != nil { - return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peer.Name, err) + return 0, nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peerName, err) } if idx > maxIdx { maxIdx = idx @@ -548,6 +593,7 @@ func (s *Store) TrustBundleListByService(ws memdb.WatchSet, service string, entM resp = append(resp, trustBundle) } } + return maxIdx, resp, nil } diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 04389a8e9..0f7e6dc9d 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/prototest" @@ -674,6 +675,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { var lastIdx uint64 + ca := &structs.CAConfiguration{ + Provider: "consul", + ClusterID: connect.TestClusterID, + } + lastIdx++ + require.NoError(t, s.CASetConfig(lastIdx, ca)) + lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ ID: testUUID(), @@ -705,10 +713,18 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { require.NoError(t, s.EnsureConfigEntry(lastIdx, entry)) } + newTarget := func(service, serviceSubset, datacenter string) *structs.DiscoveryTarget { + t := structs.NewDiscoveryTarget(service, serviceSubset, "default", "default", datacenter) + t.SNI = connect.TargetSNI(t, connect.TestTrustDomain) + t.Name = t.SNI + t.ConnectTimeout = 5 * time.Second // default + return t + } + testutil.RunStep(t, "no exported services", func(t *testing.T) { expect := &structs.ExportedServiceList{} - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -754,13 +770,23 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, - ConnectProtocol: map[structs.ServiceName]string{ - newSN("mysql"): "tcp", - newSN("redis"): "tcp", + DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("mysql"): { + Protocol: "tcp", + TCPTargets: []*structs.DiscoveryTarget{ + newTarget("mysql", "", "dc1"), + }, + }, + newSN("redis"): { + Protocol: "tcp", + TCPTargets: []*structs.DiscoveryTarget{ + newTarget("redis", "", "dc1"), + }, + }, }, } - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -800,11 +826,16 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { EnterpriseMeta: *defaultEntMeta, }, }, - ConnectProtocol: map[structs.ServiceName]string{ - newSN("billing"): "tcp", + DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("billing"): { + Protocol: "tcp", + TCPTargets: []*structs.DiscoveryTarget{ + newTarget("billing", "", "dc1"), + }, + }, }, } - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -869,29 +900,25 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { }, // NOTE: no payments-proxy here }, - DiscoChains: []structs.ServiceName{ - { - Name: "resolver", - EnterpriseMeta: *defaultEntMeta, + DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("billing"): { + Protocol: "http", }, - { - Name: "router", - EnterpriseMeta: *defaultEntMeta, + newSN("payments"): { + Protocol: "http", }, - { - Name: "splitter", - EnterpriseMeta: *defaultEntMeta, + newSN("resolver"): { + Protocol: "http", + }, + newSN("router"): { + Protocol: "http", + }, + newSN("splitter"): { + Protocol: "http", }, - }, - ConnectProtocol: map[structs.ServiceName]string{ - newSN("billing"): "http", - newSN("payments"): "http", - newSN("resolver"): "http", - newSN("router"): "http", - newSN("splitter"): "http", }, } - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -915,23 +942,19 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { }, // NOTE: no payments-proxy here }, - DiscoChains: []structs.ServiceName{ - { - Name: "resolver", - EnterpriseMeta: *defaultEntMeta, + DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{ + newSN("payments"): { + Protocol: "http", }, - { - Name: "router", - EnterpriseMeta: *defaultEntMeta, + newSN("resolver"): { + Protocol: "http", + }, + newSN("router"): { + Protocol: "http", }, - }, - ConnectProtocol: map[structs.ServiceName]string{ - newSN("payments"): "http", - newSN("resolver"): "http", - newSN("router"): "http", }, } - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -941,7 +964,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { expect := &structs.ExportedServiceList{} require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", defaultEntMeta)) - idx, got, err := s.ExportedServicesForPeer(ws, id) + idx, got, err := s.ExportedServicesForPeer(ws, id, "dc1") require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Equal(t, expect, got) @@ -1218,15 +1241,22 @@ func TestStore_TrustBundleListByService(t *testing.T) { entMeta := *acl.DefaultEnterpriseMeta() var lastIdx uint64 - ws := memdb.NewWatchSet() + + ca := &structs.CAConfiguration{ + Provider: "consul", + ClusterID: connect.TestClusterID, + } + lastIdx++ + require.NoError(t, store.CASetConfig(lastIdx, ca)) var ( peerID1 = testUUID() peerID2 = testUUID() ) + ws := memdb.NewWatchSet() testutil.RunStep(t, "no results on initial setup", func(t *testing.T) { - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 0) @@ -1248,7 +1278,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.False(t, watchFired(ws)) - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Len(t, resp, 0) require.Equal(t, lastIdx-2, idx) @@ -1264,10 +1294,10 @@ func TestStore_TrustBundleListByService(t *testing.T) { // The peering is only watched after the service is exported via config entry. require.False(t, watchFired(ws)) - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) - require.Equal(t, uint64(0), idx) require.Len(t, resp, 0) + require.Equal(t, lastIdx-3, idx) }) testutil.RunStep(t, "exporting the service does not yield trust bundles", func(t *testing.T) { @@ -1290,7 +1320,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 0) @@ -1307,7 +1337,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 1) @@ -1321,7 +1351,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 0) @@ -1346,7 +1376,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 1) @@ -1371,7 +1401,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.False(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx-2, idx) require.Len(t, resp, 1) @@ -1400,7 +1430,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 2) @@ -1419,7 +1449,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.True(t, watchFired(ws)) ws = memdb.NewWatchSet() - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx, idx) require.Len(t, resp, 1) @@ -1432,7 +1462,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { require.False(t, watchFired(ws)) - idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) + idx, resp, err := store.TrustBundleListByService(ws, "foo", "dc1", entMeta) require.NoError(t, err) require.Equal(t, lastIdx-1, idx) require.Len(t, resp, 1) diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 5638702aa..fd5e474d4 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -130,12 +130,12 @@ type Store interface { PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error) PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error) PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) - ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error) + ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error) NodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServices, error) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) - TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) + TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) AbandonCh() <-chan struct{} } @@ -533,7 +533,7 @@ func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.T switch { case req.ServiceName != "": - idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, entMeta) + idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, s.config.Datacenter, entMeta) case req.Kind == string(structs.ServiceKindMeshGateway): idx, bundles, err = s.Backend.Store().PeeringTrustBundleList(nil, entMeta) case req.Kind != "": diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index af089f56c..48e45e27b 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -490,8 +490,8 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { resp, err := client.TrustBundleListByService(context.Background(), &req) require.NoError(t, err) require.Len(t, resp.Bundles, 2) - require.Equal(t, []string{"foo-root-1"}, resp.Bundles[0].RootPEMs) - require.Equal(t, []string{"bar-root-1"}, resp.Bundles[1].RootPEMs) + require.Equal(t, []string{"bar-root-1"}, resp.Bundles[0].RootPEMs) + require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs) } func Test_StreamHandler_UpsertServices(t *testing.T) { @@ -912,7 +912,10 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { testrpc.WaitForLeader(t, server.RPC, conf.Datacenter) backend := consul.NewPeeringBackend(server, deps.GRPCConnPool) - handler := &peering.Service{Backend: backend} + handler := peering.NewService(testutil.Logger(t), peering.Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, backend) grpcServer := gogrpc.NewServer() pbpeering.RegisterPeeringServiceServer(grpcServer, handler) diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 9bc8eff4e..84b750723 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -655,6 +655,18 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { lastIdx++ require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) + mongoSvcDefaults := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "mongo", + Protocol: "grpc", + } + require.NoError(t, mongoSvcDefaults.Normalize()) + require.NoError(t, mongoSvcDefaults.Validate()) + lastIdx++ + require.NoError(t, store.EnsureConfigEntry(lastIdx, mongoSvcDefaults)) + + // NOTE: for this test we'll just live in a fantasy realm where we assume + // that mongo understands gRPC var ( mongoSN = structs.NewServiceName("mongo", nil).String() mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() @@ -681,6 +693,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { }, }, } + require.NoError(t, entry.Normalize()) + require.NoError(t, entry.Validate()) lastIdx++ require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) @@ -744,8 +758,13 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.Len(t, nodes.Nodes, 1) - svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo" - require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID) + pm := nodes.Nodes[0].Service.Connect.PeerMeta + require.Equal(t, "grpc", pm.Protocol) + spiffeIDs := []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo", + "spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1", + } + require.Equal(t, spiffeIDs, pm.SpiffeID) }, func(t *testing.T, msg *pbpeering.ReplicationMessage) { require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) @@ -756,8 +775,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.Len(t, nodes.Nodes, 1) - svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql" - require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID) + pm := nodes.Nodes[0].Service.Connect.PeerMeta + require.Equal(t, "tcp", pm.Protocol) + spiffeIDs := []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql", + } + require.Equal(t, spiffeIDs, pm.SpiffeID) }, ) }) @@ -800,6 +823,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { }, }, } + require.NoError(t, entry.Normalize()) + require.NoError(t, entry.Validate()) lastIdx++ err := store.EnsureConfigEntry(lastIdx, entry) require.NoError(t, err) diff --git a/agent/rpc/peering/subscription_blocking.go b/agent/rpc/peering/subscription_blocking.go index 0a8a07d9b..c8f771e8b 100644 --- a/agent/rpc/peering/subscription_blocking.go +++ b/agent/rpc/peering/subscription_blocking.go @@ -23,7 +23,7 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex // match the list of services exported to the peer. m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store Store, ws memdb.WatchSet) (interface{}, error) { // Get exported services for peer id - _, list, err := store.ExportedServicesForPeer(ws, peerID) + _, list, err := store.ExportedServicesForPeer(ws, peerID, m.config.Datacenter) if err != nil { return nil, fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err) } diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index cfccdeb75..9dfa135b5 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -238,8 +238,8 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti if state.exportList != nil { // Trigger public events for all synthetic discovery chain replies. - for chainName, protocol := range state.connectServices { - m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) + for chainName, info := range state.connectServices { + m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info) } } @@ -456,17 +456,17 @@ func (m *subscriptionManager) syncDiscoveryChains( ctx context.Context, state *subscriptionState, pending *pendingPayload, - chainsByName map[structs.ServiceName]string, // TODO(peering):rename variable + chainsByName map[structs.ServiceName]structs.ExportedDiscoveryChainInfo, ) { // if it was newly added, then try to emit an UPDATE event - for chainName, protocol := range chainsByName { - if oldProtocol, ok := state.connectServices[chainName]; ok && protocol == oldProtocol { + for chainName, info := range chainsByName { + if oldInfo, ok := state.connectServices[chainName]; ok && info.Equal(oldInfo) { continue } - state.connectServices[chainName] = protocol + state.connectServices[chainName] = info - m.emitEventForDiscoveryChain(ctx, state, pending, chainName, protocol) + m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info) } // if it was dropped, try to emit an DELETE event @@ -498,7 +498,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain( state *subscriptionState, pending *pendingPayload, chainName structs.ServiceName, - protocol string, + info structs.ExportedDiscoveryChainInfo, ) { if _, ok := state.connectServices[chainName]; !ok { return // not found @@ -519,7 +519,7 @@ func (m *subscriptionManager) emitEventForDiscoveryChain( m.config.Datacenter, m.trustDomain, chainName, - protocol, + info, state.meshGateway, ), ) @@ -532,7 +532,7 @@ func createDiscoChainHealth( peerName string, datacenter, trustDomain string, sn structs.ServiceName, - protocol string, + info structs.ExportedDiscoveryChainInfo, pb *pbservice.IndexedCheckServiceNodes, ) *pbservice.IndexedCheckServiceNodes { fakeProxyName := sn.Name + syntheticProxyNameSuffix @@ -546,6 +546,7 @@ func createDiscoChainHealth( Datacenter: datacenter, Service: sn.Name, } + mainSpiffeIDString := spiffeID.URI().String() sni := connect.PeeredServiceSNI( sn.Name, @@ -559,9 +560,35 @@ func createDiscoChainHealth( // // TODO(peering): should this be replicated by service and not by instance? peerMeta = &pbservice.PeeringServiceMeta{ - SNI: []string{sni}, - SpiffeID: []string{spiffeID.URI().String()}, - Protocol: protocol, + SNI: []string{sni}, + SpiffeID: []string{ + mainSpiffeIDString, + }, + Protocol: info.Protocol, + } + + if structs.IsProtocolHTTPLike(info.Protocol) { + gwSpiffeID := connect.SpiffeIDMeshGateway{ + Host: trustDomain, + Partition: sn.PartitionOrDefault(), + Datacenter: datacenter, + } + + peerMeta.SpiffeID = append(peerMeta.SpiffeID, gwSpiffeID.URI().String()) + } else { + for _, target := range info.TCPTargets { + targetSpiffeID := connect.SpiffeIDService{ + Host: trustDomain, + Partition: target.Partition, + Namespace: target.Namespace, + Datacenter: target.Datacenter, + Service: target.Service, + } + targetSpiffeIDString := targetSpiffeID.URI().String() + if targetSpiffeIDString != mainSpiffeIDString { + peerMeta.SpiffeID = append(peerMeta.SpiffeID, targetSpiffeIDString) + } + } } } diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index d556ff23e..3a5b0b891 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -281,6 +281,156 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { ) }) + testutil.RunStep(t, "peer meta changes when L4 disco chain changes", func(t *testing.T) { + backend.ensureConfigEntry(t, &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "mysql", + Failover: map[string]structs.ServiceResolverFailover{ + "*": { + Service: "failover", + Datacenters: []string{"dc2", "dc3"}, + }, + }, + }) + + // ensure we get updated peer meta + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlProxyCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 1) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("mgw", "10.1.1.1", partition), + Service: &pbservice.NodeService{ + Kind: "connect-proxy", + ID: "mysql-sidecar-proxy-instance-0", + Service: "mysql-sidecar-proxy", + Port: 8443, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, + Proxy: &pbservice.ConnectProxyConfig{ + DestinationServiceID: "mysql-instance-0", + DestinationServiceName: "mysql", + }, + Connect: &pbservice.ServiceConnect{ + PeerMeta: &pbservice.PeeringServiceMeta{ + SNI: []string{ + "mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul", + }, + SpiffeID: []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql", + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc2/svc/failover", + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc3/svc/failover", + }, + Protocol: "tcp", + }, + }, + }, + }, res.Nodes[0]) + }, + ) + + // reset so the next subtest is valid + backend.deleteConfigEntry(t, structs.ServiceResolver, "mysql") + + // ensure we get peer meta is restored + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlProxyCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 1) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("mgw", "10.1.1.1", partition), + Service: &pbservice.NodeService{ + Kind: "connect-proxy", + ID: "mysql-sidecar-proxy-instance-0", + Service: "mysql-sidecar-proxy", + Port: 8443, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, + Proxy: &pbservice.ConnectProxyConfig{ + DestinationServiceID: "mysql-instance-0", + DestinationServiceName: "mysql", + }, + Connect: &pbservice.ServiceConnect{ + PeerMeta: &pbservice.PeeringServiceMeta{ + SNI: []string{ + "mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul", + }, + SpiffeID: []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql", + }, + Protocol: "tcp", + }, + }, + }, + }, res.Nodes[0]) + }, + ) + }) + + testutil.RunStep(t, "peer meta changes when protocol switches from L4 to L7", func(t *testing.T) { + // NOTE: for this test we'll just live in a fantasy realm where we assume + // that mysql understands gRPC + backend.ensureConfigEntry(t, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "mysql", + Protocol: "grpc", + }) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, mysqlProxyCorrID, got.CorrelationID) + res := got.Result.(*pbservice.IndexedCheckServiceNodes) + require.Equal(t, uint64(0), res.Index) + + require.Len(t, res.Nodes, 1) + prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{ + Node: pbNode("mgw", "10.1.1.1", partition), + Service: &pbservice.NodeService{ + Kind: "connect-proxy", + ID: "mysql-sidecar-proxy-instance-0", + Service: "mysql-sidecar-proxy", + Port: 8443, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, + Proxy: &pbservice.ConnectProxyConfig{ + DestinationServiceID: "mysql-instance-0", + DestinationServiceName: "mysql", + }, + Connect: &pbservice.ServiceConnect{ + PeerMeta: &pbservice.PeeringServiceMeta{ + SNI: []string{ + "mysql.default.default.my-peering.external.11111111-2222-3333-4444-555555555555.consul", + }, + SpiffeID: []string{ + "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mysql", + "spiffe://11111111-2222-3333-4444-555555555555.consul/gateway/mesh/dc/dc1", + }, + Protocol: "grpc", + }, + }, + }, + }, res.Nodes[0]) + }, + ) + }) + testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { backend.deleteService(t, "bar", mysql2.Service.ID) @@ -514,6 +664,11 @@ func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend { store: store, } + backend.ensureCAConfig(t, &structs.CAConfiguration{ + Provider: "consul", + ClusterID: connect.TestClusterID, + }) + // Create some placeholder data to ensure raft index > 0 // // TODO(peering): is there some extremely subtle max-index table reading bug in play? @@ -545,6 +700,12 @@ func (b *testSubscriptionBackend) ensureConfigEntry(t *testing.T, entry structs. return b.lastIdx } +func (b *testSubscriptionBackend) deleteConfigEntry(t *testing.T, kind, name string) uint64 { + b.lastIdx++ + require.NoError(t, b.store.DeleteConfigEntry(b.lastIdx, kind, name, nil)) + return b.lastIdx +} + func (b *testSubscriptionBackend) ensureNode(t *testing.T, node *structs.Node) uint64 { b.lastIdx++ require.NoError(t, b.store.EnsureNode(b.lastIdx, node)) diff --git a/agent/rpc/peering/subscription_state.go b/agent/rpc/peering/subscription_state.go index bd9da52e8..dfb6942e2 100644 --- a/agent/rpc/peering/subscription_state.go +++ b/agent/rpc/peering/subscription_state.go @@ -26,7 +26,7 @@ type subscriptionState struct { exportList *structs.ExportedServiceList watchedServices map[structs.ServiceName]context.CancelFunc - connectServices map[structs.ServiceName]string // value:protocol + connectServices map[structs.ServiceName]structs.ExportedDiscoveryChainInfo // eventVersions is a duplicate event suppression system keyed by the "id" // not the "correlationID" @@ -48,7 +48,7 @@ func newSubscriptionState(peerName, partition string) *subscriptionState { peerName: peerName, partition: partition, watchedServices: make(map[structs.ServiceName]context.CancelFunc), - connectServices: make(map[structs.ServiceName]string), + connectServices: make(map[structs.ServiceName]structs.ExportedDiscoveryChainInfo), eventVersions: make(map[string]string), } } diff --git a/agent/structs/discovery_chain.go b/agent/structs/discovery_chain.go index 046ec1c4d..9d4a8ef91 100644 --- a/agent/structs/discovery_chain.go +++ b/agent/structs/discovery_chain.go @@ -6,7 +6,6 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/lib" ) @@ -290,3 +289,7 @@ func (t *DiscoveryTarget) String() string { func (t *DiscoveryTarget) ServiceID() ServiceID { return NewServiceID(t.Service, t.GetEnterpriseMetadata()) } + +func (t *DiscoveryTarget) ServiceName() ServiceName { + return NewServiceName(t.Service, t.GetEnterpriseMetadata()) +} diff --git a/agent/structs/peering.go b/agent/structs/peering.go index 70a59c486..c414b40a3 100644 --- a/agent/structs/peering.go +++ b/agent/structs/peering.go @@ -19,26 +19,54 @@ type ExportedServiceList struct { // service discovery and service mesh. Services []ServiceName - // DiscoChains is a list of exported service that ONLY apply to service mesh. - DiscoChains []ServiceName + // DiscoChains is a map of service names to their exported discovery chains + // for service mesh purposes as defined in the exported-services + // configuration entry. + DiscoChains map[ServiceName]ExportedDiscoveryChainInfo +} - // TODO(peering): reduce duplication here in the response - ConnectProtocol map[ServiceName]string +// NOTE: this is not serialized via msgpack so it can be changed without concern. +type ExportedDiscoveryChainInfo struct { + // Protocol is the overall protocol associated with this discovery chain. + Protocol string + + // TCPTargets is the list of discovery chain targets that are reachable by + // this discovery chain. + // + // NOTE: this is only populated if Protocol=tcp. + TCPTargets []*DiscoveryTarget +} + +func (i ExportedDiscoveryChainInfo) Equal(o ExportedDiscoveryChainInfo) bool { + switch { + case i.Protocol != o.Protocol: + return false + case len(i.TCPTargets) != len(o.TCPTargets): + return false + } + + for j := 0; j < len(i.TCPTargets); j++ { + if i.TCPTargets[j].ID != o.TCPTargets[j].ID { + return false + } + } + + return true } // ListAllDiscoveryChains returns all discovery chains (union of Services and // DiscoChains). -func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]string { - chainsByName := make(map[ServiceName]string) +func (list *ExportedServiceList) ListAllDiscoveryChains() map[ServiceName]ExportedDiscoveryChainInfo { + chainsByName := make(map[ServiceName]ExportedDiscoveryChainInfo) if list == nil { return chainsByName } for _, svc := range list.Services { - chainsByName[svc] = list.ConnectProtocol[svc] + chainsByName[svc] = list.DiscoChains[svc] } - for _, chainName := range list.DiscoChains { - chainsByName[chainName] = list.ConnectProtocol[chainName] + for chainName, info := range list.DiscoChains { + chainsByName[chainName] = info } return chainsByName } diff --git a/lib/maps/maps.go b/lib/maps/maps.go index f65c06619..eea90683c 100644 --- a/lib/maps/maps.go +++ b/lib/maps/maps.go @@ -10,3 +10,14 @@ func SliceOfKeys[K comparable, V any](m map[K]V) []K { } return res } + +func SliceOfValues[K comparable, V any](m map[K]V) []V { + if len(m) == 0 { + return nil + } + res := make([]V, 0, len(m)) + for _, v := range m { + res = append(res, v) + } + return res +} diff --git a/lib/maps/maps_test.go b/lib/maps/maps_test.go index 365abb597..bf9268d7a 100644 --- a/lib/maps/maps_test.go +++ b/lib/maps/maps_test.go @@ -39,3 +39,37 @@ func TestSliceOfKeys(t *testing.T) { require.ElementsMatch(t, []id{{Name: "foo"}, {Name: "bar"}}, SliceOfKeys(m)) }) } + +func TestSliceOfValues(t *testing.T) { + t.Run("string to int", func(t *testing.T) { + m := make(map[string]int) + require.Equal(t, []int(nil), SliceOfValues(m)) + m["foo"] = 5 + m["bar"] = 6 + require.ElementsMatch(t, []int{5, 6}, SliceOfValues(m)) + }) + + type blah struct { + V string + } + + t.Run("int to struct", func(t *testing.T) { + m := make(map[int]blah) + require.Equal(t, []blah(nil), SliceOfValues(m)) + m[5] = blah{V: "foo"} + m[6] = blah{V: "bar"} + require.ElementsMatch(t, []blah{{V: "foo"}, {V: "bar"}}, SliceOfValues(m)) + }) + + type id struct { + Name string + } + + t.Run("struct to struct pointer", func(t *testing.T) { + m := make(map[id]*blah) + require.Equal(t, []*blah(nil), SliceOfValues(m)) + m[id{Name: "foo"}] = &blah{V: "oof"} + m[id{Name: "bar"}] = &blah{V: "rab"} + require.ElementsMatch(t, []*blah{{V: "oof"}, {V: "rab"}}, SliceOfValues(m)) + }) +}