xds: properly merge central config for "agentless" services (#14962)

This commit is contained in:
Dan Upton 2022-10-13 12:04:59 +01:00 committed by GitHub
parent 36a3d00f0d
commit de7f380385
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 8 deletions

3
.changelog/14962.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: Central service configuration (proxy-defaults and service-defaults) is now correctly applied to Consul Dataplane proxies
```

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -117,7 +118,6 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
ws.Add(store.AbandonCh()) ws.Add(store.AbandonCh())
_, ns, err := store.NodeService(ws, proxyID.NodeName, proxyID.ID, &proxyID.EnterpriseMeta, structs.DefaultPeerKeyword) _, ns, err := store.NodeService(ws, proxyID.NodeName, proxyID.ID, &proxyID.EnterpriseMeta, structs.DefaultPeerKeyword)
switch { switch {
case err != nil: case err != nil:
logger.Error("failed to read service from state store", "error", err.Error()) logger.Error("failed to read service from state store", "error", err.Error())
@ -130,14 +130,29 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
err := errors.New("service must be a sidecar proxy or gateway") err := errors.New("service must be a sidecar proxy or gateway")
logger.Error(err.Error()) logger.Error(err.Error())
return nil, err return nil, err
default:
err := m.Manager.Register(proxyID, ns, source, proxyID.Token, false)
if err != nil {
logger.Error("failed to register service", "error", err.Error())
return nil, err
}
return ws, nil
} }
_, ns, err = configentry.MergeNodeServiceWithCentralConfig(
ws,
store,
// TODO(agentless): it doesn't seem like we actually *need* any of the
// values on this request struct - we should try to remove the parameter
// in case that changes in the future as this call-site isn't passing them.
&structs.ServiceSpecificRequest{},
ns,
logger,
)
if err != nil {
logger.Error("failed to merge with central config", "error", err.Error())
return nil, err
}
if err = m.Manager.Register(proxyID, ns, source, proxyID.Token, false); err != nil {
logger.Error("failed to register service", "error", err.Error())
return nil, err
}
return ws, nil
} }
syncLoop := func(ws memdb.WatchSet) { syncLoop := func(ws memdb.WatchSet) {
@ -257,6 +272,7 @@ type ConfigManager interface {
type Store interface { type Store interface {
NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error) NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error)
ReadResolvedServiceConfigEntries(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, upstreamIDs []structs.ServiceID, proxyMode structs.ProxyMode) (uint64, *configentry.ResolvedServiceConfigSet, error)
AbandonCh() <-chan struct{} AbandonCh() <-chan struct{}
} }

View File

@ -32,6 +32,11 @@ func TestConfigSource_Success(t *testing.T) {
Service: "web-sidecar-proxy", Service: "web-sidecar-proxy",
Port: 9999, Port: 9999,
Kind: structs.ServiceKindConnectProxy, Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
}, },
})) }))
@ -70,6 +75,11 @@ func TestConfigSource_Success(t *testing.T) {
Service: "web-sidecar-proxy", Service: "web-sidecar-proxy",
Port: 8888, Port: 8888,
Kind: structs.ServiceKindConnectProxy, Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
}, },
})) }))
@ -81,6 +91,25 @@ func TestConfigSource_Success(t *testing.T) {
t.Fatal("timeout waiting for snapshot") t.Fatal("timeout waiting for snapshot")
} }
// Update proxy-defaults.
require.NoError(t, store.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Name: structs.ProxyConfigGlobal,
Config: map[string]any{
"max_inbound_connections": 321,
},
}))
// Expect Register to have been called again with the new merged config.
select {
case snap := <-snapCh:
require.Equal(t, map[string]any{
"local_connect_timeout_ms": 123,
"max_inbound_connections": 321,
}, snap.Proxy.Config)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch. // Start another watch.
_, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) _, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err) require.NoError(t, err)
@ -238,6 +267,7 @@ func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName strin
snapCh <- &proxycfg.ConfigSnapshot{ snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id, ProxyID: id,
Port: ns.Port, Port: ns.Port,
Proxy: ns.Proxy,
} }
}). }).
Return(nil) Return(nil)