diff --git a/.changelog/10239.txt b/.changelog/10239.txt new file mode 100644 index 000000000..a6e943dc7 --- /dev/null +++ b/.changelog/10239.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: ensure that central service config flattening properly resets the state each time +``` diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index c569799ec..0ab771e5c 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -323,8 +323,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - reply.Reset() - reply.MeshGateway.Mode = structs.MeshGatewayModeDefault + var thisReply structs.ServiceConfigResponse + + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault // TODO(freddy) Refactor this into smaller set of state store functions // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the // blocking query, this function will be rerun and these state store lookups will both be current. @@ -349,11 +350,11 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return fmt.Errorf("failed to copy global proxy-defaults: %v", err) } - reply.ProxyConfig = mapCopy.(map[string]interface{}) - reply.Mode = proxyConf.Mode - reply.TransparentProxy = proxyConf.TransparentProxy - reply.MeshGateway = proxyConf.MeshGateway - reply.Expose = proxyConf.Expose + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.Mode = proxyConf.Mode + thisReply.TransparentProxy = proxyConf.TransparentProxy + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose // Extract the global protocol from proxyConf for upstream configs. rawProtocol := proxyConf.Config["protocol"] @@ -369,7 +370,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return err } - reply.Index = index + thisReply.Index = index var serviceConf *structs.ServiceConfigEntry if serviceEntry != nil { @@ -378,25 +379,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return fmt.Errorf("invalid service config type %T", serviceEntry) } if serviceConf.Expose.Checks { - reply.Expose.Checks = true + thisReply.Expose.Checks = true } if len(serviceConf.Expose.Paths) >= 1 { - reply.Expose.Paths = serviceConf.Expose.Paths + thisReply.Expose.Paths = serviceConf.Expose.Paths } if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode } if serviceConf.Protocol != "" { - if reply.ProxyConfig == nil { - reply.ProxyConfig = make(map[string]interface{}) + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) } - reply.ProxyConfig["protocol"] = serviceConf.Protocol + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol } if serviceConf.TransparentProxy.OutboundListenerPort != 0 { - reply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort + thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort } if serviceConf.Mode != structs.ProxyModeDefault { - reply.Mode = serviceConf.Mode + thisReply.Mode = serviceConf.Mode } } @@ -414,13 +415,14 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode // will never be transparent because the service config request does not use the resolved value. - tproxy = args.Mode == structs.ProxyModeTransparent || reply.Mode == structs.ProxyModeTransparent + tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent ) // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. // If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode. // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. if noUpstreamArgs && !tproxy { + *reply = thisReply return nil } @@ -534,25 +536,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // don't allocate the slices just to not fill them if len(usConfigs) == 0 { + *reply = thisReply return nil } if legacyUpstreams { // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces - reply.UpstreamConfigs = make(map[string]map[string]interface{}) + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) for us, conf := range usConfigs { - reply.UpstreamConfigs[us.ID] = conf + thisReply.UpstreamConfigs[us.ID] = conf } } else { - reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) + thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) for us, conf := range usConfigs { - reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) } } + + *reply = thisReply return nil }) } diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 2ce4de90f..527018ee0 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1420,6 +1420,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { // of the blocking query does NOT bleed over into the next run. Concretely // in this test the data present in the initial proxy-defaults should not // be present when we are woken up due to proxy-defaults being deleted. + // + // This test does not pertain to upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking state := s1.fsm.State() require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ @@ -1571,6 +1574,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { } } +func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // The main thing this should test is that information from one iteration + // of the blocking query does NOT bleed over into the next run. Concretely + // in this test the data present in the initial proxy-defaults should not + // be present when we are woken up due to proxy-defaults being deleted. + // + // This test is about fields in upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Blocking + + state := s1.fsm.State() + require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + })) + require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "http", + })) + + var index uint64 + + runStep(t, "foo and bar should be both http", func(t *testing.T) { + // Verify that we get the results of service-defaults for 'foo' and 'bar'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{ + { + Upstream: structs.NewServiceID("bar", nil), + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for bar. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "bar", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "foo should be http and bar should be unset", func(t *testing.T) { + // Verify that we get the results of service-defaults for just 'foo'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for foo. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "foo", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) +} + func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -1848,3 +2050,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) { require.True(t, ok) require.Equal(t, expose, proxyConf.Expose) } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 04917e942..44142ac8d 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -968,12 +968,6 @@ type ServiceConfigResponse struct { QueryMeta } -func (r *ServiceConfigResponse) Reset() { - r.ProxyConfig = nil - r.UpstreamConfigs = nil - r.MeshGateway = MeshGatewayConfig{} -} - // MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here // because we need custom decoding of the raw interface{} values. func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {