connect: rework how the service resolver subset OnlyPassing flag works (#6173)

The main change is that we no longer filter service instances by health,
preferring instead to render all results down into EDS endpoints in
envoy and merely label the endpoints as HEALTHY or UNHEALTHY.

When OnlyPassing is set to true we will force consul checks in a
'warning' state to render as UNHEALTHY in envoy.

Fixes #6171
This commit is contained in:
R.B. Boyer 2019-07-23 20:20:24 -05:00 committed by GitHub
parent e060748d3f
commit 2bfad66efa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 268 additions and 76 deletions

View file

@ -502,8 +502,6 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
}
// TODO(rb): do we have to do onlypassing filters here?
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok {
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
@ -608,16 +606,8 @@ func (s *state) resetWatchesFromChain(
}
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
// snap.WatchedUpstreams[name]
// delete(snap.WatchedUpstreams[name], target)
// delete(snap.WatchedUpstreamEndpoint[name], target)
// TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly
// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters
// TODO(rb): handle subset.onlypassing
var subset structs.ServiceResolverSubset
if target.ServiceSubset != "" {
var ok bool
@ -649,24 +639,12 @@ func (s *state) resetWatchesFromChain(
meshGateway = structs.MeshGatewayModeNone
}
filterExp := subset.Filter
if subset.OnlyPassing {
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}
// TODO(rb): update the health endpoint to allow returning even unhealthy endpoints
err = s.watchConnectProxyService(
ctx,
"upstream-target:"+string(encodedTarget)+":"+id,
target.Service,
target.Datacenter,
filterExp,
subset.Filter,
meshGateway,
)
if err != nil {

View file

@ -32,7 +32,7 @@ type CompiledDiscoveryChain struct {
// GroupResolverNodes respresents all unique service instance groups that
// need to be represented. For envoy these render as Clusters.
//
// Omitted from JSON because DiscoveryTarget is not a encoding.TextMarshaler.
// Omitted from JSON because these already show up under the Node field.
GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"`
// TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS
@ -54,6 +54,22 @@ func (c *CompiledDiscoveryChain) IsDefault() bool {
c.Node.GroupResolver.Default
}
// SubsetDefinitionForTarget is a convenience function to fetch the subset
// definition for the service subset defined by the provided target. If the
// subset is not defined an empty definition is returned.
func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset {
if t.ServiceSubset == "" {
return ServiceResolverSubset{}
}
resolver, ok := c.Resolvers[t.Service]
if !ok {
return ServiceResolverSubset{}
}
return resolver.Subsets[t.ServiceSubset]
}
const (
DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter"

View file

@ -57,7 +57,9 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment(
sni,
0,
[]structs.CheckServiceNodes{endpoints},
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
@ -68,7 +70,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
continue // TODO(rb): whaaaa?
continue // skip the upstream (should not happen)
}
for target, node := range chain.GroupResolverNodes {
@ -77,18 +79,23 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
endpoints, ok := chainEndpointMap[target]
if !ok {
continue // TODO(rb): whaaaa?
continue // skip the cluster (should not happen)
}
var (
priorityEndpoints []structs.CheckServiceNodes
endpointGroups []loadAssignmentEndpointGroup
overprovisioningFactor int
)
if failover != nil && len(failover.Targets) > 0 {
priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1)
primaryGroup := loadAssignmentEndpointGroup{
Endpoints: endpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing,
}
priorityEndpoints = append(priorityEndpoints, endpoints)
if failover != nil && len(failover.Targets) > 0 {
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
endpointGroups = append(endpointGroups, primaryGroup)
if failover.Definition.OverprovisioningFactor > 0 {
overprovisioningFactor = failover.Definition.OverprovisioningFactor
@ -101,14 +108,17 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
for _, failTarget := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTarget]
if ok {
priorityEndpoints = append(priorityEndpoints, failEndpoints)
if !ok {
continue // skip the failover target (should not happen)
}
endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{
Endpoints: failEndpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing,
})
}
} else {
priorityEndpoints = []structs.CheckServiceNodes{
endpoints,
}
endpointGroups = append(endpointGroups, primaryGroup)
}
sni := TargetSNI(target, cfgSnap)
@ -116,7 +126,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment(
sni,
overprovisioningFactor,
priorityEndpoints,
endpointGroups,
cfgSnap.Datacenter,
)
resources = append(resources, la)
@ -136,8 +146,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
@ -150,8 +160,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
@ -166,20 +176,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
endpoints := cfgSnap.MeshGateway.ServiceGroups[svc]
// locally execute the subsets filter
filterExp := subset.Filter
if subset.OnlyPassing {
// we could do another filter pass without bexpr but this simplifies things a bit
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}
if filterExp != "" {
filter, err := bexpr.CreateFilter(filterExp, nil, endpoints)
if subset.Filter != "" {
filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints)
if err != nil {
return nil, err
}
@ -194,8 +192,11 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment(
clusterName,
0,
[]structs.CheckServiceNodes{
endpoints,
[]loadAssignmentEndpointGroup{
{
Endpoints: endpoints,
OnlyPassing: subset.OnlyPassing,
},
},
cfgSnap.Datacenter,
)
@ -216,15 +217,20 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
}
}
type loadAssignmentEndpointGroup struct {
Endpoints structs.CheckServiceNodes
OnlyPassing bool
}
func makeLoadAssignment(
clusterName string,
overprovisioningFactor int,
priorityEndpoints []structs.CheckServiceNodes,
endpointGroups []loadAssignmentEndpointGroup,
localDatacenter string,
) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)),
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)),
}
if overprovisioningFactor > 0 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{
@ -232,7 +238,8 @@ func makeLoadAssignment(
}
}
for priority, endpoints := range priorityEndpoints {
for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints {
@ -246,8 +253,9 @@ func makeLoadAssignment(
for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if endpointGroup.OnlyPassing && chk.Status != api.HealthPassing {
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {

View file

@ -94,18 +94,19 @@ func Test_makeLoadAssignment(t *testing.T) {
testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"
// TODO(rb): test onlypassing
tests := []struct {
name string
clusterName string
overprovisioningFactor int
endpoints []structs.CheckServiceNodes
endpoints []loadAssignmentEndpointGroup
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
{},
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: nil},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
@ -117,8 +118,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
@ -147,8 +148,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testWeightedCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testWeightedCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
@ -177,8 +178,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{
testWarningCheckServiceNodes,
endpoints: []loadAssignmentEndpointGroup{
{Endpoints: testWarningCheckServiceNodes},
},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
@ -207,7 +208,12 @@ func Test_makeLoadAssignment(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1")
got := makeLoadAssignment(
tt.clusterName,
tt.overprovisioningFactor,
tt.endpoints,
"dc1",
)
require.Equal(t, tt.want, got)
})
}

View file

@ -246,6 +246,18 @@
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "172.16.1.9",
"portValue": 2222
}
}
},
"healthStatus": "UNHEALTHY",
"loadBalancingWeight": 1
}
]
}

View file

@ -0,0 +1,25 @@
enable_central_service_config = true
config_entries {
bootstrap {
kind = "proxy-defaults"
name = "global"
config {
protocol = "http"
}
}
bootstrap {
kind = "service-resolver"
name = "s2"
default_subset = "test"
subsets = {
"test" = {
only_passing = true
}
}
}
}

View file

@ -0,0 +1,22 @@
services {
id = "s2-v1"
name = "s2"
port = 8182
meta {
version = "v1"
}
checks = [
{
name = "main"
ttl = "30m"
},
]
connect {
sidecar_service {
port = 21011
}
}
}

View file

@ -0,0 +1,25 @@
#!/bin/bash
set -euo pipefail
# wait for service registration
wait_for_agent_service_register s1
wait_for_agent_service_register s2
wait_for_agent_service_register s2-v1
# force s2-v1 into a warning state
set_ttl_check_state service:s2-v1 warn
# wait for bootstrap to apply config entries
wait_for_config_entry proxy-defaults global
wait_for_config_entry service-resolver s2
gen_envoy_bootstrap s1 19000
gen_envoy_bootstrap s2 19001
gen_envoy_bootstrap s2-v1 19002
export REQUIRED_SERVICES="
s1 s1-sidecar-proxy
s2 s2-sidecar-proxy
s2-v1 s2-v1-sidecar-proxy
"

View file

@ -0,0 +1,66 @@
#!/usr/bin/env bats
load helpers
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "s2 proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "s2-v1 proxy admin is up on :19002" {
retry_default curl -f -s localhost:19002/stats -o /dev/null
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "s2 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21001 s2
}
@test "s2-v1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21011 s2
}
###########################
## with onlypassing=true
@test "only one s2 proxy is healthy" {
assert_service_has_healthy_instances s2 1
}
@test "s1 upstream should have 1 healthy endpoint for test.s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 HEALTHY 1
}
@test "s1 upstream should have 1 unhealthy endpoints for test.s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 UNHEALTHY 1
}
@test "s1 upstream should be able to connect to s2" {
assert_expected_fortio_name s2
}
###########################
## with onlypassing=false
@test "switch back to OnlyPassing=false by deleting the config" {
delete_config_entry service-resolver s2
}
@test "only one s2 proxy is healthy (OnlyPassing=false)" {
assert_service_has_healthy_instances s2 1
}
@test "s1 upstream should have 2 healthy endpoints for test.s2 (OnlyPassing=false)" {
# NOTE: the subset is erased, so we use the bare name now
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 HEALTHY 2
}
@test "s1 upstream should have 0 unhealthy endpoints for test.s2 (OnlyPassing=false)" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 UNHEALTHY 0
}

View file

@ -191,6 +191,10 @@ function docker_wget {
docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@
}
function docker_curl {
docker run -ti --rm --network container:envoy_consul_1 --entrypoint curl consul-dev $@
}
function get_envoy_pid {
local BOOTSTRAP_NAME=$1
run ps aux
@ -293,6 +297,36 @@ function wait_for_config_entry {
retry_default read_config_entry $KIND $NAME >/dev/null
}
function delete_config_entry {
local KIND=$1
local NAME=$2
retry_default curl -sL -XDELETE "http://127.0.0.1:8500/v1/config/${KIND}/${NAME}"
}
function wait_for_agent_service_register {
local SERVICE_ID=$1
retry_default docker_curl -sLf "http://127.0.0.1:8500/v1/agent/service/${SERVICE_ID}" >/dev/null
}
function set_ttl_check_state {
local CHECK_ID=$1
local CHECK_STATE=$2
case "$CHECK_STATE" in
pass)
;;
warn)
;;
fail)
;;
*)
echo "invalid ttl check state '${CHECK_STATE}'" >&2
return 1
esac
retry_default docker_curl -sL -XPUT "http://localhost:8500/v1/agent/check/warn/${CHECK_ID}"
}
function get_upstream_fortio_name {
run retry_default curl -v -s -f localhost:5000/debug?env=dump
[ "$status" == 0 ]

View file

@ -94,10 +94,10 @@ name = "web"
returned.
- `OnlyPassing` `(bool: false)` - Specifies the behavior of the resolver's
health check filtering. If this is set to false, the results will include
instances with checks in the passing as well as the warning states. If this
is set to true, only instances with checks in the passing state will be
returned.
health check interpretation. If this is set to false, instances with checks
in the passing as well as the warning states will be considered healthy. If
this is set to true, only instances with checks in the passing state will
be considered healthy.
- `Redirect` `(ServiceResolverRedirect: <optional>)` - When configured, all
attempts to resolve the service this resolver defines will be substituted for