Merge pull request #12223 from hashicorp/proxycfg/passthrough-cleanup

This commit is contained in:
Freddy 2022-02-10 17:35:51 -07:00 committed by GitHub
commit f45bec7779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 502 additions and 167 deletions

3
.changelog/12223.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: fixes bug where passthrough addressses for transparent proxies dialed directly weren't being cleaned up.
```

View File

@ -455,7 +455,7 @@ func retainGateways(full structs.CheckServiceNodes) structs.CheckServiceNodes {
func renderGatewayAddrs(gateways structs.CheckServiceNodes, wan bool) []string {
out := make([]string, 0, len(gateways))
for _, csn := range gateways {
addr, port := csn.BestAddress(wan)
_, addr, port := csn.BestAddress(wan)
completeAddr := ipaddr.FormatAddressPort(addr, port)
out = append(out, completeAddr)
}

View File

@ -27,7 +27,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
@ -326,6 +327,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
}
}
for uid := range snap.ConnectProxy.PassthroughUpstreams {
if _, ok := seenUpstreams[uid]; !ok {
delete(snap.ConnectProxy.PassthroughUpstreams, uid)
}
}
for addr, indexed := range snap.ConnectProxy.PassthroughIndices {
if _, ok := seenUpstreams[indexed.upstreamID]; !ok {
delete(snap.ConnectProxy.PassthroughIndices, addr)
}
}
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
// There have been situations where a discovery watch was cancelled, then fired.
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up

View File

@ -234,7 +234,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -292,7 +293,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},

View File

@ -45,6 +45,25 @@ func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID {
return id
}
func NewUpstreamIDFromTargetID(tid string) UpstreamID {
// Drop the leading subset if one is present in the target ID.
separators := strings.Count(tid, ".")
if separators > 3 {
prefix := tid[:strings.Index(tid, ".")+1]
tid = strings.TrimPrefix(tid, prefix)
}
split := strings.SplitN(tid, ".", 4)
id := UpstreamID{
Name: split[0],
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(split[2], split[1]),
Datacenter: split[3],
}
id.normalize()
return id
}
func (u *UpstreamID) normalize() {
if u.Type == structs.UpstreamDestTypeService {
u.Type = ""

View File

@ -8,6 +8,43 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
// TODO(freddy): Needs enterprise test
func TestUpstreamIDFromTargetID(t *testing.T) {
type testcase struct {
tid string
expect UpstreamID
}
run := func(t *testing.T, tc testcase) {
tc.expect.EnterpriseMeta.Normalize()
got := NewUpstreamIDFromTargetID(tc.tid)
require.Equal(t, tc.expect, got)
}
cases := map[string]testcase{
"with subset": {
tid: "v1.foo.default.default.dc2",
expect: UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
},
"without subset": {
tid: "foo.default.default.dc2",
expect: UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestUpstreamIDFromString(t *testing.T) {
type testcase struct {
id string

View File

@ -9,7 +9,6 @@ import (
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
)
@ -52,8 +51,16 @@ type ConfigSnapshotUpstreams struct {
// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[UpstreamID]*structs.Upstream
// PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs.
PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs
// PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID ->
// (set of IP addresses)). It contains the upstream endpoints that
// can be dialed directly by a transparent proxy.
PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{}
// PassthroughIndices is a map of: address -> indexedTarget.
// It is used to track the modify index associated with a passthrough address.
// Tracking this index helps break ties when a single address is shared by
// more than one upstream due to a race.
PassthroughIndices map[string]indexedTarget
// IntentionUpstreams is a set of upstreams inferred from intentions.
//
@ -61,6 +68,14 @@ type ConfigSnapshotUpstreams struct {
IntentionUpstreams map[UpstreamID]struct{}
}
// indexedTarget is used to associate the Raft modify index of a resource
// with the corresponding upstream target.
type indexedTarget struct {
upstreamID UpstreamID
targetID string
idx uint64
}
type GatewayKey struct {
Datacenter string
Partition string
@ -91,18 +106,6 @@ func gatewayKeyFromString(s string) GatewayKey {
return GatewayKey{Partition: split[0], Datacenter: split[1]}
}
// ServicePassthroughAddrs contains the LAN addrs
type ServicePassthroughAddrs struct {
// SNI is the Service SNI of the upstream.
SNI string
// SpiffeID is the SPIFFE ID to use for upstream SAN validation.
SpiffeID connect.SpiffeIDService
// Addrs is a set of the best LAN addresses for the instances of the upstream.
Addrs map[string]struct{}
}
type configSnapshotConnectProxy struct {
ConfigSnapshotUpstreams

View File

@ -412,7 +412,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C
)
for _, n := range nodes {
addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault()))
_, addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault()))
if net.ParseIP(addr) != nil {
hasIP = true
continue

View File

@ -12,7 +12,6 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
@ -1892,8 +1891,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
Datacenter: "dc1",
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
@ -1910,12 +1910,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
DialedDirectly: true,
},
},
RaftIndex: structs.RaftIndex{
ModifyIndex: 12,
},
},
},
{
Node: &structs.Node{
Node: "node2",
Address: "10.0.0.2",
Datacenter: "dc1",
Node: "node2",
Address: "10.0.0.2",
RaftIndex: structs.RaftIndex{
ModifyIndex: 21,
},
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
@ -1943,8 +1950,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
Datacenter: "dc1",
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
@ -1961,12 +1969,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
DialedDirectly: true,
},
},
RaftIndex: structs.RaftIndex{
ModifyIndex: 12,
},
},
},
{
Node: &structs.Node{
Node: "node2",
Address: "10.0.0.2",
Datacenter: "dc1",
Node: "node2",
Address: "10.0.0.2",
RaftIndex: structs.RaftIndex{
ModifyIndex: 21,
},
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
@ -1985,22 +2000,26 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// The LAN service address is used below because transparent proxying
// does not support querying service nodes in other DCs, and the WAN address
// should not be used in DC-local calls.
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
dbUID: {
SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain),
SpiffeID: connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
Namespace: db.NamespaceOrDefault(),
Partition: db.PartitionOrDefault(),
Datacenter: snap.Datacenter,
Service: "db",
},
Addrs: map[string]struct{}{
"db.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.0.0.2": {},
},
},
})
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
"10.0.0.2": {
upstreamID: dbUID,
targetID: "db.default.default.dc1",
idx: 21,
},
"10.10.10.10": {
upstreamID: dbUID,
targetID: "db.default.default.dc1",
idx: 12,
},
})
},
},
// Discovery chain updates should be stored
@ -2041,8 +2060,194 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1")
},
},
// Empty list of upstreams should clean everything up
{
// Receive a new upstream target event without proxy1.
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
Address: "10.0.0.2",
RaftIndex: structs.RaftIndex{
ModifyIndex: 21,
},
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "db-sidecar-proxy2",
Service: "db-sidecar-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "db",
TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true,
},
},
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1")
// THe endpoint and passthrough address for proxy1 should be gone.
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
Address: "10.0.0.2",
RaftIndex: structs.RaftIndex{
ModifyIndex: 21,
},
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "db-sidecar-proxy2",
Service: "db-sidecar-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "db",
TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true,
},
},
},
},
},
)
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
dbUID: {
"db.default.default.dc1": map[string]struct{}{
"10.0.0.2": {},
},
},
})
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
"10.0.0.2": {
upstreamID: dbUID,
targetID: "db.default.default.dc1",
idx: 21,
},
})
},
},
{
// Receive a new upstream target event with a conflicting passthrough address
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-sidecar-proxy",
Service: "api-sidecar-proxy",
Address: "10.0.0.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true,
},
},
RaftIndex: structs.RaftIndex{
ModifyIndex: 32,
},
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1")
// THe endpoint and passthrough address for proxy1 should be gone.
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-sidecar-proxy",
Service: "api-sidecar-proxy",
Address: "10.0.0.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: structs.TransparentProxyConfig{
DialedDirectly: true,
},
},
RaftIndex: structs.RaftIndex{
ModifyIndex: 32,
},
},
},
},
)
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
apiUID: {
// This target has a higher index so the old passthrough address should be discarded.
"api.default.default.dc1": map[string]struct{}{
"10.0.0.2": {},
},
},
})
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
"10.0.0.2": {
upstreamID: apiUID,
targetID: "api.default.default.dc1",
idx: 32,
},
})
},
},
{
// Event with no nodes should clean up addrs
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1")
// The endpoint and passthrough address for proxy1 should be gone.
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"])
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[apiUID]["api.default.default.dc1"])
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
},
},
{
// Empty list of upstreams should clean up map keys
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
@ -2070,6 +2275,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints)
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
require.Empty(t, snap.ConnectProxy.IntentionUpstreams)
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams)
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
},
},
},

View File

@ -9,8 +9,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
@ -92,55 +90,53 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
}
upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes
var passthroughAddrs map[string]ServicePassthroughAddrs
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
return nil
}
// Clear out this target's existing passthrough upstreams and indices so that they can be repopulated below.
if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; ok {
for addr := range upstreamsSnapshot.PassthroughUpstreams[uid][targetID] {
if indexed := upstreamsSnapshot.PassthroughIndices[addr]; indexed.targetID == targetID && indexed.upstreamID == uid {
delete(upstreamsSnapshot.PassthroughIndices, addr)
}
}
upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{})
}
passthroughs := make(map[string]struct{})
for _, node := range resp.Nodes {
if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly {
if passthroughAddrs == nil {
passthroughAddrs = make(map[string]ServicePassthroughAddrs)
}
if !node.Service.Proxy.TransparentProxy.DialedDirectly {
continue
}
svc := node.Service.CompoundServiceName()
// Make sure to use an external address when crossing partition or DC boundaries.
isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
csnIdx, addr, _ := node.BestAddress(isRemote)
// Overwrite the name if it's a connect proxy (as opposed to Connect native).
// We don't reference the proxy name directly for things like SNI, but rather the name
// of the destination. The enterprise meta of a proxy will always be the same as that of
// the destination service, so that remains intact.
if node.Service.Kind == structs.ServiceKindConnectProxy {
dst := node.Service.Proxy.DestinationServiceName
if dst == "" {
dst = node.Service.Proxy.DestinationServiceID
}
svc.Name = dst
}
existing := upstreamsSnapshot.PassthroughIndices[addr]
if existing.idx > csnIdx {
// The last known instance with this address had a higher index so it takes precedence.
continue
}
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), snap.Datacenter, snap.Roots.TrustDomain)
// The current instance has a higher Raft index so we ensure the passthrough address is only
// associated with this upstream target. Older associations are cleaned up as needed.
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID], addr)
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID]) == 0 {
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID], existing.targetID)
}
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID]) == 0 {
delete(upstreamsSnapshot.PassthroughUpstreams, existing.upstreamID)
}
spiffeID := connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
Partition: svc.PartitionOrDefault(),
Namespace: svc.NamespaceOrDefault(),
Datacenter: snap.Datacenter,
Service: svc.Name,
}
svcUID := NewUpstreamIDFromServiceName(svc)
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok {
upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{
SNI: sni,
SpiffeID: spiffeID,
// Stored in a set because it's possible for these to be duplicated
// when the upstream-target is targeted by multiple discovery chains.
Addrs: make(map[string]struct{}),
}
}
// Make sure to use an external address when crossing partitions.
isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
addr, _ := node.BestAddress(isRemote)
upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{}
upstreamsSnapshot.PassthroughIndices[addr] = indexedTarget{idx: csnIdx, upstreamID: uid, targetID: targetID}
passthroughs[addr] = struct{}{}
}
if len(passthroughs) > 0 {
upstreamsSnapshot.PassthroughUpstreams[uid] = map[string]map[string]struct{}{
targetID: passthroughs,
}
}

View File

@ -1741,7 +1741,7 @@ type CheckServiceNode struct {
Checks HealthChecks
}
func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) {
func (csn *CheckServiceNode) BestAddress(wan bool) (uint64, string, int) {
// TODO (mesh-gateway) needs a test
// best address
// wan
@ -1754,12 +1754,14 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) {
// node addr
addr, port := csn.Service.BestAddress(wan)
idx := csn.Service.ModifyIndex
if addr == "" {
addr = csn.Node.BestAddress(wan)
idx = csn.Node.ModifyIndex
}
return addr, port
return idx, addr, port
}
func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision {

View File

@ -2105,14 +2105,18 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
input CheckServiceNode
lanAddr string
lanPort int
lanIdx uint64
wanAddr string
wanPort int
wanIdx uint64
}
nodeAddr := "10.1.2.3"
nodeWANAddr := "198.18.19.20"
nodeIdx := uint64(11)
serviceAddr := "10.2.3.4"
servicePort := 1234
serviceIdx := uint64(22)
serviceWANAddr := "198.19.20.21"
serviceWANPort := 987
@ -2121,15 +2125,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
input: CheckServiceNode{
Node: &Node{
Address: nodeAddr,
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Port: servicePort,
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: nodeAddr,
lanIdx: nodeIdx,
lanPort: servicePort,
wanAddr: nodeAddr,
wanIdx: nodeIdx,
wanPort: servicePort,
},
"node-wan-address": {
@ -2139,15 +2151,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
TaggedAddresses: map[string]string{
"wan": nodeWANAddr,
},
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Port: servicePort,
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: nodeAddr,
lanIdx: nodeIdx,
lanPort: servicePort,
wanAddr: nodeWANAddr,
wanIdx: nodeIdx,
wanPort: servicePort,
},
"service-address": {
@ -2158,16 +2178,24 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
TaggedAddresses: map[string]string{
"wan": nodeWANAddr,
},
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Address: serviceAddr,
Port: servicePort,
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: serviceAddr,
lanIdx: serviceIdx,
lanPort: servicePort,
wanAddr: serviceAddr,
wanIdx: serviceIdx,
wanPort: servicePort,
},
"service-wan-address": {
@ -2178,6 +2206,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
TaggedAddresses: map[string]string{
"wan": nodeWANAddr,
},
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Address: serviceAddr,
@ -2188,12 +2219,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
Port: serviceWANPort,
},
},
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: serviceAddr,
lanIdx: serviceIdx,
lanPort: servicePort,
wanAddr: serviceWANAddr,
wanIdx: serviceIdx,
wanPort: serviceWANPort,
},
"service-wan-address-default-port": {
@ -2204,6 +2240,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
TaggedAddresses: map[string]string{
"wan": nodeWANAddr,
},
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Address: serviceAddr,
@ -2214,12 +2253,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
Port: 0,
},
},
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: serviceAddr,
lanIdx: serviceIdx,
lanPort: servicePort,
wanAddr: serviceWANAddr,
wanIdx: serviceIdx,
wanPort: servicePort,
},
"service-wan-address-node-lan": {
@ -2230,6 +2274,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
TaggedAddresses: map[string]string{
"wan": nodeWANAddr,
},
RaftIndex: RaftIndex{
ModifyIndex: nodeIdx,
},
},
Service: &NodeService{
Port: servicePort,
@ -2239,12 +2286,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
Port: serviceWANPort,
},
},
RaftIndex: RaftIndex{
ModifyIndex: serviceIdx,
},
},
},
lanAddr: nodeAddr,
lanIdx: nodeIdx,
lanPort: servicePort,
wanAddr: serviceWANAddr,
wanIdx: serviceIdx,
wanPort: serviceWANPort,
},
}
@ -2254,13 +2306,15 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
tc := tc
t.Run(name, func(t *testing.T) {
addr, port := tc.input.BestAddress(false)
idx, addr, port := tc.input.BestAddress(false)
require.Equal(t, tc.lanAddr, addr)
require.Equal(t, tc.lanPort, port)
require.Equal(t, tc.lanIdx, idx)
addr, port = tc.input.BestAddress(true)
idx, addr, port = tc.input.BestAddress(true)
require.Equal(t, tc.wanAddr, addr)
require.Equal(t, tc.wanPort, port)
require.Equal(t, tc.wanIdx, idx)
})
}
}

View File

@ -171,36 +171,51 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
})
}
for _, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
name := "passthrough~" + passthrough.SNI
for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for tid := range target {
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
c := envoy_cluster_v3.Cluster{
Name: name,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
},
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
sni := connect.ServiceSNI(
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
// TODO(tproxy) This should use the connection timeout configured on the upstream's config entry
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
}
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
name := "passthrough~" + sni
commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf())
err := injectSANMatcher(commonTLSContext, passthrough.SpiffeID)
if err != nil {
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", passthrough.SNI, err)
c := envoy_cluster_v3.Cluster{
Name: name,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
},
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
// TODO(tproxy) This should use the connection timeout configured on the upstream's config entry
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
}
spiffeID := connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Partition: uid.PartitionOrDefault(),
Namespace: uid.NamespaceOrDefault(),
Datacenter: cfgSnap.Datacenter,
Service: uid.Name,
}
commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf())
err := injectSANMatcher(commonTLSContext, spiffeID)
if err != nil {
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext)
if err != nil {
return nil, err
}
c.TransportSocket = transportSocket
clusters = append(clusters, &c)
}
tlsContext := envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: passthrough.SNI,
}
transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext)
if err != nil {
return nil, err
}
c.TransportSocket = transportSocket
clusters = append(clusters, &c)
}
return clusters, nil
@ -892,7 +907,7 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
fallback *envoy_endpoint_v3.LbEndpoint
)
for i, e := range opts.hostnameEndpoints {
addr, port := e.BestAddress(opts.isRemote)
_, addr, port := e.BestAddress(opts.isRemote)
uniqueHostnames[addr] = true
health, weight := calculateEndpointHealthAndWeight(e, opts.onlyPassing)

View File

@ -678,28 +678,14 @@ func TestClustersFromSnapshot(t *testing.T) {
}
// We add a passthrough cluster for each upstream service name
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Namespace: "default",
Datacenter: "dc1",
Service: "kafka",
},
Addrs: map[string]struct{}{
"kafka.default.default.dc1": map[string]struct{}{
"9.9.9.9": {},
},
},
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Namespace: "default",
Datacenter: "dc1",
Service: "mongo",
},
Addrs: map[string]struct{}{
"mongo.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.10.10.12": {},
},

View File

@ -221,7 +221,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
addr, port := srv.BestAddress(false /*wan*/)
_, addr, port := srv.BestAddress(false /*wan*/)
lbEndpoint := &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
@ -512,7 +512,7 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
for _, ep := range endpoints {
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {

View File

@ -218,26 +218,27 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// as opposed to via a virtual IP.
var passthroughChains []*envoy_listener_v3.FilterChain
for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
u := structs.Upstream{
DestinationName: uid.Name,
DestinationNamespace: uid.NamespaceOrDefault(),
DestinationPartition: uid.PartitionOrDefault(),
for _, targets := range cfgSnap.ConnectProxy.PassthroughUpstreams {
for tid, addrs := range targets {
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
sni := connect.ServiceSNI(
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
filterName := fmt.Sprintf("%s.%s.%s.%s", uid.Name, uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter)
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
clusterName: "passthrough~" + sni,
filterName: filterName,
protocol: "tcp",
})
if err != nil {
return nil, err
}
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(addrs)
passthroughChains = append(passthroughChains, filterChain)
}
filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter)
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
clusterName: "passthrough~" + passthrough.SNI,
filterName: filterName,
protocol: "tcp",
})
if err != nil {
return nil, err
}
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(passthrough.Addrs)
passthroughChains = append(passthroughChains, filterChain)
}
outboundListener.FilterChains = append(outboundListener.FilterChains, passthroughChains...)

View File

@ -1211,16 +1211,14 @@ func TestListenersFromSnapshot(t *testing.T) {
// We add a filter chains for each passthrough service name.
// The filter chain will route to a cluster with the same SNI name.
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"kafka.default.default.dc1": map[string]struct{}{
"9.9.9.9": {},
},
},
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"mongo.default.default.dc1": map[string]struct{}{
"10.10.10.10": {},
"10.10.10.12": {},
},

View File

@ -206,7 +206,7 @@
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
"name": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED",
@ -234,18 +234,18 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/kafka"
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/kafka"
}
]
}
},
"sni": "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"sni": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
"name": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED",
@ -273,12 +273,12 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/mongo"
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo"
}
]
}
},
"sni": "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"sni": "mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
}

View File

@ -55,7 +55,7 @@
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.mongo.default.default.dc1",
"cluster": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"cluster": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
@ -95,7 +95,7 @@
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.kafka.default.default.dc1",
"cluster": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
"cluster": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]