server: ensure that central service config flattening properly resets the state each time (#10239)

The prior solution to call reply.Reset() aged poorly since newer fields
were added to the reply, but not added to Reset() leading serial
blocking query loops on the server to blend replies.

This could manifest as a service-defaults protocol change from
default=>http not reverting back to default after the config entry
reponsible was deleted.
This commit is contained in:
R.B. Boyer 2021-05-14 10:21:44 -05:00 committed by GitHub
parent c42899eafa
commit b90877b440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 238 additions and 27 deletions

3
.changelog/10239.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
server: ensure that central service config flattening properly resets the state each time
```

View File

@ -323,8 +323,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
reply.Reset() var thisReply structs.ServiceConfigResponse
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions // TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current. // blocking query, this function will be rerun and these state store lookups will both be current.
@ -349,11 +350,11 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if err != nil { if err != nil {
return fmt.Errorf("failed to copy global proxy-defaults: %v", err) return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
} }
reply.ProxyConfig = mapCopy.(map[string]interface{}) thisReply.ProxyConfig = mapCopy.(map[string]interface{})
reply.Mode = proxyConf.Mode thisReply.Mode = proxyConf.Mode
reply.TransparentProxy = proxyConf.TransparentProxy thisReply.TransparentProxy = proxyConf.TransparentProxy
reply.MeshGateway = proxyConf.MeshGateway thisReply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs. // Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"] rawProtocol := proxyConf.Config["protocol"]
@ -369,7 +370,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if err != nil { if err != nil {
return err return err
} }
reply.Index = index thisReply.Index = index
var serviceConf *structs.ServiceConfigEntry var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil { if serviceEntry != nil {
@ -378,25 +379,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return fmt.Errorf("invalid service config type %T", serviceEntry) return fmt.Errorf("invalid service config type %T", serviceEntry)
} }
if serviceConf.Expose.Checks { if serviceConf.Expose.Checks {
reply.Expose.Checks = true thisReply.Expose.Checks = true
} }
if len(serviceConf.Expose.Paths) >= 1 { if len(serviceConf.Expose.Paths) >= 1 {
reply.Expose.Paths = serviceConf.Expose.Paths thisReply.Expose.Paths = serviceConf.Expose.Paths
} }
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
} }
if serviceConf.Protocol != "" { if serviceConf.Protocol != "" {
if reply.ProxyConfig == nil { if thisReply.ProxyConfig == nil {
reply.ProxyConfig = make(map[string]interface{}) thisReply.ProxyConfig = make(map[string]interface{})
} }
reply.ProxyConfig["protocol"] = serviceConf.Protocol thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
} }
if serviceConf.TransparentProxy.OutboundListenerPort != 0 { if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
reply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
} }
if serviceConf.Mode != structs.ProxyModeDefault { if serviceConf.Mode != structs.ProxyModeDefault {
reply.Mode = serviceConf.Mode thisReply.Mode = serviceConf.Mode
} }
} }
@ -414,13 +415,14 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value. // will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || reply.Mode == structs.ProxyModeTransparent tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
) )
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode. // If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy { if noUpstreamArgs && !tproxy {
*reply = thisReply
return nil return nil
} }
@ -534,25 +536,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
// don't allocate the slices just to not fill them // don't allocate the slices just to not fill them
if len(usConfigs) == 0 { if len(usConfigs) == 0 {
*reply = thisReply
return nil return nil
} }
if legacyUpstreams { if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
reply.UpstreamConfigs = make(map[string]map[string]interface{}) thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs { for us, conf := range usConfigs {
reply.UpstreamConfigs[us.ID] = conf thisReply.UpstreamConfigs[us.ID] = conf
} }
} else { } else {
reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs { for us, conf := range usConfigs {
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
} }
} }
*reply = thisReply
return nil return nil
}) })
} }

View File

@ -1420,6 +1420,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
// of the blocking query does NOT bleed over into the next run. Concretely // of the blocking query does NOT bleed over into the next run. Concretely
// in this test the data present in the initial proxy-defaults should not // in this test the data present in the initial proxy-defaults should not
// be present when we are woken up due to proxy-defaults being deleted. // be present when we are woken up due to proxy-defaults being deleted.
//
// This test does not pertain to upstreams, see:
// TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking
state := s1.fsm.State() state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
@ -1571,6 +1574,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
} }
} }
func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// The main thing this should test is that information from one iteration
// of the blocking query does NOT bleed over into the next run. Concretely
// in this test the data present in the initial proxy-defaults should not
// be present when we are woken up due to proxy-defaults being deleted.
//
// This test is about fields in upstreams, see:
// TestConfigEntry_ResolveServiceConfig_Blocking
state := s1.fsm.State()
require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}))
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
Protocol: "http",
}))
var index uint64
runStep(t, "foo and bar should be both http", func(t *testing.T) {
// Verify that we get the results of service-defaults for 'foo' and 'bar'.
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
},
&out,
))
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{
{
Upstream: structs.NewServiceID("bar", nil),
Config: map[string]interface{}{
"protocol": "http",
},
},
},
QueryMeta: out.QueryMeta, // don't care
}
require.Equal(t, expected, out)
index = out.Index
})
runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the
// service-defaults for bar.
// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
err := state.DeleteConfigEntry(index+1,
structs.ServiceDefaults,
"bar",
nil,
)
if err != nil {
t.Errorf("delete config entry failed: %v", err)
}
}()
// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))
// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
// Check the indexes
require.Equal(t, out.Index, index+1)
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
QueryMeta: out.QueryMeta, // don't care
}
require.Equal(t, expected, out)
index = out.Index
})
runStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
// Verify that we get the results of service-defaults for just 'foo'.
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
},
&out,
))
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
QueryMeta: out.QueryMeta, // don't care
}
require.Equal(t, expected, out)
index = out.Index
})
runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the
// service-defaults for foo.
// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
err := state.DeleteConfigEntry(index+1,
structs.ServiceDefaults,
"foo",
nil,
)
if err != nil {
t.Errorf("delete config entry failed: %v", err)
}
}()
// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))
// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
// Check the indexes
require.Equal(t, out.Index, index+1)
expected := structs.ServiceConfigResponse{
QueryMeta: out.QueryMeta, // don't care
}
require.Equal(t, expected, out)
index = out.Index
})
}
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) { func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
@ -1848,3 +2050,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, expose, proxyConf.Expose) require.Equal(t, expose, proxyConf.Expose)
} }
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -968,12 +968,6 @@ type ServiceConfigResponse struct {
QueryMeta QueryMeta
} }
func (r *ServiceConfigResponse) Reset() {
r.ProxyConfig = nil
r.UpstreamConfigs = nil
r.MeshGateway = MeshGatewayConfig{}
}
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here // MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
// because we need custom decoding of the raw interface{} values. // because we need custom decoding of the raw interface{} values.
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) { func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {