xds: allow only one outstanding delta request at a time (#12236)
Fixes #11876 This enforces that multiple xDS mutations are not issued on the same ADS connection at once, so that we can 100% control the order that they are applied. The original code made assumptions about the way multiple in-flight mutations were applied on the Envoy side that was incorrect.
This commit is contained in:
parent
ed719d58cf
commit
0cd0d505fa
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
xds: allow only one outstanding delta request at a time
|
||||||
|
```
|
|
@ -289,11 +289,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
||||||
|
|
||||||
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
||||||
|
|
||||||
sentType := make(map[string]struct{}) // use this to only do one kind of mutation per type per execution
|
|
||||||
for _, op := range xDSUpdateOrder {
|
for _, op := range xDSUpdateOrder {
|
||||||
if _, sent := sentType[op.TypeUrl]; sent {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err, sent := handlers[op.TypeUrl].SendIfNew(
|
err, sent := handlers[op.TypeUrl].SendIfNew(
|
||||||
cfgSnap.Kind,
|
cfgSnap.Kind,
|
||||||
currentVersions[op.TypeUrl],
|
currentVersions[op.TypeUrl],
|
||||||
|
@ -309,7 +305,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
||||||
op.TypeUrl, err)
|
op.TypeUrl, err)
|
||||||
}
|
}
|
||||||
if sent {
|
if sent {
|
||||||
sentType[op.TypeUrl] = struct{}{}
|
break // wait until we get an ACK to do more
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -895,20 +895,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
|
|
||||||
// the listener is updated
|
|
||||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
|
||||||
TypeUrl: ListenerType,
|
|
||||||
Nonce: hexString(6),
|
|
||||||
Resources: makeTestResources(t,
|
|
||||||
makeTestListener(t, snap, "http:db:rds"),
|
|
||||||
),
|
|
||||||
})
|
|
||||||
|
|
||||||
envoy.SendDeltaReqACK(t, ClusterType, 5)
|
envoy.SendDeltaReqACK(t, ClusterType, 5)
|
||||||
|
|
||||||
// ACKs the listener
|
|
||||||
envoy.SendDeltaReqACK(t, ListenerType, 6)
|
|
||||||
|
|
||||||
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
|
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
|
||||||
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
|
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
|
||||||
// triggers here. It is not explicitly under test, but we have to get past
|
// triggers here. It is not explicitly under test, but we have to get past
|
||||||
|
@ -916,13 +904,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
||||||
|
|
||||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
TypeUrl: EndpointType,
|
TypeUrl: EndpointType,
|
||||||
Nonce: hexString(7),
|
Nonce: hexString(6),
|
||||||
Resources: makeTestResources(t,
|
Resources: makeTestResources(t,
|
||||||
makeTestEndpoints(t, snap, "http:db"),
|
makeTestEndpoints(t, snap, "http:db"),
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
|
|
||||||
envoy.SendDeltaReqACK(t, EndpointType, 7)
|
envoy.SendDeltaReqACK(t, EndpointType, 6)
|
||||||
|
|
||||||
|
// the listener is updated
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(7),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestListener(t, snap, "http:db:rds"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// ACKs the listener
|
||||||
|
envoy.SendDeltaReqACK(t, ListenerType, 7)
|
||||||
|
|
||||||
// THE ACTUAL THING WE CARE ABOUT: replaced route config
|
// THE ACTUAL THING WE CARE ABOUT: replaced route config
|
||||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
|
Loading…
Reference in New Issue