xds: fix representation of incremental xDS subscriptions (#10987)

Fixes #10563

The `resourceVersion` map was doing two jobs prior to this PR. The first job was
to track what version of every resource we know envoy currently has. The
second was to track subscriptions to those resources (by way of the empty
string for a version). This mostly works out fine, but occasionally leads to
consul removing a resource and accidentally (effectively) unsubscribing at the
same time.

The fix separates these two jobs. When all of the resources for a subscription
are removed we continue to track the subscription until envoy explicitly
unsubscribes
This commit is contained in:
R.B. Boyer 2021-09-21 09:58:56 -05:00 committed by GitHub
parent ae9e167096
commit 2773bd94d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 268 additions and 62 deletions

3
.changelog/10987.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: fixed a bug where Envoy sidecars could enter a state where they failed to receive xds updates from Consul
```

View File

@ -189,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// index and hash the xDS structures // index and hash the xDS structures
newResourceMap := indexResources(generator.Logger, newRes) newResourceMap := indexResources(generator.Logger, newRes)
if s.ResourceMapMutateFn != nil {
s.ResourceMapMutateFn(newResourceMap)
}
if err := populateChildIndexMap(newResourceMap); err != nil { if err := populateChildIndexMap(newResourceMap); err != nil {
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err) return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
} }
@ -384,6 +388,10 @@ type xDSDeltaType struct {
// sentToEnvoyOnce is true after we've sent one response to envoy. // sentToEnvoyOnce is true after we've sent one response to envoy.
sentToEnvoyOnce bool sentToEnvoyOnce bool
// subscriptions is the set of currently subscribed envoy resources.
// If wildcard == true, this will be empty.
subscriptions map[string]struct{}
// resourceVersions is the current view of CONFIRMED/ACKed updates to // resourceVersions is the current view of CONFIRMED/ACKed updates to
// envoy's view of the loaded resources. // envoy's view of the loaded resources.
// //
@ -398,7 +406,16 @@ type xDSDeltaType struct {
pendingUpdates map[string]map[string]PendingUpdate pendingUpdates map[string]map[string]PendingUpdate
} }
func (t *xDSDeltaType) subscribed(name string) bool {
if t.wildcard {
return true
}
_, subscribed := t.subscriptions[name]
return subscribed
}
type PendingUpdate struct { type PendingUpdate struct {
Remove bool
Version string Version string
ChildResources []string // optional ChildResources []string // optional
} }
@ -414,6 +431,7 @@ func newDeltaType(
stream: stream, stream: stream,
typeURL: typeUrl, typeURL: typeUrl,
allowEmptyFn: allowEmptyFn, allowEmptyFn: allowEmptyFn,
subscriptions: make(map[string]struct{}),
resourceVersions: make(map[string]string), resourceVersions: make(map[string]string),
pendingUpdates: make(map[string]map[string]PendingUpdate), pendingUpdates: make(map[string]map[string]PendingUpdate),
} }
@ -484,6 +502,11 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
logger.Trace("setting initial resource versions for stream", logger.Trace("setting initial resource versions for stream",
"resources", req.InitialResourceVersions) "resources", req.InitialResourceVersions)
t.resourceVersions = req.InitialResourceVersions t.resourceVersions = req.InitialResourceVersions
if !t.wildcard {
for k := range req.InitialResourceVersions {
t.subscriptions[k] = struct{}{}
}
}
} }
if !t.wildcard { if !t.wildcard {
@ -509,8 +532,13 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
// //
// We handle that here by ALWAYS wiping the version so the diff // We handle that here by ALWAYS wiping the version so the diff
// decides to send the value. // decides to send the value.
_, alreadySubscribed := t.resourceVersions[name] _, alreadySubscribed := t.subscriptions[name]
t.resourceVersions[name] = "" // start with no version t.subscriptions[name] = struct{}{}
// Reset the tracked version so we force a reply.
if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked {
t.resourceVersions[name] = ""
}
if alreadySubscribed { if alreadySubscribed {
logger.Trace("re-subscribing resource for stream", "resource", name) logger.Trace("re-subscribing resource for stream", "resource", name)
@ -520,11 +548,12 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
} }
for _, name := range req.ResourceNamesUnsubscribe { for _, name := range req.ResourceNamesUnsubscribe {
if _, ok := t.resourceVersions[name]; !ok { if _, ok := t.subscriptions[name]; !ok {
continue continue
} }
delete(t.resourceVersions, name) delete(t.subscriptions, name)
logger.Trace("unsubscribing resource for stream", "resource", name) logger.Trace("unsubscribing resource for stream", "resource", name)
// NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions
} }
} }
@ -538,16 +567,22 @@ func (t *xDSDeltaType) ack(nonce string) {
} }
for name, obj := range pending { for name, obj := range pending {
if obj.Version == "" { if obj.Remove {
delete(t.resourceVersions, name) delete(t.resourceVersions, name)
} else { continue
t.resourceVersions[name] = obj.Version
} }
if t.childType != nil && obj.Version != "" {
t.resourceVersions[name] = obj.Version
if t.childType != nil {
// This branch only matters on UPDATE, since we already have // This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources. // mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources { for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; exist { if _, exist := t.childType.resourceVersions[childName]; !exist {
continue
}
if !t.subscribed(childName) {
continue
}
t.generator.Logger.Trace( t.generator.Logger.Trace(
"triggering implicit update of resource", "triggering implicit update of resource",
"typeUrl", t.typeURL, "typeUrl", t.typeURL,
@ -555,12 +590,11 @@ func (t *xDSDeltaType) ack(nonce string) {
"childTypeUrl", t.childType.typeURL, "childTypeUrl", t.childType.typeURL,
"childResource", childName, "childResource", childName,
) )
// Basically manifest this as a re-subscribe // Basically manifest this as a re-subscribe/re-sync
t.childType.resourceVersions[childName] = "" t.childType.resourceVersions[childName] = ""
} }
} }
} }
}
t.sentToEnvoyOnce = true t.sentToEnvoyOnce = true
delete(t.pendingUpdates, nonce) delete(t.pendingUpdates, nonce)
} }
@ -640,6 +674,8 @@ func (t *xDSDeltaType) createDeltaResponse(
hasRelevantUpdates = false hasRelevantUpdates = false
updates = make(map[string]PendingUpdate) updates = make(map[string]PendingUpdate)
) )
if t.wildcard {
// First find things that need updating or deleting // First find things that need updating or deleting
for name, envoyVers := range t.resourceVersions { for name, envoyVers := range t.resourceVersions {
currVers, ok := currentVersions[name] currVers, ok := currentVersions[name]
@ -647,7 +683,7 @@ func (t *xDSDeltaType) createDeltaResponse(
if remove { if remove {
hasRelevantUpdates = true hasRelevantUpdates = true
} }
updates[name] = PendingUpdate{Version: ""} updates[name] = PendingUpdate{Remove: true}
} else if currVers != envoyVers { } else if currVers != envoyVers {
if upsert { if upsert {
hasRelevantUpdates = true hasRelevantUpdates = true
@ -657,9 +693,38 @@ func (t *xDSDeltaType) createDeltaResponse(
} }
// Now find new things // Now find new things
if t.wildcard {
for name, currVers := range currentVersions { for name, currVers := range currentVersions {
if _, ok := t.resourceVersions[name]; !ok { if _, known := t.resourceVersions[name]; known {
continue
}
if upsert {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Version: currVers}
}
} else {
// First find things that need updating or deleting
// Walk the list of things currently stored in envoy
for name, envoyVers := range t.resourceVersions {
if t.subscribed(name) {
if currVers, ok := currentVersions[name]; ok {
if currVers != envoyVers {
if upsert {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Version: currVers}
}
}
}
}
// Now find new things not in envoy yet
for name := range t.subscriptions {
if _, known := t.resourceVersions[name]; known {
continue
}
if currVers, ok := currentVersions[name]; ok {
updates[name] = PendingUpdate{Version: currVers} updates[name] = PendingUpdate{Version: currVers}
if upsert { if upsert {
hasRelevantUpdates = true hasRelevantUpdates = true
@ -679,10 +744,10 @@ func (t *xDSDeltaType) createDeltaResponse(
} }
realUpdates := make(map[string]PendingUpdate) realUpdates := make(map[string]PendingUpdate)
for name, obj := range updates { for name, obj := range updates {
if obj.Version == "" { if obj.Remove {
if remove { if remove {
resp.RemovedResources = append(resp.RemovedResources, name) resp.RemovedResources = append(resp.RemovedResources, name)
realUpdates[name] = PendingUpdate{Version: ""} realUpdates[name] = PendingUpdate{Remove: true}
} }
} else if upsert { } else if upsert {
resources, ok := resourceMap.Index[t.typeURL] resources, ok := resourceMap.Index[t.typeURL]

View File

@ -2,6 +2,7 @@ package xds
import ( import (
"errors" "errors"
"strings"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -100,6 +101,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
Resources: makeTestResources(t, Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"), makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
), ),
}) })
@ -123,24 +125,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
), ),
}) })
// cleanup unused resources now that we've created/updated relevant things
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(4),
RemovedResources: []string{
"fake-endpoints", // correcting the errant subscription
},
})
// And no other response yet // And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener // ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3) envoy.SendDeltaReqACK(t, ListenerType, 3)
// ACK the endpoint removal
envoy.SendDeltaReqACK(t, EndpointType, 4)
// If we re-subscribe to something even if there are no changes we get a // If we re-subscribe to something even if there are no changes we get a
// fresh copy. // fresh copy.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
@ -151,13 +141,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType, TypeUrl: EndpointType,
Nonce: hexString(5), Nonce: hexString(4),
Resources: makeTestResources(t, Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"), makeTestEndpoints(t, snap, "tcp:geo-cache"),
), ),
}) })
envoy.SendDeltaReqACK(t, EndpointType, 5) envoy.SendDeltaReqACK(t, EndpointType, 4)
// And no other response yet // And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -202,13 +192,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
}) })
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType, TypeUrl: EndpointType,
Nonce: hexString(6), Nonce: hexString(5),
Resources: makeTestResources(t, Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"), makeTestEndpoints(t, snap, "tcp:db"),
), ),
}) })
envoy.SendDeltaReqACK(t, EndpointType, 6) envoy.SendDeltaReqACK(t, EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
}) })
@ -220,6 +210,17 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
// Send envoy an EDS update. // Send envoy an EDS update.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(6),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{})
// Send it again.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType, TypeUrl: EndpointType,
Nonce: hexString(7), Nonce: hexString(7),
@ -228,18 +229,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
), ),
}) })
envoy.SendDeltaReqNACK(t, EndpointType, 7, &rpcstatus.Status{}) envoy.SendDeltaReqACK(t, EndpointType, 7)
// Send it again.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(8),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 8)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
}) })
@ -421,6 +411,149 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
} }
} }
func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) {
// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/10563
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
// omitted from the response while the hack is active.
var slowHackDisabled uint32
server.ResourceMapMutateFn = func(resourceMap *IndexedResources) {
if atomic.LoadUint32(&slowHackDisabled) == 1 {
return
}
if em, ok := resourceMap.Index[EndpointType]; ok {
for k := range em {
if strings.Contains(k, "geo-cache") {
delete(em, k)
}
}
}
}
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
//
// NOTE: we do NOT return back geo-cache yet
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
})
// Disable hack. Need to wait for one more event to wake up the loop.
atomic.StoreUint32(&slowHackDisabled, 1)
runStep(t, "delayed endpoint update finally comes in", func(t *testing.T) {
// Trigger the xds.Server select{} to wake up and notice our hack is disabled.
// The actual contents of this change are irrelevant.
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 4)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all // Allow all

View File

@ -152,6 +152,9 @@ type Server struct {
DisableV2Protocol bool DisableV2Protocol bool
// ResourceMapMutateFn exclusively exists for testing purposes.
ResourceMapMutateFn func(resourceMap *IndexedResources)
activeStreams *activeStreamCounters activeStreams *activeStreamCounters
} }

View File

@ -177,7 +177,9 @@ func newTestServerScenarioInner(
nil, /*checkFetcher HTTPCheckFetcher*/ nil, /*checkFetcher HTTPCheckFetcher*/
nil, /*cfgFetcher ConfigFetcher*/ nil, /*cfgFetcher ConfigFetcher*/
) )
if authCheckFrequency > 0 {
s.AuthCheckFrequency = authCheckFrequency s.AuthCheckFrequency = authCheckFrequency
}
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {