From 1645b6aafe62c3d6ac4e06ac2654ac305ac825db Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 15 Jun 2021 15:21:07 -0500 Subject: [PATCH] xds: adding more delta protocol tests (#10398) Fixes #10125 --- agent/xds/delta_test.go | 352 +++++++++++++++---------- agent/xds/testing.go | 58 +++- agent/xds/xds_protocol_helpers_test.go | 11 + 3 files changed, 283 insertions(+), 138 deletions(-) diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 638e2ff43..288ea1a47 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -1,6 +1,7 @@ package xds import ( + "errors" "sync/atomic" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" + rpcstatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -17,16 +19,6 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -/* TODO: Test scenarios - -- initial resource versions -- removing resources -- nack -- unsubscribe -- error during handling causing retry - -*/ - // NOTE: For these tests, prefer not using xDS protobuf "factory" methods if // possible to avoid using them to test themselves. // @@ -45,132 +37,228 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Register the proxy to create state needed to Watch() on mgr.RegisterProxy(t, sid) - snap := newTestSnapshot(t, nil, "") + var snap *proxycfg.ConfigSnapshot - // Send initial cluster discover. We'll assume we are testing a partial - // reconnect and include some initial resource versions that will be - // cleaned up. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - InitialResourceVersions: mustMakeVersionMap(t, - makeTestCluster(t, snap, "tcp:geo-cache"), - ), + runStep(t, "initial setup", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. We'll assume we are testing a partial + // reconnect and include some initial resource versions that will be + // cleaned up. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) }) - // Check no response sent yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + runStep(t, "first sync", func(t *testing.T) { + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) - requireProtocolVersionGauge(t, scenario, "v3", 1) + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + // We'll assume we are testing a partial "reconnect" + InitialResourceVersions: mustMakeVersionMap(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + // + // Include "fake-endpoints" here to test subscribing to an unknown + // thing and have consul tell us there's no data for it. + "fake-endpoints", + }, + }) - // Deliver a new snapshot (tcp with one tcp upstream) - mgr.DeliverConfig(t, sid, snap) + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, ClusterType, 1) - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, - Nonce: hexString(1), - Resources: makeTestResources(t, - makeTestCluster(t, snap, "tcp:local_app"), - makeTestCluster(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), - ), + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 2) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // cleanup unused resources now that we've created/updated relevant things + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(4), + RemovedResources: []string{ + "fake-endpoints", // correcting the errant subscription + }, + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 3) + + // ACK the endpoint removal + envoy.SendDeltaReqACK(t, EndpointType, 4) + + // If we re-subscribe to something even if there are no changes we get a + // fresh copy. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + envoy.SendDeltaReqACK(t, EndpointType, 5) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - // We'll assume we are testing a partial "reconnect" - InitialResourceVersions: mustMakeVersionMap(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - ResourceNamesSubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - // - // Include "fake-endpoints" here to test subscribing to an unknown - // thing and have consul tell us there's no data for it. - "fake-endpoints", - }, + deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, svc, targetID string) { + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID] = + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID][0:1] + } + + runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesUnsubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB + snap = newTestSnapshot(t, snap, "") + deleteAllButOneEndpoint(snap, "db", "db.default.dc1") + mgr.DeliverConfig(t, sid, snap) + + // We never send an EDS reply about this change. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + runStep(t, "restore endpoint subscription", func(t *testing.T) { + // Fix the snapshot + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) - // We should get a response immediately since the config is already present in - // the server for endpoints. Note that this should not be racy if the server - // is behaving well since the Cluster send above should be blocked until we - // deliver a new config version. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(2), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), + // and fix the subscription + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(6), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + ), + }) + + envoy.SendDeltaReqACK(t, EndpointType, 6) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) { + // Trigger only an EDS update. + snap = newTestSnapshot(t, snap, "") + deleteAllButOneEndpoint(snap, "db", "db.default.dc1") + mgr.DeliverConfig(t, sid, snap) - // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + // Send envoy an EDS update. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(7), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db[0]"), + ), + }) - // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + envoy.SendDeltaReqNACK(t, EndpointType, 7, &rpcstatus.Status{}) - // And should get a response immediately. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, - Nonce: hexString(3), - Resources: makeTestResources(t, - makeTestListener(t, snap, "tcp:public_listener"), - makeTestListener(t, snap, "tcp:db"), - makeTestListener(t, snap, "tcp:geo-cache"), - ), + // Send it again. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(8), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db[0]"), + ), + }) + + envoy.SendDeltaReqACK(t, EndpointType, 8) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - // cleanup unused resources now that we've created/updated relevant things - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(4), - RemovedResources: []string{ - "fake-endpoints", // correcting the errant subscription - }, + // NOTE: this has to be the last subtest since it kills the stream + runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { + // Force sends to fail + envoy.SetSendErr(errors.New("test error")) + + // Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment) + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) }) - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) - - // ACK the endpoint removal - envoy.SendDeltaReqACK(t, EndpointType, 4, true, nil) - - // If we re-subscribe to something even if there are no changes we get a - // fresh copy. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesSubscribe: []string{ - "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - }, - }) - - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(5), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - }) - - envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil) - - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // TODO(rb): test NACK - envoy.Close() select { case err := <-errCh: - require.NoError(t, err) + // Envoy died. + expect := status.Errorf(codes.Unavailable, + "failed to send upsert reply for type %q: test error", + EndpointType) + require.EqualError(t, err, expect.Error()) case <-time.After(50 * time.Millisecond): t.Fatalf("timed out waiting for handler to finish") } @@ -223,7 +311,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server @@ -245,7 +333,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { envoy.SendDeltaReq(t, ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + envoy.SendDeltaReqACK(t, EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -262,7 +350,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) + envoy.SendDeltaReqACK(t, ListenerType, 3) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -302,7 +390,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 4, true, nil) + envoy.SendDeltaReqACK(t, ListenerType, 4) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -313,7 +401,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { ), }) - envoy.SendDeltaReqACK(t, RouteType, 5, true, nil) + envoy.SendDeltaReqACK(t, RouteType, 5) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -374,7 +462,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server @@ -396,7 +484,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa envoy.SendDeltaReq(t, ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + envoy.SendDeltaReqACK(t, EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -413,7 +501,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) + envoy.SendDeltaReqACK(t, ListenerType, 3) }) runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) { @@ -436,7 +524,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa ), }) - envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 4) // And we re-send the endpoints for the updated cluster after getting the // ACK for the cluster. @@ -448,7 +536,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // SAME makeTestEndpoints(t, snap, "tcp:geo-cache"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil) + envoy.SendDeltaReqACK(t, EndpointType, 5) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -516,7 +604,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server @@ -538,7 +626,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan envoy.SendDeltaReq(t, ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + envoy.SendDeltaReqACK(t, EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -562,7 +650,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) + envoy.SendDeltaReqACK(t, ListenerType, 3) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -573,7 +661,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - envoy.SendDeltaReqACK(t, RouteType, 4, true, nil) + envoy.SendDeltaReqACK(t, RouteType, 4) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -611,10 +699,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 5) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil) + envoy.SendDeltaReqACK(t, ListenerType, 6) // The behaviors of Cluster updates triggering re-sends of Endpoint updates // tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints @@ -629,7 +717,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil) + envoy.SendDeltaReqACK(t, EndpointType, 7) // THE ACTUAL THING WE CARE ABOUT: replaced route config assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ @@ -640,7 +728,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan ), }) - envoy.SendDeltaReqACK(t, RouteType, 8, true, nil) + envoy.SendDeltaReqACK(t, RouteType, 8) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -1010,7 +1098,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACK: clusters - envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + envoy.SendDeltaReqACK(t, ClusterType, 1) // REQ: listeners envoy.SendDeltaReq(t, ListenerType, nil) diff --git a/agent/xds/testing.go b/agent/xds/testing.go index c9c770e19..1ec7b8f65 100644 --- a/agent/xds/testing.go +++ b/agent/xds/testing.go @@ -30,6 +30,9 @@ type TestADSDeltaStream struct { stubGrpcServerStream sendCh chan *envoy_discovery_v3.DeltaDiscoveryResponse recvCh chan *envoy_discovery_v3.DeltaDiscoveryRequest + + mu sync.Mutex + sendErr error } var _ ADSDeltaStream = (*TestADSDeltaStream)(nil) @@ -45,10 +48,24 @@ func NewTestADSDeltaStream(t testing.T, ctx context.Context) *TestADSDeltaStream // Send implements ADSDeltaStream func (s *TestADSDeltaStream) Send(r *envoy_discovery_v3.DeltaDiscoveryResponse) error { + s.mu.Lock() + err := s.sendErr + s.mu.Unlock() + + if err != nil { + return err + } + s.sendCh <- r return nil } +func (s *TestADSDeltaStream) SetSendErr(err error) { + s.mu.Lock() + s.sendErr = err + s.mu.Unlock() +} + // Recv implements ADSDeltaStream func (s *TestADSDeltaStream) Recv() (*envoy_discovery_v3.DeltaDiscoveryRequest, error) { r := <-s.recvCh @@ -65,6 +82,9 @@ type TestADSStream struct { stubGrpcServerStream sendCh chan *envoy_api_v2.DiscoveryResponse recvCh chan *envoy_api_v2.DiscoveryRequest + + mu sync.Mutex + sendErr error } // NewTestADSStream makes a new TestADSStream @@ -79,10 +99,24 @@ func NewTestADSStream(t testing.T, ctx context.Context) *TestADSStream { // Send implements ADSStream func (s *TestADSStream) Send(r *envoy_api_v2.DiscoveryResponse) error { + s.mu.Lock() + err := s.sendErr + s.mu.Unlock() + + if err != nil { + return err + } + s.sendCh <- r return nil } +func (s *TestADSStream) SetSendErr(err error) { + s.mu.Lock() + s.sendErr = err + s.mu.Unlock() +} + // Recv implements ADSStream func (s *TestADSStream) Recv() (*envoy_api_v2.DiscoveryRequest, error) { r := <-s.recvCh @@ -197,6 +231,11 @@ func (e *TestEnvoy) SendReq(t testing.T, typeURL string, version, nonce uint64) } } +func (e *TestEnvoy) SetSendErr(err error) { + e.stream.SetSendErr(err) + e.deltaStream.SetSendErr(err) +} + // SendDeltaReq sends a delta request from the test server. // // NOTE: the input request is mutated before sending by injecting the node. @@ -207,19 +246,26 @@ func (e *TestEnvoy) SendDeltaReq( ) { e.sendDeltaReq(t, typeURL, nil, req) } + func (e *TestEnvoy) SendDeltaReqACK( t testing.T, typeURL string, nonce uint64, - ack bool, +) { + e.sendDeltaReq(t, typeURL, &nonce, nil) +} + +func (e *TestEnvoy) SendDeltaReqNACK( + t testing.T, + typeURL string, + nonce uint64, errorDetail *status.Status, ) { - req := &envoy_discovery_v3.DeltaDiscoveryRequest{} - if !ack { - req.ErrorDetail = errorDetail - } - e.sendDeltaReq(t, typeURL, &nonce, req) + e.sendDeltaReq(t, typeURL, &nonce, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ErrorDetail: errorDetail, + }) } + func (e *TestEnvoy) sendDeltaReq( t testing.T, typeURL string, diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index b518359a9..3837c930b 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -481,6 +481,17 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str }, }, } + case "tcp:db[0]": + return &envoy_endpoint_v3.ClusterLoadAssignment{ + ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{ + { + LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{ + xdsNewEndpointWithHealth("10.10.1.1", 8080, envoy_core_v3.HealthStatus_HEALTHY, 1), + }, + }, + }, + } case "http2:db", "http:db": return &envoy_endpoint_v3.ClusterLoadAssignment{ ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",