From 3de0b32cc5e6c227252ed9c31329170f3cf487ac Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 11 Mar 2021 10:49:43 -0700 Subject: [PATCH] Update service manager to store centrally configured upstreams --- agent/service_manager.go | 60 ++++++++++--- agent/service_manager_test.go | 119 ++++++++++++++++++++++++++ agent/structs/connect_proxy_config.go | 6 +- 3 files changed, 171 insertions(+), 14 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index b2df5825d..704dc8f80 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -383,38 +383,72 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct ns.Proxy.TransparentProxy = defaults.TransparentProxy } - // Merge upstream defaults if there were any returned + // seenUpstreams stores the upstreams seen from the local registration so that we can also add synthetic entries. + // for upstream configuration that was defined via service-defaults.UpstreamConfigs. In TransparentProxy mode + // ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly. + // So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with thosee values. + remoteUpstreams := make(map[structs.ServiceID]structs.Upstream) + + for _, us := range defaults.UpstreamIDConfigs { + parsed, err := structs.ParseUpstreamConfig(us.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err) + } + + // Delete the mesh gateway key since this is the only place it is read from an opaque map. + // Later reads use Proxy.MeshGateway. + delete(us.Config, "mesh_gateway") + + remoteUpstreams[us.Upstream] = structs.Upstream{ + DestinationNamespace: us.Upstream.NamespaceOrDefault(), + DestinationName: us.Upstream.ID, + Config: us.Config, + MeshGateway: parsed.MeshGateway, + CentrallyConfigured: true, + } + } + + localUpstreams := make(map[structs.ServiceID]struct{}) + + // Merge upstream defaults into the local registration for i := range ns.Proxy.Upstreams { // Get a pointer not a value copy of the upstream struct us := &ns.Proxy.Upstreams[i] if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { continue } + localUpstreams[us.DestinationID()] = struct{}{} - usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) + usCfg, ok := remoteUpstreams[us.DestinationID()] if !ok { // No config defaults to merge continue } - // MeshGateway mode is fetched separately since it is a first class field and not read from us.Config - parsed, err := structs.ParseUpstreamConfig(usCfg) - if err != nil { - return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Identifier(), err) - } - // The local upstream config mode has the highest precedence, so only overwrite when it's set to the default if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { - us.MeshGateway.Mode = parsed.MeshGateway.Mode + us.MeshGateway.Mode = usCfg.MeshGateway.Mode } - // Delete the mesh gateway key since this is the only place it is read from an opaque map. - delete(usCfg, "mesh_gateway") - // Merge in everything else that is read from the map - if err := mergo.Merge(&us.Config, usCfg); err != nil { + if err := mergo.Merge(&us.Config, usCfg.Config); err != nil { return nil, err } } + + // Ensure upstreams present in central config are represented in the local configuration. + // This does not apply outside of TransparentProxy mode because in that situation every upstream needs to be defined + // explicitly and locally with a local bind port. + if ns.Proxy.TransparentProxy { + for id, remote := range remoteUpstreams { + if _, ok := localUpstreams[id]; ok { + // Remote upstream is already present locally + continue + } + + ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote) + } + } + return ns, err } diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index 3d179377a..de3b4e3cc 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -922,6 +922,125 @@ func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) { }, }, }, + { + name: "remote upstream config expands local upstream list in tproxy mode", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + TransparentProxy: true, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + TransparentProxy: true, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + Config: map[string]interface{}{ + "protocol": "grpc", + }, + CentrallyConfigured: true, + }, + }, + }, + }, + }, + { + name: "remote upstream config not added to local upstream list outside of tproxy mode", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + TransparentProxy: false, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, { name: "upstream mode from remote defaults overrides local default", args: args{ diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index a2f3f774b..b8708fab1 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -265,6 +265,10 @@ type Upstream struct { // an ingress gateway. This cannot and should not be set by a user, it is // used internally to store the association of hosts to an upstream service. IngressHosts []string `json:"-" bexpr:"-"` + + // CentrallyConfigured indicates whether the upstream was defined in a proxy + // instance registration or whether it was generated from a config entry. + CentrallyConfigured bool } func (t *Upstream) UnmarshalJSON(data []byte) (err error) { @@ -321,7 +325,7 @@ func (u *Upstream) Validate() error { return fmt.Errorf("upstream destination name cannot be empty") } - if u.LocalBindPort == 0 { + if u.LocalBindPort == 0 && !u.CentrallyConfigured { return fmt.Errorf("upstream local bind port cannot be zero") } return nil