From eee0fb103594ed9105cfb1ada41ee1d5a8c33c03 Mon Sep 17 00:00:00 2001 From: Freddy Date: Mon, 7 Nov 2022 18:10:42 -0700 Subject: [PATCH] Avoid blocking child type updates on parent ack (#15083) --- .changelog/15083.txt | 3 + agent/xds/delta.go | 154 +++++++++++------- agent/xds/delta_test.go | 116 ++++++------- agent/xds/endpoints.go | 4 - .../consul-container/libs/service/common.go | 2 +- .../consul-container/libs/service/connect.go | 2 +- .../consul-container/libs/service/helpers.go | 8 +- 7 files changed, 166 insertions(+), 123 deletions(-) create mode 100644 .changelog/15083.txt diff --git a/.changelog/15083.txt b/.changelog/15083.txt new file mode 100644 index 000000000..301f0f43b --- /dev/null +++ b/.changelog/15083.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: fixed bug where endpoint updates for new xDS clusters could block for 15s before being sent to Envoy. +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index dd3a13161..4c042eeb9 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -4,7 +4,6 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "sort" "sync" "sync/atomic" "time" @@ -158,8 +157,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // representation of envoy state to force an update. // // see: https://github.com/envoyproxy/envoy/issues/13009 - handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType] - handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType] + handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{ + childType: handlers[xdscommon.RouteType], + childrenNames: make(map[string][]string), + } + handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{ + childType: handlers[xdscommon.EndpointType], + childrenNames: make(map[string][]string), + } var authTimer <-chan time.Time extendAuthTimer := func() { @@ -346,22 +351,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove continue } - var pendingTypes []string - for typeUrl, handler := range handlers { - if !handler.registered { - continue - } - if len(handler.pendingUpdates) > 0 { - pendingTypes = append(pendingTypes, typeUrl) - } - } - if len(pendingTypes) > 0 { - sort.Strings(pendingTypes) - generator.Logger.Trace("Skipping delta computation because there are responses in flight", - "pendingTypeUrls", pendingTypes) - continue - } - generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any") streamStartOnce.Do(func() { @@ -369,7 +358,25 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove }) for _, op := range xDSUpdateOrder { - err, sent := handlers[op.TypeUrl].SendIfNew( + if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType { + if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 { + generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending", + "typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType) + + // Receiving an ACK from Envoy will unblock the select statement above, + // and re-trigger an attempt to send these skipped updates. + break + } + if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 { + generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending", + "typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType) + + // Receiving an ACK from Envoy will unblock the select statement above, + // and re-trigger an attempt to send these skipped updates. + break + } + } + err, _ := handlers[op.TypeUrl].SendIfNew( cfgSnap.Kind, currentVersions[op.TypeUrl], resourceMap, @@ -383,9 +390,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove op.errorLogNameReplyPrefix(), op.TypeUrl, err) } - if sent { - break // wait until we get an ACK to do more - } } } } @@ -435,16 +439,26 @@ func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string { } } +type xDSDeltaChild struct { + // childType is a type that in Envoy is actually stored within this type. + // Upserts of THIS type should potentially trigger dependent named + // resources within the child to be re-configured. + childType *xDSDeltaType + + // childrenNames is map of parent resource names to a list of associated child resource + // names. + childrenNames map[string][]string +} + type xDSDeltaType struct { generator *ResourceGenerator stream ADSDeltaStream typeURL string allowEmptyFn func(kind structs.ServiceKind) bool - // childType is a type that in Envoy is actually stored within this type. - // Upserts of THIS type should potentially trigger dependent named - // resources within the child to be re-configured. - childType *xDSDeltaType + // deltaChild contains data for an xDS child type if there is one. + // For example, endpoints are a child type of clusters. + deltaChild *xDSDeltaChild // registered indicates if this type has been requested at least once by // the proxy @@ -484,9 +498,8 @@ func (t *xDSDeltaType) subscribed(name string) bool { } type PendingUpdate struct { - Remove bool - Version string - ChildResources []string // optional + Remove bool + Version string } func newDeltaType( @@ -610,6 +623,15 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su t.resourceVersions[name] = "" } + // Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent. + // We MUST assume that if Envoy ever had data for the children of this parent, then the child's + // data is gone. + if t.deltaChild != nil && t.deltaChild.childType.registered { + for _, childName := range t.deltaChild.childrenNames[name] { + t.ensureChildResend(name, childName) + } + } + if alreadySubscribed { logger.Trace("re-subscribing resource for stream", "resource", name) } else { @@ -646,27 +668,6 @@ func (t *xDSDeltaType) ack(nonce string) { } t.resourceVersions[name] = obj.Version - if t.childType != nil { - // This branch only matters on UPDATE, since we already have - // mechanisms to clean up orphaned resources. - for _, childName := range obj.ChildResources { - if _, exist := t.childType.resourceVersions[childName]; !exist { - continue - } - if !t.subscribed(childName) { - continue - } - t.generator.Logger.Trace( - "triggering implicit update of resource", - "typeUrl", t.typeURL, - "resource", name, - "childTypeUrl", t.childType.typeURL, - "childResource", childName, - ) - // Basically manifest this as a re-subscribe/re-sync - t.childType.resourceVersions[childName] = "" - } - } } t.sentToEnvoyOnce = true delete(t.pendingUpdates, nonce) @@ -686,6 +687,12 @@ func (t *xDSDeltaType) SendIfNew( if t == nil || !t.registered { return nil, false } + + // Wait for Envoy to catch up with this delta type before sending something new. + if len(t.pendingUpdates) > 0 { + return nil, false + } + logger := t.generator.Logger.With("typeUrl", t.typeURL) allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind) @@ -721,14 +728,23 @@ func (t *xDSDeltaType) SendIfNew( } logger.Trace("sent response", "nonce", resp.Nonce) - if t.childType != nil { - // Capture the relevant child resource names on this pending update so - // we can properly clean up the linked children when this change is - // ACKed. - for name, obj := range updates { + // Certain xDS types are children of other types, meaning that if an update is pushed for a parent, + // we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon + // receiving updates for the parent, so we need to handle this ourselves. + // + // Note that we do not check whether the deltaChild.childType is registered here, since we send + // parent types before child types, meaning that it's expected on first send of a parent that + // there are no subscriptions for the child type. + if t.deltaChild != nil { + for name := range updates { if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok { - obj.ChildResources = children - updates[name] = obj + // Capture the relevant child resource names on this pending update so + // we can know the linked children if Envoy ever re-subscribes to the parent resource. + t.deltaChild.childrenNames[name] = children + + for _, childName := range children { + t.ensureChildResend(name, childName) + } } } } @@ -848,6 +864,28 @@ func (t *xDSDeltaType) createDeltaResponse( return resp, realUpdates, nil } +func (t *xDSDeltaType) ensureChildResend(parentName, childName string) { + if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist { + return + } + if !t.subscribed(childName) { + return + } + + t.generator.Logger.Trace( + "triggering implicit update of resource", + "typeUrl", t.typeURL, + "resource", parentName, + "childTypeUrl", t.deltaChild.childType.typeURL, + "childResource", childName, + ) + + // resourceVersions tracks the last known version for this childName that Envoy + // has ACKed. By setting this to empty it effectively tells us that Envoy does + // not have any data for that child, and we need to re-send. + t.deltaChild.childType.resourceVersions[childName] = "" +} + func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) { out := make(map[string]map[string]string) for typeUrl, resources := range resourceMap.Index { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 15da3bead..7e57e3dba 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -95,9 +95,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -112,7 +109,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the cluster + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -132,13 +132,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) - // If we re-subscribe to something even if there are no changes we get a + // If Envoy re-subscribes to something even if there are no changes we send a // fresh copy. envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ @@ -156,7 +156,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -174,24 +174,24 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB + // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db. snap = newTestSnapshot(t, snap, "") deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") mgr.DeliverConfig(t, sid, snap) - // We never send an EDS reply about this change. + // We never send an EDS reply about this change because Envoy is not subscribed to db anymore. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) { - // Fix the snapshot + // Restore db's deleted endpoints by generating a new snapshot. snap = newTestSnapshot(t, snap, "") mgr.DeliverConfig(t, sid, snap) - // We never send an EDS reply about this change. + // We never send an EDS reply about this change because Envoy is still not subscribed to db. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - // and fix the subscription + // When Envoy re-subscribes to db we send the endpoints for it. envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", @@ -215,9 +215,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Force sends to fail envoy.SetSendErr(errors.New("test error")) - // Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment) - snap = newTestSnapshot(t, snap, "") - mgr.DeliverConfig(t, sid, snap) + // Trigger only an EDS update by deleting endpoints again. + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") // We never send any replies about this change because we died. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -267,7 +266,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { mgr.DeliverConfig(t, sid, snap) }) - testutil.RunStep(t, "first sync", func(t *testing.T) { + testutil.RunStep(t, "simulate Envoy NACKing initial listener", func(t *testing.T) { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), @@ -286,9 +285,6 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -302,7 +298,10 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the clusters + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -323,13 +322,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - // NACKs the listener update due to the bad public listener + // Envoy NACKs the listener update due to the bad public listener envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{}) // Consul should not respond until a new snapshot is delivered + // because the current snapshot is known to be bad. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -411,9 +411,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -427,7 +424,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the clusters + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -447,13 +447,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -480,7 +480,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends routes request @@ -490,9 +490,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }, }) - // ACKs the listener - envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4) - // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: xdscommon.RouteType, @@ -502,6 +499,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { ), }) + // After receiving the routes, Envoy sends acks back for the listener and routes. + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4) envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -580,9 +579,6 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -598,7 +594,14 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the clusters. + // Envoy aims to wait to receive endpoints before ACKing clusters, + // but because it received an update for at least one of the clusters it cares about + // then it will ACK despite not having received an update for all clusters. + // This behavior was observed against Envoy v1.21 and v1.23. + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -618,7 +621,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener @@ -642,7 +645,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // It also (in parallel) issues the endpoint ACK @@ -705,9 +708,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -721,7 +721,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the clusters + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -741,7 +744,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener @@ -768,8 +771,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa ), }) - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4) - // And we re-send the endpoints for the updated cluster after getting the // ACK for the cluster. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -780,9 +781,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // SAME makeTestEndpoints(t, snap, "tcp:geo-cache"), ), }) + + // Envoy then ACK's the clusters and the endpoints. + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4) envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -847,9 +851,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }, }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we @@ -863,7 +864,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - // And no other response yet + // After receiving the endpoints Envoy sends an ACK for the clusters + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request @@ -883,7 +887,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - // And no other response yet + // We are caught up, so there should be nothing queued to send. assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends routes request @@ -893,9 +897,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }, }) - // ACKs the listener - envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) - // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: xdscommon.RouteType, @@ -905,6 +906,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) + // After receiving the routes, Envoy sends acks back for the listener and routes. + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -960,9 +963,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - // ACKs the listener - envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7) - // THE ACTUAL THING WE CARE ABOUT: replaced route config assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: xdscommon.RouteType, @@ -972,6 +972,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) + // After receiving the routes, Envoy sends acks back for the listener and routes. + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7) envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 2a699ed00..d4965e77a 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -650,10 +650,6 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( primaryTargetID := node.Resolver.Target failover := node.Resolver.Failover - type targetLoadAssignmentOption struct { - targetID string - clusterName string - } var targetsClustersData []targetClusterData var numFailoverTargets int diff --git a/test/integration/consul-container/libs/service/common.go b/test/integration/consul-container/libs/service/common.go index bf07e2d8b..ef114f58a 100644 --- a/test/integration/consul-container/libs/service/common.go +++ b/test/integration/consul-container/libs/service/common.go @@ -11,7 +11,7 @@ import ( const ( envoyEnvKey = "ENVOY_VERSION" - envoyLogLevel = "info" + envoyLogLevel = "debug" envoyVersion = "1.23.1" hashicorpDockerProxy = "docker.mirror.hashicorp.services" diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index a930a6b51..cbdf8020c 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -64,7 +64,7 @@ func (c ConnectContainer) Terminate() error { return err } -func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (Service, error) { +func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) { namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name) containerName := utils.RandName(namePrefix) diff --git a/test/integration/consul-container/libs/service/helpers.go b/test/integration/consul-container/libs/service/helpers.go index eac3de13f..92e35079c 100644 --- a/test/integration/consul-container/libs/service/helpers.go +++ b/test/integration/consul-container/libs/service/helpers.go @@ -41,10 +41,12 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi Name: "Connect Sidecar Listening", TCP: fmt.Sprintf("%s:%d", serverConnectProxyIP, 20000), Interval: "10s", + Status: api.HealthPassing, }, &api.AgentServiceCheck{ Name: "Connect Sidecar Aliasing Static Server", AliasService: "static-server", + Status: api.HealthPassing, }, }, Proxy: &api.AgentServiceConnectProxyConfig{ @@ -55,9 +57,10 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi }, }, Check: &api.AgentServiceCheck{ - Name: "Connect Sidecar Listening", + Name: "Static Server Listening", TCP: fmt.Sprintf("%s:%d", serverServiceIP, 8080), Interval: "10s", + Status: api.HealthPassing, }, } @@ -69,7 +72,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi return serverService, serverConnectProxy, nil } -func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (Service, error) { +func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (*ConnectContainer, error) { // Create a service and proxy instance clientConnectProxy, err := NewConnectService(context.Background(), "static-client-sidecar", "static-client", 5000, node) if err != nil { @@ -97,6 +100,7 @@ func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, l Name: "Connect Sidecar Listening", TCP: fmt.Sprintf("%s:%d", clientConnectProxyIP, 20000), Interval: "10s", + Status: api.HealthPassing, }, }, Proxy: &api.AgentServiceConnectProxyConfig{