diff --git a/.changelog/10987.txt b/.changelog/10987.txt new file mode 100644 index 000000000..9c1a5f505 --- /dev/null +++ b/.changelog/10987.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fixed a bug where Envoy sidecars could enter a state where they failed to receive xds updates from Consul +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index b2564dacf..4009e9de9 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -189,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // index and hash the xDS structures newResourceMap := indexResources(generator.Logger, newRes) + if s.ResourceMapMutateFn != nil { + s.ResourceMapMutateFn(newResourceMap) + } + if err := populateChildIndexMap(newResourceMap); err != nil { return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err) } @@ -384,6 +388,10 @@ type xDSDeltaType struct { // sentToEnvoyOnce is true after we've sent one response to envoy. sentToEnvoyOnce bool + // subscriptions is the set of currently subscribed envoy resources. + // If wildcard == true, this will be empty. + subscriptions map[string]struct{} + // resourceVersions is the current view of CONFIRMED/ACKed updates to // envoy's view of the loaded resources. // @@ -398,7 +406,16 @@ type xDSDeltaType struct { pendingUpdates map[string]map[string]PendingUpdate } +func (t *xDSDeltaType) subscribed(name string) bool { + if t.wildcard { + return true + } + _, subscribed := t.subscriptions[name] + return subscribed +} + type PendingUpdate struct { + Remove bool Version string ChildResources []string // optional } @@ -414,6 +431,7 @@ func newDeltaType( stream: stream, typeURL: typeUrl, allowEmptyFn: allowEmptyFn, + subscriptions: make(map[string]struct{}), resourceVersions: make(map[string]string), pendingUpdates: make(map[string]map[string]PendingUpdate), } @@ -484,6 +502,11 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool logger.Trace("setting initial resource versions for stream", "resources", req.InitialResourceVersions) t.resourceVersions = req.InitialResourceVersions + if !t.wildcard { + for k := range req.InitialResourceVersions { + t.subscriptions[k] = struct{}{} + } + } } if !t.wildcard { @@ -509,8 +532,13 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool // // We handle that here by ALWAYS wiping the version so the diff // decides to send the value. - _, alreadySubscribed := t.resourceVersions[name] - t.resourceVersions[name] = "" // start with no version + _, alreadySubscribed := t.subscriptions[name] + t.subscriptions[name] = struct{}{} + + // Reset the tracked version so we force a reply. + if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked { + t.resourceVersions[name] = "" + } if alreadySubscribed { logger.Trace("re-subscribing resource for stream", "resource", name) @@ -520,11 +548,12 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool } for _, name := range req.ResourceNamesUnsubscribe { - if _, ok := t.resourceVersions[name]; !ok { + if _, ok := t.subscriptions[name]; !ok { continue } - delete(t.resourceVersions, name) + delete(t.subscriptions, name) logger.Trace("unsubscribing resource for stream", "resource", name) + // NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions } } @@ -538,26 +567,31 @@ func (t *xDSDeltaType) ack(nonce string) { } for name, obj := range pending { - if obj.Version == "" { + if obj.Remove { delete(t.resourceVersions, name) - } else { - t.resourceVersions[name] = obj.Version + continue } - if t.childType != nil && obj.Version != "" { + + t.resourceVersions[name] = obj.Version + if t.childType != nil { // This branch only matters on UPDATE, since we already have // mechanisms to clean up orphaned resources. for _, childName := range obj.ChildResources { - if _, exist := t.childType.resourceVersions[childName]; exist { - t.generator.Logger.Trace( - "triggering implicit update of resource", - "typeUrl", t.typeURL, - "resource", name, - "childTypeUrl", t.childType.typeURL, - "childResource", childName, - ) - // Basically manifest this as a re-subscribe - t.childType.resourceVersions[childName] = "" + if _, exist := t.childType.resourceVersions[childName]; !exist { + continue } + if !t.subscribed(childName) { + continue + } + t.generator.Logger.Trace( + "triggering implicit update of resource", + "typeUrl", t.typeURL, + "resource", name, + "childTypeUrl", t.childType.typeURL, + "childResource", childName, + ) + // Basically manifest this as a re-subscribe/re-sync + t.childType.resourceVersions[childName] = "" } } } @@ -640,26 +674,57 @@ func (t *xDSDeltaType) createDeltaResponse( hasRelevantUpdates = false updates = make(map[string]PendingUpdate) ) - // First find things that need updating or deleting - for name, envoyVers := range t.resourceVersions { - currVers, ok := currentVersions[name] - if !ok { - if remove { - hasRelevantUpdates = true + + if t.wildcard { + // First find things that need updating or deleting + for name, envoyVers := range t.resourceVersions { + currVers, ok := currentVersions[name] + if !ok { + if remove { + hasRelevantUpdates = true + } + updates[name] = PendingUpdate{Remove: true} + } else if currVers != envoyVers { + if upsert { + hasRelevantUpdates = true + } + updates[name] = PendingUpdate{Version: currVers} + } + } + + // Now find new things + for name, currVers := range currentVersions { + if _, known := t.resourceVersions[name]; known { + continue } - updates[name] = PendingUpdate{Version: ""} - } else if currVers != envoyVers { if upsert { hasRelevantUpdates = true } updates[name] = PendingUpdate{Version: currVers} } - } + } else { + // First find things that need updating or deleting - // Now find new things - if t.wildcard { - for name, currVers := range currentVersions { - if _, ok := t.resourceVersions[name]; !ok { + // Walk the list of things currently stored in envoy + for name, envoyVers := range t.resourceVersions { + if t.subscribed(name) { + if currVers, ok := currentVersions[name]; ok { + if currVers != envoyVers { + if upsert { + hasRelevantUpdates = true + } + updates[name] = PendingUpdate{Version: currVers} + } + } + } + } + + // Now find new things not in envoy yet + for name := range t.subscriptions { + if _, known := t.resourceVersions[name]; known { + continue + } + if currVers, ok := currentVersions[name]; ok { updates[name] = PendingUpdate{Version: currVers} if upsert { hasRelevantUpdates = true @@ -679,10 +744,10 @@ func (t *xDSDeltaType) createDeltaResponse( } realUpdates := make(map[string]PendingUpdate) for name, obj := range updates { - if obj.Version == "" { + if obj.Remove { if remove { resp.RemovedResources = append(resp.RemovedResources, name) - realUpdates[name] = PendingUpdate{Version: ""} + realUpdates[name] = PendingUpdate{Remove: true} } } else if upsert { resources, ok := resourceMap.Index[t.typeURL] diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index c00293c4a..7e1746812 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -2,6 +2,7 @@ package xds import ( "errors" + "strings" "sync/atomic" "testing" "time" @@ -100,6 +101,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), + // SAME_AS_INITIAL_VERSION: "fake-endpoints", ), }) @@ -123,24 +125,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { ), }) - // cleanup unused resources now that we've created/updated relevant things - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(4), - RemovedResources: []string{ - "fake-endpoints", // correcting the errant subscription - }, - }) - // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener envoy.SendDeltaReqACK(t, ListenerType, 3) - // ACK the endpoint removal - envoy.SendDeltaReqACK(t, EndpointType, 4) - // If we re-subscribe to something even if there are no changes we get a // fresh copy. envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ @@ -151,13 +141,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: EndpointType, - Nonce: hexString(5), + Nonce: hexString(4), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:geo-cache"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 5) + envoy.SendDeltaReqACK(t, EndpointType, 4) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -202,13 +192,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { }) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: EndpointType, - Nonce: hexString(6), + Nonce: hexString(5), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 6) + envoy.SendDeltaReqACK(t, EndpointType, 5) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -220,6 +210,17 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { mgr.DeliverConfig(t, sid, snap) // Send envoy an EDS update. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(6), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db[0]"), + ), + }) + + envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{}) + + // Send it again. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ TypeUrl: EndpointType, Nonce: hexString(7), @@ -228,18 +229,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { ), }) - envoy.SendDeltaReqNACK(t, EndpointType, 7, &rpcstatus.Status{}) - - // Send it again. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, - Nonce: hexString(8), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db[0]"), - ), - }) - - envoy.SendDeltaReqACK(t, EndpointType, 8) + envoy.SendDeltaReqACK(t, EndpointType, 7) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -421,6 +411,149 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { } } +func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) { + // This illustrates a scenario related to https://github.com/hashicorp/consul/issues/10563 + + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy + + // This mutateFn causes any endpoint with a name containing "geo-cache" to be + // omitted from the response while the hack is active. + var slowHackDisabled uint32 + server.ResourceMapMutateFn = func(resourceMap *IndexedResources) { + if atomic.LoadUint32(&slowHackDisabled) == 1 { + return + } + if em, ok := resourceMap.Index[EndpointType]; ok { + for k := range em { + if strings.Contains(k, "geo-cache") { + delete(em, k) + } + } + } + } + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into initial state", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, ClusterType, 1) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + // + // NOTE: we do NOT return back geo-cache yet + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + // makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 2) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 3) + }) + + // Disable hack. Need to wait for one more event to wake up the loop. + atomic.StoreUint32(&slowHackDisabled, 1) + + runStep(t, "delayed endpoint update finally comes in", func(t *testing.T) { + // Trigger the xds.Server select{} to wake up and notice our hack is disabled. + // The actual contents of this change are irrelevant. + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(4), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 4) + + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all diff --git a/agent/xds/server.go b/agent/xds/server.go index 1763448cd..1cb3e5949 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -152,6 +152,9 @@ type Server struct { DisableV2Protocol bool + // ResourceMapMutateFn exclusively exists for testing purposes. + ResourceMapMutateFn func(resourceMap *IndexedResources) + activeStreams *activeStreamCounters } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 3a567115f..8f10147e1 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -177,7 +177,9 @@ func newTestServerScenarioInner( nil, /*checkFetcher HTTPCheckFetcher*/ nil, /*cfgFetcher ConfigFetcher*/ ) - s.AuthCheckFrequency = authCheckFrequency + if authCheckFrequency > 0 { + s.AuthCheckFrequency = authCheckFrequency + } errCh := make(chan error, 1) go func() {