From 3804677570b375916b8b09a2d2ec36f39a41b3fa Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 25 Feb 2022 15:46:34 -0600 Subject: [PATCH] server: suppress spurious blocking query returns where multiple config entries are involved (#12362) Starting from and extending the mechanism introduced in #12110 we can specially handle the 3 main special Consul RPC endpoints that react to many config entries in a single blocking query in Connect: - `DiscoveryChain.Get` - `ConfigEntry.ResolveServiceConfig` - `Intentions.Match` All of these will internally watch for many config entries, and at least one of those will likely be not found in any given query. Because these are blends of multiple reads the exact solution from #12110 isn't perfectly aligned, but we can tweak the approach slightly and regain the utility of that mechanism. ### No Config Entries Found In this case, despite looking for many config entries none may be found at all. Unlike #12110 in this scenario we do not return an empty reply to the caller, but instead synthesize a struct from default values to return. This can be handled nearly identically to #12110 with the first 1-2 replies being non-empty payloads followed by the standard spurious wakeup suppression mechanism from #12110. ### No Change Since Last Wakeup Once a blocking query loop on the server has completed and slept at least once, there is a further optimization we can make here to detect if any of the config entries that were present at specific versions for the prior execution of the loop are identical for the loop we just woke up for. In that scenario we can return a slightly different internal sentinel error and basically externally handle it similar to #12110. This would mean that even if 20 discovery chain read RPC handling goroutines wakeup due to the creation of an unrelated config entry, the only ones that will terminate and reply with a blob of data are those that genuinely have new data to report. ### Extra Endpoints Since this pattern is pretty reusable, other key config-entry-adjacent endpoints used by `agent/proxycfg` also were updated: - `ConfigEntry.List` - `Internal.IntentionUpstreams` (tproxy) --- .changelog/12362.txt | 3 + agent/configentry/config_entry.go | 4 + agent/configentry/service_config.go | 57 ++ agent/consul/config_endpoint.go | 497 ++++++++++-------- agent/consul/config_endpoint_test.go | 354 +++++++++++-- agent/consul/discovery_chain_endpoint.go | 28 + agent/consul/discovery_chain_endpoint_test.go | 115 ++++ agent/consul/intention_endpoint.go | 34 ++ agent/consul/intention_endpoint_test.go | 119 +++++ agent/consul/internal_endpoint.go | 22 + agent/consul/internal_endpoint_test.go | 113 ++++ agent/consul/rpc.go | 33 +- agent/consul/rpc_test.go | 21 + agent/consul/state/config_entry.go | 114 ++++ go.mod | 1 + go.sum | 2 + 16 files changed, 1245 insertions(+), 272 deletions(-) create mode 100644 .changelog/12362.txt create mode 100644 agent/configentry/service_config.go diff --git a/.changelog/12362.txt b/.changelog/12362.txt new file mode 100644 index 000000000..e29bcb66d --- /dev/null +++ b/.changelog/12362.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: suppress spurious blocking query returns where multiple config entries are involved +``` diff --git a/agent/configentry/config_entry.go b/agent/configentry/config_entry.go index 0b5751492..7ede09358 100644 --- a/agent/configentry/config_entry.go +++ b/agent/configentry/config_entry.go @@ -32,3 +32,7 @@ func NewKindName(kind, name string, entMeta *structs.EnterpriseMeta) KindName { ret.Normalize() return ret } + +func NewKindNameForEntry(entry structs.ConfigEntry) KindName { + return NewKindName(entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta()) +} diff --git a/agent/configentry/service_config.go b/agent/configentry/service_config.go new file mode 100644 index 000000000..5cc6e740a --- /dev/null +++ b/agent/configentry/service_config.go @@ -0,0 +1,57 @@ +package configentry + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +// ResolvedServiceConfigSet is a wrapped set of raw cross-referenced config +// entries necessary for the ConfigEntry.ResolveServiceConfig RPC process. +// +// None of these are defaulted. +type ResolvedServiceConfigSet struct { + ServiceDefaults map[structs.ServiceID]*structs.ServiceConfigEntry + ProxyDefaults map[string]*structs.ProxyConfigEntry +} + +func (r *ResolvedServiceConfigSet) IsEmpty() bool { + return len(r.ServiceDefaults) == 0 && len(r.ProxyDefaults) == 0 +} + +func (r *ResolvedServiceConfigSet) GetServiceDefaults(sid structs.ServiceID) *structs.ServiceConfigEntry { + if r.ServiceDefaults == nil { + return nil + } + return r.ServiceDefaults[sid] +} + +func (r *ResolvedServiceConfigSet) GetProxyDefaults(partition string) *structs.ProxyConfigEntry { + if r.ProxyDefaults == nil { + return nil + } + return r.ProxyDefaults[partition] +} + +func (r *ResolvedServiceConfigSet) AddServiceDefaults(entry *structs.ServiceConfigEntry) { + if entry == nil { + return + } + + if r.ServiceDefaults == nil { + r.ServiceDefaults = make(map[structs.ServiceID]*structs.ServiceConfigEntry) + } + + sid := structs.NewServiceID(entry.Name, &entry.EnterpriseMeta) + r.ServiceDefaults[sid] = entry +} + +func (r *ResolvedServiceConfigSet) AddProxyDefaults(entry *structs.ProxyConfigEntry) { + if entry == nil { + return + } + + if r.ProxyDefaults == nil { + r.ProxyDefaults = make(map[string]*structs.ProxyConfigEntry) + } + + r.ProxyDefaults[entry.PartitionOrDefault()] = entry +} diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 00316c082..3cb93c4d2 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -10,8 +10,10 @@ import ( "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/mitchellh/copystructure" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" ) @@ -236,6 +238,10 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe } } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -258,6 +264,26 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe reply.Kind = args.Kind reply.Index = index reply.Entries = filteredEntries + + // Generate a hash of the content driving this response. Use it to + // determine if the response is identical to a prior wakeup. + newHash, err := hashstructure_v2.Hash(filteredEntries, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + + if len(reply.Entries) == 0 { + return errNotFound + } + return nil }) } @@ -417,115 +443,18 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return acl.ErrPermissionDenied } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - var thisReply structs.ServiceConfigResponse - - thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault - // TODO(freddy) Refactor this into smaller set of state store functions - // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the - // blocking query, this function will be rerun and these state store lookups will both be current. - // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. - _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, &args.EnterpriseMeta) - if err != nil { - return err - } - var ( - proxyConf *structs.ProxyConfigEntry - proxyConfGlobalProtocol string - ok bool + upstreamIDs = args.UpstreamIDs + legacyUpstreams = false ) - if proxyEntry != nil { - proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) - if !ok { - return fmt.Errorf("invalid proxy config type %T", proxyEntry) - } - // Apply the proxy defaults to the sidecar's proxy config - mapCopy, err := copystructure.Copy(proxyConf.Config) - if err != nil { - return fmt.Errorf("failed to copy global proxy-defaults: %v", err) - } - thisReply.ProxyConfig = mapCopy.(map[string]interface{}) - thisReply.Mode = proxyConf.Mode - thisReply.TransparentProxy = proxyConf.TransparentProxy - thisReply.MeshGateway = proxyConf.MeshGateway - thisReply.Expose = proxyConf.Expose - - // Extract the global protocol from proxyConf for upstream configs. - rawProtocol := proxyConf.Config["protocol"] - if rawProtocol != nil { - proxyConfGlobalProtocol, ok = rawProtocol.(string) - if !ok { - return fmt.Errorf("invalid protocol type %T", rawProtocol) - } - } - } - - index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta) - if err != nil { - return err - } - thisReply.Index = index - - var serviceConf *structs.ServiceConfigEntry - if serviceEntry != nil { - serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", serviceEntry) - } - if serviceConf.Expose.Checks { - thisReply.Expose.Checks = true - } - if len(serviceConf.Expose.Paths) >= 1 { - thisReply.Expose.Paths = serviceConf.Expose.Paths - } - if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode - } - if serviceConf.Protocol != "" { - if thisReply.ProxyConfig == nil { - thisReply.ProxyConfig = make(map[string]interface{}) - } - thisReply.ProxyConfig["protocol"] = serviceConf.Protocol - } - if serviceConf.TransparentProxy.OutboundListenerPort != 0 { - thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort - } - if serviceConf.TransparentProxy.DialedDirectly { - thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly - } - if serviceConf.Mode != structs.ProxyModeDefault { - thisReply.Mode = serviceConf.Mode - } - } - - // First collect all upstreams into a set of seen upstreams. - // Upstreams can come from: - // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint - // - Implicitly from centralized upstream config in service-defaults - seenUpstreams := map[structs.ServiceID]struct{}{} - - upstreamIDs := args.UpstreamIDs - legacyUpstreams := false - - var ( - noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 - - // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode - // will never be transparent because the service config request does not use the resolved value. - tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent - ) - - // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. - // If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode. - // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. - if noUpstreamArgs && !tproxy { - *reply = thisReply - return nil - } // The request is considered legacy if the deprecated args.Upstream was used if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 { @@ -533,136 +462,274 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r upstreamIDs = make([]structs.ServiceID, 0) for _, upstream := range args.Upstreams { - // Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace. - // Because of this we attach the enterprise meta of the request, which will just be the default namespace. + // Before Consul namespaces were released, the Upstreams + // provided to the endpoint did not contain the namespace. + // Because of this we attach the enterprise meta of the + // request, which will just be the default namespace. sid := structs.NewServiceID(upstream, &args.EnterpriseMeta) upstreamIDs = append(upstreamIDs, sid) } } - // First store all upstreams that were provided in the request - for _, sid := range upstreamIDs { - if _, ok := seenUpstreams[sid]; !ok { - seenUpstreams[sid] = struct{}{} - } - } + // Fetch all relevant config entries. - // Then store upstreams inferred from service-defaults and mapify the overrides. - var ( - upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) - upstreamDefaults *structs.UpstreamConfig - // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. - usConfigs = make(map[structs.ServiceID]map[string]interface{}) + index, entries, err := state.ReadResolvedServiceConfigEntries( + ws, + args.Name, + &args.EnterpriseMeta, + upstreamIDs, + args.Mode, ) - if serviceConf != nil && serviceConf.UpstreamConfig != nil { - for i, override := range serviceConf.UpstreamConfig.Overrides { - if override.Name == "" { - c.logger.Warn( - "Skipping UpstreamConfig.Overrides entry without a required name field", - "entryIndex", i, - "kind", serviceConf.GetKind(), - "name", serviceConf.GetName(), - "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), - ) - continue // skip this impossible condition - } - seenUpstreams[override.ServiceID()] = struct{}{} - upstreamConfigs[override.ServiceID()] = override - } - if serviceConf.UpstreamConfig.Defaults != nil { - upstreamDefaults = serviceConf.UpstreamConfig.Defaults - - // Store the upstream defaults under a wildcard key so that they can be applied to - // upstreams that are inferred from intentions and do not have explicit upstream configuration. - cfgMap := make(map[string]interface{}) - upstreamDefaults.MergeInto(cfgMap) - - wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) - usConfigs[wildcard] = cfgMap - } + if err != nil { + return err } - for upstream := range seenUpstreams { - resolvedCfg := make(map[string]interface{}) - - // The protocol of an upstream is resolved in this order: - // 1. Default protocol from proxy-defaults (how all services should be addressed) - // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) - // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream - // (how the downstream wants to address it) - protocol := proxyConfGlobalProtocol - - _, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) - if err != nil { - return err - } - if upstreamSvcDefaults != nil { - cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults) - } - if cfg.Protocol != "" { - protocol = cfg.Protocol - } - } - if protocol != "" { - resolvedCfg["protocol"] = protocol - } - - // Merge centralized defaults for all upstreams before configuration for specific upstreams - if upstreamDefaults != nil { - upstreamDefaults.MergeInto(resolvedCfg) - } - - // The MeshGateway value from the proxy registration overrides the one from upstream_defaults - // because it is specific to the proxy instance. - // - // The goal is to flatten the mesh gateway mode in this order: - // 0. Value from centralized upstream_defaults - // 1. Value from local proxy registration - // 2. Value from centralized upstream_config - // 3. Value from local upstream definition. This last step is done in the client's service manager. - if !args.MeshGateway.IsZero() { - resolvedCfg["mesh_gateway"] = args.MeshGateway - } - - if upstreamConfigs[upstream] != nil { - upstreamConfigs[upstream].MergeInto(resolvedCfg) - } - - if len(resolvedCfg) > 0 { - usConfigs[upstream] = resolvedCfg - } + // Generate a hash of the config entry content driving this + // response. Use it to determine if the response is identical to a + // prior wakeup. + newHash, err := hashstructure_v2.Hash(entries, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) } - // don't allocate the slices just to not fill them - if len(usConfigs) == 0 { - *reply = thisReply - return nil - } - - if legacyUpstreams { - // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces - thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) - - for us, conf := range usConfigs { - thisReply.UpstreamConfigs[us.ID] = conf - } - + if ranOnce && priorHash == newHash { + priorHash = newHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which + // is desirable + return errNotChanged } else { - thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) - - for us, conf := range usConfigs { - thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, - structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) - } + priorHash = newHash + ranOnce = true + } + + thisReply, err := c.computeResolvedServiceConfig( + args, + upstreamIDs, + legacyUpstreams, + entries, + ) + if err != nil { + return err + } + thisReply.Index = index + + *reply = *thisReply + if entries.IsEmpty() { + // No config entries factored into this reply; it's a default. + return errNotFound } - *reply = thisReply return nil }) } +func (c *ConfigEntry) computeResolvedServiceConfig( + args *structs.ServiceConfigRequest, + upstreamIDs []structs.ServiceID, + legacyUpstreams bool, + entries *configentry.ResolvedServiceConfigSet, +) (*structs.ServiceConfigResponse, error) { + var thisReply structs.ServiceConfigResponse + + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault + + // TODO(freddy) Refactor this into smaller set of state store functions + // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the + // blocking query, this function will be rerun and these state store lookups will both be current. + // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. + var proxyConfGlobalProtocol string + proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault()) + if proxyConf != nil { + // Apply the proxy defaults to the sidecar's proxy config + mapCopy, err := copystructure.Copy(proxyConf.Config) + if err != nil { + return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err) + } + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.Mode = proxyConf.Mode + thisReply.TransparentProxy = proxyConf.TransparentProxy + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose + + // Extract the global protocol from proxyConf for upstream configs. + rawProtocol := proxyConf.Config["protocol"] + if rawProtocol != nil { + var ok bool + proxyConfGlobalProtocol, ok = rawProtocol.(string) + if !ok { + return nil, fmt.Errorf("invalid protocol type %T", rawProtocol) + } + } + } + + serviceConf := entries.GetServiceDefaults( + structs.NewServiceID(args.Name, &args.EnterpriseMeta), + ) + if serviceConf != nil { + if serviceConf.Expose.Checks { + thisReply.Expose.Checks = true + } + if len(serviceConf.Expose.Paths) >= 1 { + thisReply.Expose.Paths = serviceConf.Expose.Paths + } + if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + } + if serviceConf.Protocol != "" { + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) + } + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol + } + if serviceConf.TransparentProxy.OutboundListenerPort != 0 { + thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort + } + if serviceConf.TransparentProxy.DialedDirectly { + thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly + } + if serviceConf.Mode != structs.ProxyModeDefault { + thisReply.Mode = serviceConf.Mode + } + } + + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} + + var ( + noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 + + // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode + // will never be transparent because the service config request does not use the resolved value. + tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent + ) + + // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. + // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. + // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. + if noUpstreamArgs && !tproxy { + return &thisReply, nil + } + + // First store all upstreams that were provided in the request + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + // Then store upstreams inferred from service-defaults and mapify the overrides. + var ( + upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) + upstreamDefaults *structs.UpstreamConfig + // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. + usConfigs = make(map[structs.ServiceID]map[string]interface{}) + ) + if serviceConf != nil && serviceConf.UpstreamConfig != nil { + for i, override := range serviceConf.UpstreamConfig.Overrides { + if override.Name == "" { + c.logger.Warn( + "Skipping UpstreamConfig.Overrides entry without a required name field", + "entryIndex", i, + "kind", serviceConf.GetKind(), + "name", serviceConf.GetName(), + "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), + ) + continue // skip this impossible condition + } + seenUpstreams[override.ServiceID()] = struct{}{} + upstreamConfigs[override.ServiceID()] = override + } + if serviceConf.UpstreamConfig.Defaults != nil { + upstreamDefaults = serviceConf.UpstreamConfig.Defaults + + // Store the upstream defaults under a wildcard key so that they can be applied to + // upstreams that are inferred from intentions and do not have explicit upstream configuration. + cfgMap := make(map[string]interface{}) + upstreamDefaults.MergeInto(cfgMap) + + wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) + usConfigs[wildcard] = cfgMap + } + } + + for upstream := range seenUpstreams { + resolvedCfg := make(map[string]interface{}) + + // The protocol of an upstream is resolved in this order: + // 1. Default protocol from proxy-defaults (how all services should be addressed) + // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) + // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream + // (how the downstream wants to address it) + protocol := proxyConfGlobalProtocol + + upstreamSvcDefaults := entries.GetServiceDefaults( + structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta), + ) + if upstreamSvcDefaults != nil { + if upstreamSvcDefaults.Protocol != "" { + protocol = upstreamSvcDefaults.Protocol + } + } + + if protocol != "" { + resolvedCfg["protocol"] = protocol + } + + // Merge centralized defaults for all upstreams before configuration for specific upstreams + if upstreamDefaults != nil { + upstreamDefaults.MergeInto(resolvedCfg) + } + + // The MeshGateway value from the proxy registration overrides the one from upstream_defaults + // because it is specific to the proxy instance. + // + // The goal is to flatten the mesh gateway mode in this order: + // 0. Value from centralized upstream_defaults + // 1. Value from local proxy registration + // 2. Value from centralized upstream_config + // 3. Value from local upstream definition. This last step is done in the client's service manager. + if !args.MeshGateway.IsZero() { + resolvedCfg["mesh_gateway"] = args.MeshGateway + } + + if upstreamConfigs[upstream] != nil { + upstreamConfigs[upstream].MergeInto(resolvedCfg) + } + + if len(resolvedCfg) > 0 { + usConfigs[upstream] = resolvedCfg + } + } + + // don't allocate the slices just to not fill them + if len(usConfigs) == 0 { + return &thisReply, nil + } + + if legacyUpstreams { + // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) + + for us, conf := range usConfigs { + thisReply.UpstreamConfigs[us.ID] = conf + } + + } else { + thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) + + for us, conf := range usConfigs { + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, + structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) + } + } + + return &thisReply, nil +} + func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error { // ExportedServices entries are gated from interactions from secondary DCs // because non-default partitions cannot be created in secondaries diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 527ba0272..414886b87 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -9,10 +9,12 @@ import ( "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -310,63 +312,77 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { } _, s1 := testServerWithConfig(t) + codec := rpcClient(t, s1) - store := s1.fsm.State() + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) - entry := &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "alpha", - } - require.NoError(t, store.EnsureConfigEntry(1, entry)) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Name: "does-not-exist", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { - var out structs.ConfigEntryResponse - - err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out.Entry) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := uint64(0); i < 200; i++ { - time.Sleep(5 * time.Millisecond) - entry := &structs.ServiceConfigEntry{ + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, - Name: fmt.Sprintf("other%d", i), - } - if err := store.EnsureConfigEntry(i+2, entry); err != nil { - return err - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - // The test is a bit racy because of the timing of the two goroutines, so - // we relax the check for the count to be within a small range. - if count < 2 || count > 3 { - t.Fatalf("expected count to be 2 or 3, got %d", count) + Name: "alpha", + }, + }, &out)) + require.True(t, out) } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "does-not-exist", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.ConfigEntryResponse + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out.Entry) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf("other%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + }) } func TestConfigEntry_Get_ACLDeny(t *testing.T) { @@ -472,6 +488,102 @@ func TestConfigEntry_List(t *testing.T) { require.Equal(t, expected, out) } +func TestConfigEntry_List_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: "dc1", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedConfigEntries + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: fmt.Sprintf(dataPrefix+"%d", i), + ConnectTimeout: 33 * time.Second, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // Create some dummy services in the state store to look up. + for _, entry := range []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + } { + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: entry, + }, &out)) + require.True(t, out) + } + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} + func TestConfigEntry_ListAll(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -2025,6 +2137,142 @@ func TestConfigEntry_ResolveServiceConfig_ProxyDefaultsProtocol_UsedForAllUpstre require.Equal(t, expected, out) } +func BenchmarkConfigEntry_ResolveServiceConfig_Hash(b *testing.B) { + res := &configentry.ResolvedServiceConfigSet{} + + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }) + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "up1", + Protocol: "http", + }) + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "up2", + Protocol: "http", + }) + res.AddProxyDefaults(&structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := hashstructure_v2.Hash(res, hashstructure_v2.FormatV2, nil) + if err != nil { + b.Fatalf("error: %v", err) + } + } +} + +func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ServiceConfigRequest{ + Name: "foo", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + }, + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.ServiceConfigResponse + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf(dataPrefix+"%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + { // create one unrelated entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "unrelated", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "grpc", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} + func TestConfigEntry_ResolveServiceConfigNoConfig(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index 3e91dd957..79ccbe49d 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -6,6 +6,7 @@ import ( metrics "github.com/armon/go-metrics" memdb "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/discoverychain" @@ -48,6 +49,10 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs evalDC = c.srv.config.Datacenter } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -66,9 +71,32 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return err } + // Generate a hash of the config entry content driving this + // response. Use it to determine if the response is identical to a + // prior wakeup. + newHash, err := hashstructure_v2.Hash(chain, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which + // is desirable + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + reply.Index = index reply.Chain = chain + if chain.IsDefault() { + return errNotFound + } + return nil }) } diff --git a/agent/consul/discovery_chain_endpoint_test.go b/agent/consul/discovery_chain_endpoint_test.go index 66c0aeb65..d876940a6 100644 --- a/agent/consul/discovery_chain_endpoint_test.go +++ b/agent/consul/discovery_chain_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "os" "testing" @@ -8,6 +9,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -242,3 +244,116 @@ func TestDiscoveryChainEndpoint_Get(t *testing.T) { require.Equal(t, expect, resp) } } + +func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + { // create one unrelated entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "unrelated", + ConnectTimeout: 33 * time.Second, + }, + }, &out)) + require.True(t, out) + } + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.DiscoveryChainRequest{ + Name: "web", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + EvaluateInPartition: "default", + Datacenter: "dc1", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.DiscoveryChainResponse + err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out) + if err != nil { + return fmt.Errorf("error getting discovery chain: %w", err) + } + if !out.Chain.IsDefault() { + return fmt.Errorf("expected default chain") + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Chain) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf(dataPrefix+"%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "grpc", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index 1fb894662..b31e98ceb 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -617,6 +618,10 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In } } + var ( + priorHash uint64 + ranOnce bool + ) return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -628,6 +633,35 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In reply.Index = index reply.Matches = matches + + // Generate a hash of the intentions content driving this response. + // Use it to determine if the response is identical to a prior + // wakeup. + newHash, err := hashstructure_v2.Hash(matches, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + + hasData := false + for _, match := range matches { + if len(match) > 0 { + hasData = true + break + } + } + + if !hasData { + return errNotFound + } + return nil }, ) diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 455fd45e0..0e73613da 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "os" "testing" @@ -8,6 +9,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -1742,6 +1744,123 @@ func TestIntentionMatch_good(t *testing.T) { require.Equal(t, expected, actual) } +func TestIntentionMatch_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + + run := func(t *testing.T, dataPrefix string, expectMatches int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.IntentionQueryRequest{ + Datacenter: "dc1", + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + {Name: "bar"}, + }, + }, + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedIntentionMatches + + err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out) + if err != nil { + return fmt.Errorf("error getting intentions: %w", err) + } + if len(out.Matches) != 1 { + return fmt.Errorf("expected 1 match got %d", len(out.Matches)) + } + if len(out.Matches[0]) != expectMatches { + return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0]) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out string + err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + // {"default", "*", "default", "baz"}, // shouldn't match + SourceNS: "default", + SourceName: "*", + DestinationNS: "default", + DestinationName: fmt.Sprintf(dataPrefix+"%d", i), + Action: structs.IntentionActionAllow, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other", 0) + }) + + // Create some records + { + insert := [][]string{ + {"default", "*", "default", "*"}, + {"default", "*", "default", "bar"}, + {"default", "*", "default", "baz"}, // shouldn't match + } + + for _, v := range insert { + var out string + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceNS: v[0], + SourceName: v[1], + DestinationNS: v[2], + DestinationName: v[3], + Action: structs.IntentionActionAllow, + }, + }, &out)) + } + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other", 2) + }) +} + // Test matching with ACLs func TestIntentionMatch_acl(t *testing.T) { if testing.Short() { diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 0f7740a39..a89ea5f54 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/serf" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -210,6 +211,10 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl return err } + var ( + priorHash uint64 + ranOnce bool + ) return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -224,6 +229,23 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl reply.Index, reply.Services = index, services m.srv.filterACLWithAuthorizer(authz, reply) + + // Generate a hash of the intentions content driving this response. + // Use it to determine if the response is identical to a prior + // wakeup. + newHash, err := hashstructure_v2.Hash(services, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + return nil }) } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 946e099a5..c6f63e7f2 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1,14 +1,18 @@ package consul import ( + "context" "encoding/base64" + "fmt" "os" "strings" "testing" + "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -2317,6 +2321,115 @@ func TestInternal_IntentionUpstreams(t *testing.T) { }) } +func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + + { // ensure it's default deny to start + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "*", + Sources: []*structs.SourceIntention{ + { + Name: "*", + Action: structs.IntentionActionDeny, + }, + }, + }, + }, &out)) + require.True(t, out) + } + + run := func(t *testing.T, dataPrefix string, expectServices int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedServiceList + + err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out) + if err != nil { + return fmt.Errorf("error getting upstreams: %w", err) + } + + if len(out.Services) != expectServices { + return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Services) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out string + err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceName: fmt.Sprintf(dataPrefix+"-src-%d", i), + DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i), + Action: structs.IntentionActionAllow, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other", 0) + }) + + // Services: + // api and api-proxy on node foo + // web and web-proxy on node foo + // + // Intentions + // * -> * (deny) intention + // web -> api (allow) + registerIntentionUpstreamEntries(t, codec, "") + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other", 1) + }) +} + func TestInternal_IntentionUpstreams_ACL(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index d0977e975..979569690 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -954,6 +954,19 @@ type blockingQueryResponseMeta interface { // a previous result. errNotFound will never be returned to the caller, it is // converted to nil before returning. // +// The query function can return errNotChanged, which is a sentinel error. This +// can only be returned on calls AFTER the first call, as it would not be +// possible to detect the absence of a change on the first call. Returning +// errNotChanged indicates that the query results are identical to the prior +// results which allows blockingQuery to keep blocking until the query returns +// a real changed result. +// +// The query function must take care to ensure the actual result of the query +// is either left unmodified or explicitly left in a good state before +// returning, otherwise when blockingQuery times out it may return an +// incomplete or unexpected result. errNotChanged will never be returned to the +// caller, it is converted to nil before returning. +// // If query function returns any other error, the error is returned to the caller // immediately. // @@ -993,7 +1006,7 @@ func (s *Server) blockingQuery( var ws memdb.WatchSet err := query(ws, s.fsm.State()) s.setQueryMeta(responseMeta, opts.GetToken()) - if errors.Is(err, errNotFound) { + if errors.Is(err, errNotFound) || errors.Is(err, errNotChanged) { return nil } return err @@ -1008,7 +1021,10 @@ func (s *Server) blockingQuery( // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) - var notFound bool + var ( + notFound bool + ranOnce bool + ) for { if opts.GetRequireConsistent() { @@ -1029,17 +1045,23 @@ func (s *Server) blockingQuery( err := query(ws, state) s.setQueryMeta(responseMeta, opts.GetToken()) + switch { case errors.Is(err, errNotFound): if notFound { // query result has not changed minQueryIndex = responseMeta.GetIndex() } - notFound = true + case errors.Is(err, errNotChanged): + if ranOnce { + // query result has not changed + minQueryIndex = responseMeta.GetIndex() + } case err != nil: return err } + ranOnce = true if responseMeta.GetIndex() > minQueryIndex { return nil @@ -1060,7 +1082,10 @@ func (s *Server) blockingQuery( } } -var errNotFound = fmt.Errorf("no data found for query") +var ( + errNotFound = fmt.Errorf("no data found for query") + errNotChanged = fmt.Errorf("data did not change for query") +) // setQueryMeta is used to populate the QueryMeta data for an RPC call // diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index df08627be..d26b83faa 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1681,3 +1681,24 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc } return event, nil } + +// assertBlockingQueryWakeupCount is used to assist in assertions for +// blockingQuery RPC tests involving the two sentinel errors errNotFound and +// errNotChanged. +// +// Those tests are a bit racy because of the timing of the two goroutines, so +// we relax the check for the count to be within a small range. +// +// The blocking query is going to wake up every interval, so use the elapsed test +// time with that known timing value to gauge how many legit wakeups should +// happen and then pad it out a smidge. +func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) { + t.Helper() + + const buffer = 2 + expectedQueries := int(time.Since(start)/interval) + buffer + + if gotCount < 2 || gotCount > expectedQueries { + t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount) + } +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index e75ed7f8b..7f5277561 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -818,6 +818,120 @@ func (s *Store) serviceDiscoveryChainTxn( return index, chain, nil } +func (s *Store) ReadResolvedServiceConfigEntries( + ws memdb.WatchSet, + serviceName string, + entMeta *structs.EnterpriseMeta, + upstreamIDs []structs.ServiceID, + proxyMode structs.ProxyMode, +) (uint64, *configentry.ResolvedServiceConfigSet, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + var res configentry.ResolvedServiceConfigSet + + // The caller will likely calculate this again, but we need to do it here + // to determine if we are going to traverse into implicit upstream + // definitions. + var inferredProxyMode structs.ProxyMode + + index, proxyEntry, err := configEntryTxn(tx, ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, entMeta) + if err != nil { + return 0, nil, err + } + maxIndex := index + + if proxyEntry != nil { + var ok bool + proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid proxy config type %T", proxyEntry) + } + res.AddProxyDefaults(proxyConf) + + inferredProxyMode = proxyConf.Mode + } + + index, serviceEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, serviceName, entMeta) + if err != nil { + return 0, nil, err + } + + if index > maxIndex { + maxIndex = index + } + + var serviceConf *structs.ServiceConfigEntry + if serviceEntry != nil { + var ok bool + serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid service config type %T", serviceEntry) + } + res.AddServiceDefaults(serviceConf) + + if serviceConf.Mode != structs.ProxyModeDefault { + inferredProxyMode = serviceConf.Mode + } + } + + var ( + noUpstreamArgs = len(upstreamIDs) == 0 + + // Check the args and the resolved value. If it was exclusively set via a config entry, then proxyMode + // will never be transparent because the service config request does not use the resolved value. + tproxy = proxyMode == structs.ProxyModeTransparent || inferredProxyMode == structs.ProxyModeTransparent + ) + + // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. + // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. + // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. + if noUpstreamArgs && !tproxy { + return maxIndex, &res, nil + } + + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} + + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + if serviceConf != nil && serviceConf.UpstreamConfig != nil { + for _, override := range serviceConf.UpstreamConfig.Overrides { + if override.Name == "" { + continue // skip this impossible condition + } + seenUpstreams[override.ServiceID()] = struct{}{} + } + } + + for upstream := range seenUpstreams { + index, rawEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + if index > maxIndex { + maxIndex = index + } + + if rawEntry != nil { + entry, ok := rawEntry.(*structs.ServiceConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid service config type %T", rawEntry) + } + res.AddServiceDefaults(entry) + } + } + + return maxIndex, &res, nil +} + // ReadDiscoveryChainConfigEntries will query for the full discovery chain for // the provided service name. All relevant config entries will be recursively // fetched and included in the result. diff --git a/go.mod b/go.mod index ef83a1af0..e0d8bf77a 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/go-testing-interface v1.14.0 github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 + github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/mapstructure v1.4.1 github.com/mitchellh/pointerstructure v1.2.1 github.com/mitchellh/reflectwalk v1.0.1 diff --git a/go.sum b/go.sum index f0c2628ad..2617fbb0d 100644 --- a/go.sum +++ b/go.sum @@ -396,6 +396,8 @@ github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 h1:hOY53G+kBFhbYFpRVxHl5eS7laP6B1+Cq+Z9Dry1iMU= github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=