Prevent xDS tight loop on cfg errors (#12195)
This commit is contained in:
parent
5f84b8dfba
commit
bb129384b7
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
xds: prevents tight loop where the Consul client agent would repeatedly re-send config that Envoy has rejected.
|
||||||
|
```
|
|
@ -26,6 +26,15 @@ import (
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type deltaRecvResponse int
|
||||||
|
|
||||||
|
const (
|
||||||
|
deltaRecvResponseNack deltaRecvResponse = iota
|
||||||
|
deltaRecvResponseAck
|
||||||
|
deltaRecvNewSubscription
|
||||||
|
deltaRecvUnknownType
|
||||||
|
)
|
||||||
|
|
||||||
// ADSDeltaStream is a shorter way of referring to this thing...
|
// ADSDeltaStream is a shorter way of referring to this thing...
|
||||||
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
|
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
|
||||||
|
|
||||||
|
@ -175,8 +184,17 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler, ok := handlers[req.TypeUrl]; ok {
|
if handler, ok := handlers[req.TypeUrl]; ok {
|
||||||
if handler.Recv(req, generator.ProxyFeatures) {
|
switch handler.Recv(req, generator.ProxyFeatures) {
|
||||||
|
case deltaRecvNewSubscription:
|
||||||
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
|
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
|
||||||
|
|
||||||
|
case deltaRecvResponseNack:
|
||||||
|
generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)
|
||||||
|
|
||||||
|
// There is no reason to believe that generating new xDS resources from the same snapshot
|
||||||
|
// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
|
||||||
|
// for a new request or snapshot.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,9 +448,9 @@ func newDeltaType(
|
||||||
// Recv handles new discovery requests from envoy.
|
// Recv handles new discovery requests from envoy.
|
||||||
//
|
//
|
||||||
// Returns true the first time a type receives a request.
|
// Returns true the first time a type receives a request.
|
||||||
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool {
|
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) deltaRecvResponse {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return false // not something we care about
|
return deltaRecvUnknownType // not something we care about
|
||||||
}
|
}
|
||||||
logger := t.generator.Logger.With("typeUrl", t.typeURL)
|
logger := t.generator.Logger.With("typeUrl", t.typeURL)
|
||||||
|
|
||||||
|
@ -487,6 +505,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
|
||||||
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
|
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
|
||||||
"error", status.ErrorProto(req.ErrorDetail))
|
"error", status.ErrorProto(req.ErrorDetail))
|
||||||
t.nack(req.ResponseNonce)
|
t.nack(req.ResponseNonce)
|
||||||
|
return deltaRecvResponseNack
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,7 +576,10 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return registeredThisTime
|
if registeredThisTime {
|
||||||
|
return deltaRecvNewSubscription
|
||||||
|
}
|
||||||
|
return deltaRecvResponseAck
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *xDSDeltaType) ack(nonce string) {
|
func (t *xDSDeltaType) ack(nonce string) {
|
||||||
|
|
|
@ -203,37 +203,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
||||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) {
|
|
||||||
// Trigger only an EDS update.
|
|
||||||
snap = newTestSnapshot(t, snap, "")
|
|
||||||
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
|
|
||||||
mgr.DeliverConfig(t, sid, snap)
|
|
||||||
|
|
||||||
// Send envoy an EDS update.
|
|
||||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
|
||||||
TypeUrl: EndpointType,
|
|
||||||
Nonce: hexString(6),
|
|
||||||
Resources: makeTestResources(t,
|
|
||||||
makeTestEndpoints(t, snap, "tcp:db[0]"),
|
|
||||||
),
|
|
||||||
})
|
|
||||||
|
|
||||||
envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{})
|
|
||||||
|
|
||||||
// Send it again.
|
|
||||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
|
||||||
TypeUrl: EndpointType,
|
|
||||||
Nonce: hexString(7),
|
|
||||||
Resources: makeTestResources(t,
|
|
||||||
makeTestEndpoints(t, snap, "tcp:db[0]"),
|
|
||||||
),
|
|
||||||
})
|
|
||||||
|
|
||||||
envoy.SendDeltaReqACK(t, EndpointType, 7)
|
|
||||||
|
|
||||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
|
||||||
})
|
|
||||||
|
|
||||||
// NOTE: this has to be the last subtest since it kills the stream
|
// NOTE: this has to be the last subtest since it kills the stream
|
||||||
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
|
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
|
||||||
// Force sends to fail
|
// Force sends to fail
|
||||||
|
@ -250,11 +219,138 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
||||||
envoy.Close()
|
envoy.Close()
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
// Envoy died.
|
require.NoError(t, err)
|
||||||
expect := status.Errorf(codes.Unavailable,
|
case <-time.After(50 * time.Millisecond):
|
||||||
"failed to send upsert reply for type %q: test error",
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
EndpointType)
|
}
|
||||||
require.EqualError(t, err, expect.Error())
|
}
|
||||||
|
|
||||||
|
func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
// Allow all
|
||||||
|
return acl.RootAuthorizer("manage"), nil
|
||||||
|
}
|
||||||
|
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||||
|
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||||
|
|
||||||
|
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, sid)
|
||||||
|
|
||||||
|
var snap *proxycfg.ConfigSnapshot
|
||||||
|
|
||||||
|
runStep(t, "initial setup", func(t *testing.T) {
|
||||||
|
snap = newTestSnapshot(t, nil, "")
|
||||||
|
|
||||||
|
// Plug in a bad port for the public listener
|
||||||
|
snap.Port = 1
|
||||||
|
|
||||||
|
// Send initial cluster discover.
|
||||||
|
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||||
|
|
||||||
|
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "first sync", func(t *testing.T) {
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ClusterType,
|
||||||
|
Nonce: hexString(1),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, snap, "tcp:db"),
|
||||||
|
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Envoy then tries to discover endpoints for those clusters.
|
||||||
|
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||||
|
ResourceNamesSubscribe: []string{
|
||||||
|
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// It also (in parallel) issues the cluster ACK
|
||||||
|
envoy.SendDeltaReqACK(t, ClusterType, 1)
|
||||||
|
|
||||||
|
// We should get a response immediately since the config is already present in
|
||||||
|
// the server for endpoints. Note that this should not be racy if the server
|
||||||
|
// is behaving well since the Cluster send above should be blocked until we
|
||||||
|
// deliver a new config version.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: EndpointType,
|
||||||
|
Nonce: hexString(2),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "tcp:db"),
|
||||||
|
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// Envoy now sends listener request
|
||||||
|
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||||
|
|
||||||
|
// It also (in parallel) issues the endpoint ACK
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 2)
|
||||||
|
|
||||||
|
// And should get a response immediately.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(3),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
// Response contains public_listener with port that Envoy can't bind to
|
||||||
|
makeTestListener(t, snap, "tcp:bad_public_listener"),
|
||||||
|
makeTestListener(t, snap, "tcp:db"),
|
||||||
|
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// NACKs the listener update due to the bad public listener
|
||||||
|
envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{})
|
||||||
|
|
||||||
|
// Consul should not respond until a new snapshot is delivered
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) {
|
||||||
|
// Correct the port and deliver a new snapshot
|
||||||
|
snap.Port = 9999
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
// And should send a response immediately.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(4),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
// Send a public listener that Envoy will accept
|
||||||
|
makeTestListener(t, snap, "tcp:public_listener"),
|
||||||
|
makeTestListener(t, snap, "tcp:db"),
|
||||||
|
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// New listener is acked now
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 4)
|
||||||
|
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.Close()
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
case <-time.After(50 * time.Millisecond):
|
case <-time.After(50 * time.Millisecond):
|
||||||
t.Fatalf("timed out waiting for handler to finish")
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
}
|
}
|
||||||
|
|
|
@ -533,6 +533,30 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
|
||||||
|
|
||||||
func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_listener_v3.Listener {
|
func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_listener_v3.Listener {
|
||||||
switch fixtureName {
|
switch fixtureName {
|
||||||
|
case "tcp:bad_public_listener":
|
||||||
|
return &envoy_listener_v3.Listener{
|
||||||
|
// Envoy can't bind to port 1
|
||||||
|
Name: "public_listener:0.0.0.0:1",
|
||||||
|
Address: makeAddress("0.0.0.0", 1),
|
||||||
|
TrafficDirection: envoy_core_v3.TrafficDirection_INBOUND,
|
||||||
|
FilterChains: []*envoy_listener_v3.FilterChain{
|
||||||
|
{
|
||||||
|
TransportSocket: xdsNewPublicTransportSocket(t, snap),
|
||||||
|
Filters: []*envoy_listener_v3.Filter{
|
||||||
|
xdsNewFilter(t, "envoy.filters.network.rbac", &envoy_network_rbac_v3.RBAC{
|
||||||
|
Rules: &envoy_rbac_v3.RBAC{},
|
||||||
|
StatPrefix: "connect_authz",
|
||||||
|
}),
|
||||||
|
xdsNewFilter(t, "envoy.filters.network.tcp_proxy", &envoy_tcp_proxy_v3.TcpProxy{
|
||||||
|
ClusterSpecifier: &envoy_tcp_proxy_v3.TcpProxy_Cluster{
|
||||||
|
Cluster: "local_app",
|
||||||
|
},
|
||||||
|
StatPrefix: "public_listener",
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
case "tcp:public_listener":
|
case "tcp:public_listener":
|
||||||
return &envoy_listener_v3.Listener{
|
return &envoy_listener_v3.Listener{
|
||||||
Name: "public_listener:0.0.0.0:9999",
|
Name: "public_listener:0.0.0.0:9999",
|
||||||
|
|
Loading…
Reference in New Issue