add order by locality failover to Consul enterprise (#16791)

This commit is contained in:
Eric Haberkorn 2023-03-30 10:08:38 -04:00 committed by GitHub
parent 899c5b11a5
commit b97a3a17d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1339 additions and 1287 deletions

3
.changelog/_4734.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
connect: **(Consul Enterprise only)** Implement order-by-locality failover.
```

View File

@ -20,7 +20,7 @@ func TestCompiledDiscoveryChain(t *testing.T) {
typ := &CompiledDiscoveryChain{RPC: rpc} typ := &CompiledDiscoveryChain{RPC: rpc}
// just do the default chain // just do the default chain
chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", nil) chain := discoverychain.TestCompileConfigEntries(t, "web", "default", "default", "dc1", "trustdomain.consul", nil, nil)
// Expect the proper RPC call. This also sets the expected value // Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments. // since that is return-by-pointer in the arguments.

View File

@ -17,11 +17,11 @@ func TestCompileConfigEntries(t testing.T,
evaluateInPartition string, evaluateInPartition string,
evaluateInDatacenter string, evaluateInDatacenter string,
evaluateInTrustDomain string, evaluateInTrustDomain string,
setup func(req *CompileRequest), entries ...structs.ConfigEntry) *structs.CompiledDiscoveryChain { setup func(req *CompileRequest),
set := configentry.NewDiscoveryChainSet() set *configentry.DiscoveryChainSet) *structs.CompiledDiscoveryChain {
if set == nil {
set.AddEntries(entries...) set = configentry.NewDiscoveryChainSet()
}
req := CompileRequest{ req := CompileRequest{
ServiceName: serviceName, ServiceName: serviceName,
EvaluateInNamespace: evaluateInNamespace, EvaluateInNamespace: evaluateInNamespace,

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch" "github.com/hashicorp/consul/agent/proxycfg/internal/watch"
@ -57,21 +58,20 @@ func TestManager_BasicLifecycle(t *testing.T) {
roots, leaf := TestCerts(t) roots, leaf := TestCerts(t)
dbDefaultChain := func() *structs.CompiledDiscoveryChain { dbDefaultChain := func() *structs.CompiledDiscoveryChain {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
})
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", func(req *discoverychain.CompileRequest) { return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config // This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts. // to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second req.OverrideConnectTimeout = 1 * time.Second
}, &structs.ServiceResolverConfigEntry{ }, set)
Kind: structs.ServiceResolver,
Name: "db",
})
} }
dbSplitChain := func() *structs.CompiledDiscoveryChain { dbSplitChain := func() *structs.CompiledDiscoveryChain {
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) { set := configentry.NewDiscoveryChainSet()
// This is because structs.TestUpstreams uses an opaque config set.AddEntries(&structs.ProxyConfigEntry{
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
}, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults, Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal, Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{ Config: map[string]interface{}{
@ -96,6 +96,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
{Weight: 40, ServiceSubset: "v2"}, {Weight: 40, ServiceSubset: "v2"},
}, },
}) })
return discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) {
// This is because structs.TestUpstreams uses an opaque config
// to override connect timeouts.
req.OverrideConnectTimeout = 1 * time.Second
}, set)
} }
upstreams := structs.TestUpstreams(t, false) upstreams := structs.TestUpstreams(t, false)

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
apimod "github.com/hashicorp/consul/api" apimod "github.com/hashicorp/consul/api"
@ -437,6 +438,12 @@ func upstreamIDForDC2(uid UpstreamID) UpstreamID {
return uid return uid
} }
func discoChainSetWithEntries(entries ...structs.ConfigEntry) *configentry.DiscoveryChainSet {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
return set
}
// This test is meant to exercise the various parts of the cache watching done by the state as // This test is meant to exercise the various parts of the cache watching done by the state as
// well as its management of the ConfigSnapshot // well as its management of the ConfigSnapshot
// //
@ -680,7 +687,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}), }, nil),
}, },
Err: nil, Err: nil,
}, },
@ -690,7 +697,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeRemote req.OverrideMeshGateway.Mode = structs.MeshGatewayModeRemote
}), }, nil),
}, },
Err: nil, Err: nil,
}, },
@ -700,7 +707,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}), }, nil),
}, },
Err: nil, Err: nil,
}, },
@ -710,7 +717,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeNone req.OverrideMeshGateway.Mode = structs.MeshGatewayModeNone
}), }, nil),
}, },
Err: nil, Err: nil,
}, },
@ -720,14 +727,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{ }, discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "api-dc2", Name: "api-dc2",
Redirect: &structs.ServiceResolverRedirect{ Redirect: &structs.ServiceResolverRedirect{
Service: "api", Service: "api",
Datacenter: "dc2", Datacenter: "dc2",
}, },
}), })),
}, },
Err: nil, Err: nil,
}, },
@ -737,7 +744,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-to-peer", "default", "default", "dc1", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-to-peer", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue req.OverrideMeshGateway.Mode = meshGatewayProxyConfigValue
}, &structs.ServiceResolverConfigEntry{ }, discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "api-failover-to-peer", Name: "api-failover-to-peer",
Failover: map[string]structs.ServiceResolverFailover{ Failover: map[string]structs.ServiceResolverFailover{
@ -747,7 +754,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
}, },
}), })),
}, },
Err: nil, Err: nil,
}, },
@ -1509,7 +1516,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{ {
CorrelationID: "discovery-chain:" + apiUID.String(), CorrelationID: "discovery-chain:" + apiUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil), Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil, nil),
}, },
Err: nil, Err: nil,
}, },
@ -2390,13 +2397,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries( Chain: discoverychain.TestCompileConfigEntries(
t, "db", "default", "default", "dc1", "trustdomain.consul", nil, t, "db", "default", "default", "dc1", "trustdomain.consul", nil,
&structs.ServiceConfigEntry{ discoChainSetWithEntries(&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,
Name: "db", Name: "db",
TransparentProxy: structs.TransparentProxyConfig{ TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true, DialedDirectly: true,
}, },
}, }),
), ),
}, },
Err: nil, Err: nil,
@ -2569,13 +2576,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{ {
CorrelationID: "discovery-chain:" + dbUID.String(), CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil,
discoChainSetWithEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "db", Name: "db",
Redirect: &structs.ServiceResolverRedirect{ Redirect: &structs.ServiceResolverRedirect{
Service: "mysql", Service: "mysql",
}, },
}), })),
}, },
Err: nil, Err: nil,
}, },
@ -3100,7 +3108,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul", Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) { func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}), }, nil),
}, },
Err: nil, Err: nil,
}, },
@ -3544,7 +3552,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{ {
CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()), CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil), Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil, nil),
}, },
Err: nil, Err: nil,
}, },
@ -3692,7 +3700,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
{ {
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()), CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil), Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
}, },
Err: nil, Err: nil,
}, },

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -86,12 +87,15 @@ func TestConfigSnapshotAPIGateway(
}, },
} }
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
// Add a discovery chain watch event for each service. // Add a discovery chain watch event for each service.
for _, serviceName := range route.GetServiceNames() { for _, serviceName := range route.GetServiceNames() {
discoChain := UpdateEvent{ discoChain := UpdateEvent{
CorrelationID: fmt.Sprintf("discovery-chain:%s", UpstreamIDString("", "", serviceName.Name, &serviceName.EnterpriseMeta, "")), CorrelationID: fmt.Sprintf("discovery-chain:%s", UpstreamIDString("", "", serviceName.Name, &serviceName.EnterpriseMeta, "")),
Result: &structs.DiscoveryChainResponse{ Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, serviceName.Name, "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...), Chain: discoverychain.TestCompileConfigEntries(t, serviceName.Name, "default", "default", "dc1", connect.TestClusterID+".consul", nil, set),
}, },
} }
baseEvents = append(baseEvents, discoChain) baseEvents = append(baseEvents, discoChain)

View File

@ -23,7 +23,7 @@ func TestConfigSnapshot(t testing.T, nsFn func(ns *structs.NodeService), extraUp
roots, leaf := TestCerts(t) roots, leaf := TestCerts(t)
// no entries implies we'll get a default chain // no entries implies we'll get a default chain
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil) dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
assert.True(t, dbChain.Default) assert.True(t, dbChain.Default)
var ( var (
@ -309,7 +309,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
var ( var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil) collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector) collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil) collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -156,6 +157,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con
"dc1", "dc1",
connect.TestClusterID+".consul", connect.TestClusterID+".consul",
nil, nil,
nil,
) )
insecureUID := UpstreamIDFromString("insecure") insecureUID := UpstreamIDFromString("insecure")
@ -167,6 +169,7 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayLevel_MixedTLS(t testing.T) *Con
"dc1", "dc1",
connect.TestClusterID+".consul", connect.TestClusterID+".consul",
nil, nil,
nil,
) )
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -276,15 +279,16 @@ func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel(t testing.T) *C
} }
func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel_HTTP(t testing.T) *ConfigSnapshot { func TestConfigSnapshotIngressGatewaySDS_GatewayAndListenerLevel_HTTP(t testing.T) *ConfigSnapshot {
var ( set := configentry.NewDiscoveryChainSet()
http = structs.NewServiceName("http", nil) set.AddEntries(&structs.ServiceConfigEntry{
httpUID = NewUpstreamIDFromServiceName(http)
httpChain = discoverychain.TestCompileConfigEntries(t, "http", "default", "default", "dc1", connect.TestClusterID+".consul", nil,
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,
Name: "http", Name: "http",
Protocol: "http", Protocol: "http",
}) })
var (
http = structs.NewServiceName("http", nil)
httpUID = NewUpstreamIDFromServiceName(http)
httpChain = discoverychain.TestCompileConfigEntries(t, "http", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
) )
return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -345,11 +349,11 @@ func TestConfigSnapshotIngressGatewaySDS_ServiceLevel(t testing.T) *ConfigSnapsh
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -436,11 +440,11 @@ func TestConfigSnapshotIngressGatewaySDS_ListenerAndServiceLevel(t testing.T) *C
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -526,11 +530,11 @@ func TestConfigSnapshotIngressGatewaySDS_MixedNoTLS(t testing.T) *ConfigSnapshot
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -611,11 +615,11 @@ func TestConfigSnapshotIngressGateway_MixedListeners(t testing.T) *ConfigSnapsho
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -713,22 +717,25 @@ func TestConfigSnapshotIngress_HTTPMultipleServices(t testing.T) *ConfigSnapshot
}, },
} }
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var ( var (
foo = structs.NewServiceName("foo", nil) foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo) fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
bar = structs.NewServiceName("bar", nil) bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar) barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
baz = structs.NewServiceName("baz", nil) baz = structs.NewServiceName("baz", nil)
bazUID = NewUpstreamIDFromServiceName(baz) bazUID = NewUpstreamIDFromServiceName(baz)
bazChain = discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) bazChain = discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
qux = structs.NewServiceName("qux", nil) qux = structs.NewServiceName("qux", nil)
quxUID = NewUpstreamIDFromServiceName(qux) quxUID = NewUpstreamIDFromServiceName(qux)
quxChain = discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) quxChain = discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
) )
require.False(t, fooChain.Default) require.False(t, fooChain.Default)
@ -870,14 +877,17 @@ func TestConfigSnapshotIngress_GRPCMultipleServices(t testing.T) *ConfigSnapshot
}, },
} }
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var ( var (
foo = structs.NewServiceName("foo", nil) foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo) fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
bar = structs.NewServiceName("bar", nil) bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar) barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
) )
require.False(t, fooChain.Default) require.False(t, fooChain.Default)
@ -955,11 +965,11 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C
var ( var (
foo = structs.NewServiceName("foo", nil) foo = structs.NewServiceName("foo", nil)
fooUID = NewUpstreamIDFromServiceName(foo) fooUID = NewUpstreamIDFromServiceName(foo)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
bar = structs.NewServiceName("bar", nil) bar = structs.NewServiceName("bar", nil)
barUID = NewUpstreamIDFromServiceName(bar) barUID = NewUpstreamIDFromServiceName(bar)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil) barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) { return TestConfigSnapshotIngressGateway(t, false, "http", "default", nil, func(entry *structs.IngressGatewayConfigEntry) {
@ -1231,14 +1241,17 @@ func TestConfigSnapshotIngressGatewayWithChain(
}, },
} }
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
webChain := discoverychain.TestCompileConfigEntries(t, "web", webChain := discoverychain.TestCompileConfigEntries(t, "web",
webEntMeta.NamespaceOrDefault(), webEntMeta.NamespaceOrDefault(),
webEntMeta.PartitionOrDefault(), "dc1", webEntMeta.PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...) connect.TestClusterID+".consul", nil, set)
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", fooChain := discoverychain.TestCompileConfigEntries(t, "foo",
fooEntMeta.NamespaceOrDefault(), fooEntMeta.NamespaceOrDefault(),
fooEntMeta.PartitionOrDefault(), "dc1", fooEntMeta.PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...) connect.TestClusterID+".consul", nil, set)
updates = []UpdateEvent{ updates = []UpdateEvent{
{ {
@ -1294,19 +1307,19 @@ func TestConfigSnapshotIngressGateway_TLSMinVersionListenersGatewayDefaults(t te
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s3 = structs.NewServiceName("s3", nil) s3 = structs.NewServiceName("s3", nil)
s3UID = NewUpstreamIDFromServiceName(s3) s3UID = NewUpstreamIDFromServiceName(s3)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s4 = structs.NewServiceName("s4", nil) s4 = structs.NewServiceName("s4", nil)
s4UID = NewUpstreamIDFromServiceName(s4) s4UID = NewUpstreamIDFromServiceName(s4)
s4Chain = discoverychain.TestCompileConfigEntries(t, "s4", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s4Chain = discoverychain.TestCompileConfigEntries(t, "s4", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil,
@ -1460,11 +1473,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener(t testing.T) *ConfigSnap
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil, return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) { func(entry *structs.IngressGatewayConfigEntry) {
@ -1539,11 +1552,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener_GRPC(t testing.T) *Confi
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "grpc", "simple", nil, return TestConfigSnapshotIngressGateway(t, true, "grpc", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) { func(entry *structs.IngressGatewayConfigEntry) {
@ -1618,11 +1631,11 @@ func TestConfigSnapshotIngressGateway_SingleTLSListener_HTTP2(t testing.T) *Conf
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "http2", "simple", nil, return TestConfigSnapshotIngressGateway(t, true, "http2", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) { func(entry *structs.IngressGatewayConfigEntry) {
@ -1697,11 +1710,11 @@ func TestConfigSnapshotIngressGateway_MultiTLSListener_MixedHTTP2gRPC(t testing.
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil, return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) { func(entry *structs.IngressGatewayConfigEntry) {
@ -1780,11 +1793,11 @@ func TestConfigSnapshotIngressGateway_GWTLSListener_MixedHTTP2gRPC(t testing.T)
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil, return TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil,
func(entry *structs.IngressGatewayConfigEntry) { func(entry *structs.IngressGatewayConfigEntry) {
@ -1859,15 +1872,15 @@ func TestConfigSnapshotIngressGateway_TLSMixedMinVersionListeners(t testing.T) *
var ( var (
s1 = structs.NewServiceName("s1", nil) s1 = structs.NewServiceName("s1", nil)
s1UID = NewUpstreamIDFromServiceName(s1) s1UID = NewUpstreamIDFromServiceName(s1)
s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s1Chain = discoverychain.TestCompileConfigEntries(t, "s1", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s2 = structs.NewServiceName("s2", nil) s2 = structs.NewServiceName("s2", nil)
s2UID = NewUpstreamIDFromServiceName(s2) s2UID = NewUpstreamIDFromServiceName(s2)
s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s2Chain = discoverychain.TestCompileConfigEntries(t, "s2", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
s3 = structs.NewServiceName("s3", nil) s3 = structs.NewServiceName("s3", nil)
s3UID = NewUpstreamIDFromServiceName(s3) s3UID = NewUpstreamIDFromServiceName(s3)
s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil) s3Chain = discoverychain.TestCompileConfigEntries(t, "s3", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, return TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil,

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -594,14 +595,16 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
entries = append(entries, proxyDefaults) entries = append(entries, proxyDefaults)
fallthrough // to-case: "default-services-tcp" fallthrough // to-case: "default-services-tcp"
case "default-services-tcp": case "default-services-tcp":
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var ( var (
fooSN = structs.NewServiceName("foo", nil) fooSN = structs.NewServiceName("foo", nil)
barSN = structs.NewServiceName("bar", nil) barSN = structs.NewServiceName("bar", nil)
girSN = structs.NewServiceName("gir", nil) girSN = structs.NewServiceName("gir", nil)
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
) )
assert.True(t, fooChain.Default) assert.True(t, fooChain.Default)
@ -745,11 +748,14 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
require.NoError(t, entry.Validate()) require.NoError(t, entry.Validate())
} }
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
var ( var (
dbSN = structs.NewServiceName("db", nil) dbSN = structs.NewServiceName("db", nil)
altSN = structs.NewServiceName("alt", nil) altSN = structs.NewServiceName("alt", nil)
dbChain = discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) dbChain = discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
) )
needPeerA = true needPeerA = true

View File

@ -10,6 +10,7 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -21,11 +22,11 @@ func TestConfigSnapshotTransparentProxy(t testing.T) *ConfigSnapshot {
var ( var (
google = structs.NewServiceName("google", nil) google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google) googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
noEndpoints = structs.NewServiceName("no-endpoints", nil) noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints) noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
) )
@ -129,18 +130,19 @@ func TestConfigSnapshotTransparentProxyHTTPUpstream(t testing.T, additionalEntri
}, },
}) })
set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
// DiscoveryChain without an UpstreamConfig should yield a // DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode // filter chain when in transparent proxy mode
var ( var (
google = structs.NewServiceName("google", nil) google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google) googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
entries...,
)
noEndpoints = structs.NewServiceName("no-endpoints", nil) noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints) noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
nodes = []structs.CheckServiceNode{ nodes = []structs.CheckServiceNode{
@ -253,11 +255,11 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con
var ( var (
google = structs.NewServiceName("google", nil) google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google) googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
noEndpoints = structs.NewServiceName("no-endpoints", nil) noEndpoints = structs.NewServiceName("no-endpoints", nil)
noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints) noEndpointsUID = NewUpstreamIDFromServiceName(noEndpoints)
noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) noEndpointsChain = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
) )
@ -334,20 +336,23 @@ func TestConfigSnapshotTransparentProxyCatalogDestinationsOnly(t testing.T) *Con
} }
func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot { func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot {
set := configentry.NewDiscoveryChainSet()
set.AddEntries(&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "mongo",
ConnectTimeout: 33 * time.Second,
})
// DiscoveryChain without an UpstreamConfig should yield a // DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode // filter chain when in transparent proxy mode
var ( var (
kafka = structs.NewServiceName("kafka", nil) kafka = structs.NewServiceName("kafka", nil)
kafkaUID = NewUpstreamIDFromServiceName(kafka) kafkaUID = NewUpstreamIDFromServiceName(kafka)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
mongo = structs.NewServiceName("mongo", nil) mongo = structs.NewServiceName("mongo", nil)
mongoUID = NewUpstreamIDFromServiceName(mongo) mongoUID = NewUpstreamIDFromServiceName(mongo)
mongoChain = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, &structs.ServiceResolverConfigEntry{ mongoChain = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
Kind: structs.ServiceResolver,
Name: "mongo",
ConnectTimeout: 33 * time.Second,
})
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
) )
@ -456,23 +461,23 @@ func TestConfigSnapshotTransparentProxyDialDirectly(t testing.T) *ConfigSnapshot
} }
func TestConfigSnapshotTransparentProxyResolverRedirectUpstream(t testing.T) *ConfigSnapshot { func TestConfigSnapshotTransparentProxyResolverRedirectUpstream(t testing.T) *ConfigSnapshot {
// Service-Resolver redirect with explicit upstream should spawn an outbound listener. set := configentry.NewDiscoveryChainSet()
var ( set.AddEntries(&structs.ServiceResolverConfigEntry{
db = structs.NewServiceName("db-redir", nil)
dbUID = NewUpstreamIDFromServiceName(db)
dbChain = discoverychain.TestCompileConfigEntries(t, "db-redir", "default", "default", "dc1", connect.TestClusterID+".consul", nil,
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
Name: "db-redir", Name: "db-redir",
Redirect: &structs.ServiceResolverRedirect{ Redirect: &structs.ServiceResolverRedirect{
Service: "db", Service: "db",
}, },
}, })
) // Service-Resolver redirect with explicit upstream should spawn an outbound listener.
var (
db = structs.NewServiceName("db-redir", nil)
dbUID = NewUpstreamIDFromServiceName(db)
dbChain = discoverychain.TestCompileConfigEntries(t, "db-redir", "default", "default", "dc1", connect.TestClusterID+".consul", nil, set)
google = structs.NewServiceName("google", nil) google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google) googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
) )
return TestConfigSnapshot(t, func(ns *structs.NodeService) { return TestConfigSnapshot(t, func(ns *structs.NodeService) {
@ -530,11 +535,11 @@ func TestConfigSnapshotTransparentProxyTerminatingGatewayCatalogDestinationsOnly
var ( var (
google = structs.NewServiceName("google", nil) google = structs.NewServiceName("google", nil)
googleUID = NewUpstreamIDFromServiceName(google) googleUID = NewUpstreamIDFromServiceName(google)
googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) googleChain = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
kafka = structs.NewServiceName("kafka", nil) kafka = structs.NewServiceName("kafka", nil)
kafkaUID = NewUpstreamIDFromServiceName(kafka) kafkaUID = NewUpstreamIDFromServiceName(kafka)
kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) kafkaChain = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
db = structs.NewServiceName("db", nil) db = structs.NewServiceName("db", nil)
) )

View File

@ -9,9 +9,11 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbpeering" "github.com/hashicorp/consul/proto/private/pbpeering"
) )
@ -97,7 +99,66 @@ func setupTestVariationConfigEntriesAndSnapshot(
Nodes: TestGatewayNodesDC2(t), Nodes: TestGatewayNodesDC2(t),
}, },
}) })
case "order-by-locality-failover":
cluster1UID := UpstreamID{
Name: "db",
Peer: "cluster-01",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
cluster2UID := UpstreamID{
Name: "db",
Peer: "cluster-02",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
chainID := makeChainID(structs.DiscoveryTargetOpts{Service: "db-v2"})
events = append(events,
UpdateEvent{
CorrelationID: "upstream-target:" + chainID + ":" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesAlternate(t),
},
},
UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-01",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: &pbpeering.PeeringTrustBundle{
PeerName: "peer1",
TrustDomain: "peer1.domain",
ExportedPartition: "peer1ap",
RootPEMs: []string{"peer1-root-1"},
},
},
},
UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-02",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: &pbpeering.PeeringTrustBundle{
PeerName: "peer2",
TrustDomain: "peer2.domain",
ExportedPartition: "peer2ap",
RootPEMs: []string{"peer2-root-2"},
},
},
},
UpdateEvent{
CorrelationID: "upstream-peer:" + cluster1UID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc1", "cluster-01", "10.40.1.1", false, cluster1UID.EnterpriseMeta)},
},
},
UpdateEvent{
CorrelationID: "upstream-peer:" + cluster2UID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc2", "cluster-02", "10.40.1.2", false, cluster2UID.EnterpriseMeta)},
},
},
)
case "failover-to-cluster-peer": case "failover-to-cluster-peer":
uid := UpstreamID{
Name: "db",
Peer: "cluster-01",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""),
}
events = append(events, UpdateEvent{ events = append(events, UpdateEvent{
CorrelationID: "peer-trust-bundle:cluster-01", CorrelationID: "peer-trust-bundle:cluster-01",
Result: &pbpeering.TrustBundleReadResponse{ Result: &pbpeering.TrustBundleReadResponse{
@ -109,10 +170,6 @@ func setupTestVariationConfigEntriesAndSnapshot(
}, },
}, },
}) })
uid := UpstreamID{
Name: "db",
Peer: "cluster-01",
}
if enterprise { if enterprise {
uid.EnterpriseMeta = acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), "ns9") uid.EnterpriseMeta = acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), "ns9")
} }
@ -270,6 +327,7 @@ func setupTestVariationDiscoveryChain(
) *structs.CompiledDiscoveryChain { ) *structs.CompiledDiscoveryChain {
// Compile a chain. // Compile a chain.
var ( var (
peers []*pbpeering.Peering
entries []structs.ConfigEntry entries []structs.ConfigEntry
compileSetup func(req *discoverychain.CompileRequest) compileSetup func(req *discoverychain.CompileRequest)
) )
@ -374,6 +432,49 @@ func setupTestVariationDiscoveryChain(
}, },
}, },
) )
case "order-by-locality-failover":
peers = append(peers,
&pbpeering.Peering{
Name: "cluster-01",
Remote: &pbpeering.RemoteInfo{
Locality: &pbcommon.Locality{Region: "us-west-1"},
},
},
&pbpeering.Peering{
Name: "cluster-02",
Remote: &pbpeering.RemoteInfo{
Locality: &pbcommon.Locality{Region: "us-west-2"},
},
})
cluster1Target := structs.ServiceResolverFailoverTarget{
Peer: "cluster-01",
}
cluster2Target := structs.ServiceResolverFailoverTarget{
Peer: "cluster-02",
}
entries = append(entries,
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "db",
EnterpriseMeta: entMeta,
ConnectTimeout: 33 * time.Second,
RequestTimeout: 33 * time.Second,
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Policy: &structs.ServiceResolverFailoverPolicy{
Mode: "order-by-locality",
Regions: []string{"us-west-2", "us-west-1"},
},
Targets: []structs.ServiceResolverFailoverTarget{
cluster1Target,
cluster2Target,
{Service: "db-v2"},
},
},
},
},
)
case "redirect-to-cluster-peer": case "redirect-to-cluster-peer":
redirect := &structs.ServiceResolverRedirect{ redirect := &structs.ServiceResolverRedirect{
Peer: "cluster-01", Peer: "cluster-01",
@ -930,7 +1031,12 @@ func setupTestVariationDiscoveryChain(
entries = append(entries, additionalEntries...) entries = append(entries, additionalEntries...)
} }
return discoverychain.TestCompileConfigEntries(t, "db", entMeta.NamespaceOrDefault(), entMeta.PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", compileSetup, entries...) set := configentry.NewDiscoveryChainSet()
set.AddEntries(entries...)
set.AddPeers(peers...)
return discoverychain.TestCompileConfigEntries(t, "db", entMeta.NamespaceOrDefault(), entMeta.PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", compileSetup, set)
} }
func httpMatch(http *structs.ServiceRouteHTTPMatch) *structs.ServiceRouteMatch { func httpMatch(http *structs.ServiceRouteHTTPMatch) *structs.ServiceRouteMatch {

View File

@ -1114,7 +1114,7 @@ func (e *ServiceResolverConfigEntry) Validate() error {
} }
if !f.Policy.isValid() { if !f.Policy.isValid() {
return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'default', or 'order-by-locality'", subset) return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'sequential', or 'order-by-locality'", subset)
} }
if f.ServiceSubset != "" { if f.ServiceSubset != "" {
@ -1437,8 +1437,9 @@ type ServiceResolverFailover struct {
type ServiceResolverFailoverPolicy struct { type ServiceResolverFailoverPolicy struct {
// Mode specifies the type of failover that will be performed. Valid values are // Mode specifies the type of failover that will be performed. Valid values are
// "default", "" (equivalent to "default") and "order-by-locality". // "sequential", "" (equivalent to "sequential") and "order-by-locality".
Mode string `json:",omitempty"` Mode string `json:",omitempty"`
Regions []string `json:",omitempty"`
} }
func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts { func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts {
@ -1465,7 +1466,7 @@ func (fp *ServiceResolverFailoverPolicy) isValid() bool {
switch fp.Mode { switch fp.Mode {
case "": case "":
case "default": case "sequential":
case "order-by-locality": case "order-by-locality":
default: default:
return false return false

View File

@ -183,6 +183,7 @@ type DiscoverySplit struct {
type DiscoveryFailover struct { type DiscoveryFailover struct {
Targets []string `json:",omitempty"` Targets []string `json:",omitempty"`
Policy *ServiceResolverFailoverPolicy `json:",omitempty"` Policy *ServiceResolverFailoverPolicy `json:",omitempty"`
Regions []string `json:",omitempty"`
} }
// DiscoveryTarget represents all of the inputs necessary to use a resolver // DiscoveryTarget represents all of the inputs necessary to use a resolver

View File

@ -184,6 +184,14 @@ func (o *DiscoveryFailover) DeepCopy() *DiscoveryFailover {
if o.Policy != nil { if o.Policy != nil {
cp.Policy = new(ServiceResolverFailoverPolicy) cp.Policy = new(ServiceResolverFailoverPolicy)
*cp.Policy = *o.Policy *cp.Policy = *o.Policy
if o.Policy.Regions != nil {
cp.Policy.Regions = make([]string, len(o.Policy.Regions))
copy(cp.Policy.Regions, o.Policy.Regions)
}
}
if o.Regions != nil {
cp.Regions = make([]string, len(o.Regions))
copy(cp.Regions, o.Regions)
} }
return &cp return &cp
} }
@ -917,6 +925,10 @@ func (o *ServiceResolverFailover) DeepCopy() *ServiceResolverFailover {
if o.Policy != nil { if o.Policy != nil {
cp.Policy = new(ServiceResolverFailoverPolicy) cp.Policy = new(ServiceResolverFailoverPolicy)
*cp.Policy = *o.Policy *cp.Policy = *o.Policy
if o.Policy.Regions != nil {
cp.Policy.Regions = make([]string, len(o.Policy.Regions))
copy(cp.Policy.Regions, o.Policy.Regions)
}
} }
return &cp return &cp
} }

View File

@ -3058,3 +3058,10 @@ func (l *Locality) ToAPI() *api.Locality {
Zone: l.Zone, Zone: l.Zone,
} }
} }
func (l *Locality) GetRegion() string {
if l == nil {
return ""
}
return l.Region
}

View File

@ -1268,9 +1268,14 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
return nil, err return nil, err
} }
targetGroups, err := mappedTargets.groupedTargets()
if err != nil {
return nil, err
}
var failoverClusterNames []string var failoverClusterNames []string
if mappedTargets.failover { if mappedTargets.failover {
for _, targetGroup := range mappedTargets.groupedTargets() { for _, targetGroup := range targetGroups {
failoverClusterNames = append(failoverClusterNames, targetGroup.ClusterName) failoverClusterNames = append(failoverClusterNames, targetGroup.ClusterName)
} }
@ -1298,7 +1303,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
} }
// Construct the target clusters. // Construct the target clusters.
for _, groupedTarget := range mappedTargets.groupedTargets() { for _, groupedTarget := range targetGroups {
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName) s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)
c := &envoy_cluster_v3.Cluster{ c := &envoy_cluster_v3.Cluster{
Name: groupedTarget.ClusterName, Name: groupedTarget.ClusterName,
@ -1881,124 +1886,3 @@ func (s *ResourceGenerator) getTargetClusterName(upstreamsSnapshot *proxycfg.Con
} }
return clusterName return clusterName
} }
type discoChainTargets struct {
baseClusterName string
targets []targetInfo
failover bool
}
type targetInfo struct {
TargetID string
TLSContext *envoy_tls_v3.UpstreamTlsContext
}
type discoChainTargetGroup struct {
Targets []targetInfo
ClusterName string
}
func (ft discoChainTargets) groupedTargets() []discoChainTargetGroup {
var targetGroups []discoChainTargetGroup
if !ft.failover {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: ft.baseClusterName,
Targets: ft.targets,
})
return targetGroups
}
for i, t := range ft.targets {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: fmt.Sprintf("%s%d~%s", failoverClusterNamePrefix, i, ft.baseClusterName),
Targets: []targetInfo{t},
})
}
return targetGroups
}
func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapshot, chain *structs.CompiledDiscoveryChain, node *structs.DiscoveryGraphNode, upstreamConfig structs.UpstreamConfig, forMeshGateway bool) (discoChainTargets, error) {
failoverTargets := discoChainTargets{}
if node.Resolver == nil {
return discoChainTargets{}, fmt.Errorf("impossible to process a non-resolver node")
}
primaryTargetID := node.Resolver.Target
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil && !forMeshGateway {
return discoChainTargets{}, err
}
failoverTargets.baseClusterName = s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway, false)
tids := []string{primaryTargetID}
failover := node.Resolver.Failover
if failover != nil && !forMeshGateway {
tids = append(tids, failover.Targets...)
failoverTargets.failover = true
}
for _, tid := range tids {
target := chain.Targets[tid]
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if !configureTLS {
failoverTargets.targets = append(failoverTargets.targets, ti)
continue
}
if targetUID.Peer != "" {
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
if !found {
s.Logger.Warn("failed to fetch upstream peering metadata", "target", targetUID)
continue
}
sni = peerMeta.PrimarySNI()
spiffeIDs = peerMeta.SpiffeID
} else {
sni = target.SNI
rootPEMs = cfgSnap.RootPEMs()
spiffeIDs = []string{connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Namespace: target.Namespace,
Partition: target.Partition,
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()}
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err := injectSANMatcher(commonTLSContext, spiffeIDs...)
if err != nil {
return failoverTargets, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
ti.TLSContext = tlsContext
failoverTargets.targets = append(failoverTargets.targets, ti)
}
return failoverTargets, nil
}

View File

@ -678,7 +678,12 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
return nil, err return nil, err
} }
for _, groupedTarget := range mappedTargets.groupedTargets() { targetGroups, err := mappedTargets.groupedTargets()
if err != nil {
return nil, err
}
for _, groupedTarget := range targetGroups {
clusterName := groupedTarget.ClusterName clusterName := groupedTarget.ClusterName
if escapeHatchCluster != nil { if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name clusterName = escapeHatchCluster.Name

View File

@ -1,41 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/hashicorp/consul/agent/structs"
)
func firstHealthyTarget(
targets map[string]*structs.DiscoveryTarget,
targetHealth map[string]structs.CheckServiceNodes,
primaryTarget string,
secondaryTargets []string,
) string {
series := make([]string, 0, len(secondaryTargets)+1)
series = append(series, primaryTarget)
series = append(series, secondaryTargets...)
for _, targetID := range series {
target, ok := targets[targetID]
if !ok {
continue
}
endpoints, ok := targetHealth[targetID]
if !ok {
continue
}
for _, ep := range endpoints {
healthStatus, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing)
if healthStatus == envoy_core_v3.HealthStatus_HEALTHY {
return targetID
}
}
}
return primaryTarget // if everything is broken just use the primary for now
}

View File

@ -1,145 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
func TestFirstHealthyTarget(t *testing.T) {
passing := proxycfg.TestUpstreamNodesInStatus(t, "passing")
warning := proxycfg.TestUpstreamNodesInStatus(t, "warning")
critical := proxycfg.TestUpstreamNodesInStatus(t, "critical")
warnOnlyPassingTarget := structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-warn",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
})
warnOnlyPassingTarget.Subset.OnlyPassing = true
failOnlyPassingTarget := structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-fail",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
})
failOnlyPassingTarget.Subset.OnlyPassing = true
targets := map[string]*structs.DiscoveryTarget{
"all-ok.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-ok",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-warn.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-warn",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-fail.default.default.dc1": structs.NewDiscoveryTarget(structs.DiscoveryTargetOpts{
Service: "all-fail",
Namespace: "default",
Partition: "default",
Datacenter: "dc1",
}),
"all-warn-onlypassing.default.dc1": warnOnlyPassingTarget,
"all-fail-onlypassing.default.dc1": failOnlyPassingTarget,
}
targetHealth := map[string]structs.CheckServiceNodes{
"all-ok.default.dc1": passing,
"all-warn.default.dc1": warning,
"all-fail.default.default.dc1": critical,
"all-warn-onlypassing.default.dc1": warning,
"all-fail-onlypassing.default.dc1": critical,
}
cases := []struct {
primary string
secondary []string
expect string
}{
{
primary: "all-ok.default.dc1",
expect: "all-ok.default.dc1",
},
{
primary: "all-warn.default.dc1",
expect: "all-warn.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
expect: "all-fail.default.default.dc1",
},
{
primary: "all-warn-onlypassing.default.dc1",
expect: "all-warn-onlypassing.default.dc1",
},
{
primary: "all-fail-onlypassing.default.dc1",
expect: "all-fail-onlypassing.default.dc1",
},
{
primary: "all-ok.default.dc1",
secondary: []string{
"all-warn.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-warn.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-warn.default.dc1",
},
{
primary: "all-warn-onlypassing.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail-onlypassing.default.dc1",
secondary: []string{
"all-ok.default.dc1",
},
expect: "all-ok.default.dc1",
},
{
primary: "all-fail.default.default.dc1",
secondary: []string{
"all-warn-onlypassing.default.dc1",
"all-warn.default.dc1",
"all-ok.default.dc1",
},
expect: "all-warn.default.dc1",
},
}
for _, tc := range cases {
tc := tc
name := fmt.Sprintf("%s and %v", tc.primary, tc.secondary)
t.Run(name, func(t *testing.T) {
targetID := firstHealthyTarget(targets, targetHealth, tc.primary, tc.secondary)
require.Equal(t, tc.expect, targetID)
})
}
}

View File

@ -0,0 +1,154 @@
package xds
import (
"fmt"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
type discoChainTargets struct {
baseClusterName string
targets []targetInfo
failover bool
failoverPolicy structs.ServiceResolverFailoverPolicy
}
type targetInfo struct {
TargetID string
TLSContext *envoy_tls_v3.UpstreamTlsContext
// Region is the region from the failover target's Locality. nil means the
// target is in the local Consul cluster.
Region *string
}
type discoChainTargetGroup struct {
Targets []targetInfo
ClusterName string
}
func (ft discoChainTargets) groupedTargets() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
if !ft.failover {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: ft.baseClusterName,
Targets: ft.targets,
})
return targetGroups, nil
}
switch ft.failoverPolicy.Mode {
case "sequential", "":
return ft.sequential()
case "order-by-locality":
return ft.orderByLocality()
default:
return targetGroups, fmt.Errorf("unexpected failover policy")
}
}
func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapshot, chain *structs.CompiledDiscoveryChain, node *structs.DiscoveryGraphNode, upstreamConfig structs.UpstreamConfig, forMeshGateway bool) (discoChainTargets, error) {
failoverTargets := discoChainTargets{}
if node.Resolver == nil {
return discoChainTargets{}, fmt.Errorf("impossible to process a non-resolver node")
}
primaryTargetID := node.Resolver.Target
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil && !forMeshGateway {
return discoChainTargets{}, err
}
failoverTargets.baseClusterName = s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway, false)
tids := []string{primaryTargetID}
failover := node.Resolver.Failover
if failover != nil && !forMeshGateway {
tids = append(tids, failover.Targets...)
failoverTargets.failover = true
if failover.Policy == nil {
failoverTargets.failoverPolicy = structs.ServiceResolverFailoverPolicy{}
} else {
failoverTargets.failoverPolicy = *failover.Policy
}
}
for _, tid := range tids {
target := chain.Targets[tid]
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if !configureTLS {
failoverTargets.targets = append(failoverTargets.targets, ti)
continue
}
if targetUID.Peer != "" {
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
if !found {
s.Logger.Warn("failed to fetch upstream peering metadata", "target", targetUID)
continue
}
sni = peerMeta.PrimarySNI()
spiffeIDs = peerMeta.SpiffeID
region := target.Locality.GetRegion()
ti.Region = &region
} else {
sni = target.SNI
rootPEMs = cfgSnap.RootPEMs()
spiffeIDs = []string{connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Namespace: target.Namespace,
Partition: target.Partition,
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()}
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err := injectSANMatcher(commonTLSContext, spiffeIDs...)
if err != nil {
return failoverTargets, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
ti.TLSContext = tlsContext
failoverTargets.targets = append(failoverTargets.targets, ti)
}
return failoverTargets, nil
}
func (ft discoChainTargets) sequential() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
for i, t := range ft.targets {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: fmt.Sprintf("%s%d~%s", failoverClusterNamePrefix, i, ft.baseClusterName),
Targets: []targetInfo{t},
})
}
return targetGroups, nil
}

View File

@ -0,0 +1,12 @@
//go:build !consulent
// +build !consulent
package xds
import (
"fmt"
)
func (ft discoChainTargets) orderByLocality() ([]discoChainTargetGroup, error) {
return nil, fmt.Errorf("order-by-locality is a Consul Enterprise feature")
}

View File

@ -332,7 +332,7 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
service := structs.NewServiceName("service", nil) service := structs.NewServiceName("service", nil)
serviceUID := proxycfg.NewUpstreamIDFromServiceName(service) serviceUID := proxycfg.NewUpstreamIDFromServiceName(service)
serviceChain := discoverychain.TestCompileConfigEntries(t, "service", "default", "default", "dc1", connect.TestClusterID+".consul", nil) serviceChain := discoverychain.TestCompileConfigEntries(t, "service", "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
return []goldenTestCase{ return []goldenTestCase{
{ {

View File

@ -1447,12 +1447,14 @@ func ServiceResolverFailoverPolicyToStructs(s *ServiceResolverFailoverPolicy, t
return return
} }
t.Mode = s.Mode t.Mode = s.Mode
t.Regions = s.Regions
} }
func ServiceResolverFailoverPolicyFromStructs(t *structs.ServiceResolverFailoverPolicy, s *ServiceResolverFailoverPolicy) { func ServiceResolverFailoverPolicyFromStructs(t *structs.ServiceResolverFailoverPolicy, s *ServiceResolverFailoverPolicy) {
if s == nil { if s == nil {
return return
} }
s.Mode = t.Mode s.Mode = t.Mode
s.Regions = t.Regions
} }
func ServiceResolverFailoverTargetToStructs(s *ServiceResolverFailoverTarget, t *structs.ServiceResolverFailoverTarget) { func ServiceResolverFailoverTargetToStructs(s *ServiceResolverFailoverTarget, t *structs.ServiceResolverFailoverTarget) {
if s == nil { if s == nil {

File diff suppressed because it is too large Load Diff

View File

@ -175,6 +175,7 @@ message ServiceResolverFailover {
// name=Structs // name=Structs
message ServiceResolverFailoverPolicy { message ServiceResolverFailoverPolicy {
string Mode = 1; string Mode = 1;
repeated string Regions = 2;
} }
// mog annotation: // mog annotation: