Fix resolution of service resolvers with subsets for external upstreams (#16499)
* Fix resolution of service resolvers with subsets for external upstreams * Add tests * Add changelog entry * Update view filter logic
This commit is contained in:
parent
9ef8ef9f3e
commit
ba667221a5
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
mesh: Fix resolution of service resolvers with subsets for external upstreams
|
||||
```
|
|
@ -52,6 +52,8 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
|
|||
return &HealthView{
|
||||
state: make(map[string]structs.CheckServiceNode),
|
||||
filter: fe,
|
||||
connect: req.Connect,
|
||||
kind: req.ServiceKind,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -61,6 +63,8 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
|
|||
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
||||
// involves re-sorting each time etc. though.
|
||||
type HealthView struct {
|
||||
connect bool
|
||||
kind structs.ServiceKind
|
||||
state map[string]structs.CheckServiceNode
|
||||
filter filterEvaluator
|
||||
}
|
||||
|
@ -84,6 +88,13 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error {
|
|||
if csn == nil {
|
||||
return errors.New("check service node was unexpectedly nil")
|
||||
}
|
||||
|
||||
// check if we intentionally need to skip the filter
|
||||
if s.skipFilter(csn) {
|
||||
s.state[id] = *csn
|
||||
continue
|
||||
}
|
||||
|
||||
passed, err := s.filter.Evaluate(*csn)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -100,6 +111,11 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool {
|
||||
// we only do this for connect-enabled services that need to be routed through a terminating gateway
|
||||
return s.kind == "" && s.connect && csn.Service.Kind == structs.ServiceKindTerminatingGateway
|
||||
}
|
||||
|
||||
type filterEvaluator interface {
|
||||
Evaluate(datum interface{}) (bool, error)
|
||||
}
|
||||
|
|
|
@ -941,3 +941,39 @@ func TestNewFilterEvaluator(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) {
|
||||
view, err := NewHealthView(structs.ServiceSpecificRequest{
|
||||
ServiceName: "name",
|
||||
Connect: true,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: "Service.Meta.version == \"v1\"",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = view.Update([]*pbsubscribe.Event{{
|
||||
Index: 1,
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||
Service: &pbservice.NodeService{
|
||||
Kind: structs.TerminatingGateway,
|
||||
Service: "name",
|
||||
Address: "127.0.0.1",
|
||||
Port: 8443,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
|
||||
node, ok := (view.Result(1)).(*structs.IndexedCheckServiceNodes)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Len(t, node.Nodes, 1)
|
||||
require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address)
|
||||
require.Equal(t, 8443, node.Nodes[0].Service.Port)
|
||||
}
|
||||
|
|
|
@ -2,3 +2,4 @@
|
|||
|
||||
snapshot_envoy_admin localhost:20000 terminating-gateway primary || true
|
||||
snapshot_envoy_admin localhost:19000 s1 primary || true
|
||||
snapshot_envoy_admin localhost:19001 s3 primary || true
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
services {
|
||||
id = "s3"
|
||||
name = "s3"
|
||||
port = 8184
|
||||
connect {
|
||||
sidecar_service {
|
||||
proxy {
|
||||
upstreams = [
|
||||
{
|
||||
destination_name = "s2"
|
||||
local_bind_port = 8185
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,4 +38,5 @@ register_services primary
|
|||
|
||||
# terminating gateway will act as s2's proxy
|
||||
gen_envoy_bootstrap s1 19000
|
||||
gen_envoy_bootstrap s3 19001
|
||||
gen_envoy_bootstrap terminating-gateway 20000 primary true
|
||||
|
|
|
@ -4,5 +4,6 @@
|
|||
export REQUIRED_SERVICES="
|
||||
s1 s1-sidecar-proxy
|
||||
s2-v1
|
||||
s3 s3-sidecar-proxy
|
||||
terminating-gateway-primary
|
||||
"
|
||||
|
|
|
@ -38,3 +38,7 @@ load helpers
|
|||
assert_envoy_metric_at_least 127.0.0.1:20000 "v1.s2.default.primary.*cx_total" 1
|
||||
}
|
||||
|
||||
@test "terminating-gateway is used for the upstream connection of the proxy" {
|
||||
# make sure we resolve the terminating gateway as endpoint for the upstream
|
||||
assert_upstream_has_endpoint_port 127.0.0.1:19001 "v1.s2" 8443
|
||||
}
|
||||
|
|
|
@ -361,6 +361,39 @@ function get_upstream_endpoint {
|
|||
| select(.name|startswith(\"${CLUSTER_NAME}\"))"
|
||||
}
|
||||
|
||||
function get_upstream_endpoint_port {
|
||||
local HOSTPORT=$1
|
||||
local CLUSTER_NAME=$2
|
||||
local PORT_VALUE=$3
|
||||
run curl -s -f "http://${HOSTPORT}/clusters?format=json"
|
||||
[ "$status" -eq 0 ]
|
||||
echo "$output" | jq --raw-output "
|
||||
.cluster_statuses[]
|
||||
| select(.name|startswith(\"${CLUSTER_NAME}\"))
|
||||
| [.host_statuses[].address.socket_address.port_value]
|
||||
| [select(.[] == ${PORT_VALUE})]
|
||||
| length"
|
||||
}
|
||||
|
||||
function assert_upstream_has_endpoint_port_once {
|
||||
local HOSTPORT=$1
|
||||
local CLUSTER_NAME=$2
|
||||
local PORT_VALUE=$3
|
||||
|
||||
GOT_COUNT=$(get_upstream_endpoint_port $HOSTPORT $CLUSTER_NAME $PORT_VALUE)
|
||||
|
||||
[ "$GOT_COUNT" -eq 1 ]
|
||||
}
|
||||
|
||||
function assert_upstream_has_endpoint_port {
|
||||
local HOSTPORT=$1
|
||||
local CLUSTER_NAME=$2
|
||||
local PORT_VALUE=$3
|
||||
|
||||
run retry_long assert_upstream_has_endpoint_port_once $HOSTPORT $CLUSTER_NAME $PORT_VALUE
|
||||
[ "$status" -eq 0 ]
|
||||
}
|
||||
|
||||
function get_upstream_endpoint_in_status_count {
|
||||
local HOSTPORT=$1
|
||||
local CLUSTER_NAME=$2
|
||||
|
|
Loading…
Reference in New Issue