From 7f69e279269bdff0360ca4341e2b11a48b480ab3 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Tue, 12 Jul 2022 11:43:42 +0100 Subject: [PATCH] proxycfg-glue: server-local implementation of `FederationStateListMeshGateways` This is the OSS portion of enterprise PR 2265. This PR provides a server-local implementation of the proxycfg.FederationStateListMeshGateways interface based on blocking queries. --- agent/agent.go | 1 + .../federation_state_list_mesh_gateways.go | 67 ++++++++++++ ...ederation_state_list_mesh_gateways_test.go | 103 ++++++++++++++++++ agent/proxycfg-glue/glue.go | 7 +- 4 files changed, 172 insertions(+), 6 deletions(-) create mode 100644 agent/proxycfg-glue/federation_state_list_mesh_gateways.go create mode 100644 agent/proxycfg-glue/federation_state_list_mesh_gateways_test.go diff --git a/agent/agent.go b/agent/agent.go index 59827e021..44157a91f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4247,6 +4247,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps) sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps) sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache)) + sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps) sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps) sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Intentions = proxycfgglue.ServerIntentions(deps) diff --git a/agent/proxycfg-glue/federation_state_list_mesh_gateways.go b/agent/proxycfg-glue/federation_state_list_mesh_gateways.go new file mode 100644 index 000000000..ea3640ad9 --- /dev/null +++ b/agent/proxycfg-glue/federation_state_list_mesh_gateways.go @@ -0,0 +1,67 @@ +package proxycfgglue + +import ( + "context" + + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul/watch" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/structs/aclfilter" +) + +// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways +// interface by sourcing data from the agent cache. +func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways { + return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName} +} + +// ServerFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways +// interface by sourcing data from a blocking query against the server's state +// store. +func ServerFederationStateListMeshGateways(deps ServerDataSourceDeps) proxycfg.FederationStateListMeshGateways { + return &serverFederationStateListMeshGateways{deps} +} + +type serverFederationStateListMeshGateways struct { + deps ServerDataSourceDeps +} + +func (s *serverFederationStateListMeshGateways) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore, + func(ws memdb.WatchSet, store Store) (uint64, *structs.DatacenterIndexedCheckServiceNodes, error) { + authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil) + if err != nil { + return 0, nil, err + } + + index, fedStates, err := store.FederationStateList(ws) + if err != nil { + return 0, nil, err + } + + results := make(map[string]structs.CheckServiceNodes) + for _, fs := range fedStates { + if gws := fs.MeshGateways; len(gws) != 0 { + // Shallow clone to prevent ACL filtering manipulating the slice in memdb. + results[fs.Datacenter] = gws.ShallowClone() + } + } + + rsp := &structs.DatacenterIndexedCheckServiceNodes{ + DatacenterNodes: results, + QueryMeta: structs.QueryMeta{ + Index: index, + Backend: structs.QueryBackendBlocking, + }, + } + aclfilter.New(authz, s.deps.Logger).Filter(rsp) + + return index, rsp, nil + }, + dispatchBlockingQueryUpdate[*structs.DatacenterIndexedCheckServiceNodes](ch), + ) +} diff --git a/agent/proxycfg-glue/federation_state_list_mesh_gateways_test.go b/agent/proxycfg-glue/federation_state_list_mesh_gateways_test.go new file mode 100644 index 000000000..5c716d24c --- /dev/null +++ b/agent/proxycfg-glue/federation_state_list_mesh_gateways_test.go @@ -0,0 +1,103 @@ +package proxycfgglue + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestServerFederationStateListMeshGateways(t *testing.T) { + const index uint64 = 123 + + store := state.NewStateStore(nil) + + authz := policyAuthorizer(t, ` + service_prefix "dc2-" { policy = "read" } + node_prefix "dc2-" { policy = "read" } + + service_prefix "dc3-" { policy = "read" } + node_prefix "dc3-" { policy = "read" } + `) + + require.NoError(t, store.FederationStateSet(index, &structs.FederationState{ + Datacenter: "dc2", + MeshGateways: structs.CheckServiceNodes{ + { + Service: &structs.NodeService{Service: "dc2-gw1"}, + Node: &structs.Node{Node: "dc2-gw1"}, + }, + }, + })) + + // No access to this DC, we shouldn't see it in results. + require.NoError(t, store.FederationStateSet(index, &structs.FederationState{ + Datacenter: "dc4", + MeshGateways: structs.CheckServiceNodes{ + { + Service: &structs.NodeService{Service: "dc4-gw1"}, + Node: &structs.Node{Node: "dc4-gw1"}, + }, + }, + })) + + dataSource := ServerFederationStateListMeshGateways(ServerDataSourceDeps{ + ACLResolver: newStaticResolver(authz), + GetStore: func() Store { return store }, + }) + + eventCh := make(chan proxycfg.UpdateEvent) + require.NoError(t, dataSource.Notify(context.Background(), &structs.DCSpecificRequest{Datacenter: "dc1"}, "", eventCh)) + + testutil.RunStep(t, "initial state", func(t *testing.T) { + result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh) + require.Equal(t, map[string]structs.CheckServiceNodes{ + "dc2": { + { + Service: &structs.NodeService{Service: "dc2-gw1"}, + Node: &structs.Node{Node: "dc2-gw1"}, + }, + }, + }, result.DatacenterNodes) + }) + + testutil.RunStep(t, "add new datacenter", func(t *testing.T) { + require.NoError(t, store.FederationStateSet(index+1, &structs.FederationState{ + Datacenter: "dc3", + MeshGateways: structs.CheckServiceNodes{ + { + Service: &structs.NodeService{Service: "dc3-gw1"}, + Node: &structs.Node{Node: "dc3-gw1"}, + }, + }, + })) + + result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh) + require.Equal(t, map[string]structs.CheckServiceNodes{ + "dc2": { + { + Service: &structs.NodeService{Service: "dc2-gw1"}, + Node: &structs.Node{Node: "dc2-gw1"}, + }, + }, + "dc3": { + { + Service: &structs.NodeService{Service: "dc3-gw1"}, + Node: &structs.Node{Node: "dc3-gw1"}, + }, + }, + }, result.DatacenterNodes) + }) + + testutil.RunStep(t, "delete datacenter", func(t *testing.T) { + require.NoError(t, store.FederationStateDelete(index+2, "dc3")) + + result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh) + require.NotContains(t, result.DatacenterNodes, "dc3") + }) +} diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 9fb064890..06798939b 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -21,6 +21,7 @@ import ( type Store interface { watch.StateStore + FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error) IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error) ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error) @@ -53,12 +54,6 @@ func CacheDatacenters(c *cache.Cache) proxycfg.Datacenters { return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName} } -// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways -// interface by sourcing data from the agent cache. -func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways { - return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName} -} - // CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing // data from the agent cache. func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {