Watch the singular service resolver instead of the list + filtering to 1 (#13012)

* Watch the singular service resolver instead of the list + filtering to 1

* Rename the ConfigEntries cache type to ConfigEntryList
This commit is contained in:
Matt Keeler 2022-05-12 16:34:17 -04:00 committed by GitHub
parent b6594144ee
commit 42aec5caf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 119 additions and 141 deletions

3
.changelog/13012.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
proxycfg: Fixed a minor bug that would cause configuring a terminating gateway to watch too many service resolvers and waste resources doing filtering.
```

View File

@ -4045,7 +4045,7 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a}) a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a})
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{RPC: a}) a.cache.RegisterType(cachetype.ConfigEntryListName, &cachetype.ConfigEntryList{RPC: a})
a.cache.RegisterType(cachetype.ConfigEntryName, &cachetype.ConfigEntry{RPC: a}) a.cache.RegisterType(cachetype.ConfigEntryName, &cachetype.ConfigEntry{RPC: a})

View File

@ -9,17 +9,17 @@ import (
// Recommended name for registration. // Recommended name for registration.
const ( const (
ConfigEntriesName = "config-entries" ConfigEntryListName = "config-entries"
ConfigEntryName = "config-entry" ConfigEntryName = "config-entry"
) )
// ConfigEntries supports fetching discovering configuration entries // ConfigEntryList supports fetching discovering configuration entries
type ConfigEntries struct { type ConfigEntryList struct {
RegisterOptionsBlockingRefresh RegisterOptionsBlockingRefresh
RPC RPC RPC RPC
} }
func (c *ConfigEntries) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { func (c *ConfigEntryList) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult var result cache.FetchResult
// The request should be a ConfigEntryQuery. // The request should be a ConfigEntryQuery.

View File

@ -12,7 +12,7 @@ import (
func TestConfigEntries(t *testing.T) { func TestConfigEntries(t *testing.T) {
rpc := TestRPC(t) rpc := TestRPC(t)
typ := &ConfigEntries{RPC: rpc} typ := &ConfigEntryList{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value // Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments. // since that is return-by-pointer in the arguments.
@ -99,7 +99,7 @@ func TestConfigEntry(t *testing.T) {
func TestConfigEntries_badReqType(t *testing.T) { func TestConfigEntries_badReqType(t *testing.T) {
rpc := TestRPC(t) rpc := TestRPC(t)
typ := &ConfigEntries{RPC: rpc} typ := &ConfigEntryList{RPC: rpc}
// Fetch // Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(

View File

@ -47,7 +47,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
} }
// Watch service-resolvers so we can setup service subset clusters // Watch service-resolvers so we can setup service subset clusters
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{ err = s.cache.Notify(ctx, cachetype.ConfigEntryListName, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,

View File

@ -221,7 +221,7 @@ func genVerifyLeafWatch(expectedService string, expectedDatacenter string) verif
func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind string) verifyWatchRequest { func genVerifyResolverWatch(expectedService, expectedDatacenter, expectedKind string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) { return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.ConfigEntriesName, cacheType) require.Equal(t, cachetype.ConfigEntryName, cacheType)
reqReal, ok := request.(*structs.ConfigEntryQuery) reqReal, ok := request.(*structs.ConfigEntryQuery)
require.True(t, ok) require.True(t, ok)
@ -718,10 +718,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
} }
dbResolver := &structs.IndexedConfigEntries{ dbResolver := &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Name: "db", Name: "db",
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Redirect: &structs.ServiceResolverRedirect{ Redirect: &structs.ServiceResolverRedirect{
@ -729,7 +727,6 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Datacenter: "dc2", Datacenter: "dc2",
}, },
}, },
},
} }
cases := map[string]testCase{ cases := map[string]testCase{
@ -1641,7 +1638,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.TerminatingGateway.ServiceResolversSet[db]) require.True(t, snap.TerminatingGateway.ServiceResolversSet[db])
require.Len(t, snap.TerminatingGateway.ServiceResolvers, 1) require.Len(t, snap.TerminatingGateway.ServiceResolvers, 1)
require.Equal(t, dbResolver.Entries[0], snap.TerminatingGateway.ServiceResolvers[db]) require.Equal(t, dbResolver.Entry, snap.TerminatingGateway.ServiceResolvers[db])
}, },
}, },
{ {

View File

@ -216,7 +216,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.Up
// These are used to create clusters and endpoints for the service subsets // These are used to create clusters and endpoints for the service subsets
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok { if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
err := s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{ err := s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -340,17 +340,16 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.Up
snap.TerminatingGateway.ServiceConfigs[sn] = serviceConfig snap.TerminatingGateway.ServiceConfigs[sn] = serviceConfig
case strings.HasPrefix(u.CorrelationID, serviceResolverIDPrefix): case strings.HasPrefix(u.CorrelationID, serviceResolverIDPrefix):
configEntries, ok := u.Result.(*structs.IndexedConfigEntries) resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok { if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result) return fmt.Errorf("invalid type for response: %T", u.Result)
} }
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceResolverIDPrefix)) sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceResolverIDPrefix))
// There should only ever be one entry for a service resolver within a namespace // There should only ever be one entry for a service resolver within a namespace
if len(configEntries.Entries) == 1 { if resolver, ok := resp.Entry.(*structs.ServiceResolverConfigEntry); ok {
if resolver, ok := configEntries.Entries[0].(*structs.ServiceResolverConfigEntry); ok {
snap.TerminatingGateway.ServiceResolvers[sn] = resolver snap.TerminatingGateway.ServiceResolvers[sn] = resolver
} }
}
snap.TerminatingGateway.ServiceResolversSet[sn] = true snap.TerminatingGateway.ServiceResolversSet[sn] = true
case strings.HasPrefix(u.CorrelationID, serviceIntentionsIDPrefix): case strings.HasPrefix(u.CorrelationID, serviceIntentionsIDPrefix):

View File

@ -297,30 +297,34 @@ func TestConfigSnapshotTerminatingGateway(
// ======== // ========
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Entries: nil, },
}, },
}, },
{ {
CorrelationID: serviceResolverIDPrefix + api.String(), CorrelationID: serviceResolverIDPrefix + api.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Entries: nil, },
}, },
}, },
{ {
CorrelationID: serviceResolverIDPrefix + db.String(), CorrelationID: serviceResolverIDPrefix + db.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Entries: nil, },
}, },
}, },
{ {
CorrelationID: serviceResolverIDPrefix + cache.String(), CorrelationID: serviceResolverIDPrefix + cache.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Entries: nil, },
}, },
}, },
}) })
@ -355,10 +359,8 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
events := []agentcache.UpdateEvent{ events := []agentcache.UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "web", Name: "web",
Subsets: map[string]structs.ServiceResolverSubset{ Subsets: map[string]structs.ServiceResolverSubset{
@ -373,7 +375,6 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
}, },
}, },
}, },
},
{ {
CorrelationID: serviceConfigIDPrefix + web.String(), CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
@ -386,10 +387,8 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
events = testSpliceEvents(events, []agentcache.UpdateEvent{ events = testSpliceEvents(events, []agentcache.UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + cache.String(), CorrelationID: serviceResolverIDPrefix + cache.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "cache", Name: "cache",
Subsets: map[string]structs.ServiceResolverSubset{ Subsets: map[string]structs.ServiceResolverSubset{
@ -400,7 +399,6 @@ func testConfigSnapshotTerminatingGatewayServiceSubsets(t testing.T, alsoAdjustC
}, },
}, },
}, },
},
{ {
CorrelationID: serviceConfigIDPrefix + web.String(), CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
@ -419,10 +417,8 @@ func TestConfigSnapshotTerminatingGatewayDefaultServiceSubset(t testing.T) *Conf
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "web", Name: "web",
DefaultSubset: "v2", DefaultSubset: "v2",
@ -438,7 +434,6 @@ func TestConfigSnapshotTerminatingGatewayDefaultServiceSubset(t testing.T) *Conf
}, },
}, },
}, },
},
// { // {
// CorrelationID: serviceConfigIDPrefix + web.String(), // CorrelationID: serviceConfigIDPrefix + web.String(),
// Result: &structs.ServiceConfigResponse{ // Result: &structs.ServiceConfigResponse{
@ -512,9 +507,8 @@ func testConfigSnapshotTerminatingGatewayLBConfig(t testing.T, variant string) *
}, },
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: entry,
Entries: []structs.ConfigEntry{entry},
}, },
}, },
{ {
@ -559,10 +553,8 @@ func TestConfigSnapshotTerminatingGatewayHostnameSubsets(t testing.T) *ConfigSna
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + api.String(), CorrelationID: serviceResolverIDPrefix + api.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "api", Name: "api",
Subsets: map[string]structs.ServiceResolverSubset{ Subsets: map[string]structs.ServiceResolverSubset{
@ -573,13 +565,10 @@ func TestConfigSnapshotTerminatingGatewayHostnameSubsets(t testing.T) *ConfigSna
}, },
}, },
}, },
},
{ {
CorrelationID: serviceResolverIDPrefix + cache.String(), CorrelationID: serviceResolverIDPrefix + cache.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "cache", Name: "cache",
Subsets: map[string]structs.ServiceResolverSubset{ Subsets: map[string]structs.ServiceResolverSubset{
@ -590,7 +579,6 @@ func TestConfigSnapshotTerminatingGatewayHostnameSubsets(t testing.T) *ConfigSna
}, },
}, },
}, },
},
{ {
CorrelationID: serviceConfigIDPrefix + api.String(), CorrelationID: serviceConfigIDPrefix + api.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
@ -615,10 +603,8 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{
{ {
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "web", Name: "web",
DefaultSubset: "v2", DefaultSubset: "v2",
@ -634,13 +620,10 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
}, },
}, },
}, },
},
{ {
CorrelationID: serviceResolverIDPrefix + notfound.String(), CorrelationID: serviceResolverIDPrefix + notfound.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "notfound", Name: "notfound",
DefaultSubset: "v2", DefaultSubset: "v2",
@ -656,7 +639,6 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
}, },
}, },
}, },
},
{ {
CorrelationID: serviceConfigIDPrefix + web.String(), CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{ Result: &structs.ServiceConfigResponse{
@ -689,10 +671,8 @@ func TestConfigSnapshotTerminatingGatewayWithLambdaServiceAndServiceResolvers(t
return TestConfigSnapshotTerminatingGatewayWithLambdaService(t, return TestConfigSnapshotTerminatingGatewayWithLambdaService(t,
agentcache.UpdateEvent{ agentcache.UpdateEvent{
CorrelationID: serviceResolverIDPrefix + web.String(), CorrelationID: serviceResolverIDPrefix + web.String(),
Result: &structs.IndexedConfigEntries{ Result: &structs.ConfigEntryResponse{
Kind: structs.ServiceResolver, Entry: &structs.ServiceResolverConfigEntry{
Entries: []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: web.String(), Name: web.String(),
Subsets: map[string]structs.ServiceResolverSubset{ Subsets: map[string]structs.ServiceResolverSubset{
@ -701,6 +681,5 @@ func TestConfigSnapshotTerminatingGatewayWithLambdaServiceAndServiceResolvers(t
}, },
}, },
}, },
},
}) })
} }