diff --git a/.changelog/12195.txt b/.changelog/12195.txt new file mode 100644 index 000000000..9defaf1b3 --- /dev/null +++ b/.changelog/12195.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: prevents tight loop where the Consul client agent would repeatedly re-send config that Envoy has rejected. +``` \ No newline at end of file diff --git a/agent/xds/delta.go b/agent/xds/delta.go index d952aa3e2..49360b8e3 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -26,6 +26,15 @@ import ( "github.com/hashicorp/consul/logging" ) +type deltaRecvResponse int + +const ( + deltaRecvResponseNack deltaRecvResponse = iota + deltaRecvResponseAck + deltaRecvNewSubscription + deltaRecvUnknownType +) + // ADSDeltaStream is a shorter way of referring to this thing... type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer @@ -175,8 +184,17 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } if handler, ok := handlers[req.TypeUrl]; ok { - if handler.Recv(req, generator.ProxyFeatures) { + switch handler.Recv(req, generator.ProxyFeatures) { + case deltaRecvNewSubscription: generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) + + case deltaRecvResponseNack: + generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl) + + // There is no reason to believe that generating new xDS resources from the same snapshot + // would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait + // for a new request or snapshot. + continue } } @@ -430,9 +448,9 @@ func newDeltaType( // Recv handles new discovery requests from envoy. // // Returns true the first time a type receives a request. -func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool { +func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) deltaRecvResponse { if t == nil { - return false // not something we care about + return deltaRecvUnknownType // not something we care about } logger := t.generator.Logger.With("typeUrl", t.typeURL) @@ -487,6 +505,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce, "error", status.ErrorProto(req.ErrorDetail)) t.nack(req.ResponseNonce) + return deltaRecvResponseNack } } @@ -557,7 +576,10 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su } } - return registeredThisTime + if registeredThisTime { + return deltaRecvNewSubscription + } + return deltaRecvResponseAck } func (t *xDSDeltaType) ack(nonce string) { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 6fa119002..fc0f05dca 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -203,37 +203,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) { - // Trigger only an EDS update. - snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") - mgr.DeliverConfig(t, sid, snap) - - // Send envoy an EDS update. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(6), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db[0]"), - ), - }) - - envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{}) - - // Send it again. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(7), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db[0]"), - ), - }) - - envoy.SendDeltaReqACK(t, EndpointType, 7) - - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - // NOTE: this has to be the last subtest since it kills the stream runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { // Force sends to fail @@ -250,11 +219,138 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { envoy.Close() select { case err := <-errCh: - // Envoy died. - expect := status.Errorf(codes.Unavailable, - "failed to send upsert reply for type %q: test error", - EndpointType) - require.EqualError(t, err, expect.Error()) + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + +func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + + runStep(t, "initial setup", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Plug in a bad port for the public listener + snap.Port = 1 + + // Send initial cluster discover. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + }) + + runStep(t, "first sync", func(t *testing.T) { + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, ClusterType, 1) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 2) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + // Response contains public_listener with port that Envoy can't bind to + makeTestListener(t, snap, "tcp:bad_public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // NACKs the listener update due to the bad public listener + envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{}) + + // Consul should not respond until a new snapshot is delivered + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + runStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) { + // Correct the port and deliver a new snapshot + snap.Port = 9999 + mgr.DeliverConfig(t, sid, snap) + + // And should send a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(4), + Resources: makeTestResources(t, + // Send a public listener that Envoy will accept + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // New listener is acked now + envoy.SendDeltaReqACK(t, EndpointType, 4) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) case <-time.After(50 * time.Millisecond): t.Fatalf("timed out waiting for handler to finish") } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index c0be25680..54084fddf 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -533,6 +533,30 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_listener_v3.Listener { switch fixtureName { + case "tcp:bad_public_listener": + return &envoy_listener_v3.Listener{ + // Envoy can't bind to port 1 + Name: "public_listener:0.0.0.0:1", + Address: makeAddress("0.0.0.0", 1), + TrafficDirection: envoy_core_v3.TrafficDirection_INBOUND, + FilterChains: []*envoy_listener_v3.FilterChain{ + { + TransportSocket: xdsNewPublicTransportSocket(t, snap), + Filters: []*envoy_listener_v3.Filter{ + xdsNewFilter(t, "envoy.filters.network.rbac", &envoy_network_rbac_v3.RBAC{ + Rules: &envoy_rbac_v3.RBAC{}, + StatPrefix: "connect_authz", + }), + xdsNewFilter(t, "envoy.filters.network.tcp_proxy", &envoy_tcp_proxy_v3.TcpProxy{ + ClusterSpecifier: &envoy_tcp_proxy_v3.TcpProxy_Cluster{ + Cluster: "local_app", + }, + StatPrefix: "public_listener", + }), + }, + }, + }, + } case "tcp:public_listener": return &envoy_listener_v3.Listener{ Name: "public_listener:0.0.0.0:9999",