From b90877b44045e7b74d0ddeb60b20bcd3e1fc8ee5 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 14 May 2021 10:21:44 -0500 Subject: [PATCH] server: ensure that central service config flattening properly resets the state each time (#10239) The prior solution to call reply.Reset() aged poorly since newer fields were added to the reply, but not added to Reset() leading serial blocking query loops on the server to blend replies. This could manifest as a service-defaults protocol change from default=>http not reverting back to default after the config entry reponsible was deleted. --- .changelog/10239.txt | 3 + agent/consul/config_endpoint.go | 47 +++--- agent/consul/config_endpoint_test.go | 209 +++++++++++++++++++++++++++ agent/structs/config_entry.go | 6 - 4 files changed, 238 insertions(+), 27 deletions(-) create mode 100644 .changelog/10239.txt diff --git a/.changelog/10239.txt b/.changelog/10239.txt new file mode 100644 index 000000000..a6e943dc7 --- /dev/null +++ b/.changelog/10239.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: ensure that central service config flattening properly resets the state each time +``` diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index c569799ec..0ab771e5c 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -323,8 +323,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - reply.Reset() - reply.MeshGateway.Mode = structs.MeshGatewayModeDefault + var thisReply structs.ServiceConfigResponse + + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault // TODO(freddy) Refactor this into smaller set of state store functions // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the // blocking query, this function will be rerun and these state store lookups will both be current. @@ -349,11 +350,11 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return fmt.Errorf("failed to copy global proxy-defaults: %v", err) } - reply.ProxyConfig = mapCopy.(map[string]interface{}) - reply.Mode = proxyConf.Mode - reply.TransparentProxy = proxyConf.TransparentProxy - reply.MeshGateway = proxyConf.MeshGateway - reply.Expose = proxyConf.Expose + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.Mode = proxyConf.Mode + thisReply.TransparentProxy = proxyConf.TransparentProxy + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose // Extract the global protocol from proxyConf for upstream configs. rawProtocol := proxyConf.Config["protocol"] @@ -369,7 +370,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return err } - reply.Index = index + thisReply.Index = index var serviceConf *structs.ServiceConfigEntry if serviceEntry != nil { @@ -378,25 +379,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return fmt.Errorf("invalid service config type %T", serviceEntry) } if serviceConf.Expose.Checks { - reply.Expose.Checks = true + thisReply.Expose.Checks = true } if len(serviceConf.Expose.Paths) >= 1 { - reply.Expose.Paths = serviceConf.Expose.Paths + thisReply.Expose.Paths = serviceConf.Expose.Paths } if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode } if serviceConf.Protocol != "" { - if reply.ProxyConfig == nil { - reply.ProxyConfig = make(map[string]interface{}) + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) } - reply.ProxyConfig["protocol"] = serviceConf.Protocol + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol } if serviceConf.TransparentProxy.OutboundListenerPort != 0 { - reply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort + thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort } if serviceConf.Mode != structs.ProxyModeDefault { - reply.Mode = serviceConf.Mode + thisReply.Mode = serviceConf.Mode } } @@ -414,13 +415,14 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode // will never be transparent because the service config request does not use the resolved value. - tproxy = args.Mode == structs.ProxyModeTransparent || reply.Mode == structs.ProxyModeTransparent + tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent ) // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. // If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode. // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. if noUpstreamArgs && !tproxy { + *reply = thisReply return nil } @@ -534,25 +536,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // don't allocate the slices just to not fill them if len(usConfigs) == 0 { + *reply = thisReply return nil } if legacyUpstreams { // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces - reply.UpstreamConfigs = make(map[string]map[string]interface{}) + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) for us, conf := range usConfigs { - reply.UpstreamConfigs[us.ID] = conf + thisReply.UpstreamConfigs[us.ID] = conf } } else { - reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) + thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) for us, conf := range usConfigs { - reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) } } + + *reply = thisReply return nil }) } diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 2ce4de90f..527018ee0 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1420,6 +1420,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { // of the blocking query does NOT bleed over into the next run. Concretely // in this test the data present in the initial proxy-defaults should not // be present when we are woken up due to proxy-defaults being deleted. + // + // This test does not pertain to upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking state := s1.fsm.State() require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ @@ -1571,6 +1574,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { } } +func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // The main thing this should test is that information from one iteration + // of the blocking query does NOT bleed over into the next run. Concretely + // in this test the data present in the initial proxy-defaults should not + // be present when we are woken up due to proxy-defaults being deleted. + // + // This test is about fields in upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Blocking + + state := s1.fsm.State() + require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + })) + require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "http", + })) + + var index uint64 + + runStep(t, "foo and bar should be both http", func(t *testing.T) { + // Verify that we get the results of service-defaults for 'foo' and 'bar'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{ + { + Upstream: structs.NewServiceID("bar", nil), + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for bar. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "bar", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "foo should be http and bar should be unset", func(t *testing.T) { + // Verify that we get the results of service-defaults for just 'foo'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for foo. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "foo", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) +} + func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -1848,3 +2050,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) { require.True(t, ok) require.Equal(t, expose, proxyConf.Expose) } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 04917e942..44142ac8d 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -968,12 +968,6 @@ type ServiceConfigResponse struct { QueryMeta } -func (r *ServiceConfigResponse) Reset() { - r.ProxyConfig = nil - r.UpstreamConfigs = nil - r.MeshGateway = MeshGatewayConfig{} -} - // MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here // because we need custom decoding of the raw interface{} values. func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {