Connect: Fix Envoy getting stuck during load (#5499)

* Connect: Fix Envoy getting stuck during load

Also in this PR:
 - Enabled outlier detection on upstreams which will mark instances unhealthy after 5 failures (using Envoy's defaults)
 - Enable weighted load balancing where DNS weights are configured

* Fix empty load assignments in the right place

* Fix import names from review

* Move millisecond parse to a helper function
This commit is contained in:
Paul Banks 2019-03-22 19:37:14 +00:00 committed by GitHub
parent 6c75291aeb
commit cf5528734c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 357 additions and 8 deletions

View File

@ -4,10 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
@ -86,6 +88,25 @@ func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
return c, err
}
func parseTimeMillis(ms interface{}) (time.Duration, error) {
switch v := ms.(type) {
case string:
ms, err := strconv.Atoi(v)
if err != nil {
return 0, err
}
return time.Duration(ms) * time.Millisecond, nil
case float64: // This is what parsing from JSON results in
return time.Duration(v) * time.Millisecond, nil
// Not sure if this can ever really happen but just in case it does in
// some test code...
case int:
return time.Duration(v) * time.Millisecond, nil
}
return 0, errors.New("invalid type for millisecond duration")
}
func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
@ -101,9 +122,15 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
}
if c == nil {
conTimeout := 5 * time.Second
if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok {
if ms, err := parseTimeMillis(toRaw); err == nil {
conTimeout = ms
}
}
c = &envoy.Cluster{
Name: upstream.Identifier(),
ConnectTimeout: 5 * time.Second,
ConnectTimeout: conTimeout,
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
@ -112,6 +139,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
},
},
},
// Having an empty config enables outlier detection with default config.
OutlierDetection: &envoycluster.OutlierDetection{},
}
}

View File

@ -0,0 +1,76 @@
package xds
import (
"testing"
"time"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
func Test_makeUpstreamCluster(t *testing.T) {
tests := []struct {
name string
snap proxycfg.ConfigSnapshot
upstream structs.Upstream
want *envoy.Cluster
}{
{
name: "timeout override",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[0],
want: &envoy.Cluster{
Name: "service:db",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
{
name: "timeout default",
snap: proxycfg.ConfigSnapshot{},
upstream: structs.TestUpstreams(t)[1],
want: &envoy.Cluster{
Name: "prepared_query:geo-cache",
Type: envoy.Cluster_EDS,
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
EdsConfig: &envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
ConnectTimeout: 5 * time.Second, // Default
OutlierDetection: &cluster.OutlierDetection{},
TlsContext: &envoyauth.UpstreamTlsContext{
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
got, err := makeUpstreamCluster(tt.upstream, &tt.snap)
require.NoError(err)
require.Equal(tt.want, got)
})
}
}

View File

@ -4,11 +4,13 @@ import (
"errors"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/gogo/protobuf/proto"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
@ -19,9 +21,6 @@ func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pr
}
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
for id, endpoints := range cfgSnap.UpstreamEndpoints {
if len(endpoints) < 1 {
continue
}
la := makeLoadAssignment(id, endpoints)
resources = append(resources, la)
}
@ -43,10 +42,38 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes)
if addr == "" {
addr = ep.Node.Address
}
healthStatus := envoycore.HealthStatus_HEALTHY
weight := 1
if ep.Service.Weights != nil {
weight = ep.Service.Weights.Passing
}
for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical
// but in the future it may not so set this correctly!
healthStatus = envoycore.HealthStatus_UNHEALTHY
}
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
weight = ep.Service.Weights.Warning
}
}
// Make weights fit Envoy's limits. A zero weight means that either Warning
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
// this instance unhealthy and should not be sent traffic.
if weight < 1 {
healthStatus = envoycore.HealthStatus_UNHEALTHY
weight = 1
}
if weight > 128 {
weight = 128
}
es = append(es, envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr(addr, ep.Service.Port),
},
HealthStatus: healthStatus,
LoadBalancingWeight: makeUint32Value(weight),
})
}
return &envoy.ClusterLoadAssignment{

193
agent/xds/endpoints_test.go Normal file
View File

@ -0,0 +1,193 @@
package xds
import (
"testing"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/require"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/hashicorp/consul/agent/structs"
)
func Test_makeLoadAssignment(t *testing.T) {
testCheckServiceNodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node1-id",
Node: "node1",
Address: "10.10.10.10",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node1",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "node2-id",
Node: "node2",
Address: "10.10.10.20",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "web",
Port: 1234,
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
CheckID: "serfHealth",
Status: "passing",
},
&structs.HealthCheck{
Node: "node2",
ServiceID: "web",
CheckID: "web:check",
Status: "passing",
},
},
},
}
testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes)
require.NoError(t, err)
testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes)
testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{
Passing: 10,
Warning: 1,
}
testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{
Passing: 5,
Warning: 0,
}
testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes)
require.NoError(t, err)
testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes)
testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"
tests := []struct {
name string
clusterName string
endpoints structs.CheckServiceNodes
want *envoy.ClusterLoadAssignment
}{
{
name: "no instances",
clusterName: "service:test",
endpoints: structs.CheckServiceNodes{},
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{},
}},
},
},
{
name: "instances, no weights",
clusterName: "service:test",
endpoints: testCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
{
name: "instances, healthy weights",
clusterName: "service:test",
endpoints: testWeightedCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(10),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(5),
},
},
}},
},
},
{
name: "instances, warning weights",
clusterName: "service:test",
endpoints: testWarningCheckServiceNodes,
want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test",
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
LbEndpoints: []envoyendpoint.LbEndpoint{
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.10", 1234),
},
HealthStatus: core.HealthStatus_HEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
envoyendpoint.LbEndpoint{
Endpoint: &envoyendpoint.Endpoint{
Address: makeAddressPtr("10.10.10.20", 1234),
},
HealthStatus: core.HealthStatus_UNHEALTHY,
LoadBalancingWeight: makeUint32Value(1),
},
},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.endpoints)
require.Equal(t, tt.want, got)
})
}
}

View File

@ -252,6 +252,9 @@ func makeCommonTLSContext(cfgSnap *proxycfg.ConfigSnapshot) *envoyauth.CommonTls
// Concatenate all the root PEMs into one.
// TODO(banks): verify this actually works with Envoy (docs are not clear).
rootPEMS := ""
if cfgSnap.Roots == nil {
return nil
}
for _, root := range cfgSnap.Roots.Roots {
rootPEMS += root.RootCert
}

View File

@ -5,6 +5,7 @@ import (
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
prototypes "github.com/gogo/protobuf/types"
)
func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) {
@ -52,3 +53,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address {
a := makeAddress(ip, port)
return &a
}
func makeUint32Value(n int) *prototypes.UInt32Value {
return &prototypes.UInt32Value{Value: uint32(n)}
}

View File

@ -346,7 +346,13 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
if err != nil {
return err
}
if resources == nil || len(resources) == 0 {
// Zero length resource responses should be ignored and are the result of no
// data yet. Notice that this caused a bug originally where we had zero
// healthy endpoints for an upstream that would cause Envoy to hang waiting
// for the EDS response. This is fixed though by ensuring we send an explicit
// empty LoadAssignment resource for the cluster rather than allowing junky
// empty resources.
if len(resources) == 0 {
// Nothing to send yet
return nil
}

View File

@ -389,7 +389,10 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to
}
}
},
"connectTimeout": "5s",
"outlierDetection": {
},
"connectTimeout": "1s",
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
}`,
"prepared_query:geo-cache": `
@ -403,6 +406,9 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to
}
}
},
"outlierDetection": {
},
"connectTimeout": "5s",
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
@ -461,7 +467,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri
"portValue": 0
}
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
@ -471,7 +479,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri
"portValue": 0
}
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}