Add internal endpoint to fetch peered upstream candidates from VirtualIP table (#13642)

For initial cluster peering TProxy support we consider all imported services of a partition to be potential upstreams.

We leverage the VirtualIP table because it stores plain service names (e.g. "api", not "api-sidecar-proxy").
This commit is contained in:
Chris S. Kim 2022-06-29 16:34:58 -04:00 committed by GitHub
parent f3bba7c963
commit 25aec40e74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 457 additions and 15 deletions

View file

@ -647,6 +647,7 @@ func (a *Agent) Start(ctx context.Context) error {
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
PeeredUpstreams: proxycfgglue.CachePeeredUpstreams(a.cache),
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),
ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache),
ServiceList: proxycfgglue.CacheServiceList(a.cache),

View file

@ -10,7 +10,7 @@ import (
// Recommended name for registration.
const IntentionUpstreamsName = "intention-upstreams"
// GatewayUpstreams supports fetching upstreams for a given gateway name.
// IntentionUpstreams supports fetching upstreams for a given service name.
type IntentionUpstreams struct {
RegisterOptionsBlockingRefresh
RPC RPC

View file

@ -0,0 +1,51 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const PeeredUpstreamsName = "peered-upstreams"
// PeeredUpstreams supports fetching imported upstream candidates of a given partition.
type PeeredUpstreams struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
func (i *PeeredUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
reqReal, ok := req.(*structs.PartitionSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedPeeredServiceList
if err := i.RPC.RPC("Internal.PeeredUpstreams", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

View file

@ -0,0 +1,60 @@
package cachetype
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestPeeredUpstreams(t *testing.T) {
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &PeeredUpstreams{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedPeeredServiceList
rpc.On("RPC", "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.PartitionSpecificRequest)
require.Equal(t, uint64(24), req.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedPeeredServiceList)
reply.Index = 48
resp = reply
})
// Fetch
result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.PartitionSpecificRequest{
Datacenter: "dc1",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}
func TestPeeredUpstreams_badReqType(t *testing.T) {
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &PeeredUpstreams{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
}

View file

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
@ -1209,6 +1210,104 @@ func registerTestRoutingConfigTopologyEntries(t *testing.T, codec rpc.ClientCode
}
}
func registerLocalAndRemoteServicesVIPEnabled(t *testing.T, state *state.Store) {
t.Helper()
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
require.NoError(t, err)
require.NotNil(t, entry)
require.Equal(t, "true", entry.Value)
// Register a local connect-native service
require.NoError(t, state.EnsureRegistration(10, &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "api",
Connect: structs.ServiceConnect{
Native: true,
},
},
}))
// Should be assigned VIP
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)}
vip, err := state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "240.0.0.1", vip)
// Register an imported service and its proxy
require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
Service: "web",
ID: "web-1",
},
PeerName: "peer-a",
}))
require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
},
LocallyRegisteredAsSidecar: true,
},
PeerName: "peer-a",
}))
// Should be assigned one VIP for the real service name
psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "240.0.0.2", vip)
// web-proxy should not have a VIP
psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web-proxy", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Empty(t, vip)
// Register an imported service and its proxy from another peer
require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{
Node: "gir",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
Service: "web",
ID: "web-1",
},
PeerName: "peer-b",
}))
require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{
Node: "gir",
Address: "127.0.0.3",
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
},
LocallyRegisteredAsSidecar: true,
},
PeerName: "peer-b",
}))
// Should be assigned one VIP for the real service name
psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, "240.0.0.3", vip)
// web-proxy should not have a VIP
psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web-proxy", nil)}
vip, err = state.VirtualIPForService(psn)
require.NoError(t, err)
require.Empty(t, vip)
}
func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) {
t.Helper()

View file

@ -292,7 +292,7 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *
})
}
// IntentionUpstreams returns the upstreams of a service. Upstreams are inferred from intentions.
// IntentionUpstreams returns a service's upstreams which are inferred from intentions.
// If intentions allow a connection from the target to some candidate service, the candidate service is considered
// an upstream of the target.
func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
@ -309,9 +309,9 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
return m.internalUpstreams(args, reply, structs.IntentionTargetService)
}
// IntentionUpstreamsDestination returns the upstreams of a service. Upstreams are inferred from intentions.
// IntentionUpstreamsDestination returns a service's upstreams which are inferred from intentions.
// If intentions allow a connection from the target to some candidate destination, the candidate destination is considered
// an upstream of the target.this is performs the same logic as IntentionUpstreams endpoint but for destination upstreams only.
// an upstream of the target. This performs the same logic as IntentionUpstreams endpoint but for destination upstreams only.
func (m *Internal) IntentionUpstreamsDestination(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
// Exit early if Connect hasn't been enabled.
if !m.srv.config.ConnectEnabled {
@ -571,6 +571,49 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply
})
}
// PeeredUpstreams returns all imported services as upstreams for any service in a given partition.
// Cluster peering does not replicate intentions so all imported services are considered potential upstreams.
func (m *Internal) PeeredUpstreams(args *structs.PartitionSpecificRequest, reply *structs.IndexedPeeredServiceList) error {
// Exit early if Connect hasn't been enabled.
if !m.srv.config.ConnectEnabled {
return ErrConnectNotEnabled
}
if done, err := m.srv.ForwardRPC("Internal.PeeredUpstreams", args, reply); done {
return err
}
// TODO(peering): ACL for filtering
// authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
// if err != nil {
// return err
// }
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, vips, err := state.VirtualIPsForAllImportedServices(ws, args.EnterpriseMeta)
if err != nil {
return err
}
result := make([]structs.PeeredServiceName, 0, len(vips))
for _, vip := range vips {
result = append(result, vip.Service)
}
reply.Index, reply.Services = index, result
// TODO(peering): low priority: consider ACL filtering
// m.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
}
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
// call to fire an event. The primary use case is to enable user events being
// triggered in a remote DC.

View file

@ -2776,3 +2776,40 @@ func TestInternal_CatalogOverview_ACLDeny(t *testing.T) {
arg.Token = opReadToken.SecretID
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out))
}
func TestInternal_PeeredUpstreams(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
state := s1.fsm.State()
// Services
// api local
// web peer: peer-a
// web-proxy peer: peer-a
// web peer: peer-b
// web-proxy peer: peer-b
registerLocalAndRemoteServicesVIPEnabled(t, state)
codec := rpcClient(t, s1)
args := structs.PartitionSpecificRequest{
Datacenter: "dc1",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}
var out structs.IndexedPeeredServiceList
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.PeeredUpstreams", &args, &out))
require.Len(t, out.Services, 2)
expect := []structs.PeeredServiceName{
{Peer: "peer-a", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())},
{Peer: "peer-b", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())},
}
require.Equal(t, expect, out.Services)
}

View file

@ -117,7 +117,13 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
}
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
return s.tx.Insert(tableServiceVirtualIPs, req)
if err := s.tx.Insert(tableServiceVirtualIPs, req); err != nil {
return err
}
if err := updateVirtualIPMaxIndexes(s.tx, req.ModifyIndex, req.Service.ServiceName.PartitionOrDefault(), req.Service.Peer); err != nil {
return err
}
return nil
}
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
@ -898,7 +904,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
vip, err := assignServiceVirtualIP(tx, psn)
vip, err := assignServiceVirtualIP(tx, idx, psn)
if err != nil {
return fmt.Errorf("failed updating virtual IP: %s", err)
}
@ -923,7 +929,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
}
if conf != nil {
termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry)
addrs, err := getTermGatewayVirtualIPs(tx, termGatewayConf.Services, &svc.EnterpriseMeta)
addrs, err := getTermGatewayVirtualIPs(tx, idx, termGatewayConf.Services, &svc.EnterpriseMeta)
if err != nil {
return err
}
@ -978,7 +984,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
// the global virtual IP counter if necessary.
func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, error) {
func assignServiceVirtualIP(tx WriteTxn, idx uint64, psn structs.PeeredServiceName) (string, error) {
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
@ -1052,10 +1058,17 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string,
assignedVIP := ServiceVirtualIP{
Service: psn,
IP: newEntry.IP,
RaftIndex: structs.RaftIndex{
ModifyIndex: idx,
CreateIndex: idx,
},
}
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return "", err
}
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
if err != nil {
@ -1064,6 +1077,20 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string,
return result.String(), nil
}
func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
// update per-partition max index
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
return fmt.Errorf("failed while updating partitioned index: %w", err)
}
if peerName != "" {
// track a separate max index for imported services
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", partition)); err != nil {
return fmt.Errorf("failed while updating partitioned index for imported services: %w", err)
}
}
return nil
}
func addIPOffset(a, b net.IP) (net.IP, error) {
a4 := a.To4()
b4 := b.To4()
@ -2899,6 +2926,35 @@ func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, erro
return result.String(), nil
}
// VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all
// VirtualIP-assignable services that have been imported by the partition represented in entMeta.
// Namespace is ignored.
func (s *Store) VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []ServiceVirtualIP, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
q := Query{
EnterpriseMeta: entMeta,
// Wildcard peername is used by prefix index to fetch all remote peers for a partition.
PeerName: "*",
}
iter, err := tx.Get(tableServiceVirtualIPs, indexID+"_prefix", q)
if err != nil {
return 0, nil, fmt.Errorf("failed service virtual IP lookup: %s", err)
}
ws.Add(iter.WatchCh())
idx := maxIndexTxn(tx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", entMeta.PartitionOrDefault()))
var vips []ServiceVirtualIP
for raw := iter.Next(); raw != nil; raw = iter.Next() {
vip := raw.(ServiceVirtualIP)
vips = append(vips, vip)
}
return idx, vips, nil
}
func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -3333,13 +3389,18 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
return nil
}
func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, entMeta *acl.EnterpriseMeta) (map[string]structs.ServiceAddress, error) {
func getTermGatewayVirtualIPs(
tx WriteTxn,
idx uint64,
services []structs.LinkedService,
entMeta *acl.EnterpriseMeta,
) (map[string]structs.ServiceAddress, error) {
addrs := make(map[string]structs.ServiceAddress, len(services))
for _, s := range services {
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
// Terminating Gateways cannot route to services in peered clusters
psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword}
vip, err := assignServiceVirtualIP(tx, psn)
vip, err := assignServiceVirtualIP(tx, idx, psn)
if err != nil {
return nil, err
}
@ -3353,7 +3414,7 @@ func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, ent
func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *acl.EnterpriseMeta) error {
// Build the current map of services with virtual IPs for this gateway
services := conf.Services
addrs, err := getTermGatewayVirtualIPs(tx, services, entMeta)
addrs, err := getTermGatewayVirtualIPs(tx, idx, services, entMeta)
if err != nil {
return err
}

View file

@ -303,7 +303,12 @@ func updateKindServiceNamesIndex(tx WriteTxn, idx uint64, kind structs.ServiceKi
func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) {
peer := structs.LocalPeerKeyword
if psn.Peer != "" {
peer = psn.Peer
// This prefix is unusual but necessary for reads which want
// to isolate peered resources.
// This allows you to prefix query for "peer:":
// internal/name
// peer:peername/name
peer = "peer:" + psn.Peer
}
var b indexBuilder

View file

@ -700,6 +700,21 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
source: obj,
expected: []byte("internal\x00foo\x00"),
},
prefix: []indexValue{
{
source: Query{
Value: "foo",
},
expected: []byte("internal\x00foo\x00"),
},
{
source: Query{
Value: "foo",
PeerName: "*", // test wildcard PeerName
},
expected: []byte("peer:"),
},
},
extra: []indexerTestCase{
{
read: indexValue{
@ -709,11 +724,11 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
},
Peer: "Billing",
},
expected: []byte("billing\x00foo\x00"),
expected: []byte("peer:billing\x00foo\x00"),
},
write: indexValue{
source: peeredObj,
expected: []byte("billing\x00foo\x00"),
expected: []byte("peer:billing\x00foo\x00"),
},
},
},

View file

@ -607,6 +607,8 @@ func (q NodeCheckQuery) PartitionOrDefault() string {
type ServiceVirtualIP struct {
Service structs.PeeredServiceName
IP net.IP
structs.RaftIndex
}
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
@ -631,9 +633,11 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: indexerSingle[structs.PeeredServiceName, ServiceVirtualIP]{
Indexer: indexerSingleWithPrefix[structs.PeeredServiceName, ServiceVirtualIP, Query]{
readIndex: indexFromPeeredServiceName,
writeIndex: indexFromServiceVirtualIP,
// Read all peers in a cluster / partition
prefixIndex: prefixIndexFromQueryWithPeerWildcardable,
},
},
},

View file

@ -52,6 +52,29 @@ func prefixIndexFromQueryWithPeer(arg any) ([]byte, error) {
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
}
// prefixIndexFromQueryWithPeerWildcardable allows for a wildcard "*" peerName
// to query for all peers _excluding_ structs.LocalPeerKeyword.
// Assumes that non-local peers are prefixed with "peer:".
func prefixIndexFromQueryWithPeerWildcardable(v Query) ([]byte, error) {
var b indexBuilder
peername := v.PeerOrEmpty()
if peername == "" {
b.String(strings.ToLower(structs.LocalPeerKeyword))
} else if peername == "*" {
// use b.Raw so we don't add null terminator to prefix
b.Raw([]byte("peer:"))
return b.Bytes(), nil
} else {
b.String(strings.ToLower("peer:" + peername))
}
if v.Value != "" {
b.String(strings.ToLower(v.Value))
}
return b.Bytes(), nil
}
func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
return prefixIndexFromQuery(arg)
}

View file

@ -83,6 +83,12 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
}
// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface
// by sourcing data from the agent cache.
func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams {
return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName}
}
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
// sourcing data from the agent cache.
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {

View file

@ -69,6 +69,8 @@ type DataSources struct {
// notification channel.
LeafCertificate LeafCertificate
PeeredUpstreams PeeredUpstreams
// PreparedQuery provides updates about the results of a prepared query.
PreparedQuery PreparedQuery
@ -170,6 +172,12 @@ type LeafCertificate interface {
Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
}
// PeeredUpstreams is the interface used to consume updates about upstreams
// for all peered targets in a given partition.
type PeeredUpstreams interface {
Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// PreparedQuery is the interface used to consume updates about the results of
// a prepared query.
type PreparedQuery interface {

View file

@ -682,6 +682,30 @@ func (r *ServiceDumpRequest) CacheMinIndex() uint64 {
return r.QueryOptions.MinQueryIndex
}
// PartitionSpecificRequest is used to query about a specific partition.
type PartitionSpecificRequest struct {
Datacenter string
acl.EnterpriseMeta
QueryOptions
}
func (r *PartitionSpecificRequest) RequestDatacenter() string {
return r.Datacenter
}
func (r *PartitionSpecificRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: r.Token,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
Key: r.EnterpriseMeta.PartitionOrDefault(),
}
}
// ServiceSpecificRequest is used to query about a specific service
type ServiceSpecificRequest struct {
Datacenter string
@ -2211,6 +2235,11 @@ type IndexedServiceList struct {
QueryMeta
}
type IndexedPeeredServiceList struct {
Services []PeeredServiceName
QueryMeta
}
type IndexedServiceNodes struct {
ServiceNodes ServiceNodes
QueryMeta