diff --git a/agent/proxycfg/api_gateway.go b/agent/proxycfg/api_gateway.go index 79e9d4995..28a6c4230 100644 --- a/agent/proxycfg/api_gateway.go +++ b/agent/proxycfg/api_gateway.go @@ -62,6 +62,9 @@ func (h *handlerAPIGateway) initialize(ctx context.Context) (ConfigSnapshot, err snap.APIGateway.TCPRoutes = watch.NewMap[structs.ResourceReference, *structs.TCPRouteConfigEntry]() snap.APIGateway.Certificates = watch.NewMap[structs.ResourceReference, *structs.InlineCertificateConfigEntry]() + snap.APIGateway.Upstreams = make(listenerRouteUpstreams) + snap.APIGateway.UpstreamsSet = make(routeUpstreamSet) + // These need to be initialized here but are set by handlerUpstreams snap.APIGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) snap.APIGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]() @@ -192,6 +195,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd // Unsubscribe from any config entries that are no longer attached snap.APIGateway.HTTPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { if _, ok := seenRefs[ref]; !ok { + snap.APIGateway.Upstreams.delete(ref) + snap.APIGateway.UpstreamsSet.delete(ref) snap.APIGateway.HTTPRoutes.CancelWatch(ref) } return true @@ -199,6 +204,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd snap.APIGateway.TCPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { if _, ok := seenRefs[ref]; !ok { + snap.APIGateway.Upstreams.delete(ref) + snap.APIGateway.UpstreamsSet.delete(ref) snap.APIGateway.TCPRoutes.CancelWatch(ref) } return true @@ -270,7 +277,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat EnterpriseMeta: *resp.Entry.GetEnterpriseMeta(), } - seenUpstreamIDs := make(map[UpstreamID]struct{}) + seenUpstreamIDs := make(upstreamIDSet) upstreams := make(map[APIGatewayListenerKey]structs.Upstreams) switch route := resp.Entry.(type) { @@ -331,7 +338,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat for _, service := range route.Services { upstreamID := NewUpstreamIDFromServiceName(service.ServiceName()) - seenUpstreamIDs[upstreamID] = struct{}{} + seenUpstreamIDs.add(upstreamID) // For each listener, check if this route should bind and, if so, create an upstream. for _, listener := range snap.APIGateway.Listeners { @@ -351,7 +358,6 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat DestinationNamespace: service.NamespaceOrDefault(), DestinationPartition: service.PartitionOrDefault(), LocalBindPort: listener.Port, - //IngressHosts: g.Hosts, // Pass the protocol that was configured on the ingress listener in order // to force that protocol on the Envoy listener. Config: map[string]interface{}{ @@ -380,14 +386,16 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat return fmt.Errorf("invalid type for config entry: %T", resp.Entry) } - snap.APIGateway.Upstreams = upstreams - snap.APIGateway.UpstreamsSet = seenUpstreamIDs + for listener, set := range upstreams { + snap.APIGateway.Upstreams.set(ref, listener, set) + } + snap.APIGateway.UpstreamsSet.set(ref, seenUpstreamIDs) //snap.APIGateway.Hosts = TODO snap.APIGateway.AreHostsSet = true // Stop watching any upstreams and discovery chains that have become irrelevant for upstreamID, cancelDiscoChain := range snap.APIGateway.WatchedDiscoveryChains { - if _, ok := seenUpstreamIDs[upstreamID]; ok { + if snap.APIGateway.UpstreamsSet.hasUpstream(upstreamID) { continue } diff --git a/agent/proxycfg/proxycfg.deepcopy.go b/agent/proxycfg/proxycfg.deepcopy.go index 3f4804a1c..4c0318f67 100644 --- a/agent/proxycfg/proxycfg.deepcopy.go +++ b/agent/proxycfg/proxycfg.deepcopy.go @@ -260,26 +260,40 @@ func (o *configSnapshotAPIGateway) DeepCopy() *configSnapshotAPIGateway { copy(cp.Hosts, o.Hosts) } if o.Upstreams != nil { - cp.Upstreams = make(map[IngressListenerKey]structs.Upstreams, len(o.Upstreams)) + cp.Upstreams = make(map[structs.ResourceReference]listenerUpstreamMap, len(o.Upstreams)) for k2, v2 := range o.Upstreams { - var cp_Upstreams_v2 structs.Upstreams + var cp_Upstreams_v2 listenerUpstreamMap if v2 != nil { - cp_Upstreams_v2 = make([]structs.Upstream, len(v2)) - copy(cp_Upstreams_v2, v2) - for i3 := range v2 { - { - retV := v2[i3].DeepCopy() - cp_Upstreams_v2[i3] = *retV + cp_Upstreams_v2 = make(map[IngressListenerKey]structs.Upstreams, len(v2)) + for k3, v3 := range v2 { + var cp_Upstreams_v2_v3 structs.Upstreams + if v3 != nil { + cp_Upstreams_v2_v3 = make([]structs.Upstream, len(v3)) + copy(cp_Upstreams_v2_v3, v3) + for i4 := range v3 { + { + retV := v3[i4].DeepCopy() + cp_Upstreams_v2_v3[i4] = *retV + } + } } + cp_Upstreams_v2[k3] = cp_Upstreams_v2_v3 } } cp.Upstreams[k2] = cp_Upstreams_v2 } } if o.UpstreamsSet != nil { - cp.UpstreamsSet = make(map[UpstreamID]struct{}, len(o.UpstreamsSet)) + cp.UpstreamsSet = make(map[structs.ResourceReference]upstreamIDSet, len(o.UpstreamsSet)) for k2, v2 := range o.UpstreamsSet { - cp.UpstreamsSet[k2] = v2 + var cp_UpstreamsSet_v2 upstreamIDSet + if v2 != nil { + cp_UpstreamsSet_v2 = make(map[UpstreamID]struct{}, len(v2)) + for k3, v3 := range v2 { + cp_UpstreamsSet_v2[k3] = v3 + } + } + cp.UpstreamsSet[k2] = cp_UpstreamsSet_v2 } } cp.HTTPRoutes = o.HTTPRoutes.DeepCopy() diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 6e909f17d..899776386 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -2,7 +2,6 @@ package proxycfg import ( "context" - "errors" "fmt" "sort" "strings" @@ -641,6 +640,55 @@ func (c *configSnapshotMeshGateway) isEmptyPeering() bool { !c.PeeringTrustBundlesSet } +type upstreamIDSet map[UpstreamID]struct{} + +func (u upstreamIDSet) add(uid UpstreamID) { + u[uid] = struct{}{} +} + +type routeUpstreamSet map[structs.ResourceReference]upstreamIDSet + +func (r routeUpstreamSet) hasUpstream(uid UpstreamID) bool { + for _, set := range r { + if _, ok := set[uid]; ok { + return true + } + } + return false +} + +func (r routeUpstreamSet) set(route structs.ResourceReference, set upstreamIDSet) { + r[route] = set +} + +func (r routeUpstreamSet) delete(route structs.ResourceReference) { + delete(r, route) +} + +type listenerUpstreamMap map[APIGatewayListenerKey]structs.Upstreams +type listenerRouteUpstreams map[structs.ResourceReference]listenerUpstreamMap + +func (l listenerRouteUpstreams) set(route structs.ResourceReference, listener APIGatewayListenerKey, upstreams structs.Upstreams) { + if _, ok := l[route]; !ok { + l[route] = make(listenerUpstreamMap) + } + l[route][listener] = upstreams +} + +func (l listenerRouteUpstreams) delete(route structs.ResourceReference) { + delete(l, route) +} + +func (l listenerRouteUpstreams) toUpstreams() map[IngressListenerKey]structs.Upstreams { + listeners := make(map[IngressListenerKey]structs.Upstreams, len(l)) + for _, listenerMap := range l { + for listener, set := range listenerMap { + listeners[listener] = append(listeners[listener], set...) + } + } + return listeners +} + type configSnapshotAPIGateway struct { ConfigSnapshotUpstreams @@ -669,10 +717,10 @@ type configSnapshotAPIGateway struct { // the GatewayServices RPC to retrieve them. // TODO Determine if this is updated "for free" or not. If not, we might need // to do some work to populate it in handlerAPIGateway - Upstreams map[IngressListenerKey]structs.Upstreams + Upstreams listenerRouteUpstreams // UpstreamsSet is the unique set of UpstreamID the gateway routes to. - UpstreamsSet map[UpstreamID]struct{} + UpstreamsSet routeUpstreamSet HTTPRoutes watch.Map[structs.ResourceReference, *structs.HTTPRouteConfigEntry] TCPRoutes watch.Map[structs.ResourceReference, *structs.TCPRouteConfigEntry] @@ -733,17 +781,18 @@ func (c *configSnapshotAPIGateway) ToIngress(datacenter string) (configSnapshotI } ingressListener.TLS = tls - ingressListeners[IngressListenerKey{ + key := IngressListenerKey{ Port: listener.Port, Protocol: string(listener.Protocol), - }] = ingressListener + } + ingressListeners[key] = ingressListener } - upstreams := c.DeepCopy().ConfigSnapshotUpstreams - upstreams.DiscoveryChain = synthesizedChains + snapshotUpstreams := c.DeepCopy().ConfigSnapshotUpstreams + snapshotUpstreams.DiscoveryChain = synthesizedChains return configSnapshotIngressGateway{ - Upstreams: c.Upstreams, - ConfigSnapshotUpstreams: upstreams, + Upstreams: c.Upstreams.toUpstreams(), + ConfigSnapshotUpstreams: snapshotUpstreams, GatewayConfigLoaded: true, Listeners: ingressListeners, }, nil @@ -784,7 +833,7 @@ func (c *configSnapshotAPIGateway) synthesizeChains(datacenter string, protocol } if len(chains) == 0 { - return nil, nil, errors.New("could not synthesize discovery chain") + return nil, nil, nil } return synthesizer.Synthesize(chains...) diff --git a/agent/proxycfg/snapshot_test.go b/agent/proxycfg/snapshot_test.go index dda3e0349..8c4778cc7 100644 --- a/agent/proxycfg/snapshot_test.go +++ b/agent/proxycfg/snapshot_test.go @@ -73,6 +73,7 @@ func TestAPIGatewaySnapshotToIngressGatewaySnapshot(t *testing.T) { }, Listeners: map[IngressListenerKey]structs.IngressListener{}, Defaults: structs.IngressServiceConfig{}, + Upstreams: map[IngressListenerKey]structs.Upstreams{}, }, }, } diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index e2c6f137a..367eeda62 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -64,7 +64,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv switch snap.Kind { case structs.ServiceKindAPIGateway: - if _, ok := snap.APIGateway.UpstreamsSet[uid]; !ok { + if !snap.APIGateway.UpstreamsSet.hasUpstream(uid) { // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // The associated watch was likely cancelled. delete(upstreamsSnapshot.DiscoveryChain, uid) diff --git a/test/integration/connect/envoy/case-api-gateway-tcp-simple/setup.sh b/test/integration/connect/envoy/case-api-gateway-tcp-simple/setup.sh index 5cbb7a283..fd4f474ab 100644 --- a/test/integration/connect/envoy/case-api-gateway-tcp-simple/setup.sh +++ b/test/integration/connect/envoy/case-api-gateway-tcp-simple/setup.sh @@ -7,15 +7,21 @@ kind = "api-gateway" name = "api-gateway" listeners = [ { + name = "listener-one" port = 9999 protocol = "tcp" + }, + { + name = "listener-two" + port = 9998 + protocol = "tcp" } ] ' upsert_config_entry primary ' kind = "tcp-route" -name = "api-gateway-route" +name = "api-gateway-route-one" services = [ { name = "s1" @@ -23,12 +29,46 @@ services = [ ] parents = [ { - kind = "api-gateway" name = "api-gateway" + sectionName = "listener-one" } ] ' +upsert_config_entry primary ' +kind = "tcp-route" +name = "api-gateway-route-two" +services = [ + { + name = "s2" + } +] +parents = [ + { + name = "api-gateway" + sectionName = "listener-two" + } +] +' + +upsert_config_entry primary ' +kind = "service-intentions" +name = "s1" +sources { + name = "api-gateway" + action = "allow" +} +' + +upsert_config_entry primary ' +kind = "service-intentions" +name = "s2" +sources { + name = "api-gateway" + action = "deny" +} +' + register_services primary gen_envoy_bootstrap api-gateway 20000 primary true diff --git a/test/integration/connect/envoy/case-api-gateway-tcp-simple/verify.bats b/test/integration/connect/envoy/case-api-gateway-tcp-simple/verify.bats index 83ff3a6a5..51ed646bd 100644 --- a/test/integration/connect/envoy/case-api-gateway-tcp-simple/verify.bats +++ b/test/integration/connect/envoy/case-api-gateway-tcp-simple/verify.bats @@ -12,11 +12,21 @@ load helpers } @test "api gateway should have healthy endpoints for s1" { + assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-one assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s1 HEALTHY 1 } +@test "api gateway should have healthy endpoints for s2" { + assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-two + assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s2 HEALTHY 1 +} + @test "api gateway should be able to connect to s1 via configured port" { run retry_default curl -s -f -d hello localhost:9999 [ "$status" -eq 0 ] [[ "$output" == *"hello"* ]] +} + +@test "api gateway should get an intentions error connecting to s2 via configured port" { + run retry_default must_fail_tcp_connection localhost:9998 } \ No newline at end of file