[API Gateway] Update simple test to leverage intentions and multiple listeners (#16228)

* [API Gateway] Add integration test for conflicted TCP listeners

* [API Gateway] Update simple test to leverage intentions and multiple listeners

* Fix broken unit test

* PR suggestions
This commit is contained in:
Andrew Stucki 2023-02-10 16:13:44 -05:00 committed by GitHub
parent d457e40038
commit 7dda5e8b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 29 deletions

View File

@ -62,6 +62,9 @@ func (h *handlerAPIGateway) initialize(ctx context.Context) (ConfigSnapshot, err
snap.APIGateway.TCPRoutes = watch.NewMap[structs.ResourceReference, *structs.TCPRouteConfigEntry]() snap.APIGateway.TCPRoutes = watch.NewMap[structs.ResourceReference, *structs.TCPRouteConfigEntry]()
snap.APIGateway.Certificates = watch.NewMap[structs.ResourceReference, *structs.InlineCertificateConfigEntry]() snap.APIGateway.Certificates = watch.NewMap[structs.ResourceReference, *structs.InlineCertificateConfigEntry]()
snap.APIGateway.Upstreams = make(listenerRouteUpstreams)
snap.APIGateway.UpstreamsSet = make(routeUpstreamSet)
// These need to be initialized here but are set by handlerUpstreams // These need to be initialized here but are set by handlerUpstreams
snap.APIGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) snap.APIGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
snap.APIGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]() snap.APIGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
@ -192,6 +195,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd
// Unsubscribe from any config entries that are no longer attached // Unsubscribe from any config entries that are no longer attached
snap.APIGateway.HTTPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { snap.APIGateway.HTTPRoutes.ForEachKey(func(ref structs.ResourceReference) bool {
if _, ok := seenRefs[ref]; !ok { if _, ok := seenRefs[ref]; !ok {
snap.APIGateway.Upstreams.delete(ref)
snap.APIGateway.UpstreamsSet.delete(ref)
snap.APIGateway.HTTPRoutes.CancelWatch(ref) snap.APIGateway.HTTPRoutes.CancelWatch(ref)
} }
return true return true
@ -199,6 +204,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd
snap.APIGateway.TCPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { snap.APIGateway.TCPRoutes.ForEachKey(func(ref structs.ResourceReference) bool {
if _, ok := seenRefs[ref]; !ok { if _, ok := seenRefs[ref]; !ok {
snap.APIGateway.Upstreams.delete(ref)
snap.APIGateway.UpstreamsSet.delete(ref)
snap.APIGateway.TCPRoutes.CancelWatch(ref) snap.APIGateway.TCPRoutes.CancelWatch(ref)
} }
return true return true
@ -270,7 +277,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
EnterpriseMeta: *resp.Entry.GetEnterpriseMeta(), EnterpriseMeta: *resp.Entry.GetEnterpriseMeta(),
} }
seenUpstreamIDs := make(map[UpstreamID]struct{}) seenUpstreamIDs := make(upstreamIDSet)
upstreams := make(map[APIGatewayListenerKey]structs.Upstreams) upstreams := make(map[APIGatewayListenerKey]structs.Upstreams)
switch route := resp.Entry.(type) { switch route := resp.Entry.(type) {
@ -331,7 +338,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
for _, service := range route.Services { for _, service := range route.Services {
upstreamID := NewUpstreamIDFromServiceName(service.ServiceName()) upstreamID := NewUpstreamIDFromServiceName(service.ServiceName())
seenUpstreamIDs[upstreamID] = struct{}{} seenUpstreamIDs.add(upstreamID)
// For each listener, check if this route should bind and, if so, create an upstream. // For each listener, check if this route should bind and, if so, create an upstream.
for _, listener := range snap.APIGateway.Listeners { for _, listener := range snap.APIGateway.Listeners {
@ -351,7 +358,6 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
DestinationNamespace: service.NamespaceOrDefault(), DestinationNamespace: service.NamespaceOrDefault(),
DestinationPartition: service.PartitionOrDefault(), DestinationPartition: service.PartitionOrDefault(),
LocalBindPort: listener.Port, LocalBindPort: listener.Port,
//IngressHosts: g.Hosts,
// Pass the protocol that was configured on the ingress listener in order // Pass the protocol that was configured on the ingress listener in order
// to force that protocol on the Envoy listener. // to force that protocol on the Envoy listener.
Config: map[string]interface{}{ Config: map[string]interface{}{
@ -380,14 +386,16 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
return fmt.Errorf("invalid type for config entry: %T", resp.Entry) return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
} }
snap.APIGateway.Upstreams = upstreams for listener, set := range upstreams {
snap.APIGateway.UpstreamsSet = seenUpstreamIDs snap.APIGateway.Upstreams.set(ref, listener, set)
}
snap.APIGateway.UpstreamsSet.set(ref, seenUpstreamIDs)
//snap.APIGateway.Hosts = TODO //snap.APIGateway.Hosts = TODO
snap.APIGateway.AreHostsSet = true snap.APIGateway.AreHostsSet = true
// Stop watching any upstreams and discovery chains that have become irrelevant // Stop watching any upstreams and discovery chains that have become irrelevant
for upstreamID, cancelDiscoChain := range snap.APIGateway.WatchedDiscoveryChains { for upstreamID, cancelDiscoChain := range snap.APIGateway.WatchedDiscoveryChains {
if _, ok := seenUpstreamIDs[upstreamID]; ok { if snap.APIGateway.UpstreamsSet.hasUpstream(upstreamID) {
continue continue
} }

View File

@ -260,26 +260,40 @@ func (o *configSnapshotAPIGateway) DeepCopy() *configSnapshotAPIGateway {
copy(cp.Hosts, o.Hosts) copy(cp.Hosts, o.Hosts)
} }
if o.Upstreams != nil { if o.Upstreams != nil {
cp.Upstreams = make(map[IngressListenerKey]structs.Upstreams, len(o.Upstreams)) cp.Upstreams = make(map[structs.ResourceReference]listenerUpstreamMap, len(o.Upstreams))
for k2, v2 := range o.Upstreams { for k2, v2 := range o.Upstreams {
var cp_Upstreams_v2 structs.Upstreams var cp_Upstreams_v2 listenerUpstreamMap
if v2 != nil { if v2 != nil {
cp_Upstreams_v2 = make([]structs.Upstream, len(v2)) cp_Upstreams_v2 = make(map[IngressListenerKey]structs.Upstreams, len(v2))
copy(cp_Upstreams_v2, v2) for k3, v3 := range v2 {
for i3 := range v2 { var cp_Upstreams_v2_v3 structs.Upstreams
{ if v3 != nil {
retV := v2[i3].DeepCopy() cp_Upstreams_v2_v3 = make([]structs.Upstream, len(v3))
cp_Upstreams_v2[i3] = *retV copy(cp_Upstreams_v2_v3, v3)
for i4 := range v3 {
{
retV := v3[i4].DeepCopy()
cp_Upstreams_v2_v3[i4] = *retV
}
}
} }
cp_Upstreams_v2[k3] = cp_Upstreams_v2_v3
} }
} }
cp.Upstreams[k2] = cp_Upstreams_v2 cp.Upstreams[k2] = cp_Upstreams_v2
} }
} }
if o.UpstreamsSet != nil { if o.UpstreamsSet != nil {
cp.UpstreamsSet = make(map[UpstreamID]struct{}, len(o.UpstreamsSet)) cp.UpstreamsSet = make(map[structs.ResourceReference]upstreamIDSet, len(o.UpstreamsSet))
for k2, v2 := range o.UpstreamsSet { for k2, v2 := range o.UpstreamsSet {
cp.UpstreamsSet[k2] = v2 var cp_UpstreamsSet_v2 upstreamIDSet
if v2 != nil {
cp_UpstreamsSet_v2 = make(map[UpstreamID]struct{}, len(v2))
for k3, v3 := range v2 {
cp_UpstreamsSet_v2[k3] = v3
}
}
cp.UpstreamsSet[k2] = cp_UpstreamsSet_v2
} }
} }
cp.HTTPRoutes = o.HTTPRoutes.DeepCopy() cp.HTTPRoutes = o.HTTPRoutes.DeepCopy()

View File

@ -2,7 +2,6 @@ package proxycfg
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
@ -641,6 +640,55 @@ func (c *configSnapshotMeshGateway) isEmptyPeering() bool {
!c.PeeringTrustBundlesSet !c.PeeringTrustBundlesSet
} }
type upstreamIDSet map[UpstreamID]struct{}
func (u upstreamIDSet) add(uid UpstreamID) {
u[uid] = struct{}{}
}
type routeUpstreamSet map[structs.ResourceReference]upstreamIDSet
func (r routeUpstreamSet) hasUpstream(uid UpstreamID) bool {
for _, set := range r {
if _, ok := set[uid]; ok {
return true
}
}
return false
}
func (r routeUpstreamSet) set(route structs.ResourceReference, set upstreamIDSet) {
r[route] = set
}
func (r routeUpstreamSet) delete(route structs.ResourceReference) {
delete(r, route)
}
type listenerUpstreamMap map[APIGatewayListenerKey]structs.Upstreams
type listenerRouteUpstreams map[structs.ResourceReference]listenerUpstreamMap
func (l listenerRouteUpstreams) set(route structs.ResourceReference, listener APIGatewayListenerKey, upstreams structs.Upstreams) {
if _, ok := l[route]; !ok {
l[route] = make(listenerUpstreamMap)
}
l[route][listener] = upstreams
}
func (l listenerRouteUpstreams) delete(route structs.ResourceReference) {
delete(l, route)
}
func (l listenerRouteUpstreams) toUpstreams() map[IngressListenerKey]structs.Upstreams {
listeners := make(map[IngressListenerKey]structs.Upstreams, len(l))
for _, listenerMap := range l {
for listener, set := range listenerMap {
listeners[listener] = append(listeners[listener], set...)
}
}
return listeners
}
type configSnapshotAPIGateway struct { type configSnapshotAPIGateway struct {
ConfigSnapshotUpstreams ConfigSnapshotUpstreams
@ -669,10 +717,10 @@ type configSnapshotAPIGateway struct {
// the GatewayServices RPC to retrieve them. // the GatewayServices RPC to retrieve them.
// TODO Determine if this is updated "for free" or not. If not, we might need // TODO Determine if this is updated "for free" or not. If not, we might need
// to do some work to populate it in handlerAPIGateway // to do some work to populate it in handlerAPIGateway
Upstreams map[IngressListenerKey]structs.Upstreams Upstreams listenerRouteUpstreams
// UpstreamsSet is the unique set of UpstreamID the gateway routes to. // UpstreamsSet is the unique set of UpstreamID the gateway routes to.
UpstreamsSet map[UpstreamID]struct{} UpstreamsSet routeUpstreamSet
HTTPRoutes watch.Map[structs.ResourceReference, *structs.HTTPRouteConfigEntry] HTTPRoutes watch.Map[structs.ResourceReference, *structs.HTTPRouteConfigEntry]
TCPRoutes watch.Map[structs.ResourceReference, *structs.TCPRouteConfigEntry] TCPRoutes watch.Map[structs.ResourceReference, *structs.TCPRouteConfigEntry]
@ -733,17 +781,18 @@ func (c *configSnapshotAPIGateway) ToIngress(datacenter string) (configSnapshotI
} }
ingressListener.TLS = tls ingressListener.TLS = tls
ingressListeners[IngressListenerKey{ key := IngressListenerKey{
Port: listener.Port, Port: listener.Port,
Protocol: string(listener.Protocol), Protocol: string(listener.Protocol),
}] = ingressListener }
ingressListeners[key] = ingressListener
} }
upstreams := c.DeepCopy().ConfigSnapshotUpstreams snapshotUpstreams := c.DeepCopy().ConfigSnapshotUpstreams
upstreams.DiscoveryChain = synthesizedChains snapshotUpstreams.DiscoveryChain = synthesizedChains
return configSnapshotIngressGateway{ return configSnapshotIngressGateway{
Upstreams: c.Upstreams, Upstreams: c.Upstreams.toUpstreams(),
ConfigSnapshotUpstreams: upstreams, ConfigSnapshotUpstreams: snapshotUpstreams,
GatewayConfigLoaded: true, GatewayConfigLoaded: true,
Listeners: ingressListeners, Listeners: ingressListeners,
}, nil }, nil
@ -784,7 +833,7 @@ func (c *configSnapshotAPIGateway) synthesizeChains(datacenter string, protocol
} }
if len(chains) == 0 { if len(chains) == 0 {
return nil, nil, errors.New("could not synthesize discovery chain") return nil, nil, nil
} }
return synthesizer.Synthesize(chains...) return synthesizer.Synthesize(chains...)

View File

@ -73,6 +73,7 @@ func TestAPIGatewaySnapshotToIngressGatewaySnapshot(t *testing.T) {
}, },
Listeners: map[IngressListenerKey]structs.IngressListener{}, Listeners: map[IngressListenerKey]structs.IngressListener{},
Defaults: structs.IngressServiceConfig{}, Defaults: structs.IngressServiceConfig{},
Upstreams: map[IngressListenerKey]structs.Upstreams{},
}, },
}, },
} }

View File

@ -64,7 +64,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
switch snap.Kind { switch snap.Kind {
case structs.ServiceKindAPIGateway: case structs.ServiceKindAPIGateway:
if _, ok := snap.APIGateway.UpstreamsSet[uid]; !ok { if !snap.APIGateway.UpstreamsSet.hasUpstream(uid) {
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled. // The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, uid) delete(upstreamsSnapshot.DiscoveryChain, uid)

View File

@ -7,15 +7,21 @@ kind = "api-gateway"
name = "api-gateway" name = "api-gateway"
listeners = [ listeners = [
{ {
name = "listener-one"
port = 9999 port = 9999
protocol = "tcp" protocol = "tcp"
},
{
name = "listener-two"
port = 9998
protocol = "tcp"
} }
] ]
' '
upsert_config_entry primary ' upsert_config_entry primary '
kind = "tcp-route" kind = "tcp-route"
name = "api-gateway-route" name = "api-gateway-route-one"
services = [ services = [
{ {
name = "s1" name = "s1"
@ -23,12 +29,46 @@ services = [
] ]
parents = [ parents = [
{ {
kind = "api-gateway"
name = "api-gateway" name = "api-gateway"
sectionName = "listener-one"
} }
] ]
' '
upsert_config_entry primary '
kind = "tcp-route"
name = "api-gateway-route-two"
services = [
{
name = "s2"
}
]
parents = [
{
name = "api-gateway"
sectionName = "listener-two"
}
]
'
upsert_config_entry primary '
kind = "service-intentions"
name = "s1"
sources {
name = "api-gateway"
action = "allow"
}
'
upsert_config_entry primary '
kind = "service-intentions"
name = "s2"
sources {
name = "api-gateway"
action = "deny"
}
'
register_services primary register_services primary
gen_envoy_bootstrap api-gateway 20000 primary true gen_envoy_bootstrap api-gateway 20000 primary true

View File

@ -12,11 +12,21 @@ load helpers
} }
@test "api gateway should have healthy endpoints for s1" { @test "api gateway should have healthy endpoints for s1" {
assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-one
assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s1 HEALTHY 1 assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s1 HEALTHY 1
} }
@test "api gateway should have healthy endpoints for s2" {
assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-two
assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s2 HEALTHY 1
}
@test "api gateway should be able to connect to s1 via configured port" { @test "api gateway should be able to connect to s1 via configured port" {
run retry_default curl -s -f -d hello localhost:9999 run retry_default curl -s -f -d hello localhost:9999
[ "$status" -eq 0 ] [ "$status" -eq 0 ]
[[ "$output" == *"hello"* ]] [[ "$output" == *"hello"* ]]
}
@test "api gateway should get an intentions error connecting to s2 via configured port" {
run retry_default must_fail_tcp_connection localhost:9998
} }