Default discovery chain when upstream targets a DestinationPeer (#12942)

This commit is contained in:
Chris S. Kim 2022-05-04 16:25:25 -04:00 committed by GitHub
parent 1497421b65
commit e55aac9d30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 5 deletions

View File

@ -7,6 +7,8 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -170,6 +172,31 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
fallthrough fallthrough
case "": case "":
// If DestinationPeer is not empty, insert a default discovery chain directly to the snapshot
if u.DestinationPeer != "" {
req := discoverychain.CompileRequest{
ServiceName: u.DestinationName,
EvaluateInNamespace: u.DestinationNamespace,
EvaluateInPartition: u.DestinationPartition,
EvaluateInDatacenter: dc,
EvaluateInTrustDomain: "trustdomain.consul", // TODO(peering): where to evaluate this?
Entries: configentry.NewDiscoveryChainSet(),
}
chain, err := discoverychain.Compile(req)
if err != nil {
return snap, fmt.Errorf("error while compiling default discovery chain: %w", err)
}
// Directly insert chain and empty function into the discovery chain maps
snap.ConnectProxy.ConfigSnapshotUpstreams.DiscoveryChain[uid] = chain
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedDiscoveryChains[uid] = func() {}
if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil {
return snap, fmt.Errorf("error while resetting watches from chain: %w", err)
}
return snap, nil
}
err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},

View File

@ -410,11 +410,15 @@ func TestState_WatchesAndUpdates(t *testing.T) {
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil) billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil) api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
apiUID = NewUpstreamIDFromServiceName(api) apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db) dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query") pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
) )
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
rootWatchEvent := func() cache.UpdateEvent { rootWatchEvent := func() cache.UpdateEvent {
return cache.UpdateEvent{ return cache.UpdateEvent{
@ -2495,6 +2499,109 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault), "connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal), "connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
"connect-proxy-with-peers": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Address: "10.0.1.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "api",
LocalBindPort: 10000,
},
structs.Upstream{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "api-a",
DestinationPeer: "peer-a",
LocalBindPort: 10001,
},
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
// First evaluate peered upstream
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Datacenter: "dc1",
}),
// No Peering watch
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "should not be valid")
require.True(t, snap.MeshGateway.isEmpty())
// Even though there were no events to trigger the watches,
// the peered upstream is written to the maps
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.NotNil(t, snap.ConnectProxy.DiscoveryChain[extApiUID])
require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 1, "%+v", snap.ConnectProxy.WatchedDiscoveryChains)
require.NotNil(t, snap.ConnectProxy.WatchedDiscoveryChains[extApiUID])
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
},
},
{
// This time add the events
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{},
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.True(t, snap.MeshGateway.isEmpty())
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)
require.Len(t, snap.ConnectProxy.DiscoveryChain, 2, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 2, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedGateways, 2, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 2, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
},
},
},
},
} }
for name, tc := range cases { for name, tc := range cases {
@ -2527,12 +2634,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
snap, err := state.handler.initialize(ctx) snap, err := state.handler.initialize(ctx)
require.NoError(t, err) require.NoError(t, err)
//-------------------------------------------------------------------- // --------------------------------------------------------------------
// //
// All the nested subtests here are to make failures easier to // All the nested subtests here are to make failures easier to
// correlate back with the test table // correlate back with the test table
// //
//-------------------------------------------------------------------- // --------------------------------------------------------------------
for idx, stage := range tc.stages { for idx, stage := range tc.stages {
require.True(t, t.Run(fmt.Sprintf("stage-%d", idx), func(t *testing.T) { require.True(t, t.Run(fmt.Sprintf("stage-%d", idx), func(t *testing.T) {

View File

@ -374,6 +374,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
PeerName: opts.upstreamID.Peer,
Datacenter: opts.datacenter, Datacenter: opts.datacenter,
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Token: s.token, Token: s.token,