peering, internal: support UIServices, UINodes, UINodeInfo (#13577)
This commit is contained in:
parent
5538ba212f
commit
4333312be9
|
@ -69,18 +69,60 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, args.PeerName)
|
// we don't support calling this endpoint for a specific peer
|
||||||
if err != nil {
|
if args.PeerName != "" {
|
||||||
return err
|
return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName)
|
||||||
}
|
}
|
||||||
reply.Index, reply.Dump = index, dump
|
|
||||||
|
// this maxIndex will be the max of the NodeDump calls and the PeeringList call
|
||||||
|
var maxIndex uint64
|
||||||
|
// Get data for local nodes
|
||||||
|
index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, structs.DefaultPeerKeyword)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get a node dump for local nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
|
}
|
||||||
|
reply.Dump = dump
|
||||||
|
|
||||||
|
// get a list of all peerings
|
||||||
|
index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not list peers for node dump %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
|
}
|
||||||
|
|
||||||
|
// get node dumps for all peerings
|
||||||
|
for _, p := range listedPeerings {
|
||||||
|
index, importedDump, err := state.NodeDump(ws, &args.EnterpriseMeta, p.Name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get a node dump for peer %q: %w", p.Name, err)
|
||||||
|
}
|
||||||
|
reply.ImportedDump = append(reply.ImportedDump, importedDump...)
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reply.Index = maxIndex
|
||||||
|
|
||||||
raw, err := filter.Execute(reply.Dump)
|
raw, err := filter.Execute(reply.Dump)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not filter local node dump: %w", err)
|
||||||
}
|
}
|
||||||
reply.Dump = raw.(structs.NodeDump)
|
reply.Dump = raw.(structs.NodeDump)
|
||||||
|
|
||||||
|
importedRaw, err := filter.Execute(reply.ImportedDump)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not filter peer node dump: %w", err)
|
||||||
|
}
|
||||||
|
reply.ImportedDump = importedRaw.(structs.NodeDump)
|
||||||
|
|
||||||
// Note: we filter the results with ACLs *after* applying the user-supplied
|
// Note: we filter the results with ACLs *after* applying the user-supplied
|
||||||
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
||||||
// results that would be filtered out even if the user did have permission.
|
// results that would be filtered out even if the user did have permission.
|
||||||
|
@ -111,13 +153,47 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
// Get, store, and filter nodes
|
// we don't support calling this endpoint for a specific peer
|
||||||
maxIdx, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, args.PeerName)
|
if args.PeerName != "" {
|
||||||
|
return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// this maxIndex will be the max of the ServiceDump calls and the PeeringList call
|
||||||
|
var maxIndex uint64
|
||||||
|
|
||||||
|
// get a local dump for services
|
||||||
|
index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, structs.DefaultPeerKeyword)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not get a service dump for local nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
}
|
}
|
||||||
reply.Nodes = nodes
|
reply.Nodes = nodes
|
||||||
|
|
||||||
|
// get a list of all peerings
|
||||||
|
index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not list peers for service dump %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range listedPeerings {
|
||||||
|
index, importedNodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, p.Name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get a service dump for peer %q: %w", p.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if index > maxIndex {
|
||||||
|
maxIndex = index
|
||||||
|
}
|
||||||
|
reply.ImportedNodes = append(reply.ImportedNodes, importedNodes...)
|
||||||
|
}
|
||||||
|
|
||||||
// Get, store, and filter gateway services
|
// Get, store, and filter gateway services
|
||||||
idx, gatewayServices, err := state.DumpGatewayServices(ws)
|
idx, gatewayServices, err := state.DumpGatewayServices(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -125,17 +201,23 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
|
||||||
}
|
}
|
||||||
reply.Gateways = gatewayServices
|
reply.Gateways = gatewayServices
|
||||||
|
|
||||||
if idx > maxIdx {
|
if idx > maxIndex {
|
||||||
maxIdx = idx
|
maxIndex = idx
|
||||||
}
|
}
|
||||||
reply.Index = maxIdx
|
reply.Index = maxIndex
|
||||||
|
|
||||||
raw, err := filter.Execute(reply.Nodes)
|
raw, err := filter.Execute(reply.Nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("could not filter local service dump: %w", err)
|
||||||
}
|
}
|
||||||
reply.Nodes = raw.(structs.CheckServiceNodes)
|
reply.Nodes = raw.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
|
importedRaw, err := filter.Execute(reply.ImportedNodes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not filter peer service dump: %w", err)
|
||||||
|
}
|
||||||
|
reply.ImportedNodes = importedRaw.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
// Note: we filter the results with ACLs *after* applying the user-supplied
|
// Note: we filter the results with ACLs *after* applying the user-supplied
|
||||||
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
||||||
// results that would be filtered out even if the user did have permission.
|
// results that would be filtered out even if the user did have permission.
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -17,6 +18,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/lib/stringslice"
|
"github.com/hashicorp/consul/lib/stringslice"
|
||||||
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
@ -29,56 +31,79 @@ func TestInternal_NodeInfo(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir1, s1 := testServer(t)
|
_, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
codec := rpcClient(t, s1)
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
arg := structs.RegisterRequest{
|
args := []*structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
{
|
||||||
Node: "foo",
|
Datacenter: "dc1",
|
||||||
Address: "127.0.0.1",
|
Node: "foo",
|
||||||
Service: &structs.NodeService{
|
Address: "127.0.0.1",
|
||||||
ID: "db",
|
Service: &structs.NodeService{
|
||||||
Service: "db",
|
ID: "db",
|
||||||
Tags: []string{"primary"},
|
Service: "db",
|
||||||
|
Tags: []string{"primary"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "db connect",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
ServiceID: "db",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Check: &structs.HealthCheck{
|
{
|
||||||
Name: "db connect",
|
Datacenter: "dc1",
|
||||||
Status: api.HealthPassing,
|
Node: "foo",
|
||||||
ServiceID: "db",
|
Address: "127.0.0.3",
|
||||||
|
PeerName: "peer1",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
var out struct{}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
for _, reg := range args {
|
||||||
t.Fatalf("err: %v", err)
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var out2 structs.IndexedNodeDump
|
t.Run("get local node", func(t *testing.T) {
|
||||||
req := structs.NodeSpecificRequest{
|
var out structs.IndexedNodeDump
|
||||||
Datacenter: "dc1",
|
req := structs.NodeSpecificRequest{
|
||||||
Node: "foo",
|
Datacenter: "dc1",
|
||||||
}
|
Node: "foo",
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil {
|
}
|
||||||
t.Fatalf("err: %v", err)
|
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out); err != nil {
|
||||||
}
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
nodes := out2.Dump
|
nodes := out.Dump
|
||||||
if len(nodes) != 1 {
|
if len(nodes) != 1 {
|
||||||
t.Fatalf("Bad: %v", nodes)
|
t.Fatalf("Bad: %v", nodes)
|
||||||
}
|
}
|
||||||
if nodes[0].Node != "foo" {
|
if nodes[0].Node != "foo" {
|
||||||
t.Fatalf("Bad: %v", nodes[0])
|
t.Fatalf("Bad: %v", nodes[0])
|
||||||
}
|
}
|
||||||
if !stringslice.Contains(nodes[0].Services[0].Tags, "primary") {
|
if !stringslice.Contains(nodes[0].Services[0].Tags, "primary") {
|
||||||
t.Fatalf("Bad: %v", nodes[0])
|
t.Fatalf("Bad: %v", nodes[0])
|
||||||
}
|
}
|
||||||
if nodes[0].Checks[0].Status != api.HealthPassing {
|
if nodes[0].Checks[0].Status != api.HealthPassing {
|
||||||
t.Fatalf("Bad: %v", nodes[0])
|
t.Fatalf("Bad: %v", nodes[0])
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("get peered node", func(t *testing.T) {
|
||||||
|
var out structs.IndexedNodeDump
|
||||||
|
req := structs.NodeSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
PeerName: "peer1",
|
||||||
|
}
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out))
|
||||||
|
|
||||||
|
nodes := out.Dump
|
||||||
|
require.Equal(t, 1, len(nodes))
|
||||||
|
require.Equal(t, "foo", nodes[0].Node)
|
||||||
|
require.Equal(t, "peer1", nodes[0].PeerName)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInternal_NodeDump(t *testing.T) {
|
func TestInternal_NodeDump(t *testing.T) {
|
||||||
|
@ -87,53 +112,61 @@ func TestInternal_NodeDump(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir1, s1 := testServer(t)
|
_, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
codec := rpcClient(t, s1)
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
arg := structs.RegisterRequest{
|
args := []*structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
{
|
||||||
Node: "foo",
|
Datacenter: "dc1",
|
||||||
Address: "127.0.0.1",
|
Node: "foo",
|
||||||
Service: &structs.NodeService{
|
Address: "127.0.0.1",
|
||||||
ID: "db",
|
Service: &structs.NodeService{
|
||||||
Service: "db",
|
ID: "db",
|
||||||
Tags: []string{"primary"},
|
Service: "db",
|
||||||
|
Tags: []string{"primary"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "db connect",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
ServiceID: "db",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Check: &structs.HealthCheck{
|
{
|
||||||
Name: "db connect",
|
Datacenter: "dc1",
|
||||||
Status: api.HealthPassing,
|
Node: "bar",
|
||||||
ServiceID: "db",
|
Address: "127.0.0.2",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "db",
|
||||||
|
Service: "db",
|
||||||
|
Tags: []string{"replica"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "db connect",
|
||||||
|
Status: api.HealthWarning,
|
||||||
|
ServiceID: "db",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo-peer",
|
||||||
|
Address: "127.0.0.3",
|
||||||
|
PeerName: "peer1",
|
||||||
},
|
},
|
||||||
}
|
|
||||||
var out struct{}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
arg = structs.RegisterRequest{
|
for _, reg := range args {
|
||||||
Datacenter: "dc1",
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
|
||||||
Node: "bar",
|
require.NoError(t, err)
|
||||||
Address: "127.0.0.2",
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "db",
|
|
||||||
Service: "db",
|
|
||||||
Tags: []string{"replica"},
|
|
||||||
},
|
|
||||||
Check: &structs.HealthCheck{
|
|
||||||
Name: "db connect",
|
|
||||||
Status: api.HealthWarning,
|
|
||||||
ServiceID: "db",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
|
||||||
|
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||||
|
Name: "peer1",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
var out2 structs.IndexedNodeDump
|
var out2 structs.IndexedNodeDump
|
||||||
req := structs.DCSpecificRequest{
|
req := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -175,6 +208,10 @@ func TestInternal_NodeDump(t *testing.T) {
|
||||||
if !foundFoo || !foundBar {
|
if !foundFoo || !foundBar {
|
||||||
t.Fatalf("missing foo or bar")
|
t.Fatalf("missing foo or bar")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.Len(t, out2.ImportedDump, 1)
|
||||||
|
require.Equal(t, "peer1", out2.ImportedDump[0].PeerName)
|
||||||
|
require.Equal(t, "foo-peer", out2.ImportedDump[0].Node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInternal_NodeDump_Filter(t *testing.T) {
|
func TestInternal_NodeDump_Filter(t *testing.T) {
|
||||||
|
@ -183,60 +220,107 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir1, s1 := testServer(t)
|
_, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
codec := rpcClient(t, s1)
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
arg := structs.RegisterRequest{
|
args := []*structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
{
|
||||||
Node: "foo",
|
Datacenter: "dc1",
|
||||||
Address: "127.0.0.1",
|
Node: "foo",
|
||||||
Service: &structs.NodeService{
|
Address: "127.0.0.1",
|
||||||
ID: "db",
|
Service: &structs.NodeService{
|
||||||
Service: "db",
|
ID: "db",
|
||||||
Tags: []string{"primary"},
|
Service: "db",
|
||||||
|
Tags: []string{"primary"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "db connect",
|
||||||
|
Status: api.HealthPassing,
|
||||||
|
ServiceID: "db",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Check: &structs.HealthCheck{
|
{
|
||||||
Name: "db connect",
|
Datacenter: "dc1",
|
||||||
Status: api.HealthPassing,
|
Node: "bar",
|
||||||
ServiceID: "db",
|
Address: "127.0.0.2",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "db",
|
||||||
|
Service: "db",
|
||||||
|
Tags: []string{"replica"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "db connect",
|
||||||
|
Status: api.HealthWarning,
|
||||||
|
ServiceID: "db",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
{
|
||||||
var out struct{}
|
Datacenter: "dc1",
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
|
Node: "foo-peer",
|
||||||
|
Address: "127.0.0.3",
|
||||||
arg = structs.RegisterRequest{
|
PeerName: "peer1",
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: "bar",
|
|
||||||
Address: "127.0.0.2",
|
|
||||||
Service: &structs.NodeService{
|
|
||||||
ID: "db",
|
|
||||||
Service: "db",
|
|
||||||
Tags: []string{"replica"},
|
|
||||||
},
|
|
||||||
Check: &structs.HealthCheck{
|
|
||||||
Name: "db connect",
|
|
||||||
Status: api.HealthWarning,
|
|
||||||
ServiceID: "db",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
|
for _, reg := range args {
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
|
||||||
var out2 structs.IndexedNodeDump
|
require.NoError(t, err)
|
||||||
req := structs.DCSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
QueryOptions: structs.QueryOptions{Filter: "primary in Services.Tags"},
|
|
||||||
}
|
}
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2))
|
|
||||||
|
|
||||||
nodes := out2.Dump
|
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
|
||||||
require.Len(t, nodes, 1)
|
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||||
require.Equal(t, "foo", nodes[0].Node)
|
Name: "peer1",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("filter on the local node", func(t *testing.T) {
|
||||||
|
var out2 structs.IndexedNodeDump
|
||||||
|
req := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: "primary in Services.Tags"},
|
||||||
|
}
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2))
|
||||||
|
|
||||||
|
nodes := out2.Dump
|
||||||
|
require.Len(t, nodes, 1)
|
||||||
|
require.Equal(t, "foo", nodes[0].Node)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("filter on imported dump", func(t *testing.T) {
|
||||||
|
var out3 structs.IndexedNodeDump
|
||||||
|
req2 := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: "friend in PeerName"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
|
||||||
|
require.Len(t, out3.Dump, 0)
|
||||||
|
require.Len(t, out3.ImportedDump, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("filter look for peer nodes (non local nodes)", func(t *testing.T) {
|
||||||
|
var out3 structs.IndexedNodeDump
|
||||||
|
req2 := structs.DCSpecificRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: "PeerName != \"\""},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
|
||||||
|
require.Len(t, out3.Dump, 0)
|
||||||
|
require.Len(t, out3.ImportedDump, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("filter look for a specific peer", func(t *testing.T) {
|
||||||
|
var out3 structs.IndexedNodeDump
|
||||||
|
req2 := structs.DCSpecificRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: "PeerName == peer1"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
|
||||||
|
require.Len(t, out3.Dump, 0)
|
||||||
|
require.Len(t, out3.ImportedDump, 1)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInternal_KeyringOperation(t *testing.T) {
|
func TestInternal_KeyringOperation(t *testing.T) {
|
||||||
|
@ -1665,6 +1749,89 @@ func TestInternal_GatewayServiceDump_Ingress_ACL(t *testing.T) {
|
||||||
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
|
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInternal_ServiceDump_Peering(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
_, s1 := testServer(t)
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// prep the cluster with some data we can use in our filters
|
||||||
|
registerTestCatalogEntries(t, codec)
|
||||||
|
|
||||||
|
doRequest := func(t *testing.T, filter string) structs.IndexedNodesWithGateways {
|
||||||
|
t.Helper()
|
||||||
|
args := structs.DCSpecificRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: filter},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out structs.IndexedNodesWithGateways
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out))
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("No peerings", func(t *testing.T) {
|
||||||
|
nodes := doRequest(t, "")
|
||||||
|
// redis (3), web (3), critical (1), warning (1) and consul (1)
|
||||||
|
require.Len(t, nodes.Nodes, 9)
|
||||||
|
require.Len(t, nodes.ImportedNodes, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
addPeerService(t, codec)
|
||||||
|
|
||||||
|
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
|
||||||
|
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||||
|
Name: "peer1",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("peerings", func(t *testing.T) {
|
||||||
|
nodes := doRequest(t, "")
|
||||||
|
// redis (3), web (3), critical (1), warning (1) and consul (1)
|
||||||
|
require.Len(t, nodes.Nodes, 9)
|
||||||
|
// service (1)
|
||||||
|
require.Len(t, nodes.ImportedNodes, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("peerings w filter", func(t *testing.T) {
|
||||||
|
nodes := doRequest(t, "Node.PeerName == foo")
|
||||||
|
require.Len(t, nodes.Nodes, 0)
|
||||||
|
require.Len(t, nodes.ImportedNodes, 0)
|
||||||
|
|
||||||
|
nodes2 := doRequest(t, "Node.PeerName == peer1")
|
||||||
|
require.Len(t, nodes2.Nodes, 0)
|
||||||
|
require.Len(t, nodes2.ImportedNodes, 1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func addPeerService(t *testing.T, codec rpc.ClientCodec) {
|
||||||
|
// prep the cluster with some data we can use in our filters
|
||||||
|
registrations := map[string]*structs.RegisterRequest{
|
||||||
|
"Peer node foo with peer service": {
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
PeerName: "peer1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
ID: "serviceID",
|
||||||
|
Service: "service",
|
||||||
|
Port: 1235,
|
||||||
|
Address: "198.18.1.2",
|
||||||
|
PeerName: "peer1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
registerTestCatalogEntriesMap(t, codec, registrations)
|
||||||
|
}
|
||||||
|
|
||||||
func TestInternal_GatewayIntentions(t *testing.T) {
|
func TestInternal_GatewayIntentions(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -2239,8 +2239,9 @@ type IndexedCheckServiceNodes struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexedNodesWithGateways struct {
|
type IndexedNodesWithGateways struct {
|
||||||
Nodes CheckServiceNodes
|
ImportedNodes CheckServiceNodes
|
||||||
Gateways GatewayServices
|
Nodes CheckServiceNodes
|
||||||
|
Gateways GatewayServices
|
||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2250,7 +2251,8 @@ type DatacenterIndexedCheckServiceNodes struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexedNodeDump struct {
|
type IndexedNodeDump struct {
|
||||||
Dump NodeDump
|
ImportedDump NodeDump
|
||||||
|
Dump NodeDump
|
||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ type ServiceSummary struct {
|
||||||
transparentProxySet bool
|
transparentProxySet bool
|
||||||
ConnectNative bool
|
ConnectNative bool
|
||||||
|
|
||||||
|
PeerName string `json:",omitempty"`
|
||||||
|
|
||||||
acl.EnterpriseMeta
|
acl.EnterpriseMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +119,18 @@ RPC:
|
||||||
if out.Dump == nil {
|
if out.Dump == nil {
|
||||||
out.Dump = make(structs.NodeDump, 0)
|
out.Dump = make(structs.NodeDump, 0)
|
||||||
}
|
}
|
||||||
return out.Dump, nil
|
|
||||||
|
// Use empty list instead of nil
|
||||||
|
for _, info := range out.ImportedDump {
|
||||||
|
if info.Services == nil {
|
||||||
|
info.Services = make([]*structs.NodeService, 0)
|
||||||
|
}
|
||||||
|
if info.Checks == nil {
|
||||||
|
info.Checks = make([]*structs.HealthCheck, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return append(out.Dump, out.ImportedDump...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UINodeInfo is used to get info on a single node in a given datacenter. We return a
|
// UINodeInfo is used to get info on a single node in a given datacenter. We return a
|
||||||
|
@ -139,6 +152,10 @@ func (s *HTTPHandlers) UINodeInfo(resp http.ResponseWriter, req *http.Request) (
|
||||||
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
|
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if peer := req.URL.Query().Get("peer"); peer != "" {
|
||||||
|
args.PeerName = peer
|
||||||
|
}
|
||||||
|
|
||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedNodeDump
|
var out structs.IndexedNodeDump
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
@ -216,15 +233,17 @@ RPC:
|
||||||
|
|
||||||
// Store the names of the gateways associated with each service
|
// Store the names of the gateways associated with each service
|
||||||
var (
|
var (
|
||||||
serviceGateways = make(map[structs.ServiceName][]structs.ServiceName)
|
serviceGateways = make(map[structs.PeeredServiceName][]structs.PeeredServiceName)
|
||||||
numLinkedServices = make(map[structs.ServiceName]int)
|
numLinkedServices = make(map[structs.PeeredServiceName]int)
|
||||||
)
|
)
|
||||||
for _, gs := range out.Gateways {
|
for _, gs := range out.Gateways {
|
||||||
serviceGateways[gs.Service] = append(serviceGateways[gs.Service], gs.Gateway)
|
psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Service}
|
||||||
numLinkedServices[gs.Gateway] += 1
|
gpsn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Gateway}
|
||||||
|
serviceGateways[psn] = append(serviceGateways[psn], gpsn)
|
||||||
|
numLinkedServices[gpsn] += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
summaries, hasProxy := summarizeServices(out.Nodes.ToServiceDump(), nil, "")
|
summaries, hasProxy := summarizeServices(append(out.Nodes, out.ImportedNodes...).ToServiceDump(), nil, "")
|
||||||
sorted := prepSummaryOutput(summaries, false)
|
sorted := prepSummaryOutput(summaries, false)
|
||||||
|
|
||||||
// Ensure at least a zero length slice
|
// Ensure at least a zero length slice
|
||||||
|
@ -233,17 +252,18 @@ RPC:
|
||||||
sum := ServiceListingSummary{ServiceSummary: *svc}
|
sum := ServiceListingSummary{ServiceSummary: *svc}
|
||||||
|
|
||||||
sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
|
sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
|
||||||
if hasProxy[sn] {
|
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
|
||||||
|
if hasProxy[psn] {
|
||||||
sum.ConnectedWithProxy = true
|
sum.ConnectedWithProxy = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that at least one of the gateways linked by config entry has an instance registered in the catalog
|
// Verify that at least one of the gateways linked by config entry has an instance registered in the catalog
|
||||||
for _, gw := range serviceGateways[sn] {
|
for _, gw := range serviceGateways[psn] {
|
||||||
if s := summaries[gw]; s != nil && sum.InstanceCount > 0 {
|
if s := summaries[gw]; s != nil && sum.InstanceCount > 0 {
|
||||||
sum.ConnectedWithGateway = true
|
sum.ConnectedWithGateway = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[sn]
|
sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[psn]
|
||||||
|
|
||||||
result = append(result, &sum)
|
result = append(result, &sum)
|
||||||
}
|
}
|
||||||
|
@ -389,31 +409,43 @@ RPC:
|
||||||
return topo, nil
|
return topo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.ServiceName]*ServiceSummary, map[structs.ServiceName]bool) {
|
func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.PeeredServiceName]*ServiceSummary, map[structs.PeeredServiceName]bool) {
|
||||||
var (
|
var (
|
||||||
summary = make(map[structs.ServiceName]*ServiceSummary)
|
summary = make(map[structs.PeeredServiceName]*ServiceSummary)
|
||||||
hasProxy = make(map[structs.ServiceName]bool)
|
hasProxy = make(map[structs.PeeredServiceName]bool)
|
||||||
)
|
)
|
||||||
|
|
||||||
getService := func(service structs.ServiceName) *ServiceSummary {
|
getService := func(psn structs.PeeredServiceName) *ServiceSummary {
|
||||||
serv, ok := summary[service]
|
serv, ok := summary[psn]
|
||||||
if !ok {
|
if !ok {
|
||||||
serv = &ServiceSummary{
|
serv = &ServiceSummary{
|
||||||
Name: service.Name,
|
Name: psn.ServiceName.Name,
|
||||||
EnterpriseMeta: service.EnterpriseMeta,
|
EnterpriseMeta: psn.ServiceName.EnterpriseMeta,
|
||||||
// the other code will increment this unconditionally so we
|
// the other code will increment this unconditionally so we
|
||||||
// shouldn't initialize it to 1
|
// shouldn't initialize it to 1
|
||||||
InstanceCount: 0,
|
InstanceCount: 0,
|
||||||
|
PeerName: psn.Peer,
|
||||||
}
|
}
|
||||||
summary[service] = serv
|
summary[psn] = serv
|
||||||
}
|
}
|
||||||
return serv
|
return serv
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, csn := range dump {
|
for _, csn := range dump {
|
||||||
|
var peerName string
|
||||||
|
// all entities will have the same peer name so it is safe to use the node's peer name
|
||||||
|
if csn.Node == nil {
|
||||||
|
// this can happen for gateway dumps that call this summarize func
|
||||||
|
peerName = structs.DefaultPeerKeyword
|
||||||
|
} else {
|
||||||
|
peerName = csn.Node.PeerName
|
||||||
|
}
|
||||||
|
|
||||||
if cfg != nil && csn.GatewayService != nil {
|
if cfg != nil && csn.GatewayService != nil {
|
||||||
gwsvc := csn.GatewayService
|
gwsvc := csn.GatewayService
|
||||||
sum := getService(gwsvc.Service)
|
|
||||||
|
psn := structs.PeeredServiceName{Peer: peerName, ServiceName: gwsvc.Service}
|
||||||
|
sum := getService(psn)
|
||||||
modifySummaryForGatewayService(cfg, dc, sum, gwsvc)
|
modifySummaryForGatewayService(cfg, dc, sum, gwsvc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,8 +453,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
|
||||||
if csn.Service == nil {
|
if csn.Service == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta)
|
sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta)
|
||||||
sum := getService(sn)
|
psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn}
|
||||||
|
sum := getService(psn)
|
||||||
|
|
||||||
svc := csn.Service
|
svc := csn.Service
|
||||||
sum.Nodes = append(sum.Nodes, csn.Node.Node)
|
sum.Nodes = append(sum.Nodes, csn.Node.Node)
|
||||||
|
@ -432,9 +466,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
|
||||||
sum.ConnectNative = svc.Connect.Native
|
sum.ConnectNative = svc.Connect.Native
|
||||||
if svc.Kind == structs.ServiceKindConnectProxy {
|
if svc.Kind == structs.ServiceKindConnectProxy {
|
||||||
sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
|
sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
|
||||||
hasProxy[sn] = true
|
psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn}
|
||||||
|
hasProxy[psn] = true
|
||||||
|
|
||||||
destination := getService(sn)
|
destination := getService(psn)
|
||||||
for _, check := range csn.Checks {
|
for _, check := range csn.Checks {
|
||||||
cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
|
cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
|
||||||
uid := structs.UniqueID(csn.Node.Node, cid.String())
|
uid := structs.UniqueID(csn.Node.Node, cid.String())
|
||||||
|
@ -496,7 +531,7 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
|
||||||
return summary, hasProxy
|
return summary, hasProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary {
|
func prepSummaryOutput(summaries map[structs.PeeredServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary {
|
||||||
var resp []*ServiceSummary
|
var resp []*ServiceSummary
|
||||||
// Ensure at least a zero length slice
|
// Ensure at least a zero length slice
|
||||||
resp = make([]*ServiceSummary, 0)
|
resp = make([]*ServiceSummary, 0)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -11,6 +12,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -19,12 +21,14 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUiIndex(t *testing.T) {
|
func TestUIIndex(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -74,7 +78,7 @@ func TestUiIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUiNodes(t *testing.T) {
|
func TestUINodes(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -84,15 +88,42 @@ func TestUiNodes(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
args := &structs.RegisterRequest{
|
args := []*structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
{
|
||||||
Node: "test",
|
Datacenter: "dc1",
|
||||||
Address: "127.0.0.1",
|
Node: "test",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo-peer",
|
||||||
|
Address: "127.0.0.3",
|
||||||
|
PeerName: "peer1",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
var out struct{}
|
for _, reg := range args {
|
||||||
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
var out struct{}
|
||||||
t.Fatalf("err: %v", err)
|
err := a.RPC("Catalog.Register", reg, &out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// establish "peer1"
|
||||||
|
{
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
peerOne := &pbpeering.PeeringWriteRequest{
|
||||||
|
Peering: &pbpeering.Peering{
|
||||||
|
Name: "peer1",
|
||||||
|
State: pbpeering.PeeringState_INITIAL,
|
||||||
|
PeerCAPems: nil,
|
||||||
|
PeerServerName: "fooservername",
|
||||||
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/nodes/dc1", nil)
|
req, _ := http.NewRequest("GET", "/v1/internal/ui/nodes/dc1", nil)
|
||||||
|
@ -103,20 +134,32 @@ func TestUiNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
assertIndex(t, resp)
|
assertIndex(t, resp)
|
||||||
|
|
||||||
// Should be 2 nodes, and all the empty lists should be non-nil
|
// Should be 3 nodes, and all the empty lists should be non-nil
|
||||||
nodes := obj.(structs.NodeDump)
|
nodes := obj.(structs.NodeDump)
|
||||||
if len(nodes) != 2 ||
|
require.Len(t, nodes, 3)
|
||||||
nodes[0].Node != a.Config.NodeName ||
|
|
||||||
nodes[0].Services == nil || len(nodes[0].Services) != 1 ||
|
// check local nodes, services and checks
|
||||||
nodes[0].Checks == nil || len(nodes[0].Checks) != 1 ||
|
require.Equal(t, a.Config.NodeName, nodes[0].Node)
|
||||||
nodes[1].Node != "test" ||
|
require.NotNil(t, nodes[0].Services)
|
||||||
nodes[1].Services == nil || len(nodes[1].Services) != 0 ||
|
require.Len(t, nodes[0].Services, 1)
|
||||||
nodes[1].Checks == nil || len(nodes[1].Checks) != 0 {
|
require.NotNil(t, nodes[0].Checks)
|
||||||
t.Fatalf("bad: %v", obj)
|
require.Len(t, nodes[0].Checks, 1)
|
||||||
}
|
require.Equal(t, "test", nodes[1].Node)
|
||||||
|
require.NotNil(t, nodes[1].Services)
|
||||||
|
require.Len(t, nodes[1].Services, 0)
|
||||||
|
require.NotNil(t, nodes[1].Checks)
|
||||||
|
require.Len(t, nodes[1].Checks, 0)
|
||||||
|
|
||||||
|
// peered node
|
||||||
|
require.Equal(t, "foo-peer", nodes[2].Node)
|
||||||
|
require.Equal(t, "peer1", nodes[2].PeerName)
|
||||||
|
require.NotNil(t, nodes[2].Services)
|
||||||
|
require.Len(t, nodes[2].Services, 0)
|
||||||
|
require.NotNil(t, nodes[1].Checks)
|
||||||
|
require.Len(t, nodes[2].Services, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUiNodes_Filter(t *testing.T) {
|
func TestUINodes_Filter(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -162,7 +205,7 @@ func TestUiNodes_Filter(t *testing.T) {
|
||||||
require.Empty(t, nodes[0].Checks)
|
require.Empty(t, nodes[0].Checks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUiNodeInfo(t *testing.T) {
|
func TestUINodeInfo(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -214,7 +257,7 @@ func TestUiNodeInfo(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUiServices(t *testing.T) {
|
func TestUIServices(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -318,6 +361,30 @@ func TestUiServices(t *testing.T) {
|
||||||
Tags: []string{},
|
Tags: []string{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// register peer node foo with peer service
|
||||||
|
{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
TaggedAddresses: map[string]string{
|
||||||
|
"lan": "127.0.0.2",
|
||||||
|
"wan": "198.18.0.2",
|
||||||
|
},
|
||||||
|
NodeMeta: map[string]string{
|
||||||
|
"env": "production",
|
||||||
|
"os": "linux",
|
||||||
|
},
|
||||||
|
PeerName: "peer1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
ID: "serviceID",
|
||||||
|
Service: "service",
|
||||||
|
Port: 1235,
|
||||||
|
Address: "198.18.1.2",
|
||||||
|
PeerName: "peer1",
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, args := range requests {
|
for _, args := range requests {
|
||||||
|
@ -325,6 +392,24 @@ func TestUiServices(t *testing.T) {
|
||||||
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// establish "peer1"
|
||||||
|
{
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
peerOne := &pbpeering.PeeringWriteRequest{
|
||||||
|
Peering: &pbpeering.Peering{
|
||||||
|
Name: "peer1",
|
||||||
|
State: pbpeering.PeeringState_INITIAL,
|
||||||
|
PeerCAPems: nil,
|
||||||
|
PeerServerName: "fooservername",
|
||||||
|
PeerServerAddresses: []string{"addr1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Register a terminating gateway associated with api and cache
|
// Register a terminating gateway associated with api and cache
|
||||||
{
|
{
|
||||||
arg := structs.RegisterRequest{
|
arg := structs.RegisterRequest{
|
||||||
|
@ -393,7 +478,7 @@ func TestUiServices(t *testing.T) {
|
||||||
|
|
||||||
// Should be 2 nodes, and all the empty lists should be non-nil
|
// Should be 2 nodes, and all the empty lists should be non-nil
|
||||||
summary := obj.([]*ServiceListingSummary)
|
summary := obj.([]*ServiceListingSummary)
|
||||||
require.Len(t, summary, 6)
|
require.Len(t, summary, 7)
|
||||||
|
|
||||||
// internal accounting that users don't see can be blown away
|
// internal accounting that users don't see can be blown away
|
||||||
for _, sum := range summary {
|
for _, sum := range summary {
|
||||||
|
@ -493,6 +578,21 @@ func TestUiServices(t *testing.T) {
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
ServiceSummary: ServiceSummary{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
Name: "service",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Tags: nil,
|
||||||
|
Nodes: []string{"foo"},
|
||||||
|
InstanceCount: 1,
|
||||||
|
ChecksPassing: 0,
|
||||||
|
ChecksWarning: 0,
|
||||||
|
ChecksCritical: 0,
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
PeerName: "peer1",
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
require.ElementsMatch(t, expected, summary)
|
require.ElementsMatch(t, expected, summary)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue