Avoid blocking child type updates on parent ack (#15083)

This commit is contained in:
Freddy 2022-11-07 18:10:42 -07:00 committed by GitHub
parent 4672d8bd3c
commit eee0fb1035
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 166 additions and 123 deletions

3
.changelog/15083.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: fixed bug where endpoint updates for new xDS clusters could block for 15s before being sent to Envoy.
```

View File

@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
@ -158,8 +157,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType]
handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType]
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.RouteType],
childrenNames: make(map[string][]string),
}
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.EndpointType],
childrenNames: make(map[string][]string),
}
var authTimer <-chan time.Time
extendAuthTimer := func() {
@ -346,22 +351,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
continue
}
var pendingTypes []string
for typeUrl, handler := range handlers {
if !handler.registered {
continue
}
if len(handler.pendingUpdates) > 0 {
pendingTypes = append(pendingTypes, typeUrl)
}
}
if len(pendingTypes) > 0 {
sort.Strings(pendingTypes)
generator.Logger.Trace("Skipping delta computation because there are responses in flight",
"pendingTypeUrls", pendingTypes)
continue
}
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
streamStartOnce.Do(func() {
@ -369,7 +358,25 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
})
for _, op := range xDSUpdateOrder {
err, sent := handlers[op.TypeUrl].SendIfNew(
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)
// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)
// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
}
err, _ := handlers[op.TypeUrl].SendIfNew(
cfgSnap.Kind,
currentVersions[op.TypeUrl],
resourceMap,
@ -383,9 +390,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
op.errorLogNameReplyPrefix(),
op.TypeUrl, err)
}
if sent {
break // wait until we get an ACK to do more
}
}
}
}
@ -435,16 +439,26 @@ func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
}
}
type xDSDeltaChild struct {
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// childrenNames is map of parent resource names to a list of associated child resource
// names.
childrenNames map[string][]string
}
type xDSDeltaType struct {
generator *ResourceGenerator
stream ADSDeltaStream
typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// deltaChild contains data for an xDS child type if there is one.
// For example, endpoints are a child type of clusters.
deltaChild *xDSDeltaChild
// registered indicates if this type has been requested at least once by
// the proxy
@ -486,7 +500,6 @@ func (t *xDSDeltaType) subscribed(name string) bool {
type PendingUpdate struct {
Remove bool
Version string
ChildResources []string // optional
}
func newDeltaType(
@ -610,6 +623,15 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
t.resourceVersions[name] = ""
}
// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
// data is gone.
if t.deltaChild != nil && t.deltaChild.childType.registered {
for _, childName := range t.deltaChild.childrenNames[name] {
t.ensureChildResend(name, childName)
}
}
if alreadySubscribed {
logger.Trace("re-subscribing resource for stream", "resource", name)
} else {
@ -646,27 +668,6 @@ func (t *xDSDeltaType) ack(nonce string) {
}
t.resourceVersions[name] = obj.Version
if t.childType != nil {
// This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; !exist {
continue
}
if !t.subscribed(childName) {
continue
}
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", name,
"childTypeUrl", t.childType.typeURL,
"childResource", childName,
)
// Basically manifest this as a re-subscribe/re-sync
t.childType.resourceVersions[childName] = ""
}
}
}
t.sentToEnvoyOnce = true
delete(t.pendingUpdates, nonce)
@ -686,6 +687,12 @@ func (t *xDSDeltaType) SendIfNew(
if t == nil || !t.registered {
return nil, false
}
// Wait for Envoy to catch up with this delta type before sending something new.
if len(t.pendingUpdates) > 0 {
return nil, false
}
logger := t.generator.Logger.With("typeUrl", t.typeURL)
allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
@ -721,14 +728,23 @@ func (t *xDSDeltaType) SendIfNew(
}
logger.Trace("sent response", "nonce", resp.Nonce)
if t.childType != nil {
// Capture the relevant child resource names on this pending update so
// we can properly clean up the linked children when this change is
// ACKed.
for name, obj := range updates {
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
// receiving updates for the parent, so we need to handle this ourselves.
//
// Note that we do not check whether the deltaChild.childType is registered here, since we send
// parent types before child types, meaning that it's expected on first send of a parent that
// there are no subscriptions for the child type.
if t.deltaChild != nil {
for name := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
obj.ChildResources = children
updates[name] = obj
// Capture the relevant child resource names on this pending update so
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
t.deltaChild.childrenNames[name] = children
for _, childName := range children {
t.ensureChildResend(name, childName)
}
}
}
}
@ -848,6 +864,28 @@ func (t *xDSDeltaType) createDeltaResponse(
return resp, realUpdates, nil
}
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)
// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
}
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap.Index {

View File

@ -95,9 +95,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -112,7 +109,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the cluster
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -132,13 +132,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If we re-subscribe to something even if there are no changes we get a
// If Envoy re-subscribes to something even if there are no changes we send a
// fresh copy.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
@ -156,7 +156,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -174,24 +174,24 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
// We never send an EDS reply about this change because Envoy is not subscribed to db anymore.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
// Fix the snapshot
// Restore db's deleted endpoints by generating a new snapshot.
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
// We never send an EDS reply about this change because Envoy is still not subscribed to db.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// and fix the subscription
// When Envoy re-subscribes to db we send the endpoints for it.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
@ -215,9 +215,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// Trigger only an EDS update by deleting endpoints again.
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -267,7 +266,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
mgr.DeliverConfig(t, sid, snap)
})
testutil.RunStep(t, "first sync", func(t *testing.T) {
testutil.RunStep(t, "simulate Envoy NACKing initial listener", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
@ -286,9 +285,6 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -302,7 +298,10 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -323,13 +322,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// NACKs the listener update due to the bad public listener
// Envoy NACKs the listener update due to the bad public listener
envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{})
// Consul should not respond until a new snapshot is delivered
// because the current snapshot is known to be bad.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -411,9 +411,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -427,7 +424,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -447,13 +447,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -480,7 +480,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends routes request
@ -490,9 +490,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
},
})
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.RouteType,
@ -502,6 +499,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
),
})
// After receiving the routes, Envoy sends acks back for the listener and routes.
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -580,9 +579,6 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -598,7 +594,14 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the clusters.
// Envoy aims to wait to receive endpoints before ACKing clusters,
// but because it received an update for at least one of the clusters it cares about
// then it will ACK despite not having received an update for all clusters.
// This behavior was observed against Envoy v1.21 and v1.23.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -618,7 +621,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
@ -642,7 +645,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// It also (in parallel) issues the endpoint ACK
@ -705,9 +708,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -721,7 +721,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -741,7 +744,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
@ -768,8 +771,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
),
})
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
// And we re-send the endpoints for the updated cluster after getting the
// ACK for the cluster.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -780,9 +781,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// Envoy then ACK's the clusters and the endpoints.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -847,9 +851,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
@ -863,7 +864,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
// And no other response yet
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
@ -883,7 +887,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
// And no other response yet
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends routes request
@ -893,9 +897,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
},
})
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.RouteType,
@ -905,6 +906,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
// After receiving the routes, Envoy sends acks back for the listener and routes.
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -960,9 +963,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
// THE ACTUAL THING WE CARE ABOUT: replaced route config
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.RouteType,
@ -972,6 +972,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
// After receiving the routes, Envoy sends acks back for the listener and routes.
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

View File

@ -650,10 +650,6 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
primaryTargetID := node.Resolver.Target
failover := node.Resolver.Failover
type targetLoadAssignmentOption struct {
targetID string
clusterName string
}
var targetsClustersData []targetClusterData
var numFailoverTargets int

View File

@ -11,7 +11,7 @@ import (
const (
envoyEnvKey = "ENVOY_VERSION"
envoyLogLevel = "info"
envoyLogLevel = "debug"
envoyVersion = "1.23.1"
hashicorpDockerProxy = "docker.mirror.hashicorp.services"

View File

@ -64,7 +64,7 @@ func (c ConnectContainer) Terminate() error {
return err
}
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (Service, error) {
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) {
namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name)
containerName := utils.RandName(namePrefix)

View File

@ -41,10 +41,12 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
Name: "Connect Sidecar Listening",
TCP: fmt.Sprintf("%s:%d", serverConnectProxyIP, 20000),
Interval: "10s",
Status: api.HealthPassing,
},
&api.AgentServiceCheck{
Name: "Connect Sidecar Aliasing Static Server",
AliasService: "static-server",
Status: api.HealthPassing,
},
},
Proxy: &api.AgentServiceConnectProxyConfig{
@ -55,9 +57,10 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
},
},
Check: &api.AgentServiceCheck{
Name: "Connect Sidecar Listening",
Name: "Static Server Listening",
TCP: fmt.Sprintf("%s:%d", serverServiceIP, 8080),
Interval: "10s",
Status: api.HealthPassing,
},
}
@ -69,7 +72,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
return serverService, serverConnectProxy, nil
}
func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (Service, error) {
func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (*ConnectContainer, error) {
// Create a service and proxy instance
clientConnectProxy, err := NewConnectService(context.Background(), "static-client-sidecar", "static-client", 5000, node)
if err != nil {
@ -97,6 +100,7 @@ func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, l
Name: "Connect Sidecar Listening",
TCP: fmt.Sprintf("%s:%d", clientConnectProxyIP, 20000),
Interval: "10s",
Status: api.HealthPassing,
},
},
Proxy: &api.AgentServiceConnectProxyConfig{