diff --git a/agent/agent.go b/agent/agent.go index 47d16a10e..c565919c8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -647,6 +647,7 @@ func (a *Agent) Start(ctx context.Context) error { IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache), InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache), LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache), + PeeredUpstreams: proxycfgglue.CachePeeredUpstreams(a.cache), PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache), ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache), ServiceList: proxycfgglue.CacheServiceList(a.cache), diff --git a/agent/cache-types/intention_upstreams.go b/agent/cache-types/intention_upstreams.go index d3f662b08..489bf4cd9 100644 --- a/agent/cache-types/intention_upstreams.go +++ b/agent/cache-types/intention_upstreams.go @@ -10,7 +10,7 @@ import ( // Recommended name for registration. const IntentionUpstreamsName = "intention-upstreams" -// GatewayUpstreams supports fetching upstreams for a given gateway name. +// IntentionUpstreams supports fetching upstreams for a given service name. type IntentionUpstreams struct { RegisterOptionsBlockingRefresh RPC RPC diff --git a/agent/cache-types/peered_upstreams.go b/agent/cache-types/peered_upstreams.go new file mode 100644 index 000000000..8e8f9001a --- /dev/null +++ b/agent/cache-types/peered_upstreams.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const PeeredUpstreamsName = "peered-upstreams" + +// PeeredUpstreams supports fetching imported upstream candidates of a given partition. +type PeeredUpstreams struct { + RegisterOptionsBlockingRefresh + RPC RPC +} + +func (i *PeeredUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + reqReal, ok := req.(*structs.PartitionSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Lightweight copy this object so that manipulating QueryOptions doesn't race. + dup := *reqReal + reqReal = &dup + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and end up arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedPeeredServiceList + if err := i.RPC.RPC("Internal.PeeredUpstreams", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} diff --git a/agent/cache-types/peered_upstreams_test.go b/agent/cache-types/peered_upstreams_test.go new file mode 100644 index 000000000..31312f4fa --- /dev/null +++ b/agent/cache-types/peered_upstreams_test.go @@ -0,0 +1,60 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +func TestPeeredUpstreams(t *testing.T) { + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &PeeredUpstreams{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedPeeredServiceList + rpc.On("RPC", "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.PartitionSpecificRequest) + require.Equal(t, uint64(24), req.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*structs.IndexedPeeredServiceList) + reply.Index = 48 + resp = reply + }) + + // Fetch + result, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.PartitionSpecificRequest{ + Datacenter: "dc1", + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, result) +} + +func TestPeeredUpstreams_badReqType(t *testing.T) { + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &PeeredUpstreams{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") +} diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 75048b913..a817880f2 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -1209,6 +1210,104 @@ func registerTestRoutingConfigTopologyEntries(t *testing.T, codec rpc.ClientCode } } +func registerLocalAndRemoteServicesVIPEnabled(t *testing.T, state *state.Store) { + t.Helper() + + _, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled) + require.NoError(t, err) + require.NotNil(t, entry) + require.Equal(t, "true", entry.Value) + + // Register a local connect-native service + require.NoError(t, state.EnsureRegistration(10, &structs.RegisterRequest{ + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "api", + Connect: structs.ServiceConnect{ + Native: true, + }, + }, + })) + // Should be assigned VIP + psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)} + vip, err := state.VirtualIPForService(psn) + require.NoError(t, err) + require.Equal(t, "240.0.0.1", vip) + + // Register an imported service and its proxy + require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{ + Node: "bar", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + Service: "web", + ID: "web-1", + }, + PeerName: "peer-a", + })) + require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{ + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + }, + LocallyRegisteredAsSidecar: true, + }, + PeerName: "peer-a", + })) + // Should be assigned one VIP for the real service name + psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web", nil)} + vip, err = state.VirtualIPForService(psn) + require.NoError(t, err) + require.Equal(t, "240.0.0.2", vip) + // web-proxy should not have a VIP + psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web-proxy", nil)} + vip, err = state.VirtualIPForService(psn) + require.NoError(t, err) + require.Empty(t, vip) + + // Register an imported service and its proxy from another peer + require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{ + Node: "gir", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + Service: "web", + ID: "web-1", + }, + PeerName: "peer-b", + })) + require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{ + Node: "gir", + Address: "127.0.0.3", + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + }, + LocallyRegisteredAsSidecar: true, + }, + PeerName: "peer-b", + })) + // Should be assigned one VIP for the real service name + psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web", nil)} + vip, err = state.VirtualIPForService(psn) + require.NoError(t, err) + require.Equal(t, "240.0.0.3", vip) + // web-proxy should not have a VIP + psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web-proxy", nil)} + vip, err = state.VirtualIPForService(psn) + require.NoError(t, err) + require.Empty(t, vip) +} + func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) { t.Helper() diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 5ed07524e..a041c7eeb 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -292,7 +292,7 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply * }) } -// IntentionUpstreams returns the upstreams of a service. Upstreams are inferred from intentions. +// IntentionUpstreams returns a service's upstreams which are inferred from intentions. // If intentions allow a connection from the target to some candidate service, the candidate service is considered // an upstream of the target. func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error { @@ -309,9 +309,9 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl return m.internalUpstreams(args, reply, structs.IntentionTargetService) } -// IntentionUpstreamsDestination returns the upstreams of a service. Upstreams are inferred from intentions. +// IntentionUpstreamsDestination returns a service's upstreams which are inferred from intentions. // If intentions allow a connection from the target to some candidate destination, the candidate destination is considered -// an upstream of the target.this is performs the same logic as IntentionUpstreams endpoint but for destination upstreams only. +// an upstream of the target. This performs the same logic as IntentionUpstreams endpoint but for destination upstreams only. func (m *Internal) IntentionUpstreamsDestination(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error { // Exit early if Connect hasn't been enabled. if !m.srv.config.ConnectEnabled { @@ -571,6 +571,49 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply }) } +// PeeredUpstreams returns all imported services as upstreams for any service in a given partition. +// Cluster peering does not replicate intentions so all imported services are considered potential upstreams. +func (m *Internal) PeeredUpstreams(args *structs.PartitionSpecificRequest, reply *structs.IndexedPeeredServiceList) error { + // Exit early if Connect hasn't been enabled. + if !m.srv.config.ConnectEnabled { + return ErrConnectNotEnabled + } + if done, err := m.srv.ForwardRPC("Internal.PeeredUpstreams", args, reply); done { + return err + } + + // TODO(peering): ACL for filtering + // authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) + // if err != nil { + // return err + // } + + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + + return m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, vips, err := state.VirtualIPsForAllImportedServices(ws, args.EnterpriseMeta) + if err != nil { + return err + } + + result := make([]structs.PeeredServiceName, 0, len(vips)) + for _, vip := range vips { + result = append(result, vip.Service) + } + + reply.Index, reply.Services = index, result + + // TODO(peering): low priority: consider ACL filtering + // m.srv.filterACLWithAuthorizer(authz, reply) + return nil + }) +} + // EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC // call to fire an event. The primary use case is to enable user events being // triggered in a remote DC. diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index d24f08d1b..3f9ff1832 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -2776,3 +2776,40 @@ func TestInternal_CatalogOverview_ACLDeny(t *testing.T) { arg.Token = opReadToken.SecretID require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out)) } + +func TestInternal_PeeredUpstreams(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + _, s1 := testServerWithConfig(t) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + state := s1.fsm.State() + + // Services + // api local + // web peer: peer-a + // web-proxy peer: peer-a + // web peer: peer-b + // web-proxy peer: peer-b + registerLocalAndRemoteServicesVIPEnabled(t, state) + + codec := rpcClient(t, s1) + + args := structs.PartitionSpecificRequest{ + Datacenter: "dc1", + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + } + var out structs.IndexedPeeredServiceList + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.PeeredUpstreams", &args, &out)) + + require.Len(t, out.Services, 2) + expect := []structs.PeeredServiceName{ + {Peer: "peer-a", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())}, + {Peer: "peer-b", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())}, + } + require.Equal(t, expect, out.Services) +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 2777b2fd1..2f116ab0f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -117,7 +117,13 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { } func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error { - return s.tx.Insert(tableServiceVirtualIPs, req) + if err := s.tx.Insert(tableServiceVirtualIPs, req); err != nil { + return err + } + if err := updateVirtualIPMaxIndexes(s.tx, req.ModifyIndex, req.Service.ServiceName.PartitionOrDefault(), req.Service.Peer); err != nil { + return err + } + return nil } func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error { @@ -898,7 +904,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta} psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn} - vip, err := assignServiceVirtualIP(tx, psn) + vip, err := assignServiceVirtualIP(tx, idx, psn) if err != nil { return fmt.Errorf("failed updating virtual IP: %s", err) } @@ -923,7 +929,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool } if conf != nil { termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry) - addrs, err := getTermGatewayVirtualIPs(tx, termGatewayConf.Services, &svc.EnterpriseMeta) + addrs, err := getTermGatewayVirtualIPs(tx, idx, termGatewayConf.Services, &svc.EnterpriseMeta) if err != nil { return err } @@ -978,7 +984,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool // assignServiceVirtualIP assigns a virtual IP to the target service and updates // the global virtual IP counter if necessary. -func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, error) { +func assignServiceVirtualIP(tx WriteTxn, idx uint64, psn structs.PeeredServiceName) (string, error) { serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn) if err != nil { return "", fmt.Errorf("failed service virtual IP lookup: %s", err) @@ -1052,10 +1058,17 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, assignedVIP := ServiceVirtualIP{ Service: psn, IP: newEntry.IP, + RaftIndex: structs.RaftIndex{ + ModifyIndex: idx, + CreateIndex: idx, + }, } if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil { return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err) } + if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil { + return "", err + } result, err := addIPOffset(startingVirtualIP, assignedVIP.IP) if err != nil { @@ -1064,6 +1077,20 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, return result.String(), nil } +func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error { + // update per-partition max index + if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil { + return fmt.Errorf("failed while updating partitioned index: %w", err) + } + if peerName != "" { + // track a separate max index for imported services + if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", partition)); err != nil { + return fmt.Errorf("failed while updating partitioned index for imported services: %w", err) + } + } + return nil +} + func addIPOffset(a, b net.IP) (net.IP, error) { a4 := a.To4() b4 := b.To4() @@ -2899,6 +2926,35 @@ func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, erro return result.String(), nil } +// VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all +// VirtualIP-assignable services that have been imported by the partition represented in entMeta. +// Namespace is ignored. +func (s *Store) VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []ServiceVirtualIP, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + q := Query{ + EnterpriseMeta: entMeta, + // Wildcard peername is used by prefix index to fetch all remote peers for a partition. + PeerName: "*", + } + iter, err := tx.Get(tableServiceVirtualIPs, indexID+"_prefix", q) + if err != nil { + return 0, nil, fmt.Errorf("failed service virtual IP lookup: %s", err) + } + ws.Add(iter.WatchCh()) + + idx := maxIndexTxn(tx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", entMeta.PartitionOrDefault())) + + var vips []ServiceVirtualIP + for raw := iter.Next(); raw != nil; raw = iter.Next() { + vip := raw.(ServiceVirtualIP) + vips = append(vips, vip) + } + + return idx, vips, nil +} + func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -3333,13 +3389,18 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en return nil } -func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, entMeta *acl.EnterpriseMeta) (map[string]structs.ServiceAddress, error) { +func getTermGatewayVirtualIPs( + tx WriteTxn, + idx uint64, + services []structs.LinkedService, + entMeta *acl.EnterpriseMeta, +) (map[string]structs.ServiceAddress, error) { addrs := make(map[string]structs.ServiceAddress, len(services)) for _, s := range services { sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta} // Terminating Gateways cannot route to services in peered clusters psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword} - vip, err := assignServiceVirtualIP(tx, psn) + vip, err := assignServiceVirtualIP(tx, idx, psn) if err != nil { return nil, err } @@ -3353,7 +3414,7 @@ func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, ent func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *acl.EnterpriseMeta) error { // Build the current map of services with virtual IPs for this gateway services := conf.Services - addrs, err := getTermGatewayVirtualIPs(tx, services, entMeta) + addrs, err := getTermGatewayVirtualIPs(tx, idx, services, entMeta) if err != nil { return err } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index ee3a9e487..b7632f295 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -303,7 +303,12 @@ func updateKindServiceNamesIndex(tx WriteTxn, idx uint64, kind structs.ServiceKi func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) { peer := structs.LocalPeerKeyword if psn.Peer != "" { - peer = psn.Peer + // This prefix is unusual but necessary for reads which want + // to isolate peered resources. + // This allows you to prefix query for "peer:": + // internal/name + // peer:peername/name + peer = "peer:" + psn.Peer } var b indexBuilder diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 36d15b954..2603e85b7 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -700,6 +700,21 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase { source: obj, expected: []byte("internal\x00foo\x00"), }, + prefix: []indexValue{ + { + source: Query{ + Value: "foo", + }, + expected: []byte("internal\x00foo\x00"), + }, + { + source: Query{ + Value: "foo", + PeerName: "*", // test wildcard PeerName + }, + expected: []byte("peer:"), + }, + }, extra: []indexerTestCase{ { read: indexValue{ @@ -709,11 +724,11 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase { }, Peer: "Billing", }, - expected: []byte("billing\x00foo\x00"), + expected: []byte("peer:billing\x00foo\x00"), }, write: indexValue{ source: peeredObj, - expected: []byte("billing\x00foo\x00"), + expected: []byte("peer:billing\x00foo\x00"), }, }, }, diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index d77487c4a..5d0fe438c 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -607,6 +607,8 @@ func (q NodeCheckQuery) PartitionOrDefault() string { type ServiceVirtualIP struct { Service structs.PeeredServiceName IP net.IP + + structs.RaftIndex } // FreeVirtualIP is used to store a virtual IP freed up by a service deregistration. @@ -631,9 +633,11 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: indexerSingle[structs.PeeredServiceName, ServiceVirtualIP]{ + Indexer: indexerSingleWithPrefix[structs.PeeredServiceName, ServiceVirtualIP, Query]{ readIndex: indexFromPeeredServiceName, writeIndex: indexFromServiceVirtualIP, + // Read all peers in a cluster / partition + prefixIndex: prefixIndexFromQueryWithPeerWildcardable, }, }, }, diff --git a/agent/consul/state/query_oss.go b/agent/consul/state/query_oss.go index e36850fe0..d68a9b258 100644 --- a/agent/consul/state/query_oss.go +++ b/agent/consul/state/query_oss.go @@ -52,6 +52,29 @@ func prefixIndexFromQueryWithPeer(arg any) ([]byte, error) { return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg) } +// prefixIndexFromQueryWithPeerWildcardable allows for a wildcard "*" peerName +// to query for all peers _excluding_ structs.LocalPeerKeyword. +// Assumes that non-local peers are prefixed with "peer:". +func prefixIndexFromQueryWithPeerWildcardable(v Query) ([]byte, error) { + var b indexBuilder + + peername := v.PeerOrEmpty() + if peername == "" { + b.String(strings.ToLower(structs.LocalPeerKeyword)) + } else if peername == "*" { + // use b.Raw so we don't add null terminator to prefix + b.Raw([]byte("peer:")) + return b.Bytes(), nil + } else { + b.String(strings.ToLower("peer:" + peername)) + } + + if v.Value != "" { + b.String(strings.ToLower(v.Value)) + } + return b.Bytes(), nil +} + func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) { return prefixIndexFromQuery(arg) } diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index c047da48f..9a85c633f 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -83,6 +83,12 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate { return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName} } +// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface +// by sourcing data from the agent cache. +func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams { + return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName} +} + // CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by // sourcing data from the agent cache. func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index 5e2da0ad2..a55f592d3 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -69,6 +69,8 @@ type DataSources struct { // notification channel. LeafCertificate LeafCertificate + PeeredUpstreams PeeredUpstreams + // PreparedQuery provides updates about the results of a prepared query. PreparedQuery PreparedQuery @@ -170,6 +172,12 @@ type LeafCertificate interface { Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error } +// PeeredUpstreams is the interface used to consume updates about upstreams +// for all peered targets in a given partition. +type PeeredUpstreams interface { + Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- UpdateEvent) error +} + // PreparedQuery is the interface used to consume updates about the results of // a prepared query. type PreparedQuery interface { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 4039b547b..275bf4c18 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -682,6 +682,30 @@ func (r *ServiceDumpRequest) CacheMinIndex() uint64 { return r.QueryOptions.MinQueryIndex } +// PartitionSpecificRequest is used to query about a specific partition. +type PartitionSpecificRequest struct { + Datacenter string + + acl.EnterpriseMeta + QueryOptions +} + +func (r *PartitionSpecificRequest) RequestDatacenter() string { + return r.Datacenter +} + +func (r *PartitionSpecificRequest) CacheInfo() cache.RequestInfo { + return cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + Key: r.EnterpriseMeta.PartitionOrDefault(), + } +} + // ServiceSpecificRequest is used to query about a specific service type ServiceSpecificRequest struct { Datacenter string @@ -2211,6 +2235,11 @@ type IndexedServiceList struct { QueryMeta } +type IndexedPeeredServiceList struct { + Services []PeeredServiceName + QueryMeta +} + type IndexedServiceNodes struct { ServiceNodes ServiceNodes QueryMeta