Backport of Fix issue with streaming service health watches. into release/1.16.x (#17776)

* backport of commit 92bb96727fe781df8926f38e02db1d489d9163f4

* backport of commit 3ea67c04a6a9993b859d7e338c996b72a9793fb4

---------

Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
This commit is contained in:
hc-github-team-consul-core 2023-06-15 14:06:09 -04:00 committed by GitHub
parent 7ee59e8dd9
commit 68e191dd50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 366 additions and 4 deletions

3
.changelog/17775.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where changes to service exports were not reflected in proxies.
```

View File

@ -4555,7 +4555,11 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps)
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
// We do not use this health check currently due to a bug with the way that service exports
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
// for more details.
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)

View File

@ -16,8 +16,9 @@ import (
)
var (
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorNotFound = errors.New("no data found for query")
ErrorNotChanged = errors.New("data did not change for query")
ErrorACLResetData = errors.New("an acl update forced a state reset")
errNilContext = errors.New("cannot call ServerLocalNotify with a nil context")
errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore")
@ -320,8 +321,15 @@ func serverLocalNotifyRoutine[ResultType any, StoreType StateStore](
return
}
// An ACL reset error can be raised so that the index greater-than check is
// bypassed. We should not propagate it to the caller.
forceReset := errors.Is(err, ErrorACLResetData)
if forceReset {
err = nil
}
// Check the index to see if we should call notify
if minIndex == 0 || minIndex < index {
if minIndex == 0 || minIndex < index || forceReset {
notify(ctx, correlationID, result, err)
minIndex = index
}

View File

@ -0,0 +1,164 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package proxycfgglue
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)
// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs.
// Whenever an exported-services config entry is modified, this is effectively an ACL change.
// Assume the following situation:
// - no services are exported
// - an upstream watch to service X is spawned
// - the streaming backend filters out data for service X (because it's not exported yet)
// - service X is finally exported
//
// In this situation, the streaming backend does not trigger a refresh of its data.
// This means that any events that were supposed to have been received prior to the export are NOT backfilled,
// and the watches never see service X spawning.
//
// We currently have decided to not trigger a stream refresh in this situation due to the potential for a
// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially).
// Therefore, this local blocking-query approach exists for agentless.
//
// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful,
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
// This means that while agents should technically have this same issue, they don't experience it with mesh health
// watches.
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
}
type serverHealthBlocking struct {
deps ServerDataSourceDeps
remoteSource proxycfg.Health
state *state.Store
watchTimeout time.Duration
}
// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks.
// Most notably, some query features unnecessary for mesh have been stripped out.
func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if args.Datacenter != h.deps.Datacenter {
return h.remoteSource.Notify(ctx, args, correlationID, ch)
}
// Verify the arguments
if args.ServiceName == "" {
return fmt.Errorf("Must provide service name")
}
if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName {
return fmt.Errorf("Wildcards are not allowed in the partition field")
}
// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
switch {
case args.Connect:
f = serviceNodesConnect
case args.Ingress:
f = serviceNodesIngress
default:
f = serviceNodesDefault
}
filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{})
if err != nil {
return err
}
var hadResults bool = false
return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) {
// This is necessary so that service export changes are eventually picked up, since
// they won't trigger the watch themselves.
timeoutCh := make(chan struct{})
time.AfterFunc(h.watchTimeout, func() {
close(timeoutCh)
})
ws.Add(timeoutCh)
authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return 0, nil, err
}
// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// If access was somehow revoked (via token deletion or unexporting), then we clear the
// last-known results before triggering an error. This way, the proxies will actually update
// their data, rather than holding onto the last-known list of healthy nodes indefinitely.
if hadResults {
hadResults = false
return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData
}
return 0, nil, acl.ErrPermissionDenied
}
}
var thisReply structs.IndexedCheckServiceNodes
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
if err != nil {
return 0, nil, err
}
raw, err := filter.Execute(thisReply.Nodes)
if err != nil {
return 0, nil, err
}
thisReply.Nodes = raw.(structs.CheckServiceNodes)
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil {
return 0, nil, err
}
hadResults = true
return thisReply.Index, &thisReply, nil
},
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch),
)
}
func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error {
// Get the ACL from the token
var entMeta acl.EnterpriseMeta
authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz)
if err != nil {
return err
}
aclfilter.New(authorizer, h.deps.Logger).Filter(subj)
return nil
}
func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}
func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}

View File

@ -0,0 +1,183 @@
package proxycfgglue
import (
"context"
"errors"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)
func TestServerHealthBlocking(t *testing.T) {
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
var (
ctx = context.Background()
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
correlationID = "correlation-id"
ch = make(chan<- proxycfg.UpdateEvent)
result = errors.New("KABOOM")
)
remoteSource := newMockHealth(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
store := state.NewStateStore(nil)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})
t.Run("services notify correctly", func(t *testing.T) {
const (
datacenter = "dc1"
serviceName = "web"
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store := state.NewStateStore(nil)
aclResolver := newStaticResolver(acl.ManageAll())
dataSource := ServerHealthBlocking(ServerDataSourceDeps{
GetStore: func() Store { return store },
Datacenter: datacenter,
ACLResolver: aclResolver,
Logger: testutil.Logger(t),
}, nil, store)
dataSource.watchTimeout = 1 * time.Second
// Watch for all events
eventCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
}, "", eventCh))
// Watch for a subset of events
filteredCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Datacenter: datacenter,
ServiceName: serviceName,
QueryOptions: structs.QueryOptions{
Filter: "Service.ID == \"web1\"",
},
}, "", filteredCh))
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Empty(t, result.Nodes)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Empty(t, result.Nodes)
})
testutil.RunStep(t, "register services", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "1",
Service: serviceName,
Port: 80,
},
}))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: serviceName + "2",
Service: serviceName,
Port: 81,
},
}))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 2)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web1", result.Nodes[0].Service.ID)
})
testutil.RunStep(t, "deregister service", func(t *testing.T) {
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, ""))
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
require.Len(t, result.Nodes, 1)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
require.Len(t, result.Nodes, 0)
})
testutil.RunStep(t, "acl enforcement", func(t *testing.T) {
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: serviceName + "-sidecar-proxy",
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: serviceName,
},
},
}))
authzDeny := policyAuthorizer(t, ``)
authzAllow := policyAuthorizer(t, `
node_prefix "" { policy = "read" }
service_prefix "web" { policy = "read" }
`)
// Start a stream where insufficient permissions are denied
aclDenyCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzDeny)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclDenyCh))
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied")
// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
// Start a stream where sufficient permissions are allowed
aclAllowCh := make(chan proxycfg.UpdateEvent)
aclResolver.SwapAuthorizer(authzAllow)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
Connect: true,
Datacenter: datacenter,
ServiceName: serviceName,
}, "", aclAllowCh))
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
// Removing ACL permissions will send empty data
aclResolver.SwapAuthorizer(authzDeny)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 0)
// Adding ACL permissions will send valid data
aclResolver.SwapAuthorizer(authzAllow)
time.Sleep(dataSource.watchTimeout)
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
require.Len(t, result.Nodes, 1)
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
})
})
}