diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 7d42fb44d..233d9e2eb 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -502,8 +502,6 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err) } - // TODO(rb): do we have to do onlypassing filters here? - m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc] if !ok { m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes) @@ -608,16 +606,8 @@ func (s *state) resetWatchesFromChain( } s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target) - // snap.WatchedUpstreams[name] - - // delete(snap.WatchedUpstreams[name], target) - // delete(snap.WatchedUpstreamEndpoint[name], target) - - // TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly - // TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters - // TODO(rb): handle subset.onlypassing var subset structs.ServiceResolverSubset if target.ServiceSubset != "" { var ok bool @@ -649,24 +639,12 @@ func (s *state) resetWatchesFromChain( meshGateway = structs.MeshGatewayModeNone } - filterExp := subset.Filter - if subset.OnlyPassing { - if filterExp != "" { - // TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }" - // once the syntax is supported - filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp) - } else { - filterExp = "not Checks.Status != passing" - } - } - - // TODO(rb): update the health endpoint to allow returning even unhealthy endpoints err = s.watchConnectProxyService( ctx, "upstream-target:"+string(encodedTarget)+":"+id, target.Service, target.Datacenter, - filterExp, + subset.Filter, meshGateway, ) if err != nil { diff --git a/agent/structs/discovery_chain.go b/agent/structs/discovery_chain.go index cf95314a3..6565aabf7 100644 --- a/agent/structs/discovery_chain.go +++ b/agent/structs/discovery_chain.go @@ -32,7 +32,7 @@ type CompiledDiscoveryChain struct { // GroupResolverNodes respresents all unique service instance groups that // need to be represented. For envoy these render as Clusters. // - // Omitted from JSON because DiscoveryTarget is not a encoding.TextMarshaler. + // Omitted from JSON because these already show up under the Node field. GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"` // TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS @@ -54,6 +54,22 @@ func (c *CompiledDiscoveryChain) IsDefault() bool { c.Node.GroupResolver.Default } +// SubsetDefinitionForTarget is a convenience function to fetch the subset +// definition for the service subset defined by the provided target. If the +// subset is not defined an empty definition is returned. +func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset { + if t.ServiceSubset == "" { + return ServiceResolverSubset{} + } + + resolver, ok := c.Resolvers[t.Service] + if !ok { + return ServiceResolverSubset{} + } + + return resolver.Subsets[t.ServiceSubset] +} + const ( DiscoveryGraphNodeTypeRouter = "router" DiscoveryGraphNodeTypeSplitter = "splitter" diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 8f99abf45..db946bef1 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -57,7 +57,9 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps la := makeLoadAssignment( sni, 0, - []structs.CheckServiceNodes{endpoints}, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, cfgSnap.Datacenter, ) resources = append(resources, la) @@ -68,7 +70,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] if !ok { - continue // TODO(rb): whaaaa? + continue // skip the upstream (should not happen) } for target, node := range chain.GroupResolverNodes { @@ -77,18 +79,23 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps endpoints, ok := chainEndpointMap[target] if !ok { - continue // TODO(rb): whaaaa? + continue // skip the cluster (should not happen) } var ( - priorityEndpoints []structs.CheckServiceNodes + endpointGroups []loadAssignmentEndpointGroup overprovisioningFactor int ) - if failover != nil && len(failover.Targets) > 0 { - priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1) + primaryGroup := loadAssignmentEndpointGroup{ + Endpoints: endpoints, + OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing, + } - priorityEndpoints = append(priorityEndpoints, endpoints) + if failover != nil && len(failover.Targets) > 0 { + endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1) + + endpointGroups = append(endpointGroups, primaryGroup) if failover.Definition.OverprovisioningFactor > 0 { overprovisioningFactor = failover.Definition.OverprovisioningFactor @@ -101,14 +108,17 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps for _, failTarget := range failover.Targets { failEndpoints, ok := chainEndpointMap[failTarget] - if ok { - priorityEndpoints = append(priorityEndpoints, failEndpoints) + if !ok { + continue // skip the failover target (should not happen) } + + endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{ + Endpoints: failEndpoints, + OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing, + }) } } else { - priorityEndpoints = []structs.CheckServiceNodes{ - endpoints, - } + endpointGroups = append(endpointGroups, primaryGroup) } sni := TargetSNI(target, cfgSnap) @@ -116,7 +126,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps la := makeLoadAssignment( sni, overprovisioningFactor, - priorityEndpoints, + endpointGroups, cfgSnap.Datacenter, ) resources = append(resources, la) @@ -136,8 +146,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, }, cfgSnap.Datacenter, ) @@ -150,8 +160,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, }, cfgSnap.Datacenter, ) @@ -166,20 +176,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh endpoints := cfgSnap.MeshGateway.ServiceGroups[svc] // locally execute the subsets filter - filterExp := subset.Filter - if subset.OnlyPassing { - // we could do another filter pass without bexpr but this simplifies things a bit - if filterExp != "" { - // TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }" - // once the syntax is supported - filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp) - } else { - filterExp = "not Checks.Status != passing" - } - } - - if filterExp != "" { - filter, err := bexpr.CreateFilter(filterExp, nil, endpoints) + if subset.Filter != "" { + filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) if err != nil { return nil, err } @@ -194,8 +192,11 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + { + Endpoints: endpoints, + OnlyPassing: subset.OnlyPassing, + }, }, cfgSnap.Datacenter, ) @@ -216,15 +217,20 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint { } } +type loadAssignmentEndpointGroup struct { + Endpoints structs.CheckServiceNodes + OnlyPassing bool +} + func makeLoadAssignment( clusterName string, overprovisioningFactor int, - priorityEndpoints []structs.CheckServiceNodes, + endpointGroups []loadAssignmentEndpointGroup, localDatacenter string, ) *envoy.ClusterLoadAssignment { cla := &envoy.ClusterLoadAssignment{ ClusterName: clusterName, - Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)), + Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)), } if overprovisioningFactor > 0 { cla.Policy = &envoy.ClusterLoadAssignment_Policy{ @@ -232,7 +238,8 @@ func makeLoadAssignment( } } - for priority, endpoints := range priorityEndpoints { + for priority, endpointGroup := range endpointGroups { + endpoints := endpointGroup.Endpoints es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints)) for _, ep := range endpoints { @@ -246,8 +253,9 @@ func makeLoadAssignment( for _, chk := range ep.Checks { if chk.Status == api.HealthCritical { - // This can't actually happen now because health always filters critical - // but in the future it may not so set this correctly! + healthStatus = envoycore.HealthStatus_UNHEALTHY + } + if endpointGroup.OnlyPassing && chk.Status != api.HealthPassing { healthStatus = envoycore.HealthStatus_UNHEALTHY } if chk.Status == api.HealthWarning && ep.Service.Weights != nil { diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 1077a739a..72e0fed27 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -94,18 +94,19 @@ func Test_makeLoadAssignment(t *testing.T) { testWarningCheckServiceNodes[0].Checks[0].Status = "warning" testWarningCheckServiceNodes[1].Checks[0].Status = "warning" + // TODO(rb): test onlypassing tests := []struct { name string clusterName string overprovisioningFactor int - endpoints []structs.CheckServiceNodes + endpoints []loadAssignmentEndpointGroup want *envoy.ClusterLoadAssignment }{ { name: "no instances", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - {}, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: nil}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -117,8 +118,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, no weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -147,8 +148,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, healthy weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testWeightedCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testWeightedCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -177,8 +178,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, warning weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testWarningCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testWarningCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -207,7 +208,12 @@ func Test_makeLoadAssignment(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1") + got := makeLoadAssignment( + tt.clusterName, + tt.overprovisioningFactor, + tt.endpoints, + "dc1", + ) require.Equal(t, tt.want, got) }) } diff --git a/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden b/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden index 91e407914..74c59f1a5 100644 --- a/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden +++ b/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden @@ -246,6 +246,18 @@ }, "healthStatus": "HEALTHY", "loadBalancingWeight": 1 + }, + { + "endpoint": { + "address": { + "socketAddress": { + "address": "172.16.1.9", + "portValue": 2222 + } + } + }, + "healthStatus": "UNHEALTHY", + "loadBalancingWeight": 1 } ] } diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl new file mode 100644 index 000000000..7e4a3cd91 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl @@ -0,0 +1,25 @@ +enable_central_service_config = true + +config_entries { + bootstrap { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "http" + } + } + + bootstrap { + kind = "service-resolver" + name = "s2" + + default_subset = "test" + + subsets = { + "test" = { + only_passing = true + } + } + } +} diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl new file mode 100644 index 000000000..a2f6423e0 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl @@ -0,0 +1,22 @@ +services { + id = "s2-v1" + name = "s2" + port = 8182 + + meta { + version = "v1" + } + + checks = [ + { + name = "main" + ttl = "30m" + }, + ] + + connect { + sidecar_service { + port = 21011 + } + } +} diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh new file mode 100644 index 000000000..5cdf0540d --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -euo pipefail + +# wait for service registration +wait_for_agent_service_register s1 +wait_for_agent_service_register s2 +wait_for_agent_service_register s2-v1 + +# force s2-v1 into a warning state +set_ttl_check_state service:s2-v1 warn + +# wait for bootstrap to apply config entries +wait_for_config_entry proxy-defaults global +wait_for_config_entry service-resolver s2 + +gen_envoy_bootstrap s1 19000 +gen_envoy_bootstrap s2 19001 +gen_envoy_bootstrap s2-v1 19002 + +export REQUIRED_SERVICES=" +s1 s1-sidecar-proxy +s2 s2-sidecar-proxy +s2-v1 s2-v1-sidecar-proxy +" diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats new file mode 100644 index 000000000..10c6136aa --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats @@ -0,0 +1,66 @@ +#!/usr/bin/env bats + +load helpers + +@test "s1 proxy admin is up on :19000" { + retry_default curl -f -s localhost:19000/stats -o /dev/null +} + +@test "s2 proxy admin is up on :19001" { + retry_default curl -f -s localhost:19001/stats -o /dev/null +} + +@test "s2-v1 proxy admin is up on :19002" { + retry_default curl -f -s localhost:19002/stats -o /dev/null +} + +@test "s1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s1 +} + +@test "s2 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21001 s2 +} + +@test "s2-v1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21011 s2 +} + +########################### +## with onlypassing=true + +@test "only one s2 proxy is healthy" { + assert_service_has_healthy_instances s2 1 +} + +@test "s1 upstream should have 1 healthy endpoint for test.s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 HEALTHY 1 +} + +@test "s1 upstream should have 1 unhealthy endpoints for test.s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 UNHEALTHY 1 +} + +@test "s1 upstream should be able to connect to s2" { + assert_expected_fortio_name s2 +} + +########################### +## with onlypassing=false + +@test "switch back to OnlyPassing=false by deleting the config" { + delete_config_entry service-resolver s2 +} + +@test "only one s2 proxy is healthy (OnlyPassing=false)" { + assert_service_has_healthy_instances s2 1 +} + +@test "s1 upstream should have 2 healthy endpoints for test.s2 (OnlyPassing=false)" { + # NOTE: the subset is erased, so we use the bare name now + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 HEALTHY 2 +} + +@test "s1 upstream should have 0 unhealthy endpoints for test.s2 (OnlyPassing=false)" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 UNHEALTHY 0 +} diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index c1c6f368c..7ea1792f2 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -191,6 +191,10 @@ function docker_wget { docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@ } +function docker_curl { + docker run -ti --rm --network container:envoy_consul_1 --entrypoint curl consul-dev $@ +} + function get_envoy_pid { local BOOTSTRAP_NAME=$1 run ps aux @@ -293,6 +297,36 @@ function wait_for_config_entry { retry_default read_config_entry $KIND $NAME >/dev/null } +function delete_config_entry { + local KIND=$1 + local NAME=$2 + retry_default curl -sL -XDELETE "http://127.0.0.1:8500/v1/config/${KIND}/${NAME}" +} + +function wait_for_agent_service_register { + local SERVICE_ID=$1 + retry_default docker_curl -sLf "http://127.0.0.1:8500/v1/agent/service/${SERVICE_ID}" >/dev/null +} + +function set_ttl_check_state { + local CHECK_ID=$1 + local CHECK_STATE=$2 + + case "$CHECK_STATE" in + pass) + ;; + warn) + ;; + fail) + ;; + *) + echo "invalid ttl check state '${CHECK_STATE}'" >&2 + return 1 + esac + + retry_default docker_curl -sL -XPUT "http://localhost:8500/v1/agent/check/warn/${CHECK_ID}" +} + function get_upstream_fortio_name { run retry_default curl -v -s -f localhost:5000/debug?env=dump [ "$status" == 0 ] diff --git a/website/source/docs/agent/config-entries/service-resolver.html.md b/website/source/docs/agent/config-entries/service-resolver.html.md index d0a6a782d..b37f781c1 100644 --- a/website/source/docs/agent/config-entries/service-resolver.html.md +++ b/website/source/docs/agent/config-entries/service-resolver.html.md @@ -94,10 +94,10 @@ name = "web" returned. - `OnlyPassing` `(bool: false)` - Specifies the behavior of the resolver's - health check filtering. If this is set to false, the results will include - instances with checks in the passing as well as the warning states. If this - is set to true, only instances with checks in the passing state will be - returned. + health check interpretation. If this is set to false, instances with checks + in the passing as well as the warning states will be considered healthy. If + this is set to true, only instances with checks in the passing state will + be considered healthy. - `Redirect` `(ServiceResolverRedirect: )` - When configured, all attempts to resolve the service this resolver defines will be substituted for