Update server-side config resolution and client-side merging

This commit is contained in:
freddygv 2021-03-10 21:05:11 -07:00
parent 68148a1dae
commit b98abb6f09
9 changed files with 949 additions and 65 deletions

View File

@ -1948,6 +1948,7 @@ type addServiceLockedRequest struct {
// agent using Agent.AddService. // agent using Agent.AddService.
type AddServiceRequest struct { type AddServiceRequest struct {
Service *structs.NodeService Service *structs.NodeService
nodeName string
chkTypes []*structs.CheckType chkTypes []*structs.CheckType
persist bool persist bool
token string token string
@ -3107,6 +3108,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{ err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{ AddServiceRequest: AddServiceRequest{
Service: ns, Service: ns,
nodeName: a.config.NodeName,
chkTypes: chkTypes, chkTypes: chkTypes,
persist: false, // don't rewrite the file with the same data we just read persist: false, // don't rewrite the file with the same data we just read
token: service.Token, token: service.Token,
@ -3127,6 +3129,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{ err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{ AddServiceRequest: AddServiceRequest{
Service: sidecar, Service: sidecar,
nodeName: a.config.NodeName,
chkTypes: sidecarChecks, chkTypes: sidecarChecks,
persist: false, // don't rewrite the file with the same data we just read persist: false, // don't rewrite the file with the same data we just read
token: sidecarToken, token: sidecarToken,
@ -3225,6 +3228,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{ err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{ AddServiceRequest: AddServiceRequest{
Service: p.Service, Service: p.Service,
nodeName: a.config.NodeName,
chkTypes: nil, chkTypes: nil,
persist: false, // don't rewrite the file with the same data we just read persist: false, // don't rewrite the file with the same data we just read
token: p.Token, token: p.Token,

View File

@ -994,6 +994,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
addReq := AddServiceRequest{ addReq := AddServiceRequest{
Service: ns, Service: ns,
nodeName: s.agent.config.NodeName,
chkTypes: chkTypes, chkTypes: chkTypes,
persist: true, persist: true,
token: token, token: token,
@ -1007,6 +1008,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
if sidecar != nil { if sidecar != nil {
addReq := AddServiceRequest{ addReq := AddServiceRequest{
Service: sidecar, Service: sidecar,
nodeName: s.agent.config.NodeName,
chkTypes: sidecarChecks, chkTypes: sidecarChecks,
persist: true, persist: true,
token: sidecarToken, token: sidecarToken,

View File

@ -25,6 +25,8 @@ func TestResolvedServiceConfig(t *testing.T) {
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("foo", req.Name) require.Equal("foo", req.Name)
require.Equal("foo-1", req.ID)
require.Equal("foo-node", req.NodeName)
require.True(req.AllowStale) require.True(req.AllowStale)
reply := args.Get(2).(*structs.ServiceConfigResponse) reply := args.Get(2).(*structs.ServiceConfigResponse)
@ -48,6 +50,8 @@ func TestResolvedServiceConfig(t *testing.T) {
}, &structs.ServiceConfigRequest{ }, &structs.ServiceConfigRequest{
Datacenter: "dc1", Datacenter: "dc1",
Name: "foo", Name: "foo",
ID: "foo-1",
NodeName: "foo-node",
}) })
require.NoError(err) require.NoError(err)
require.Equal(cache.FetchResult{ require.Equal(cache.FetchResult{

View File

@ -329,31 +329,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
reply.Reset() reply.Reset()
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
// during the blocking query, this function will be rerun and these state store lookups
// will both be current.
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
var serviceConf *structs.ServiceConfigEntry
var ok bool
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
}
// Use the default enterprise meta to look up the global proxy defaults. In the future we may allow per-namespace proxy-defaults // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// but not yet. // blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because their are not namespaced.
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta()) _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
if err != nil { if err != nil {
return err return err
} }
var proxyConf *structs.ProxyConfigEntry
var (
proxyConf *structs.ProxyConfigEntry
proxyConfGlobalProtocol string
ok bool
)
if proxyEntry != nil { if proxyEntry != nil {
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
if !ok { if !ok {
@ -367,11 +357,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
reply.ProxyConfig = mapCopy.(map[string]interface{}) reply.ProxyConfig = mapCopy.(map[string]interface{})
reply.MeshGateway = proxyConf.MeshGateway reply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose reply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
} }
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index = index reply.Index = index
if serviceConf != nil { var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
if serviceConf.Expose.Checks { if serviceConf.Expose.Checks {
reply.Expose.Checks = true reply.Expose.Checks = true
} }
@ -389,55 +397,121 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
} }
} }
// Extract the global protocol from proxyConf for upstream configs. // First collect all upstreams into a set of seen upstreams.
var proxyConfGlobalProtocol interface{} // Upstreams can come from:
if proxyConf != nil && proxyConf.Config != nil { // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
proxyConfGlobalProtocol = proxyConf.Config["protocol"] // - Implicitly from centralized upstream config in service-defaults
} seenUpstreams := map[structs.ServiceID]struct{}{}
// map the legacy request structure using only service names
// to the new ServiceID type.
upstreamIDs := args.UpstreamIDs upstreamIDs := args.UpstreamIDs
legacyUpstreams := false legacyUpstreams := false
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
if len(upstreamIDs) == 0 { if len(upstreamIDs) == 0 {
legacyUpstreams = true legacyUpstreams = true
upstreamIDs = make([]structs.ServiceID, 0) upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams { for _, upstream := range args.Upstreams {
upstreamIDs = append(upstreamIDs, structs.NewServiceID(upstream, &args.EnterpriseMeta)) sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
} }
} }
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults
if serviceConf != nil && serviceConf.Connect != nil {
for sid := range serviceConf.Connect.UpstreamConfigs {
seenUpstreams[structs.ServiceIDFromString(sid)] = struct{}{}
}
}
var (
upstreamDefaults *structs.UpstreamConfig
upstreamConfigs map[string]*structs.UpstreamConfig
)
if serviceConf != nil && serviceConf.Connect != nil {
if serviceConf.Connect.UpstreamDefaults != nil {
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
}
if serviceConf.Connect.UpstreamConfigs != nil {
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
}
}
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_configs
// 3. Value from local upstream definition. This last step is done in the client's service manager.
var registrationMGConfig structs.MeshGatewayConfig
if args.ID != "" && args.NodeName != "" {
index, registration, err := state.NodeServiceWatch(ws, args.NodeName, args.ID, &args.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to query service registration")
}
if index > reply.Index {
reply.Index = index
}
if registration != nil && !registration.Proxy.MeshGateway.IsZero() {
registrationMGConfig = registration.Proxy.MeshGateway
}
}
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{}) usConfigs := make(map[structs.ServiceID]map[string]interface{})
for _, upstream := range upstreamIDs { for upstream, _ := range seenUpstreams {
_, upstreamEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_defaults|upstream_configs) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
_, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
if err != nil { if err != nil {
return err return err
} }
var upstreamConf *structs.ServiceConfigEntry if upstreamSvcDefaults != nil {
var ok bool cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry)
if upstreamEntry != nil {
upstreamConf, ok = upstreamEntry.(*structs.ServiceConfigEntry)
if !ok { if !ok {
return fmt.Errorf("invalid service config type %T", upstreamEntry) return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults)
}
if cfg.Protocol != "" {
protocol = cfg.Protocol
} }
} }
if protocol != "" {
// Fallback to proxyConf global protocol. resolvedCfg["protocol"] = protocol
protocol := proxyConfGlobalProtocol
if upstreamConf != nil && upstreamConf.Protocol != "" {
protocol = upstreamConf.Protocol
} }
// Nothing to configure if a protocol hasn't been set. // Merge centralized defaults for all upstreams before configuration for specific upstreams
if protocol == nil { if upstreamDefaults != nil {
continue upstreamDefaults.MergeInto(resolvedCfg, args.ID == "")
}
// The value from the proxy registration overrides the one from upstream_defaults because
// it is specific to the proxy instance
if !registrationMGConfig.IsZero() {
resolvedCfg["mesh_gateway"] = registrationMGConfig
} }
usConfigs[upstream] = map[string]interface{}{ if upstreamConfigs[upstream.String()] != nil {
"protocol": protocol, upstreamConfigs[upstream.String()].MergeInto(resolvedCfg, args.ID == "")
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
} }
} }
@ -447,22 +521,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
} }
if legacyUpstreams { if legacyUpstreams {
if reply.UpstreamConfigs == nil { // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
reply.UpstreamConfigs = make(map[string]map[string]interface{}) reply.UpstreamConfigs = make(map[string]map[string]interface{})
}
for us, conf := range usConfigs { for us, conf := range usConfigs {
reply.UpstreamConfigs[us.ID] = conf reply.UpstreamConfigs[us.ID] = conf
} }
} else { } else {
if reply.UpstreamIDConfigs == nil { reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
}
for us, conf := range usConfigs { for us, conf := range usConfigs {
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf}) reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
} }
} }
return nil return nil
}) })
} }

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"os" "os"
"sort"
"testing" "testing"
"time" "time"
@ -892,6 +893,550 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config) require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config)
} }
func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
tt := []struct {
name string
entries []structs.ConfigEntry
request structs.ServiceConfigRequest
proxyCfg structs.ConnectProxyConfig
expect structs.ServiceConfigResponse
}{
{
name: "upstream config entries from Upstreams and service-defaults",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zip": {
Protocol: "http",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy-1",
NodeName: "foo-node",
Datacenter: "dc1",
Upstreams: []string{"zap"},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "grpc",
},
UpstreamConfigs: map[string]map[string]interface{}{
"zip": {
"protocol": "http",
},
"zap": {
"protocol": "grpc",
},
},
},
},
{
name: "upstream config entries from UpstreamIDs and service-defaults",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zip": {
Protocol: "http",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy-1",
NodeName: "foo-node",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{{ID: "zap"}},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "grpc",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
{
Upstream: structs.ServiceID{
ID: "zip",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
{
name: "proxy registration overrides upstream_defaults",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy-1",
NodeName: "foo-node",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
{ID: "zap"},
},
},
proxyCfg: structs.ConnectProxyConfig{
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "none",
},
},
},
},
},
},
{
name: "upstream_configs overrides all",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "udp",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "tcp",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
Protocol: "http",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
PassiveHealthCheck: structs.PassiveHealthCheck{
Interval: 10,
MaxFailures: 2,
},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zap": {
Protocol: "grpc",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy-1",
NodeName: "foo-node",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
{ID: "zap"},
},
},
proxyCfg: structs.ConnectProxyConfig{
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "udp",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
state := s1.fsm.State()
// Boostrap the config entries
idx := uint64(1)
for _, conf := range tc.entries {
require.NoError(t, state.EnsureConfigEntry(idx, conf))
idx++
}
// The config endpoints pulls the proxy registration if a proxy ID is provided.
if tc.request.ID != "" {
require.NoError(t, state.EnsureNode(4, &structs.Node{
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
Node: tc.request.NodeName,
}))
require.NoError(t, state.EnsureService(5, tc.request.NodeName, &structs.NodeService{
ID: tc.request.ID,
Service: tc.request.ID,
Proxy: tc.proxyCfg,
}))
}
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &tc.request, &out))
// Don't know what this is deterministically, so we grab it from the response
tc.expect.QueryMeta = out.QueryMeta
// Order of this slice is also not deterministic since it's populated from a map
sort.SliceStable(out.UpstreamIDConfigs, func(i, j int) bool {
return out.UpstreamIDConfigs[i].Upstream.String() < out.UpstreamIDConfigs[j].Upstream.String()
})
require.Equal(t, tc.expect, out)
})
}
}
func TestConfigEntry_ResolveServiceConfig_Upstreams_RegistrationBlocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
nodeName := "foo-node"
// Create a dummy proxy/service config in the state store to look up.
state := s1.fsm.State()
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"foo": 1,
},
}))
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}))
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
Protocol: "grpc",
}))
require.NoError(t, state.EnsureNode(4, &structs.Node{
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
Node: nodeName,
}))
args := structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy",
NodeName: nodeName,
Datacenter: s1.config.Datacenter,
Upstreams: []string{"bar", "baz"},
}
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
var index uint64
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
// This mesh gateway configuration is pulled from foo-proxy's registration
UpstreamConfigs: map[string]map[string]interface{}{
"bar": {
"protocol": "grpc",
},
},
// Don't know what this is deterministically
QueryMeta: out.QueryMeta,
}
require.Equal(t, expected, out)
index = out.Index
// Now setup a blocking query for 'foo' while we add the proxy registration for foo-proxy.
// Adding the foo proxy registration should cause the blocking query to fire because it is
// watched when the ID and NodeName are provided.
{
// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
require.NoError(t, state.EnsureService(index+1, nodeName, &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
}))
}()
// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy",
NodeName: nodeName,
Datacenter: "dc1",
Upstreams: []string{"bar", "baz"},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))
// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
// Check the indexes
require.Equal(t, out.Index, index+1)
// The mesh gateway config from the proxy registration should no longer be present
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
UpstreamConfigs: map[string]map[string]interface{}{
"bar": {
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
},
"baz": {
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
},
},
// Don't know what this is deterministically
QueryMeta: out.QueryMeta,
}
require.Equal(t, expected, out)
}
}
func TestConfigEntry_ResolveServiceConfig_Upstreams_DegistrationBlocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
nodeName := "foo-node"
// Create a dummy proxy/service config in the state store to look up.
state := s1.fsm.State()
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"foo": 1,
},
}))
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}))
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
Protocol: "grpc",
}))
require.NoError(t, state.EnsureNode(4, &structs.Node{
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
Node: nodeName,
}))
require.NoError(t, state.EnsureService(5, nodeName, &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
}))
args := structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy",
NodeName: nodeName,
Datacenter: s1.config.Datacenter,
Upstreams: []string{"bar", "baz"},
}
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
var index uint64
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
// This mesh gateway configuration is pulled from foo-proxy's registration
UpstreamConfigs: map[string]map[string]interface{}{
"bar": {
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
},
"baz": {
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
},
},
// Don't know what this is deterministically
QueryMeta: out.QueryMeta,
}
require.Equal(t, expected, out)
index = out.Index
// Now setup a blocking query for 'foo' while we erase the proxy registration for foo-proxy.
// Deleting the foo proxy registration should cause the blocking query to fire because it is
// watched when the ID and NodeName are provided.
{
// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
require.NoError(t, state.DeleteService(index+1, nodeName, "foo-proxy", nil))
}()
// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
ID: "foo-proxy",
NodeName: nodeName,
Datacenter: "dc1",
Upstreams: []string{"bar", "baz"},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))
// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
// Check the indexes
require.Equal(t, out.Index, index+1)
// The mesh gateway config from the proxy registration should no longer be present
expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"foo": int64(1),
"protocol": "http",
},
UpstreamConfigs: map[string]map[string]interface{}{
"bar": {
"protocol": "grpc",
},
},
// Don't know what this is deterministically
QueryMeta: out.QueryMeta,
}
require.Equal(t, expected, out)
}
}
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")

View File

@ -1136,6 +1136,24 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
return idx, service, nil return idx, service, nil
} }
// NodeServiceWatch is used to retrieve a specific service associated with the given
// node, and add it to the watch set.
func (s *Store) NodeServiceWatch(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.NodeService, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := catalogServicesMaxIndex(tx, entMeta)
// Query the service
service, err := getNodeServiceWatchTxn(tx, ws, nodeName, serviceID, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
return idx, service, nil
}
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// Query the service // Query the service
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
@ -1150,6 +1168,21 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
return nil, nil return nil, nil
} }
func getNodeServiceWatchTxn(tx ReadTxn, ws memdb.WatchSet, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
// Query the service
watchCh, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
if err != nil {
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
}
ws.Add(watchCh)
if service != nil {
return service.(*structs.ServiceNode).ToNodeService(), nil
}
return nil, nil
}
func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) { func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()

View File

@ -309,8 +309,13 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
} }
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
ns := addReq.Service var (
name := ns.Service ns = addReq.Service
name = ns.Service
id = ns.ID
node = addReq.nodeName
)
var upstreams []structs.ServiceID var upstreams []structs.ServiceID
// Note that only sidecar proxies should even make it here for now although // Note that only sidecar proxies should even make it here for now although
@ -333,6 +338,8 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
req := &structs.ServiceConfigRequest{ req := &structs.ServiceConfigRequest{
Name: name, Name: name,
ID: id,
NodeName: node,
Datacenter: bd.RuntimeConfig.Datacenter, Datacenter: bd.RuntimeConfig.Datacenter,
QueryOptions: structs.QueryOptions{Token: addReq.token}, QueryOptions: structs.QueryOptions{Token: addReq.token},
UpstreamIDs: upstreams, UpstreamIDs: upstreams,
@ -365,7 +372,6 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
return nil, err return nil, err
} }
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
return nil, err return nil, err
} }
@ -382,16 +388,27 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
continue continue
} }
// default the upstreams gateway mode if it didn't specify one
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode
}
usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID())
if !ok { if !ok {
// No config defaults to merge // No config defaults to merge
continue continue
} }
// MeshGateway mode is fetched separately since it is a first class field and not read from us.Config
parsed, err := structs.ParseUpstreamConfig(usCfg)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Identifier(), err)
}
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = parsed.MeshGateway.Mode
}
// Delete the mesh gateway key since this is the only place it is read from an opaque map.
delete(usCfg, "mesh_gateway")
// Merge in everything else that is read from the map
if err := mergo.Merge(&us.Config, usCfg); err != nil { if err := mergo.Merge(&us.Config, usCfg); err != nil {
return nil, err return nil, err
} }

View File

@ -8,11 +8,11 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestServiceManager_RegisterService(t *testing.T) { func TestServiceManager_RegisterService(t *testing.T) {
@ -848,3 +848,205 @@ func convertToMap(v interface{}) (map[string]interface{}, error) {
return raw, nil return raw, nil
} }
func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "new config fields",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"protocol": "grpc",
},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "upstream mode from remote defaults overrides local default",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "mode in local upstream config overrides all",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -578,13 +578,17 @@ func (r *ConfigEntryListAllRequest) RequestDatacenter() string {
// for a service. // for a service.
type ServiceConfigRequest struct { type ServiceConfigRequest struct {
Name string Name string
ID string
NodeName string
Datacenter string Datacenter string
UpstreamIDs []ServiceID
// DEPRECATED // DEPRECATED
// Upstreams is a list of upstream service names to use for resolving the service config // Upstreams is a list of upstream service names to use for resolving the service config
// UpstreamIDs should be used instead which can encode more than just the name to // UpstreamIDs should be used instead which can encode more than just the name to
// uniquely identify a service. // uniquely identify a service.
Upstreams []string Upstreams []string
UpstreamIDs []ServiceID
EnterpriseMeta `hcl:",squash" mapstructure:",squash"` EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
QueryOptions QueryOptions