Return mesh gateway addrs if peering through mgw
This commit is contained in:
parent
e30f4c3a8a
commit
17463472b7
|
@ -9,10 +9,12 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
||||
|
@ -57,9 +59,38 @@ func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) {
|
|||
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
|
||||
}
|
||||
|
||||
// GetServerAddresses looks up server node addresses from the state store.
|
||||
// GetServerAddresses looks up server or mesh gateway addresses from the state store.
|
||||
func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
|
||||
state := b.srv.fsm.State()
|
||||
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read mesh config entry: %w", err)
|
||||
}
|
||||
|
||||
meshConfig, ok := rawEntry.(*structs.MeshConfigEntry)
|
||||
if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways {
|
||||
return meshGatewayAdresses(b.srv.fsm.State())
|
||||
}
|
||||
return serverAddresses(b.srv.fsm.State())
|
||||
}
|
||||
|
||||
func meshGatewayAdresses(state *state.Store) ([]string, error) {
|
||||
_, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to dump gateway addresses: %w", err)
|
||||
}
|
||||
|
||||
var addrs []string
|
||||
for _, node := range nodes {
|
||||
_, addr, port := node.BestAddress(true)
|
||||
addrs = append(addrs, ipaddr.FormatAddressPort(addr, port))
|
||||
}
|
||||
if len(addrs) == 0 {
|
||||
return nil, fmt.Errorf("servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
|
||||
}
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
func serverAddresses(state *state.Store) ([]string, error) {
|
||||
_, nodes, err := state.ServiceNodes(nil, "consul", structs.DefaultEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -2,10 +2,13 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/stretchr/testify/require"
|
||||
gogrpc "google.golang.org/grpc"
|
||||
|
||||
|
@ -17,7 +20,9 @@ import (
|
|||
)
|
||||
|
||||
func TestPeeringBackend_ForwardToLeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, conf1 := testServerConfig(t)
|
||||
server1, err := newServer(t, conf1)
|
||||
|
@ -60,6 +65,83 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPeeringBackend_GetServerAddresses(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, cfg := testServerConfig(t)
|
||||
cfg.GRPCTLSPort = 8505
|
||||
|
||||
srv, err := newServer(t, cfg)
|
||||
require.NoError(t, err)
|
||||
testrpc.WaitForLeader(t, srv.RPC, "dc1")
|
||||
|
||||
backend := NewPeeringBackend(srv)
|
||||
|
||||
testutil.RunStep(t, "peer to servers", func(t *testing.T) {
|
||||
addrs, err := backend.GetServerAddresses()
|
||||
require.NoError(t, err)
|
||||
|
||||
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
|
||||
require.Equal(t, []string{expect}, addrs)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "existence of mesh config entry is not enough to peer through gateways", func(t *testing.T) {
|
||||
mesh := structs.MeshConfigEntry{
|
||||
// Enable unrelated config.
|
||||
TransparentProxy: structs.TransparentProxyMeshConfig{
|
||||
MeshDestinationsOnly: true,
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
|
||||
addrs, err := backend.GetServerAddresses()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Still expect server address because PeerThroughMeshGateways was not enabled.
|
||||
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
|
||||
require.Equal(t, []string{expect}, addrs)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "cannot peer through gateways without registered gateways", func(t *testing.T) {
|
||||
mesh := structs.MeshConfigEntry{
|
||||
Peering: &structs.PeeringMeshConfig{PeerThroughMeshGateways: true},
|
||||
}
|
||||
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
|
||||
|
||||
addrs, err := backend.GetServerAddresses()
|
||||
require.Nil(t, addrs)
|
||||
testutil.RequireErrorContains(t, err,
|
||||
"servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "peer through mesh gateways", func(t *testing.T) {
|
||||
reg := structs.RegisterRequest{
|
||||
ID: types.NodeID("b5489ca9-f5e9-4dba-a779-61fec4e8e364"),
|
||||
Node: "gw-node",
|
||||
Address: "1.2.3.4",
|
||||
TaggedAddresses: map[string]string{
|
||||
structs.TaggedAddressWAN: "172.217.22.14",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "mesh-gateway",
|
||||
Service: "mesh-gateway",
|
||||
Kind: structs.ServiceKindMeshGateway,
|
||||
Port: 443,
|
||||
TaggedAddresses: map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressWAN: {Address: "154.238.12.252", Port: 8443},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(t, srv.fsm.State().EnsureRegistration(2, ®))
|
||||
|
||||
addrs, err := backend.GetServerAddresses()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"154.238.12.252:8443"}, addrs)
|
||||
})
|
||||
}
|
||||
|
||||
func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
|
||||
return func(ctx context.Context, addr string) (net.Conn, error) {
|
||||
d := net.Dialer{}
|
||||
|
@ -79,7 +161,9 @@ func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn,
|
|||
}
|
||||
|
||||
func TestPeerStreamService_ForwardToLeader(t *testing.T) {
|
||||
t.Parallel()
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, conf1 := testServerConfig(t)
|
||||
server1, err := newServer(t, conf1)
|
||||
|
|
|
@ -117,7 +117,8 @@ type Backend interface {
|
|||
// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
|
||||
GetAgentCACertificates() ([]string, error)
|
||||
|
||||
// GetServerAddresses returns the addresses used for establishing a peering connection
|
||||
// GetServerAddresses returns the addresses used for establishing a peering connection.
|
||||
// These may be server addresses or mesh gateway addresses if peering through mesh gateways.
|
||||
GetServerAddresses() ([]string, error)
|
||||
|
||||
// GetServerName returns the SNI to be returned in the peering token data which
|
||||
|
|
Loading…
Reference in New Issue