From 68e191dd5088e7618a382bdeabfe935501a505ab Mon Sep 17 00:00:00 2001 From: hc-github-team-consul-core Date: Thu, 15 Jun 2023 14:06:09 -0400 Subject: [PATCH] Backport of Fix issue with streaming service health watches. into release/1.16.x (#17776) * backport of commit 92bb96727fe781df8926f38e02db1d489d9163f4 * backport of commit 3ea67c04a6a9993b859d7e338c996b72a9793fb4 --------- Co-authored-by: Derek Menteer --- .changelog/17775.txt | 3 + agent/agent.go | 6 +- agent/consul/watch/server_local.go | 14 +- agent/proxycfg-glue/health_blocking.go | 164 ++++++++++++++++++ agent/proxycfg-glue/health_blocking_test.go | 183 ++++++++++++++++++++ 5 files changed, 366 insertions(+), 4 deletions(-) create mode 100644 .changelog/17775.txt create mode 100644 agent/proxycfg-glue/health_blocking.go create mode 100644 agent/proxycfg-glue/health_blocking_test.go diff --git a/.changelog/17775.txt b/.changelog/17775.txt new file mode 100644 index 000000000..8060cfa12 --- /dev/null +++ b/.changelog/17775.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix issue where changes to service exports were not reflected in proxies. +``` diff --git a/agent/agent.go b/agent/agent.go index fcf6ce209..90bfffc1a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4555,7 +4555,11 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps) sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps) sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps) - sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) + // We do not use this health check currently due to a bug with the way that service exports + // interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking` + // for more details. + // sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) + sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State()) sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State) sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) diff --git a/agent/consul/watch/server_local.go b/agent/consul/watch/server_local.go index f407d2c16..5937ba1c6 100644 --- a/agent/consul/watch/server_local.go +++ b/agent/consul/watch/server_local.go @@ -16,8 +16,9 @@ import ( ) var ( - ErrorNotFound = errors.New("no data found for query") - ErrorNotChanged = errors.New("data did not change for query") + ErrorNotFound = errors.New("no data found for query") + ErrorNotChanged = errors.New("data did not change for query") + ErrorACLResetData = errors.New("an acl update forced a state reset") errNilContext = errors.New("cannot call ServerLocalNotify with a nil context") errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore") @@ -320,8 +321,15 @@ func serverLocalNotifyRoutine[ResultType any, StoreType StateStore]( return } + // An ACL reset error can be raised so that the index greater-than check is + // bypassed. We should not propagate it to the caller. + forceReset := errors.Is(err, ErrorACLResetData) + if forceReset { + err = nil + } + // Check the index to see if we should call notify - if minIndex == 0 || minIndex < index { + if minIndex == 0 || minIndex < index || forceReset { notify(ctx, correlationID, result, err) minIndex = index } diff --git a/agent/proxycfg-glue/health_blocking.go b/agent/proxycfg-glue/health_blocking.go new file mode 100644 index 000000000..0a47a920d --- /dev/null +++ b/agent/proxycfg-glue/health_blocking.go @@ -0,0 +1,164 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package proxycfgglue + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/watch" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/structs/aclfilter" +) + +// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs. +// Whenever an exported-services config entry is modified, this is effectively an ACL change. +// Assume the following situation: +// - no services are exported +// - an upstream watch to service X is spawned +// - the streaming backend filters out data for service X (because it's not exported yet) +// - service X is finally exported +// +// In this situation, the streaming backend does not trigger a refresh of its data. +// This means that any events that were supposed to have been received prior to the export are NOT backfilled, +// and the watches never see service X spawning. +// +// We currently have decided to not trigger a stream refresh in this situation due to the potential for a +// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially). +// Therefore, this local blocking-query approach exists for agentless. +// +// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful, +// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing. +// This means that while agents should technically have this same issue, they don't experience it with mesh health +// watches. +func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking { + return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute} +} + +type serverHealthBlocking struct { + deps ServerDataSourceDeps + remoteSource proxycfg.Health + state *state.Store + watchTimeout time.Duration +} + +// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks. +// Most notably, some query features unnecessary for mesh have been stripped out. +func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { + if args.Datacenter != h.deps.Datacenter { + return h.remoteSource.Notify(ctx, args, correlationID, ch) + } + + // Verify the arguments + if args.ServiceName == "" { + return fmt.Errorf("Must provide service name") + } + if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName { + return fmt.Errorf("Wildcards are not allowed in the partition field") + } + + // Determine the function we'll call + var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) + switch { + case args.Connect: + f = serviceNodesConnect + case args.Ingress: + f = serviceNodesIngress + default: + f = serviceNodesDefault + } + + filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{}) + if err != nil { + return err + } + + var hadResults bool = false + return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore, + func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) { + // This is necessary so that service export changes are eventually picked up, since + // they won't trigger the watch themselves. + timeoutCh := make(chan struct{}) + time.AfterFunc(h.watchTimeout, func() { + close(timeoutCh) + }) + ws.Add(timeoutCh) + + authzContext := acl.AuthorizerContext{ + Peer: args.PeerName, + } + authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) + if err != nil { + return 0, nil, err + } + // If we're doing a connect or ingress query, we need read access to the service + // we're trying to find proxies for, so check that. + if args.Connect || args.Ingress { + if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { + // If access was somehow revoked (via token deletion or unexporting), then we clear the + // last-known results before triggering an error. This way, the proxies will actually update + // their data, rather than holding onto the last-known list of healthy nodes indefinitely. + if hadResults { + hadResults = false + return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData + } + return 0, nil, acl.ErrPermissionDenied + } + } + + var thisReply structs.IndexedCheckServiceNodes + thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args) + if err != nil { + return 0, nil, err + } + + raw, err := filter.Execute(thisReply.Nodes) + if err != nil { + return 0, nil, err + } + thisReply.Nodes = raw.(structs.CheckServiceNodes) + + // Note: we filter the results with ACLs *after* applying the user-supplied + // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include + // results that would be filtered out even if the user did have permission. + if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil { + return 0, nil, err + } + + hadResults = true + return thisReply.Index, &thisReply, nil + }, + dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch), + ) +} + +func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error { + // Get the ACL from the token + var entMeta acl.EnterpriseMeta + authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz) + if err != nil { + return err + } + aclfilter.New(authorizer, h.deps.Logger).Filter(subj) + return nil +} + +func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { + return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName) +} + +func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { + return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta) +} + +func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { + return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName) +} diff --git a/agent/proxycfg-glue/health_blocking_test.go b/agent/proxycfg-glue/health_blocking_test.go new file mode 100644 index 000000000..3dcdaf17d --- /dev/null +++ b/agent/proxycfg-glue/health_blocking_test.go @@ -0,0 +1,183 @@ +package proxycfgglue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" +) + +func TestServerHealthBlocking(t *testing.T) { + t.Run("remote queries are delegated to the remote source", func(t *testing.T) { + var ( + ctx = context.Background() + req = &structs.ServiceSpecificRequest{Datacenter: "dc2"} + correlationID = "correlation-id" + ch = make(chan<- proxycfg.UpdateEvent) + result = errors.New("KABOOM") + ) + + remoteSource := newMockHealth(t) + remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result) + + store := state.NewStateStore(nil) + dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store) + err := dataSource.Notify(ctx, req, correlationID, ch) + require.Equal(t, result, err) + }) + + t.Run("services notify correctly", func(t *testing.T) { + const ( + datacenter = "dc1" + serviceName = "web" + ) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := state.NewStateStore(nil) + aclResolver := newStaticResolver(acl.ManageAll()) + dataSource := ServerHealthBlocking(ServerDataSourceDeps{ + GetStore: func() Store { return store }, + Datacenter: datacenter, + ACLResolver: aclResolver, + Logger: testutil.Logger(t), + }, nil, store) + dataSource.watchTimeout = 1 * time.Second + + // Watch for all events + eventCh := make(chan proxycfg.UpdateEvent) + require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ + Datacenter: datacenter, + ServiceName: serviceName, + }, "", eventCh)) + + // Watch for a subset of events + filteredCh := make(chan proxycfg.UpdateEvent) + require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ + Datacenter: datacenter, + ServiceName: serviceName, + QueryOptions: structs.QueryOptions{ + Filter: "Service.ID == \"web1\"", + }, + }, "", filteredCh)) + + testutil.RunStep(t, "initial state", func(t *testing.T) { + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Empty(t, result.Nodes) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) + require.Empty(t, result.Nodes) + }) + + testutil.RunStep(t, "register services", func(t *testing.T) { + require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: serviceName + "1", + Service: serviceName, + Port: 80, + }, + })) + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Len(t, result.Nodes, 1) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) + require.Len(t, result.Nodes, 1) + + require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: serviceName + "2", + Service: serviceName, + Port: 81, + }, + })) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Len(t, result.Nodes, 2) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) + require.Len(t, result.Nodes, 1) + require.Equal(t, "web1", result.Nodes[0].Service.ID) + }) + + testutil.RunStep(t, "deregister service", func(t *testing.T) { + require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, "")) + result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) + require.Len(t, result.Nodes, 1) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) + require.Len(t, result.Nodes, 0) + }) + + testutil.RunStep(t, "acl enforcement", func(t *testing.T) { + require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: serviceName + "-sidecar-proxy", + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: serviceName, + }, + }, + })) + + authzDeny := policyAuthorizer(t, ``) + authzAllow := policyAuthorizer(t, ` + node_prefix "" { policy = "read" } + service_prefix "web" { policy = "read" } + `) + + // Start a stream where insufficient permissions are denied + aclDenyCh := make(chan proxycfg.UpdateEvent) + aclResolver.SwapAuthorizer(authzDeny) + require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ + Connect: true, + Datacenter: datacenter, + ServiceName: serviceName, + }, "", aclDenyCh)) + require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied") + + // Adding ACL permissions will send valid data + aclResolver.SwapAuthorizer(authzAllow) + time.Sleep(dataSource.watchTimeout) + result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh) + require.Len(t, result.Nodes, 1) + require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) + + // Start a stream where sufficient permissions are allowed + aclAllowCh := make(chan proxycfg.UpdateEvent) + aclResolver.SwapAuthorizer(authzAllow) + require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ + Connect: true, + Datacenter: datacenter, + ServiceName: serviceName, + }, "", aclAllowCh)) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) + require.Len(t, result.Nodes, 1) + require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) + + // Removing ACL permissions will send empty data + aclResolver.SwapAuthorizer(authzDeny) + time.Sleep(dataSource.watchTimeout) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) + require.Len(t, result.Nodes, 0) + + // Adding ACL permissions will send valid data + aclResolver.SwapAuthorizer(authzAllow) + time.Sleep(dataSource.watchTimeout) + result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) + require.Len(t, result.Nodes, 1) + require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) + }) + }) +}