xds: ensure that dependent xDS resources are reconfigured during primary type warming (#10381)

Updates to a cluster will clear the associated endpoints, and updates to
a listener will clear the associated routes. Update the incremental xDS
logic to account for this implicit cleanup so that we can finish warming
the clusters and listeners.

Fixes #10379
This commit is contained in:
R.B. Boyer 2021-06-14 17:20:27 -05:00 committed by GitHub
parent f399fd2add
commit 8d5f81b460
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 560 additions and 33 deletions

3
.changelog/10381.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: (beta-only) ensure that dependent xDS resources are reconfigured during primary type warming
```

View File

@ -120,6 +120,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
EndpointType: newDeltaType(generator, stream, EndpointType, nil), EndpointType: newDeltaType(generator, stream, EndpointType, nil),
} }
// Endpoints are stored within a Cluster (and Routes
// are stored within a Listener) so whenever the
// enclosing resource is updated the inner resource
// list is cleared implicitly.
//
// When this happens we should update our local
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[ListenerType].childType = handlers[RouteType]
handlers[ClusterType].childType = handlers[EndpointType]
var authTimer <-chan time.Time var authTimer <-chan time.Time
extendAuthTimer := func() { extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency) authTimer = time.After(s.AuthCheckFrequency)
@ -177,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// index and hash the xDS structures // index and hash the xDS structures
newResourceMap := indexResources(generator.Logger, newRes) newResourceMap := indexResources(generator.Logger, newRes)
if err := populateChildIndexMap(newResourceMap); err != nil {
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
}
newVersions, err := computeResourceVersions(newResourceMap) newVersions, err := computeResourceVersions(newResourceMap)
if err != nil { if err != nil {
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err) return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
@ -352,6 +368,11 @@ type xDSDeltaType struct {
typeURL string typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool allowEmptyFn func(kind structs.ServiceKind) bool
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// registered indicates if this type has been requested at least once by // registered indicates if this type has been requested at least once by
// the proxy // the proxy
registered bool registered bool
@ -373,8 +394,13 @@ type xDSDeltaType struct {
// map. Once we get an ACK from envoy we'll update the resourceVersions map // map. Once we get an ACK from envoy we'll update the resourceVersions map
// and strike the entry from this map. // and strike the entry from this map.
// //
// nonce -> name -> version // nonce -> name -> {version}
pendingUpdates map[string]map[string]string pendingUpdates map[string]map[string]PendingUpdate
}
type PendingUpdate struct {
Version string
ChildResources []string // optional
} }
func newDeltaType( func newDeltaType(
@ -389,7 +415,7 @@ func newDeltaType(
typeURL: typeUrl, typeURL: typeUrl,
allowEmptyFn: allowEmptyFn, allowEmptyFn: allowEmptyFn,
resourceVersions: make(map[string]string), resourceVersions: make(map[string]string),
pendingUpdates: make(map[string]map[string]string), pendingUpdates: make(map[string]map[string]PendingUpdate),
} }
} }
@ -511,11 +537,28 @@ func (t *xDSDeltaType) ack(nonce string) {
return return
} }
for name, version := range pending { for name, obj := range pending {
if version == "" { if obj.Version == "" {
delete(t.resourceVersions, name) delete(t.resourceVersions, name)
} else { } else {
t.resourceVersions[name] = version t.resourceVersions[name] = obj.Version
}
if t.childType != nil && obj.Version != "" {
// This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; exist {
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", name,
"childTypeUrl", t.childType.typeURL,
"childResource", childName,
)
// Basically manifest this as a re-subscribe
t.childType.resourceVersions[childName] = ""
}
}
} }
} }
t.sentToEnvoyOnce = true t.sentToEnvoyOnce = true
@ -529,7 +572,7 @@ func (t *xDSDeltaType) nack(nonce string) {
func (t *xDSDeltaType) SendIfNew( func (t *xDSDeltaType) SendIfNew(
kind structs.ServiceKind, kind structs.ServiceKind,
currentVersions map[string]string, // type => name => version (as consul knows right now) currentVersions map[string]string, // type => name => version (as consul knows right now)
resourceMap IndexedResources, resourceMap *IndexedResources,
nonce *uint64, nonce *uint64,
upsert, remove bool, upsert, remove bool,
) (error, bool) { ) (error, bool) {
@ -571,6 +614,17 @@ func (t *xDSDeltaType) SendIfNew(
} }
logger.Trace("sent response", "nonce", resp.Nonce) logger.Trace("sent response", "nonce", resp.Nonce)
if t.childType != nil {
// Capture the relevant child resource names on this pending update so
// we can properly clean up the linked children when this change is
// ACKed.
for name, obj := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
obj.ChildResources = children
updates[name] = obj
}
}
}
t.pendingUpdates[resp.Nonce] = updates t.pendingUpdates[resp.Nonce] = updates
return nil, true return nil, true
@ -578,13 +632,13 @@ func (t *xDSDeltaType) SendIfNew(
func (t *xDSDeltaType) createDeltaResponse( func (t *xDSDeltaType) createDeltaResponse(
currentVersions map[string]string, // name => version (as consul knows right now) currentVersions map[string]string, // name => version (as consul knows right now)
resourceMap IndexedResources, resourceMap *IndexedResources,
upsert, remove bool, upsert, remove bool,
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]string, error) { ) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
// compute difference // compute difference
var ( var (
hasRelevantUpdates = false hasRelevantUpdates = false
updates = make(map[string]string) updates = make(map[string]PendingUpdate)
) )
// First find things that need updating or deleting // First find things that need updating or deleting
for name, envoyVers := range t.resourceVersions { for name, envoyVers := range t.resourceVersions {
@ -593,12 +647,12 @@ func (t *xDSDeltaType) createDeltaResponse(
if remove { if remove {
hasRelevantUpdates = true hasRelevantUpdates = true
} }
updates[name] = "" updates[name] = PendingUpdate{Version: ""}
} else if currVers != envoyVers { } else if currVers != envoyVers {
if upsert { if upsert {
hasRelevantUpdates = true hasRelevantUpdates = true
} }
updates[name] = currVers updates[name] = PendingUpdate{Version: currVers}
} }
} }
@ -606,7 +660,7 @@ func (t *xDSDeltaType) createDeltaResponse(
if t.wildcard { if t.wildcard {
for name, currVers := range currentVersions { for name, currVers := range currentVersions {
if _, ok := t.resourceVersions[name]; !ok { if _, ok := t.resourceVersions[name]; !ok {
updates[name] = currVers updates[name] = PendingUpdate{Version: currVers}
if upsert { if upsert {
hasRelevantUpdates = true hasRelevantUpdates = true
} }
@ -623,15 +677,15 @@ func (t *xDSDeltaType) createDeltaResponse(
// TODO(rb): consider putting something in SystemVersionInfo? // TODO(rb): consider putting something in SystemVersionInfo?
TypeUrl: t.typeURL, TypeUrl: t.typeURL,
} }
realUpdates := make(map[string]string) realUpdates := make(map[string]PendingUpdate)
for name, vers := range updates { for name, obj := range updates {
if vers == "" { if obj.Version == "" {
if remove { if remove {
resp.RemovedResources = append(resp.RemovedResources, name) resp.RemovedResources = append(resp.RemovedResources, name)
realUpdates[name] = "" realUpdates[name] = PendingUpdate{Version: ""}
} }
} else if upsert { } else if upsert {
resources, ok := resourceMap[t.typeURL] resources, ok := resourceMap.Index[t.typeURL]
if !ok { if !ok {
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL) return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
} }
@ -647,18 +701,18 @@ func (t *xDSDeltaType) createDeltaResponse(
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{ resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
Name: name, Name: name,
Resource: any, Resource: any,
Version: vers, Version: obj.Version,
}) })
realUpdates[name] = vers realUpdates[name] = obj
} }
} }
return resp, realUpdates, nil return resp, realUpdates, nil
} }
func computeResourceVersions(resourceMap IndexedResources) (map[string]map[string]string, error) { func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string) out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap { for typeUrl, resources := range resourceMap.Index {
m, err := hashResourceMap(resources) m, err := hashResourceMap(resources)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err) return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
@ -668,18 +722,51 @@ func computeResourceVersions(resourceMap IndexedResources) (map[string]map[strin
return out, nil return out, nil
} }
type IndexedResources map[string]map[string]proto.Message type IndexedResources struct {
// Index is a map of typeURL => resourceName => resource
Index map[string]map[string]proto.Message
func emptyIndexedResources() IndexedResources { // ChildIndex is a map of typeURL => parentResourceName => list of
return map[string]map[string]proto.Message{ // childResourceNames. This only applies if the child and parent do not
ListenerType: make(map[string]proto.Message), // share a name.
RouteType: make(map[string]proto.Message), ChildIndex map[string]map[string][]string
ClusterType: make(map[string]proto.Message), }
EndpointType: make(map[string]proto.Message),
func emptyIndexedResources() *IndexedResources {
return &IndexedResources{
Index: map[string]map[string]proto.Message{
ListenerType: make(map[string]proto.Message),
RouteType: make(map[string]proto.Message),
ClusterType: make(map[string]proto.Message),
EndpointType: make(map[string]proto.Message),
},
ChildIndex: map[string]map[string][]string{
ListenerType: make(map[string][]string),
ClusterType: make(map[string][]string),
},
} }
} }
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) IndexedResources { func populateChildIndexMap(resourceMap *IndexedResources) error {
// LDS and RDS have a more complicated relationship.
for name, res := range resourceMap.Index[ListenerType] {
listener := res.(*envoy_listener_v3.Listener)
rdsRouteNames, err := extractRdsResourceNames(listener)
if err != nil {
return err
}
resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames
}
// CDS and EDS share exact names.
for name := range resourceMap.Index[ClusterType] {
resourceMap.ChildIndex[ClusterType][name] = []string{name}
}
return nil
}
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
data := emptyIndexedResources() data := emptyIndexedResources()
for typeURL, typeRes := range resources { for typeURL, typeRes := range resources {
@ -688,7 +775,7 @@ func indexResources(logger hclog.Logger, resources map[string][]proto.Message) I
if name == "" { if name == "" {
logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL) logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL)
} else { } else {
data[typeURL][name] = res data.Index[typeURL][name] = res
} }
} }
} }

View File

@ -327,6 +327,333 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
} }
} }
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
})
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
// Update the snapshot in a way that causes a single cluster update.
snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
ConnectTimeout: 1337 * time.Second,
})
mgr.DeliverConfig(t, sid, snap)
// The cluster is updated
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(4),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db:timeout"),
// SAME makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil)
// And we re-send the endpoints for the updated cluster after getting the
// ACK for the cluster.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) {
// Send initial cluster discover (empty payload)
envoy.SendDeltaReq(t, ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Deliver a new snapshot (tcp with one http upstream with no-op disco chain)
snap = newTestSnapshot(t, nil, "http2", &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
Protocol: "http2",
}, &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "db",
Routes: nil,
})
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "http2:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "http2:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "http2:db:rds"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends routes request
envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db",
},
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: RouteType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestRoute(t, "http2:db"),
),
})
envoy.SendDeltaReqACK(t, RouteType, 4, true, nil)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
runStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) {
// Update the snapshot in a way that causes a single listener update.
//
// Downgrade from http2 to http
snap = newTestSnapshot(t, snap, "http", &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
Protocol: "http",
}, &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "db",
Routes: nil,
})
mgr.DeliverConfig(t, sid, snap)
// db cluster is refreshed (unrelated to the test scenario other than it's required)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "http:db"),
),
})
// the listener is updated
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(6),
Resources: makeTestResources(t,
makeTestListener(t, snap, "http:db:rds"),
),
})
envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil)
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
// triggers here. It is not explicitly under test, but we have to get past
// this exchange to get to the part we care about.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(7),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "http:db"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil)
// THE ACTUAL THING WE CARE ABOUT: replaced route config
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: RouteType,
Nonce: hexString(8),
Resources: makeTestResources(t,
makeTestRoute(t, "http2:db"),
),
})
envoy.SendDeltaReqACK(t, RouteType, 8, true, nil)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -663,6 +663,51 @@ const (
httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager" httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager"
) )
func extractRdsResourceNames(listener *envoy_listener_v3.Listener) ([]string, error) {
var found []string
for chainIdx, chain := range listener.FilterChains {
for filterIdx, filter := range chain.Filters {
if filter.Name != httpConnectionManagerNewName {
continue
}
tc, ok := filter.ConfigType.(*envoy_listener_v3.Filter_TypedConfig)
if !ok {
return nil, fmt.Errorf(
"filter chain %d has a %q filter %d with an unsupported config type: %T",
chainIdx,
filter.Name,
filterIdx,
filter.ConfigType,
)
}
var hcm envoy_http_v3.HttpConnectionManager
if err := ptypes.UnmarshalAny(tc.TypedConfig, &hcm); err != nil {
return nil, err
}
if hcm.RouteSpecifier == nil {
continue
}
rds, ok := hcm.RouteSpecifier.(*envoy_http_v3.HttpConnectionManager_Rds)
if !ok {
continue
}
if rds.Rds == nil {
continue
}
found = append(found, rds.Rds.RouteConfigName)
}
}
return found, nil
}
// Locate the existing http connect manager L4 filter and inject our RBAC filter at the top. // Locate the existing http connect manager L4 filter and inject our RBAC filter at the top.
func injectHTTPFilterOnFilterChains( func injectHTTPFilterOnFilterChains(
listener *envoy_listener_v3.Listener, listener *envoy_listener_v3.Listener,

View File

@ -391,6 +391,24 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
ConnectTimeout: ptypes.DurationProto(5 * time.Second), ConnectTimeout: ptypes.DurationProto(5 * time.Second),
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
} }
case "tcp:db:timeout":
return &envoy_cluster_v3.Cluster{
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_EDS,
},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: xdsNewADSConfig(),
},
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
},
ConnectTimeout: ptypes.DurationProto(1337 * time.Second),
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
}
case "http2:db": case "http2:db":
return &envoy_cluster_v3.Cluster{ return &envoy_cluster_v3.Cluster{
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
@ -410,6 +428,25 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{}, Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
} }
case "http:db":
return &envoy_cluster_v3.Cluster{
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_EDS,
},
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: xdsNewADSConfig(),
},
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
},
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
}
case "tcp:geo-cache": case "tcp:geo-cache":
return &envoy_cluster_v3.Cluster{ return &envoy_cluster_v3.Cluster{
Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -444,7 +481,7 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
}, },
}, },
} }
case "http2:db": case "http2:db", "http:db":
return &envoy_endpoint_v3.ClusterLoadAssignment{ return &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{ Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
@ -570,6 +607,34 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
}, },
}, },
} }
case "http:db:rds":
return &envoy_listener_v3.Listener{
Name: "db:127.0.0.1:9191",
Address: makeAddress("127.0.0.1", 9191),
TrafficDirection: envoy_core_v3.TrafficDirection_OUTBOUND,
FilterChains: []*envoy_listener_v3.FilterChain{
{
Filters: []*envoy_listener_v3.Filter{
xdsNewFilter(t, "envoy.filters.network.http_connection_manager", &envoy_http_v3.HttpConnectionManager{
HttpFilters: []*envoy_http_v3.HttpFilter{
{Name: "envoy.filters.http.router"},
},
RouteSpecifier: &envoy_http_v3.HttpConnectionManager_Rds{
Rds: &envoy_http_v3.Rds{
RouteConfigName: "db",
ConfigSource: xdsNewADSConfig(),
},
},
StatPrefix: "upstream.db.default.dc1",
Tracing: &envoy_http_v3.HttpConnectionManager_Tracing{
RandomSampling: &envoy_type_v3.Percent{Value: 0},
},
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
}),
},
},
},
}
case "tcp:geo-cache": case "tcp:geo-cache":
return &envoy_listener_v3.Listener{ return &envoy_listener_v3.Listener{
Name: "prepared_query:geo-cache:127.10.10.10:8181", Name: "prepared_query:geo-cache:127.10.10.10:8181",
@ -596,7 +661,7 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration { func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration {
switch fixtureName { switch fixtureName {
case "http2:db": case "http2:db", "http:db":
return &envoy_route_v3.RouteConfiguration{ return &envoy_route_v3.RouteConfiguration{
Name: "db", Name: "db",
ValidateClusters: makeBoolValue(true), ValidateClusters: makeBoolValue(true),