From 0cd0d505fa12778e97ee1b10ecfd5a86b7acc1ec Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 8 Feb 2022 10:36:48 -0600 Subject: [PATCH] xds: allow only one outstanding delta request at a time (#12236) Fixes #11876 This enforces that multiple xDS mutations are not issued on the same ADS connection at once, so that we can 100% control the order that they are applied. The original code made assumptions about the way multiple in-flight mutations were applied on the Envoy side that was incorrect. --- .changelog/12236.txt | 3 +++ agent/xds/delta.go | 6 +----- agent/xds/delta_test.go | 28 ++++++++++++++-------------- 3 files changed, 18 insertions(+), 19 deletions(-) create mode 100644 .changelog/12236.txt diff --git a/.changelog/12236.txt b/.changelog/12236.txt new file mode 100644 index 000000000..1c59a4177 --- /dev/null +++ b/.changelog/12236.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: allow only one outstanding delta request at a time +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index b0a4ccafc..d952aa3e2 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -289,11 +289,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any") - sentType := make(map[string]struct{}) // use this to only do one kind of mutation per type per execution for _, op := range xDSUpdateOrder { - if _, sent := sentType[op.TypeUrl]; sent { - continue - } err, sent := handlers[op.TypeUrl].SendIfNew( cfgSnap.Kind, currentVersions[op.TypeUrl], @@ -309,7 +305,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove op.TypeUrl, err) } if sent { - sentType[op.TypeUrl] = struct{}{} + break // wait until we get an ACK to do more } } } diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index eca97e36e..6fa119002 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -895,20 +895,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - // the listener is updated - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, - Nonce: hexString(6), - Resources: makeTestResources(t, - makeTestListener(t, snap, "http:db:rds"), - ), - }) - envoy.SendDeltaReqACK(t, ClusterType, 5) - // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 6) - // The behaviors of Cluster updates triggering re-sends of Endpoint updates // tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints // triggers here. It is not explicitly under test, but we have to get past @@ -916,13 +904,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: EndpointType, - Nonce: hexString(7), + Nonce: hexString(6), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "http:db"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 7) + envoy.SendDeltaReqACK(t, EndpointType, 6) + + // the listener is updated + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(7), + Resources: makeTestResources(t, + makeTestListener(t, snap, "http:db:rds"), + ), + }) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 7) // THE ACTUAL THING WE CARE ABOUT: replaced route config assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{