diff --git a/.changelog/12362.txt b/.changelog/12362.txt new file mode 100644 index 000000000..e29bcb66d --- /dev/null +++ b/.changelog/12362.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: suppress spurious blocking query returns where multiple config entries are involved +``` diff --git a/agent/configentry/config_entry.go b/agent/configentry/config_entry.go index 0b5751492..7ede09358 100644 --- a/agent/configentry/config_entry.go +++ b/agent/configentry/config_entry.go @@ -32,3 +32,7 @@ func NewKindName(kind, name string, entMeta *structs.EnterpriseMeta) KindName { ret.Normalize() return ret } + +func NewKindNameForEntry(entry structs.ConfigEntry) KindName { + return NewKindName(entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta()) +} diff --git a/agent/configentry/service_config.go b/agent/configentry/service_config.go new file mode 100644 index 000000000..5cc6e740a --- /dev/null +++ b/agent/configentry/service_config.go @@ -0,0 +1,57 @@ +package configentry + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +// ResolvedServiceConfigSet is a wrapped set of raw cross-referenced config +// entries necessary for the ConfigEntry.ResolveServiceConfig RPC process. +// +// None of these are defaulted. +type ResolvedServiceConfigSet struct { + ServiceDefaults map[structs.ServiceID]*structs.ServiceConfigEntry + ProxyDefaults map[string]*structs.ProxyConfigEntry +} + +func (r *ResolvedServiceConfigSet) IsEmpty() bool { + return len(r.ServiceDefaults) == 0 && len(r.ProxyDefaults) == 0 +} + +func (r *ResolvedServiceConfigSet) GetServiceDefaults(sid structs.ServiceID) *structs.ServiceConfigEntry { + if r.ServiceDefaults == nil { + return nil + } + return r.ServiceDefaults[sid] +} + +func (r *ResolvedServiceConfigSet) GetProxyDefaults(partition string) *structs.ProxyConfigEntry { + if r.ProxyDefaults == nil { + return nil + } + return r.ProxyDefaults[partition] +} + +func (r *ResolvedServiceConfigSet) AddServiceDefaults(entry *structs.ServiceConfigEntry) { + if entry == nil { + return + } + + if r.ServiceDefaults == nil { + r.ServiceDefaults = make(map[structs.ServiceID]*structs.ServiceConfigEntry) + } + + sid := structs.NewServiceID(entry.Name, &entry.EnterpriseMeta) + r.ServiceDefaults[sid] = entry +} + +func (r *ResolvedServiceConfigSet) AddProxyDefaults(entry *structs.ProxyConfigEntry) { + if entry == nil { + return + } + + if r.ProxyDefaults == nil { + r.ProxyDefaults = make(map[string]*structs.ProxyConfigEntry) + } + + r.ProxyDefaults[entry.PartitionOrDefault()] = entry +} diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 00316c082..3cb93c4d2 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -10,8 +10,10 @@ import ( "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/mitchellh/copystructure" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" ) @@ -236,6 +238,10 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe } } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -258,6 +264,26 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe reply.Kind = args.Kind reply.Index = index reply.Entries = filteredEntries + + // Generate a hash of the content driving this response. Use it to + // determine if the response is identical to a prior wakeup. + newHash, err := hashstructure_v2.Hash(filteredEntries, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + + if len(reply.Entries) == 0 { + return errNotFound + } + return nil }) } @@ -417,115 +443,18 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return acl.ErrPermissionDenied } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - var thisReply structs.ServiceConfigResponse - - thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault - // TODO(freddy) Refactor this into smaller set of state store functions - // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the - // blocking query, this function will be rerun and these state store lookups will both be current. - // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. - _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, &args.EnterpriseMeta) - if err != nil { - return err - } - var ( - proxyConf *structs.ProxyConfigEntry - proxyConfGlobalProtocol string - ok bool + upstreamIDs = args.UpstreamIDs + legacyUpstreams = false ) - if proxyEntry != nil { - proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) - if !ok { - return fmt.Errorf("invalid proxy config type %T", proxyEntry) - } - // Apply the proxy defaults to the sidecar's proxy config - mapCopy, err := copystructure.Copy(proxyConf.Config) - if err != nil { - return fmt.Errorf("failed to copy global proxy-defaults: %v", err) - } - thisReply.ProxyConfig = mapCopy.(map[string]interface{}) - thisReply.Mode = proxyConf.Mode - thisReply.TransparentProxy = proxyConf.TransparentProxy - thisReply.MeshGateway = proxyConf.MeshGateway - thisReply.Expose = proxyConf.Expose - - // Extract the global protocol from proxyConf for upstream configs. - rawProtocol := proxyConf.Config["protocol"] - if rawProtocol != nil { - proxyConfGlobalProtocol, ok = rawProtocol.(string) - if !ok { - return fmt.Errorf("invalid protocol type %T", rawProtocol) - } - } - } - - index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta) - if err != nil { - return err - } - thisReply.Index = index - - var serviceConf *structs.ServiceConfigEntry - if serviceEntry != nil { - serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", serviceEntry) - } - if serviceConf.Expose.Checks { - thisReply.Expose.Checks = true - } - if len(serviceConf.Expose.Paths) >= 1 { - thisReply.Expose.Paths = serviceConf.Expose.Paths - } - if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode - } - if serviceConf.Protocol != "" { - if thisReply.ProxyConfig == nil { - thisReply.ProxyConfig = make(map[string]interface{}) - } - thisReply.ProxyConfig["protocol"] = serviceConf.Protocol - } - if serviceConf.TransparentProxy.OutboundListenerPort != 0 { - thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort - } - if serviceConf.TransparentProxy.DialedDirectly { - thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly - } - if serviceConf.Mode != structs.ProxyModeDefault { - thisReply.Mode = serviceConf.Mode - } - } - - // First collect all upstreams into a set of seen upstreams. - // Upstreams can come from: - // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint - // - Implicitly from centralized upstream config in service-defaults - seenUpstreams := map[structs.ServiceID]struct{}{} - - upstreamIDs := args.UpstreamIDs - legacyUpstreams := false - - var ( - noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 - - // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode - // will never be transparent because the service config request does not use the resolved value. - tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent - ) - - // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. - // If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode. - // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. - if noUpstreamArgs && !tproxy { - *reply = thisReply - return nil - } // The request is considered legacy if the deprecated args.Upstream was used if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 { @@ -533,136 +462,274 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r upstreamIDs = make([]structs.ServiceID, 0) for _, upstream := range args.Upstreams { - // Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace. - // Because of this we attach the enterprise meta of the request, which will just be the default namespace. + // Before Consul namespaces were released, the Upstreams + // provided to the endpoint did not contain the namespace. + // Because of this we attach the enterprise meta of the + // request, which will just be the default namespace. sid := structs.NewServiceID(upstream, &args.EnterpriseMeta) upstreamIDs = append(upstreamIDs, sid) } } - // First store all upstreams that were provided in the request - for _, sid := range upstreamIDs { - if _, ok := seenUpstreams[sid]; !ok { - seenUpstreams[sid] = struct{}{} - } - } + // Fetch all relevant config entries. - // Then store upstreams inferred from service-defaults and mapify the overrides. - var ( - upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) - upstreamDefaults *structs.UpstreamConfig - // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. - usConfigs = make(map[structs.ServiceID]map[string]interface{}) + index, entries, err := state.ReadResolvedServiceConfigEntries( + ws, + args.Name, + &args.EnterpriseMeta, + upstreamIDs, + args.Mode, ) - if serviceConf != nil && serviceConf.UpstreamConfig != nil { - for i, override := range serviceConf.UpstreamConfig.Overrides { - if override.Name == "" { - c.logger.Warn( - "Skipping UpstreamConfig.Overrides entry without a required name field", - "entryIndex", i, - "kind", serviceConf.GetKind(), - "name", serviceConf.GetName(), - "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), - ) - continue // skip this impossible condition - } - seenUpstreams[override.ServiceID()] = struct{}{} - upstreamConfigs[override.ServiceID()] = override - } - if serviceConf.UpstreamConfig.Defaults != nil { - upstreamDefaults = serviceConf.UpstreamConfig.Defaults - - // Store the upstream defaults under a wildcard key so that they can be applied to - // upstreams that are inferred from intentions and do not have explicit upstream configuration. - cfgMap := make(map[string]interface{}) - upstreamDefaults.MergeInto(cfgMap) - - wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) - usConfigs[wildcard] = cfgMap - } + if err != nil { + return err } - for upstream := range seenUpstreams { - resolvedCfg := make(map[string]interface{}) - - // The protocol of an upstream is resolved in this order: - // 1. Default protocol from proxy-defaults (how all services should be addressed) - // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) - // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream - // (how the downstream wants to address it) - protocol := proxyConfGlobalProtocol - - _, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) - if err != nil { - return err - } - if upstreamSvcDefaults != nil { - cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults) - } - if cfg.Protocol != "" { - protocol = cfg.Protocol - } - } - if protocol != "" { - resolvedCfg["protocol"] = protocol - } - - // Merge centralized defaults for all upstreams before configuration for specific upstreams - if upstreamDefaults != nil { - upstreamDefaults.MergeInto(resolvedCfg) - } - - // The MeshGateway value from the proxy registration overrides the one from upstream_defaults - // because it is specific to the proxy instance. - // - // The goal is to flatten the mesh gateway mode in this order: - // 0. Value from centralized upstream_defaults - // 1. Value from local proxy registration - // 2. Value from centralized upstream_config - // 3. Value from local upstream definition. This last step is done in the client's service manager. - if !args.MeshGateway.IsZero() { - resolvedCfg["mesh_gateway"] = args.MeshGateway - } - - if upstreamConfigs[upstream] != nil { - upstreamConfigs[upstream].MergeInto(resolvedCfg) - } - - if len(resolvedCfg) > 0 { - usConfigs[upstream] = resolvedCfg - } + // Generate a hash of the config entry content driving this + // response. Use it to determine if the response is identical to a + // prior wakeup. + newHash, err := hashstructure_v2.Hash(entries, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) } - // don't allocate the slices just to not fill them - if len(usConfigs) == 0 { - *reply = thisReply - return nil - } - - if legacyUpstreams { - // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces - thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) - - for us, conf := range usConfigs { - thisReply.UpstreamConfigs[us.ID] = conf - } - + if ranOnce && priorHash == newHash { + priorHash = newHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which + // is desirable + return errNotChanged } else { - thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) - - for us, conf := range usConfigs { - thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, - structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) - } + priorHash = newHash + ranOnce = true + } + + thisReply, err := c.computeResolvedServiceConfig( + args, + upstreamIDs, + legacyUpstreams, + entries, + ) + if err != nil { + return err + } + thisReply.Index = index + + *reply = *thisReply + if entries.IsEmpty() { + // No config entries factored into this reply; it's a default. + return errNotFound } - *reply = thisReply return nil }) } +func (c *ConfigEntry) computeResolvedServiceConfig( + args *structs.ServiceConfigRequest, + upstreamIDs []structs.ServiceID, + legacyUpstreams bool, + entries *configentry.ResolvedServiceConfigSet, +) (*structs.ServiceConfigResponse, error) { + var thisReply structs.ServiceConfigResponse + + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault + + // TODO(freddy) Refactor this into smaller set of state store functions + // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the + // blocking query, this function will be rerun and these state store lookups will both be current. + // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. + var proxyConfGlobalProtocol string + proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault()) + if proxyConf != nil { + // Apply the proxy defaults to the sidecar's proxy config + mapCopy, err := copystructure.Copy(proxyConf.Config) + if err != nil { + return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err) + } + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.Mode = proxyConf.Mode + thisReply.TransparentProxy = proxyConf.TransparentProxy + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose + + // Extract the global protocol from proxyConf for upstream configs. + rawProtocol := proxyConf.Config["protocol"] + if rawProtocol != nil { + var ok bool + proxyConfGlobalProtocol, ok = rawProtocol.(string) + if !ok { + return nil, fmt.Errorf("invalid protocol type %T", rawProtocol) + } + } + } + + serviceConf := entries.GetServiceDefaults( + structs.NewServiceID(args.Name, &args.EnterpriseMeta), + ) + if serviceConf != nil { + if serviceConf.Expose.Checks { + thisReply.Expose.Checks = true + } + if len(serviceConf.Expose.Paths) >= 1 { + thisReply.Expose.Paths = serviceConf.Expose.Paths + } + if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + } + if serviceConf.Protocol != "" { + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) + } + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol + } + if serviceConf.TransparentProxy.OutboundListenerPort != 0 { + thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort + } + if serviceConf.TransparentProxy.DialedDirectly { + thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly + } + if serviceConf.Mode != structs.ProxyModeDefault { + thisReply.Mode = serviceConf.Mode + } + } + + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} + + var ( + noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 + + // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode + // will never be transparent because the service config request does not use the resolved value. + tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent + ) + + // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. + // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. + // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. + if noUpstreamArgs && !tproxy { + return &thisReply, nil + } + + // First store all upstreams that were provided in the request + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + // Then store upstreams inferred from service-defaults and mapify the overrides. + var ( + upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) + upstreamDefaults *structs.UpstreamConfig + // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. + usConfigs = make(map[structs.ServiceID]map[string]interface{}) + ) + if serviceConf != nil && serviceConf.UpstreamConfig != nil { + for i, override := range serviceConf.UpstreamConfig.Overrides { + if override.Name == "" { + c.logger.Warn( + "Skipping UpstreamConfig.Overrides entry without a required name field", + "entryIndex", i, + "kind", serviceConf.GetKind(), + "name", serviceConf.GetName(), + "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), + ) + continue // skip this impossible condition + } + seenUpstreams[override.ServiceID()] = struct{}{} + upstreamConfigs[override.ServiceID()] = override + } + if serviceConf.UpstreamConfig.Defaults != nil { + upstreamDefaults = serviceConf.UpstreamConfig.Defaults + + // Store the upstream defaults under a wildcard key so that they can be applied to + // upstreams that are inferred from intentions and do not have explicit upstream configuration. + cfgMap := make(map[string]interface{}) + upstreamDefaults.MergeInto(cfgMap) + + wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) + usConfigs[wildcard] = cfgMap + } + } + + for upstream := range seenUpstreams { + resolvedCfg := make(map[string]interface{}) + + // The protocol of an upstream is resolved in this order: + // 1. Default protocol from proxy-defaults (how all services should be addressed) + // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) + // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream + // (how the downstream wants to address it) + protocol := proxyConfGlobalProtocol + + upstreamSvcDefaults := entries.GetServiceDefaults( + structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta), + ) + if upstreamSvcDefaults != nil { + if upstreamSvcDefaults.Protocol != "" { + protocol = upstreamSvcDefaults.Protocol + } + } + + if protocol != "" { + resolvedCfg["protocol"] = protocol + } + + // Merge centralized defaults for all upstreams before configuration for specific upstreams + if upstreamDefaults != nil { + upstreamDefaults.MergeInto(resolvedCfg) + } + + // The MeshGateway value from the proxy registration overrides the one from upstream_defaults + // because it is specific to the proxy instance. + // + // The goal is to flatten the mesh gateway mode in this order: + // 0. Value from centralized upstream_defaults + // 1. Value from local proxy registration + // 2. Value from centralized upstream_config + // 3. Value from local upstream definition. This last step is done in the client's service manager. + if !args.MeshGateway.IsZero() { + resolvedCfg["mesh_gateway"] = args.MeshGateway + } + + if upstreamConfigs[upstream] != nil { + upstreamConfigs[upstream].MergeInto(resolvedCfg) + } + + if len(resolvedCfg) > 0 { + usConfigs[upstream] = resolvedCfg + } + } + + // don't allocate the slices just to not fill them + if len(usConfigs) == 0 { + return &thisReply, nil + } + + if legacyUpstreams { + // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) + + for us, conf := range usConfigs { + thisReply.UpstreamConfigs[us.ID] = conf + } + + } else { + thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) + + for us, conf := range usConfigs { + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, + structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) + } + } + + return &thisReply, nil +} + func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error { // ExportedServices entries are gated from interactions from secondary DCs // because non-default partitions cannot be created in secondaries diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 527ba0272..414886b87 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -9,10 +9,12 @@ import ( "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -310,63 +312,77 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { } _, s1 := testServerWithConfig(t) + codec := rpcClient(t, s1) - store := s1.fsm.State() + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) - entry := &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "alpha", - } - require.NoError(t, store.EnsureConfigEntry(1, entry)) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Name: "does-not-exist", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { - var out structs.ConfigEntryResponse - - err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out.Entry) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := uint64(0); i < 200; i++ { - time.Sleep(5 * time.Millisecond) - entry := &structs.ServiceConfigEntry{ + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, - Name: fmt.Sprintf("other%d", i), - } - if err := store.EnsureConfigEntry(i+2, entry); err != nil { - return err - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - // The test is a bit racy because of the timing of the two goroutines, so - // we relax the check for the count to be within a small range. - if count < 2 || count > 3 { - t.Fatalf("expected count to be 2 or 3, got %d", count) + Name: "alpha", + }, + }, &out)) + require.True(t, out) } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "does-not-exist", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.ConfigEntryResponse + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out.Entry) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf("other%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + }) } func TestConfigEntry_Get_ACLDeny(t *testing.T) { @@ -472,6 +488,102 @@ func TestConfigEntry_List(t *testing.T) { require.Equal(t, expected, out) } +func TestConfigEntry_List_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: "dc1", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedConfigEntries + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: fmt.Sprintf(dataPrefix+"%d", i), + ConnectTimeout: 33 * time.Second, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // Create some dummy services in the state store to look up. + for _, entry := range []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + } { + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: entry, + }, &out)) + require.True(t, out) + } + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} + func TestConfigEntry_ListAll(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -2025,6 +2137,142 @@ func TestConfigEntry_ResolveServiceConfig_ProxyDefaultsProtocol_UsedForAllUpstre require.Equal(t, expected, out) } +func BenchmarkConfigEntry_ResolveServiceConfig_Hash(b *testing.B) { + res := &configentry.ResolvedServiceConfigSet{} + + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }) + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "up1", + Protocol: "http", + }) + res.AddServiceDefaults(&structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "up2", + Protocol: "http", + }) + res.AddProxyDefaults(&structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := hashstructure_v2.Hash(res, hashstructure_v2.FormatV2, nil) + if err != nil { + b.Fatalf("error: %v", err) + } + } +} + +func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := structs.ServiceConfigRequest{ + Name: "foo", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + }, + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.ServiceConfigResponse + + err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out) + if err != nil { + return err + } + t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf(dataPrefix+"%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + { // create one unrelated entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "unrelated", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "grpc", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} + func TestConfigEntry_ResolveServiceConfigNoConfig(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index 3e91dd957..79ccbe49d 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -6,6 +6,7 @@ import ( metrics "github.com/armon/go-metrics" memdb "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/discoverychain" @@ -48,6 +49,10 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs evalDC = c.srv.config.Datacenter } + var ( + priorHash uint64 + ranOnce bool + ) return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -66,9 +71,32 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return err } + // Generate a hash of the config entry content driving this + // response. Use it to determine if the response is identical to a + // prior wakeup. + newHash, err := hashstructure_v2.Hash(chain, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which + // is desirable + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + reply.Index = index reply.Chain = chain + if chain.IsDefault() { + return errNotFound + } + return nil }) } diff --git a/agent/consul/discovery_chain_endpoint_test.go b/agent/consul/discovery_chain_endpoint_test.go index 66c0aeb65..d876940a6 100644 --- a/agent/consul/discovery_chain_endpoint_test.go +++ b/agent/consul/discovery_chain_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "os" "testing" @@ -8,6 +9,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -242,3 +244,116 @@ func TestDiscoveryChainEndpoint_Get(t *testing.T) { require.Equal(t, expect, resp) } } + +func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + { // create one unrelated entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "unrelated", + ConnectTimeout: 33 * time.Second, + }, + }, &out)) + require.True(t, out) + } + + run := func(t *testing.T, dataPrefix string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.DiscoveryChainRequest{ + Name: "web", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + EvaluateInPartition: "default", + Datacenter: "dc1", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.DiscoveryChainResponse + err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out) + if err != nil { + return fmt.Errorf("error getting discovery chain: %w", err) + } + if !out.Chain.IsDefault() { + return fmt.Errorf("expected default chain") + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Chain) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out bool + err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf(dataPrefix+"%d", i), + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other") + }) + + { // create one relevant entry + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "grpc", + }, + }, &out)) + require.True(t, out) + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other") + }) +} diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index 1fb894662..b31e98ceb 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -617,6 +618,10 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In } } + var ( + priorHash uint64 + ranOnce bool + ) return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -628,6 +633,35 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In reply.Index = index reply.Matches = matches + + // Generate a hash of the intentions content driving this response. + // Use it to determine if the response is identical to a prior + // wakeup. + newHash, err := hashstructure_v2.Hash(matches, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + + hasData := false + for _, match := range matches { + if len(match) > 0 { + hasData = true + break + } + } + + if !hasData { + return errNotFound + } + return nil }, ) diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 455fd45e0..0e73613da 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "context" "fmt" "os" "testing" @@ -8,6 +9,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -1742,6 +1744,123 @@ func TestIntentionMatch_good(t *testing.T) { require.Equal(t, expected, actual) } +func TestIntentionMatch_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + + run := func(t *testing.T, dataPrefix string, expectMatches int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.IntentionQueryRequest{ + Datacenter: "dc1", + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + {Name: "bar"}, + }, + }, + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedIntentionMatches + + err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out) + if err != nil { + return fmt.Errorf("error getting intentions: %w", err) + } + if len(out.Matches) != 1 { + return fmt.Errorf("expected 1 match got %d", len(out.Matches)) + } + if len(out.Matches[0]) != expectMatches { + return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0]) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out string + err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + // {"default", "*", "default", "baz"}, // shouldn't match + SourceNS: "default", + SourceName: "*", + DestinationNS: "default", + DestinationName: fmt.Sprintf(dataPrefix+"%d", i), + Action: structs.IntentionActionAllow, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other", 0) + }) + + // Create some records + { + insert := [][]string{ + {"default", "*", "default", "*"}, + {"default", "*", "default", "bar"}, + {"default", "*", "default", "baz"}, // shouldn't match + } + + for _, v := range insert { + var out string + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceNS: v[0], + SourceName: v[1], + DestinationNS: v[2], + DestinationName: v[3], + Action: structs.IntentionActionAllow, + }, + }, &out)) + } + } + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other", 2) + }) +} + // Test matching with ACLs func TestIntentionMatch_acl(t *testing.T) { if testing.Short() { diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 0f7740a39..a89ea5f54 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/serf" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -210,6 +211,10 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl return err } + var ( + priorHash uint64 + ranOnce bool + ) return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -224,6 +229,23 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl reply.Index, reply.Services = index, services m.srv.filterACLWithAuthorizer(authz, reply) + + // Generate a hash of the intentions content driving this response. + // Use it to determine if the response is identical to a prior + // wakeup. + newHash, err := hashstructure_v2.Hash(services, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + + if ranOnce && priorHash == newHash { + priorHash = newHash + return errNotChanged + } else { + priorHash = newHash + ranOnce = true + } + return nil }) } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 946e099a5..c6f63e7f2 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1,14 +1,18 @@ package consul import ( + "context" "encoding/base64" + "fmt" "os" "strings" "testing" + "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -2317,6 +2321,115 @@ func TestInternal_IntentionUpstreams(t *testing.T) { }) } +func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t) + + codec := rpcClient(t, s1) + readerCodec := rpcClient(t, s1) + writerCodec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + + { // ensure it's default deny to start + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "*", + Sources: []*structs.SourceIntention{ + { + Name: "*", + Action: structs.IntentionActionDeny, + }, + }, + }, + }, &out)) + require.True(t, out) + } + + run := func(t *testing.T, dataPrefix string, expectServices int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var count int + + start := time.Now() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + args := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + args.QueryOptions.MaxQueryTime = time.Second + + for ctx.Err() == nil { + var out structs.IndexedServiceList + + err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out) + if err != nil { + return fmt.Errorf("error getting upstreams: %w", err) + } + + if len(out.Services) != expectServices { + return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) + } + + t.Log("blocking query index", out.QueryMeta.Index, out.Services) + count++ + args.QueryOptions.MinQueryIndex = out.QueryMeta.Index + } + return nil + }) + + g.Go(func() error { + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + var out string + err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceName: fmt.Sprintf(dataPrefix+"-src-%d", i), + DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i), + Action: structs.IntentionActionAllow, + }, + }, &out) + if err != nil { + return fmt.Errorf("[%d] unexpected error: %w", i, err) + } + } + cancel() + return nil + }) + + require.NoError(t, g.Wait()) + + assertBlockingQueryWakeupCount(t, time.Second, start, count) + } + + runStep(t, "test the errNotFound path", func(t *testing.T) { + run(t, "other", 0) + }) + + // Services: + // api and api-proxy on node foo + // web and web-proxy on node foo + // + // Intentions + // * -> * (deny) intention + // web -> api (allow) + registerIntentionUpstreamEntries(t, codec, "") + + runStep(t, "test the errNotChanged path", func(t *testing.T) { + run(t, "completely-different-other", 1) + }) +} + func TestInternal_IntentionUpstreams_ACL(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index d0977e975..979569690 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -954,6 +954,19 @@ type blockingQueryResponseMeta interface { // a previous result. errNotFound will never be returned to the caller, it is // converted to nil before returning. // +// The query function can return errNotChanged, which is a sentinel error. This +// can only be returned on calls AFTER the first call, as it would not be +// possible to detect the absence of a change on the first call. Returning +// errNotChanged indicates that the query results are identical to the prior +// results which allows blockingQuery to keep blocking until the query returns +// a real changed result. +// +// The query function must take care to ensure the actual result of the query +// is either left unmodified or explicitly left in a good state before +// returning, otherwise when blockingQuery times out it may return an +// incomplete or unexpected result. errNotChanged will never be returned to the +// caller, it is converted to nil before returning. +// // If query function returns any other error, the error is returned to the caller // immediately. // @@ -993,7 +1006,7 @@ func (s *Server) blockingQuery( var ws memdb.WatchSet err := query(ws, s.fsm.State()) s.setQueryMeta(responseMeta, opts.GetToken()) - if errors.Is(err, errNotFound) { + if errors.Is(err, errNotFound) || errors.Is(err, errNotChanged) { return nil } return err @@ -1008,7 +1021,10 @@ func (s *Server) blockingQuery( // decrement the count when the function returns. defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) - var notFound bool + var ( + notFound bool + ranOnce bool + ) for { if opts.GetRequireConsistent() { @@ -1029,17 +1045,23 @@ func (s *Server) blockingQuery( err := query(ws, state) s.setQueryMeta(responseMeta, opts.GetToken()) + switch { case errors.Is(err, errNotFound): if notFound { // query result has not changed minQueryIndex = responseMeta.GetIndex() } - notFound = true + case errors.Is(err, errNotChanged): + if ranOnce { + // query result has not changed + minQueryIndex = responseMeta.GetIndex() + } case err != nil: return err } + ranOnce = true if responseMeta.GetIndex() > minQueryIndex { return nil @@ -1060,7 +1082,10 @@ func (s *Server) blockingQuery( } } -var errNotFound = fmt.Errorf("no data found for query") +var ( + errNotFound = fmt.Errorf("no data found for query") + errNotChanged = fmt.Errorf("data did not change for query") +) // setQueryMeta is used to populate the QueryMeta data for an RPC call // diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index df08627be..d26b83faa 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1681,3 +1681,24 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc } return event, nil } + +// assertBlockingQueryWakeupCount is used to assist in assertions for +// blockingQuery RPC tests involving the two sentinel errors errNotFound and +// errNotChanged. +// +// Those tests are a bit racy because of the timing of the two goroutines, so +// we relax the check for the count to be within a small range. +// +// The blocking query is going to wake up every interval, so use the elapsed test +// time with that known timing value to gauge how many legit wakeups should +// happen and then pad it out a smidge. +func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) { + t.Helper() + + const buffer = 2 + expectedQueries := int(time.Since(start)/interval) + buffer + + if gotCount < 2 || gotCount > expectedQueries { + t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount) + } +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index e75ed7f8b..7f5277561 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -818,6 +818,120 @@ func (s *Store) serviceDiscoveryChainTxn( return index, chain, nil } +func (s *Store) ReadResolvedServiceConfigEntries( + ws memdb.WatchSet, + serviceName string, + entMeta *structs.EnterpriseMeta, + upstreamIDs []structs.ServiceID, + proxyMode structs.ProxyMode, +) (uint64, *configentry.ResolvedServiceConfigSet, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + var res configentry.ResolvedServiceConfigSet + + // The caller will likely calculate this again, but we need to do it here + // to determine if we are going to traverse into implicit upstream + // definitions. + var inferredProxyMode structs.ProxyMode + + index, proxyEntry, err := configEntryTxn(tx, ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, entMeta) + if err != nil { + return 0, nil, err + } + maxIndex := index + + if proxyEntry != nil { + var ok bool + proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid proxy config type %T", proxyEntry) + } + res.AddProxyDefaults(proxyConf) + + inferredProxyMode = proxyConf.Mode + } + + index, serviceEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, serviceName, entMeta) + if err != nil { + return 0, nil, err + } + + if index > maxIndex { + maxIndex = index + } + + var serviceConf *structs.ServiceConfigEntry + if serviceEntry != nil { + var ok bool + serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid service config type %T", serviceEntry) + } + res.AddServiceDefaults(serviceConf) + + if serviceConf.Mode != structs.ProxyModeDefault { + inferredProxyMode = serviceConf.Mode + } + } + + var ( + noUpstreamArgs = len(upstreamIDs) == 0 + + // Check the args and the resolved value. If it was exclusively set via a config entry, then proxyMode + // will never be transparent because the service config request does not use the resolved value. + tproxy = proxyMode == structs.ProxyModeTransparent || inferredProxyMode == structs.ProxyModeTransparent + ) + + // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. + // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. + // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. + if noUpstreamArgs && !tproxy { + return maxIndex, &res, nil + } + + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} + + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + if serviceConf != nil && serviceConf.UpstreamConfig != nil { + for _, override := range serviceConf.UpstreamConfig.Overrides { + if override.Name == "" { + continue // skip this impossible condition + } + seenUpstreams[override.ServiceID()] = struct{}{} + } + } + + for upstream := range seenUpstreams { + index, rawEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) + if err != nil { + return 0, nil, err + } + if index > maxIndex { + maxIndex = index + } + + if rawEntry != nil { + entry, ok := rawEntry.(*structs.ServiceConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("invalid service config type %T", rawEntry) + } + res.AddServiceDefaults(entry) + } + } + + return maxIndex, &res, nil +} + // ReadDiscoveryChainConfigEntries will query for the full discovery chain for // the provided service name. All relevant config entries will be recursively // fetched and included in the result. diff --git a/go.mod b/go.mod index ef83a1af0..e0d8bf77a 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/go-testing-interface v1.14.0 github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 + github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/mapstructure v1.4.1 github.com/mitchellh/pointerstructure v1.2.1 github.com/mitchellh/reflectwalk v1.0.1 diff --git a/go.sum b/go.sum index f0c2628ad..2617fbb0d 100644 --- a/go.sum +++ b/go.sum @@ -396,6 +396,8 @@ github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 h1:hOY53G+kBFhbYFpRVxHl5eS7laP6B1+Cq+Z9Dry1iMU= github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ= +github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= +github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=