server: suppress spurious blocking query returns where multiple config entries are involved (#12362)

Starting from and extending the mechanism introduced in #12110 we can specially handle the 3 main special Consul RPC endpoints that react to many config entries in a single blocking query in Connect:

- `DiscoveryChain.Get`
- `ConfigEntry.ResolveServiceConfig`
- `Intentions.Match`

All of these will internally watch for many config entries, and at least one of those will likely be not found in any given query. Because these are blends of multiple reads the exact solution from #12110 isn't perfectly aligned, but we can tweak the approach slightly and regain the utility of that mechanism.

### No Config Entries Found

In this case, despite looking for many config entries none may be found at all. Unlike #12110 in this scenario we do not return an empty reply to the caller, but instead synthesize a struct from default values to return. This can be handled nearly identically to #12110 with the first 1-2 replies being non-empty payloads followed by the standard spurious wakeup suppression mechanism from #12110.

### No Change Since Last Wakeup

Once a blocking query loop on the server has completed and slept at least once, there is a further optimization we can make here to detect if any of the config entries that were present at specific versions for the prior execution of the loop are identical for the loop we just woke up for. In that scenario we can return a slightly different internal sentinel error and basically externally handle it similar to #12110.

This would mean that even if 20 discovery chain read RPC handling goroutines wakeup due to the creation of an unrelated config entry, the only ones that will terminate and reply with a blob of data are those that genuinely have new data to report.

### Extra Endpoints

Since this pattern is pretty reusable, other key config-entry-adjacent endpoints used by `agent/proxycfg` also were updated:

- `ConfigEntry.List`
- `Internal.IntentionUpstreams` (tproxy)
This commit is contained in:
R.B. Boyer 2022-02-25 15:46:34 -06:00 committed by GitHub
parent aea00f10ae
commit 3804677570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1245 additions and 272 deletions

3
.changelog/12362.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
server: suppress spurious blocking query returns where multiple config entries are involved
```

View File

@ -32,3 +32,7 @@ func NewKindName(kind, name string, entMeta *structs.EnterpriseMeta) KindName {
ret.Normalize()
return ret
}
func NewKindNameForEntry(entry structs.ConfigEntry) KindName {
return NewKindName(entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta())
}

View File

@ -0,0 +1,57 @@
package configentry
import (
"github.com/hashicorp/consul/agent/structs"
)
// ResolvedServiceConfigSet is a wrapped set of raw cross-referenced config
// entries necessary for the ConfigEntry.ResolveServiceConfig RPC process.
//
// None of these are defaulted.
type ResolvedServiceConfigSet struct {
ServiceDefaults map[structs.ServiceID]*structs.ServiceConfigEntry
ProxyDefaults map[string]*structs.ProxyConfigEntry
}
func (r *ResolvedServiceConfigSet) IsEmpty() bool {
return len(r.ServiceDefaults) == 0 && len(r.ProxyDefaults) == 0
}
func (r *ResolvedServiceConfigSet) GetServiceDefaults(sid structs.ServiceID) *structs.ServiceConfigEntry {
if r.ServiceDefaults == nil {
return nil
}
return r.ServiceDefaults[sid]
}
func (r *ResolvedServiceConfigSet) GetProxyDefaults(partition string) *structs.ProxyConfigEntry {
if r.ProxyDefaults == nil {
return nil
}
return r.ProxyDefaults[partition]
}
func (r *ResolvedServiceConfigSet) AddServiceDefaults(entry *structs.ServiceConfigEntry) {
if entry == nil {
return
}
if r.ServiceDefaults == nil {
r.ServiceDefaults = make(map[structs.ServiceID]*structs.ServiceConfigEntry)
}
sid := structs.NewServiceID(entry.Name, &entry.EnterpriseMeta)
r.ServiceDefaults[sid] = entry
}
func (r *ResolvedServiceConfigSet) AddProxyDefaults(entry *structs.ProxyConfigEntry) {
if entry == nil {
return
}
if r.ProxyDefaults == nil {
r.ProxyDefaults = make(map[string]*structs.ProxyConfigEntry)
}
r.ProxyDefaults[entry.PartitionOrDefault()] = entry
}

View File

@ -10,8 +10,10 @@ import (
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/mitchellh/copystructure"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
@ -236,6 +238,10 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
}
}
var (
priorHash uint64
ranOnce bool
)
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
@ -258,6 +264,26 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
reply.Kind = args.Kind
reply.Index = index
reply.Entries = filteredEntries
// Generate a hash of the content driving this response. Use it to
// determine if the response is identical to a prior wakeup.
newHash, err := hashstructure_v2.Hash(filteredEntries, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranOnce && priorHash == newHash {
priorHash = newHash
return errNotChanged
} else {
priorHash = newHash
ranOnce = true
}
if len(reply.Entries) == 0 {
return errNotFound
}
return nil
})
}
@ -417,115 +443,18 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return acl.ErrPermissionDenied
}
var (
priorHash uint64
ranOnce bool
)
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, &args.EnterpriseMeta)
if err != nil {
return err
}
var (
proxyConf *structs.ProxyConfigEntry
proxyConfGlobalProtocol string
ok bool
upstreamIDs = args.UpstreamIDs
legacyUpstreams = false
)
if proxyEntry != nil {
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
if !ok {
return fmt.Errorf("invalid proxy config type %T", proxyEntry)
}
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
thisReply.Index = index
var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
upstreamIDs := args.UpstreamIDs
legacyUpstreams := false
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
*reply = thisReply
return nil
}
// The request is considered legacy if the deprecated args.Upstream was used
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
@ -533,136 +462,274 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams {
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
// Before Consul namespaces were released, the Upstreams
// provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the
// request, which will just be the default namespace.
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
}
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Fetch all relevant config entries.
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
index, entries, err := state.ReadResolvedServiceConfigEntries(
ws,
args.Name,
&args.EnterpriseMeta,
upstreamIDs,
args.Mode,
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
c.logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
if err != nil {
return err
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
_, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
if err != nil {
return err
}
if upstreamSvcDefaults != nil {
cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults)
}
if cfg.Protocol != "" {
protocol = cfg.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
// Generate a hash of the config entry content driving this
// response. Use it to determine if the response is identical to a
// prior wakeup.
newHash, err := hashstructure_v2.Hash(entries, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
*reply = thisReply
return nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
if ranOnce && priorHash == newHash {
priorHash = newHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which
// is desirable
return errNotChanged
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
priorHash = newHash
ranOnce = true
}
thisReply, err := c.computeResolvedServiceConfig(
args,
upstreamIDs,
legacyUpstreams,
entries,
)
if err != nil {
return err
}
thisReply.Index = index
*reply = *thisReply
if entries.IsEmpty() {
// No config entries factored into this reply; it's a default.
return errNotFound
}
*reply = thisReply
return nil
})
}
func (c *ConfigEntry) computeResolvedServiceConfig(
args *structs.ServiceConfigRequest,
upstreamIDs []structs.ServiceID,
legacyUpstreams bool,
entries *configentry.ResolvedServiceConfigSet,
) (*structs.ServiceConfigResponse, error) {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
var proxyConfGlobalProtocol string
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
if proxyConf != nil {
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
var ok bool
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
serviceConf := entries.GetServiceDefaults(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return &thisReply, nil
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
c.logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
return &thisReply, nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return &thisReply, nil
}
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
// ExportedServices entries are gated from interactions from secondary DCs
// because non-default partitions cannot be created in secondaries

View File

@ -9,10 +9,12 @@ import (
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
@ -310,63 +312,77 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
}
_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
store := s1.fsm.State()
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "alpha",
}
require.NoError(t, store.EnsureConfigEntry(1, entry))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "does-not-exist",
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.ConfigEntryResponse
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := uint64(0); i < 200; i++ {
time.Sleep(5 * time.Millisecond)
entry := &structs.ServiceConfigEntry{
{ // create one relevant entry
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("other%d", i),
}
if err := store.EnsureConfigEntry(i+2, entry); err != nil {
return err
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
// The test is a bit racy because of the timing of the two goroutines, so
// we relax the check for the count to be within a small range.
if count < 2 || count > 3 {
t.Fatalf("expected count to be 2 or 3, got %d", count)
Name: "alpha",
},
}, &out))
require.True(t, out)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "does-not-exist",
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.ConfigEntryResponse
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("other%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
})
}
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
@ -472,6 +488,102 @@ func TestConfigEntry_List(t *testing.T) {
require.Equal(t, expected, out)
}
func TestConfigEntry_List_BlockOnNoChange(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Datacenter: "dc1",
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.IndexedConfigEntries
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: fmt.Sprintf(dataPrefix+"%d", i),
ConnectTimeout: 33 * time.Second,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other")
})
{ // Create some dummy services in the state store to look up.
for _, entry := range []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
},
} {
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: entry,
}, &out))
require.True(t, out)
}
}
runStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other")
})
}
func TestConfigEntry_ListAll(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -2025,6 +2137,142 @@ func TestConfigEntry_ResolveServiceConfig_ProxyDefaultsProtocol_UsedForAllUpstre
require.Equal(t, expected, out)
}
func BenchmarkConfigEntry_ResolveServiceConfig_Hash(b *testing.B) {
res := &configentry.ResolvedServiceConfigSet{}
res.AddServiceDefaults(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
})
res.AddServiceDefaults(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "up1",
Protocol: "http",
})
res.AddServiceDefaults(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "up2",
Protocol: "http",
})
res.AddProxyDefaults(&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := hashstructure_v2.Hash(res, hashstructure_v2.FormatV2, nil)
if err != nil {
b.Fatalf("error: %v", err)
}
}
}
func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ServiceConfigRequest{
Name: "foo",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
},
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.ServiceConfigResponse
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf(dataPrefix+"%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}
{ // create one unrelated entry
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "unrelated",
},
}, &out))
require.True(t, out)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other")
})
{ // create one relevant entry
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
Protocol: "grpc",
},
}, &out))
require.True(t, out)
}
runStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other")
})
}
func TestConfigEntry_ResolveServiceConfigNoConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -6,6 +6,7 @@ import (
metrics "github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/discoverychain"
@ -48,6 +49,10 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
evalDC = c.srv.config.Datacenter
}
var (
priorHash uint64
ranOnce bool
)
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
@ -66,9 +71,32 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
return err
}
// Generate a hash of the config entry content driving this
// response. Use it to determine if the response is identical to a
// prior wakeup.
newHash, err := hashstructure_v2.Hash(chain, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranOnce && priorHash == newHash {
priorHash = newHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which
// is desirable
return errNotChanged
} else {
priorHash = newHash
ranOnce = true
}
reply.Index = index
reply.Chain = chain
if chain.IsDefault() {
return errNotFound
}
return nil
})
}

View File

@ -1,6 +1,7 @@
package consul
import (
"context"
"fmt"
"os"
"testing"
@ -8,6 +9,7 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
@ -242,3 +244,116 @@ func TestDiscoveryChainEndpoint_Get(t *testing.T) {
require.Equal(t, expect, resp)
}
}
func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{ // create one unrelated entry
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "unrelated",
ConnectTimeout: 33 * time.Second,
},
}, &out))
require.True(t, out)
}
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := &structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Datacenter: "dc1",
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.DiscoveryChainResponse
err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out)
if err != nil {
return fmt.Errorf("error getting discovery chain: %w", err)
}
if !out.Chain.IsDefault() {
return fmt.Errorf("expected default chain")
}
t.Log("blocking query index", out.QueryMeta.Index, out.Chain)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf(dataPrefix+"%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other")
})
{ // create one relevant entry
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "grpc",
},
}, &out))
require.True(t, out)
}
runStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other")
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
@ -617,6 +618,10 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In
}
}
var (
priorHash uint64
ranOnce bool
)
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
@ -628,6 +633,35 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In
reply.Index = index
reply.Matches = matches
// Generate a hash of the intentions content driving this response.
// Use it to determine if the response is identical to a prior
// wakeup.
newHash, err := hashstructure_v2.Hash(matches, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranOnce && priorHash == newHash {
priorHash = newHash
return errNotChanged
} else {
priorHash = newHash
ranOnce = true
}
hasData := false
for _, match := range matches {
if len(match) > 0 {
hasData = true
break
}
}
if !hasData {
return errNotFound
}
return nil
},
)

View File

@ -1,6 +1,7 @@
package consul
import (
"context"
"fmt"
"os"
"testing"
@ -8,6 +9,7 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -1742,6 +1744,123 @@ func TestIntentionMatch_good(t *testing.T) {
require.Equal(t, expected, actual)
}
func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
run := func(t *testing.T, dataPrefix string, expectMatches int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := &structs.IntentionQueryRequest{
Datacenter: "dc1",
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{Name: "bar"},
},
},
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.IndexedIntentionMatches
err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out)
if err != nil {
return fmt.Errorf("error getting intentions: %w", err)
}
if len(out.Matches) != 1 {
return fmt.Errorf("expected 1 match got %d", len(out.Matches))
}
if len(out.Matches[0]) != expectMatches {
return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0]))
}
t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0])
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out string
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
// {"default", "*", "default", "baz"}, // shouldn't match
SourceNS: "default",
SourceName: "*",
DestinationNS: "default",
DestinationName: fmt.Sprintf(dataPrefix+"%d", i),
Action: structs.IntentionActionAllow,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other", 0)
})
// Create some records
{
insert := [][]string{
{"default", "*", "default", "*"},
{"default", "*", "default", "bar"},
{"default", "*", "default", "baz"}, // shouldn't match
}
for _, v := range insert {
var out string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Intention.Apply", &structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
SourceNS: v[0],
SourceName: v[1],
DestinationNS: v[2],
DestinationName: v[3],
Action: structs.IntentionActionAllow,
},
}, &out))
}
}
runStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other", 2)
})
}
// Test matching with ACLs
func TestIntentionMatch_acl(t *testing.T) {
if testing.Short() {

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/serf"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
@ -210,6 +211,10 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
return err
}
var (
priorHash uint64
ranOnce bool
)
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
@ -224,6 +229,23 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
reply.Index, reply.Services = index, services
m.srv.filterACLWithAuthorizer(authz, reply)
// Generate a hash of the intentions content driving this response.
// Use it to determine if the response is identical to a prior
// wakeup.
newHash, err := hashstructure_v2.Hash(services, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranOnce && priorHash == newHash {
priorHash = newHash
return errNotChanged
} else {
priorHash = newHash
ranOnce = true
}
return nil
})
}

View File

@ -1,14 +1,18 @@
package consul
import (
"context"
"encoding/base64"
"fmt"
"os"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -2317,6 +2321,115 @@ func TestInternal_IntentionUpstreams(t *testing.T) {
})
}
func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
{ // ensure it's default deny to start
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "*",
Sources: []*structs.SourceIntention{
{
Name: "*",
Action: structs.IntentionActionDeny,
},
},
},
}, &out))
require.True(t, out)
}
run := func(t *testing.T, dataPrefix string, expectServices int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
args.QueryOptions.MaxQueryTime = time.Second
for ctx.Err() == nil {
var out structs.IndexedServiceList
err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out)
if err != nil {
return fmt.Errorf("error getting upstreams: %w", err)
}
if len(out.Services) != expectServices {
return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services))
}
t.Log("blocking query index", out.QueryMeta.Index, out.Services)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
var out string
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
SourceName: fmt.Sprintf(dataPrefix+"-src-%d", i),
DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i),
Action: structs.IntentionActionAllow,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other", 0)
})
// Services:
// api and api-proxy on node foo
// web and web-proxy on node foo
//
// Intentions
// * -> * (deny) intention
// web -> api (allow)
registerIntentionUpstreamEntries(t, codec, "")
runStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other", 1)
})
}
func TestInternal_IntentionUpstreams_ACL(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -954,6 +954,19 @@ type blockingQueryResponseMeta interface {
// a previous result. errNotFound will never be returned to the caller, it is
// converted to nil before returning.
//
// The query function can return errNotChanged, which is a sentinel error. This
// can only be returned on calls AFTER the first call, as it would not be
// possible to detect the absence of a change on the first call. Returning
// errNotChanged indicates that the query results are identical to the prior
// results which allows blockingQuery to keep blocking until the query returns
// a real changed result.
//
// The query function must take care to ensure the actual result of the query
// is either left unmodified or explicitly left in a good state before
// returning, otherwise when blockingQuery times out it may return an
// incomplete or unexpected result. errNotChanged will never be returned to the
// caller, it is converted to nil before returning.
//
// If query function returns any other error, the error is returned to the caller
// immediately.
//
@ -993,7 +1006,7 @@ func (s *Server) blockingQuery(
var ws memdb.WatchSet
err := query(ws, s.fsm.State())
s.setQueryMeta(responseMeta, opts.GetToken())
if errors.Is(err, errNotFound) {
if errors.Is(err, errNotFound) || errors.Is(err, errNotChanged) {
return nil
}
return err
@ -1008,7 +1021,10 @@ func (s *Server) blockingQuery(
// decrement the count when the function returns.
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
var notFound bool
var (
notFound bool
ranOnce bool
)
for {
if opts.GetRequireConsistent() {
@ -1029,17 +1045,23 @@ func (s *Server) blockingQuery(
err := query(ws, state)
s.setQueryMeta(responseMeta, opts.GetToken())
switch {
case errors.Is(err, errNotFound):
if notFound {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
notFound = true
case errors.Is(err, errNotChanged):
if ranOnce {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}
case err != nil:
return err
}
ranOnce = true
if responseMeta.GetIndex() > minQueryIndex {
return nil
@ -1060,7 +1082,10 @@ func (s *Server) blockingQuery(
}
}
var errNotFound = fmt.Errorf("no data found for query")
var (
errNotFound = fmt.Errorf("no data found for query")
errNotChanged = fmt.Errorf("data did not change for query")
)
// setQueryMeta is used to populate the QueryMeta data for an RPC call
//

View File

@ -1681,3 +1681,24 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc
}
return event, nil
}
// assertBlockingQueryWakeupCount is used to assist in assertions for
// blockingQuery RPC tests involving the two sentinel errors errNotFound and
// errNotChanged.
//
// Those tests are a bit racy because of the timing of the two goroutines, so
// we relax the check for the count to be within a small range.
//
// The blocking query is going to wake up every interval, so use the elapsed test
// time with that known timing value to gauge how many legit wakeups should
// happen and then pad it out a smidge.
func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) {
t.Helper()
const buffer = 2
expectedQueries := int(time.Since(start)/interval) + buffer
if gotCount < 2 || gotCount > expectedQueries {
t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount)
}
}

View File

@ -818,6 +818,120 @@ func (s *Store) serviceDiscoveryChainTxn(
return index, chain, nil
}
func (s *Store) ReadResolvedServiceConfigEntries(
ws memdb.WatchSet,
serviceName string,
entMeta *structs.EnterpriseMeta,
upstreamIDs []structs.ServiceID,
proxyMode structs.ProxyMode,
) (uint64, *configentry.ResolvedServiceConfigSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
var res configentry.ResolvedServiceConfigSet
// The caller will likely calculate this again, but we need to do it here
// to determine if we are going to traverse into implicit upstream
// definitions.
var inferredProxyMode structs.ProxyMode
index, proxyEntry, err := configEntryTxn(tx, ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, entMeta)
if err != nil {
return 0, nil, err
}
maxIndex := index
if proxyEntry != nil {
var ok bool
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("invalid proxy config type %T", proxyEntry)
}
res.AddProxyDefaults(proxyConf)
inferredProxyMode = proxyConf.Mode
}
index, serviceEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, serviceName, entMeta)
if err != nil {
return 0, nil, err
}
if index > maxIndex {
maxIndex = index
}
var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
var ok bool
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("invalid service config type %T", serviceEntry)
}
res.AddServiceDefaults(serviceConf)
if serviceConf.Mode != structs.ProxyModeDefault {
inferredProxyMode = serviceConf.Mode
}
}
var (
noUpstreamArgs = len(upstreamIDs) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then proxyMode
// will never be transparent because the service config request does not use the resolved value.
tproxy = proxyMode == structs.ProxyModeTransparent || inferredProxyMode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return maxIndex, &res, nil
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for _, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
}
}
for upstream := range seenUpstreams {
index, rawEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if index > maxIndex {
maxIndex = index
}
if rawEntry != nil {
entry, ok := rawEntry.(*structs.ServiceConfigEntry)
if !ok {
return 0, nil, fmt.Errorf("invalid service config type %T", rawEntry)
}
res.AddServiceDefaults(entry)
}
}
return maxIndex, &res, nil
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
// the provided service name. All relevant config entries will be recursively
// fetched and included in the result.

1
go.mod
View File

@ -69,6 +69,7 @@ require (
github.com/mitchellh/copystructure v1.0.0
github.com/mitchellh/go-testing-interface v1.14.0
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/mitchellh/mapstructure v1.4.1
github.com/mitchellh/pointerstructure v1.2.1
github.com/mitchellh/reflectwalk v1.0.1

2
go.sum
View File

@ -396,6 +396,8 @@ github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 h1:hOY53G+kBFhbYFpRVxHl5eS7laP6B1+Cq+Z9Dry1iMU=
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=