From 4333312be956328651a7b76b3964092c239c535a Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Fri, 24 Jun 2022 15:17:35 -0700 Subject: [PATCH] peering, internal: support UIServices, UINodes, UINodeInfo (#13577) --- agent/consul/internal_endpoint.go | 106 ++++++- agent/consul/internal_endpoint_test.go | 407 +++++++++++++++++-------- agent/structs/structs.go | 8 +- agent/ui_endpoint.go | 79 +++-- agent/ui_endpoint_test.go | 146 +++++++-- 5 files changed, 566 insertions(+), 180 deletions(-) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index f1fda470f..5ed07524e 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -69,18 +69,60 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, args.PeerName) - if err != nil { - return err + // we don't support calling this endpoint for a specific peer + if args.PeerName != "" { + return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName) } - reply.Index, reply.Dump = index, dump + + // this maxIndex will be the max of the NodeDump calls and the PeeringList call + var maxIndex uint64 + // Get data for local nodes + index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, structs.DefaultPeerKeyword) + if err != nil { + return fmt.Errorf("could not get a node dump for local nodes: %w", err) + } + + if index > maxIndex { + maxIndex = index + } + reply.Dump = dump + + // get a list of all peerings + index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta) + if err != nil { + return fmt.Errorf("could not list peers for node dump %w", err) + } + + if index > maxIndex { + maxIndex = index + } + + // get node dumps for all peerings + for _, p := range listedPeerings { + index, importedDump, err := state.NodeDump(ws, &args.EnterpriseMeta, p.Name) + if err != nil { + return fmt.Errorf("could not get a node dump for peer %q: %w", p.Name, err) + } + reply.ImportedDump = append(reply.ImportedDump, importedDump...) + + if index > maxIndex { + maxIndex = index + } + } + reply.Index = maxIndex raw, err := filter.Execute(reply.Dump) if err != nil { - return err + return fmt.Errorf("could not filter local node dump: %w", err) } reply.Dump = raw.(structs.NodeDump) + importedRaw, err := filter.Execute(reply.ImportedDump) + if err != nil { + return fmt.Errorf("could not filter peer node dump: %w", err) + } + reply.ImportedDump = importedRaw.(structs.NodeDump) + // Note: we filter the results with ACLs *after* applying the user-supplied // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include // results that would be filtered out even if the user did have permission. @@ -111,13 +153,47 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - // Get, store, and filter nodes - maxIdx, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, args.PeerName) + // we don't support calling this endpoint for a specific peer + if args.PeerName != "" { + return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName) + } + + // this maxIndex will be the max of the ServiceDump calls and the PeeringList call + var maxIndex uint64 + + // get a local dump for services + index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, structs.DefaultPeerKeyword) if err != nil { - return err + return fmt.Errorf("could not get a service dump for local nodes: %w", err) + } + + if index > maxIndex { + maxIndex = index } reply.Nodes = nodes + // get a list of all peerings + index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta) + if err != nil { + return fmt.Errorf("could not list peers for service dump %w", err) + } + + if index > maxIndex { + maxIndex = index + } + + for _, p := range listedPeerings { + index, importedNodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, p.Name) + if err != nil { + return fmt.Errorf("could not get a service dump for peer %q: %w", p.Name, err) + } + + if index > maxIndex { + maxIndex = index + } + reply.ImportedNodes = append(reply.ImportedNodes, importedNodes...) + } + // Get, store, and filter gateway services idx, gatewayServices, err := state.DumpGatewayServices(ws) if err != nil { @@ -125,17 +201,23 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. } reply.Gateways = gatewayServices - if idx > maxIdx { - maxIdx = idx + if idx > maxIndex { + maxIndex = idx } - reply.Index = maxIdx + reply.Index = maxIndex raw, err := filter.Execute(reply.Nodes) if err != nil { - return err + return fmt.Errorf("could not filter local service dump: %w", err) } reply.Nodes = raw.(structs.CheckServiceNodes) + importedRaw, err := filter.Execute(reply.ImportedNodes) + if err != nil { + return fmt.Errorf("could not filter peer service dump: %w", err) + } + reply.ImportedNodes = importedRaw.(structs.CheckServiceNodes) + // Note: we filter the results with ACLs *after* applying the user-supplied // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include // results that would be filtered out even if the user did have permission. diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 3737f3a08..d24f08d1b 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,6 +18,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib/stringslice" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -29,56 +31,79 @@ func TestInternal_NodeInfo(t *testing.T) { } t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() + _, s1 := testServer(t) codec := rpcClient(t, s1) - defer codec.Close() testrpc.WaitForLeader(t, s1.RPC, "dc1") - arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"primary"}, + args := []*structs.RegisterRequest{ + { + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + }, + Check: &structs.HealthCheck{ + Name: "db connect", + Status: api.HealthPassing, + ServiceID: "db", + }, }, - Check: &structs.HealthCheck{ - Name: "db connect", - Status: api.HealthPassing, - ServiceID: "db", + { + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.3", + PeerName: "peer1", }, } - var out struct{} - if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { - t.Fatalf("err: %v", err) + + for _, reg := range args { + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil) + require.NoError(t, err) } - var out2 structs.IndexedNodeDump - req := structs.NodeSpecificRequest{ - Datacenter: "dc1", - Node: "foo", - } - if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil { - t.Fatalf("err: %v", err) - } + t.Run("get local node", func(t *testing.T) { + var out structs.IndexedNodeDump + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "foo", + } + if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } - nodes := out2.Dump - if len(nodes) != 1 { - t.Fatalf("Bad: %v", nodes) - } - if nodes[0].Node != "foo" { - t.Fatalf("Bad: %v", nodes[0]) - } - if !stringslice.Contains(nodes[0].Services[0].Tags, "primary") { - t.Fatalf("Bad: %v", nodes[0]) - } - if nodes[0].Checks[0].Status != api.HealthPassing { - t.Fatalf("Bad: %v", nodes[0]) - } + nodes := out.Dump + if len(nodes) != 1 { + t.Fatalf("Bad: %v", nodes) + } + if nodes[0].Node != "foo" { + t.Fatalf("Bad: %v", nodes[0]) + } + if !stringslice.Contains(nodes[0].Services[0].Tags, "primary") { + t.Fatalf("Bad: %v", nodes[0]) + } + if nodes[0].Checks[0].Status != api.HealthPassing { + t.Fatalf("Bad: %v", nodes[0]) + } + }) + + t.Run("get peered node", func(t *testing.T) { + var out structs.IndexedNodeDump + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "foo", + PeerName: "peer1", + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out)) + + nodes := out.Dump + require.Equal(t, 1, len(nodes)) + require.Equal(t, "foo", nodes[0].Node) + require.Equal(t, "peer1", nodes[0].PeerName) + }) } func TestInternal_NodeDump(t *testing.T) { @@ -87,53 +112,61 @@ func TestInternal_NodeDump(t *testing.T) { } t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() + _, s1 := testServer(t) codec := rpcClient(t, s1) - defer codec.Close() testrpc.WaitForLeader(t, s1.RPC, "dc1") - arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"primary"}, + args := []*structs.RegisterRequest{ + { + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + }, + Check: &structs.HealthCheck{ + Name: "db connect", + Status: api.HealthPassing, + ServiceID: "db", + }, }, - Check: &structs.HealthCheck{ - Name: "db connect", - Status: api.HealthPassing, - ServiceID: "db", + { + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"replica"}, + }, + Check: &structs.HealthCheck{ + Name: "db connect", + Status: api.HealthWarning, + ServiceID: "db", + }, + }, + { + Datacenter: "dc1", + Node: "foo-peer", + Address: "127.0.0.3", + PeerName: "peer1", }, - } - var out struct{} - if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { - t.Fatalf("err: %v", err) } - arg = structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar", - Address: "127.0.0.2", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"replica"}, - }, - Check: &structs.HealthCheck{ - Name: "db connect", - Status: api.HealthWarning, - ServiceID: "db", - }, - } - if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { - t.Fatalf("err: %v", err) + for _, reg := range args { + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil) + require.NoError(t, err) } + err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{ + ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + Name: "peer1", + }) + require.NoError(t, err) + var out2 structs.IndexedNodeDump req := structs.DCSpecificRequest{ Datacenter: "dc1", @@ -175,6 +208,10 @@ func TestInternal_NodeDump(t *testing.T) { if !foundFoo || !foundBar { t.Fatalf("missing foo or bar") } + + require.Len(t, out2.ImportedDump, 1) + require.Equal(t, "peer1", out2.ImportedDump[0].PeerName) + require.Equal(t, "foo-peer", out2.ImportedDump[0].Node) } func TestInternal_NodeDump_Filter(t *testing.T) { @@ -183,60 +220,107 @@ func TestInternal_NodeDump_Filter(t *testing.T) { } t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() + _, s1 := testServer(t) codec := rpcClient(t, s1) - defer codec.Close() testrpc.WaitForLeader(t, s1.RPC, "dc1") - arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"primary"}, + args := []*structs.RegisterRequest{ + { + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + }, + Check: &structs.HealthCheck{ + Name: "db connect", + Status: api.HealthPassing, + ServiceID: "db", + }, }, - Check: &structs.HealthCheck{ - Name: "db connect", - Status: api.HealthPassing, - ServiceID: "db", + { + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"replica"}, + }, + Check: &structs.HealthCheck{ + Name: "db connect", + Status: api.HealthWarning, + ServiceID: "db", + }, }, - } - var out struct{} - require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) - - arg = structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar", - Address: "127.0.0.2", - Service: &structs.NodeService{ - ID: "db", - Service: "db", - Tags: []string{"replica"}, - }, - Check: &structs.HealthCheck{ - Name: "db connect", - Status: api.HealthWarning, - ServiceID: "db", + { + Datacenter: "dc1", + Node: "foo-peer", + Address: "127.0.0.3", + PeerName: "peer1", }, } - require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) - - var out2 structs.IndexedNodeDump - req := structs.DCSpecificRequest{ - Datacenter: "dc1", - QueryOptions: structs.QueryOptions{Filter: "primary in Services.Tags"}, + for _, reg := range args { + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil) + require.NoError(t, err) } - require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2)) - nodes := out2.Dump - require.Len(t, nodes, 1) - require.Equal(t, "foo", nodes[0].Node) + err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{ + ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + Name: "peer1", + }) + require.NoError(t, err) + + t.Run("filter on the local node", func(t *testing.T) { + var out2 structs.IndexedNodeDump + req := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{Filter: "primary in Services.Tags"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2)) + + nodes := out2.Dump + require.Len(t, nodes, 1) + require.Equal(t, "foo", nodes[0].Node) + }) + + t.Run("filter on imported dump", func(t *testing.T) { + var out3 structs.IndexedNodeDump + req2 := structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{Filter: "friend in PeerName"}, + } + + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3)) + require.Len(t, out3.Dump, 0) + require.Len(t, out3.ImportedDump, 0) + }) + + t.Run("filter look for peer nodes (non local nodes)", func(t *testing.T) { + var out3 structs.IndexedNodeDump + req2 := structs.DCSpecificRequest{ + QueryOptions: structs.QueryOptions{Filter: "PeerName != \"\""}, + } + + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3)) + require.Len(t, out3.Dump, 0) + require.Len(t, out3.ImportedDump, 1) + }) + + t.Run("filter look for a specific peer", func(t *testing.T) { + var out3 structs.IndexedNodeDump + req2 := structs.DCSpecificRequest{ + QueryOptions: structs.QueryOptions{Filter: "PeerName == peer1"}, + } + + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3)) + require.Len(t, out3.Dump, 0) + require.Len(t, out3.ImportedDump, 1) + }) } func TestInternal_KeyringOperation(t *testing.T) { @@ -1665,6 +1749,89 @@ func TestInternal_GatewayServiceDump_Ingress_ACL(t *testing.T) { require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning) } +func TestInternal_ServiceDump_Peering(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + _, s1 := testServer(t) + codec := rpcClient(t, s1) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // prep the cluster with some data we can use in our filters + registerTestCatalogEntries(t, codec) + + doRequest := func(t *testing.T, filter string) structs.IndexedNodesWithGateways { + t.Helper() + args := structs.DCSpecificRequest{ + QueryOptions: structs.QueryOptions{Filter: filter}, + } + + var out structs.IndexedNodesWithGateways + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out)) + + return out + } + + t.Run("No peerings", func(t *testing.T) { + nodes := doRequest(t, "") + // redis (3), web (3), critical (1), warning (1) and consul (1) + require.Len(t, nodes.Nodes, 9) + require.Len(t, nodes.ImportedNodes, 0) + }) + + addPeerService(t, codec) + + err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{ + ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + Name: "peer1", + }) + require.NoError(t, err) + + t.Run("peerings", func(t *testing.T) { + nodes := doRequest(t, "") + // redis (3), web (3), critical (1), warning (1) and consul (1) + require.Len(t, nodes.Nodes, 9) + // service (1) + require.Len(t, nodes.ImportedNodes, 1) + }) + + t.Run("peerings w filter", func(t *testing.T) { + nodes := doRequest(t, "Node.PeerName == foo") + require.Len(t, nodes.Nodes, 0) + require.Len(t, nodes.ImportedNodes, 0) + + nodes2 := doRequest(t, "Node.PeerName == peer1") + require.Len(t, nodes2.Nodes, 0) + require.Len(t, nodes2.ImportedNodes, 1) + }) +} + +func addPeerService(t *testing.T, codec rpc.ClientCodec) { + // prep the cluster with some data we can use in our filters + registrations := map[string]*structs.RegisterRequest{ + "Peer node foo with peer service": { + Datacenter: "dc1", + Node: "foo", + ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"), + Address: "127.0.0.2", + PeerName: "peer1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "serviceID", + Service: "service", + Port: 1235, + Address: "198.18.1.2", + PeerName: "peer1", + }, + }, + } + + registerTestCatalogEntriesMap(t, codec, registrations) +} + func TestInternal_GatewayIntentions(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 918597b8d..4039b547b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2239,8 +2239,9 @@ type IndexedCheckServiceNodes struct { } type IndexedNodesWithGateways struct { - Nodes CheckServiceNodes - Gateways GatewayServices + ImportedNodes CheckServiceNodes + Nodes CheckServiceNodes + Gateways GatewayServices QueryMeta } @@ -2250,7 +2251,8 @@ type DatacenterIndexedCheckServiceNodes struct { } type IndexedNodeDump struct { - Dump NodeDump + ImportedDump NodeDump + Dump NodeDump QueryMeta } diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 70d5d9def..2f74d8e59 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -37,6 +37,8 @@ type ServiceSummary struct { transparentProxySet bool ConnectNative bool + PeerName string `json:",omitempty"` + acl.EnterpriseMeta } @@ -117,7 +119,18 @@ RPC: if out.Dump == nil { out.Dump = make(structs.NodeDump, 0) } - return out.Dump, nil + + // Use empty list instead of nil + for _, info := range out.ImportedDump { + if info.Services == nil { + info.Services = make([]*structs.NodeService, 0) + } + if info.Checks == nil { + info.Checks = make([]*structs.HealthCheck, 0) + } + } + + return append(out.Dump, out.ImportedDump...), nil } // UINodeInfo is used to get info on a single node in a given datacenter. We return a @@ -139,6 +152,10 @@ func (s *HTTPHandlers) UINodeInfo(resp http.ResponseWriter, req *http.Request) ( return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"} } + if peer := req.URL.Query().Get("peer"); peer != "" { + args.PeerName = peer + } + // Make the RPC request var out structs.IndexedNodeDump defer setMeta(resp, &out.QueryMeta) @@ -216,15 +233,17 @@ RPC: // Store the names of the gateways associated with each service var ( - serviceGateways = make(map[structs.ServiceName][]structs.ServiceName) - numLinkedServices = make(map[structs.ServiceName]int) + serviceGateways = make(map[structs.PeeredServiceName][]structs.PeeredServiceName) + numLinkedServices = make(map[structs.PeeredServiceName]int) ) for _, gs := range out.Gateways { - serviceGateways[gs.Service] = append(serviceGateways[gs.Service], gs.Gateway) - numLinkedServices[gs.Gateway] += 1 + psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Service} + gpsn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Gateway} + serviceGateways[psn] = append(serviceGateways[psn], gpsn) + numLinkedServices[gpsn] += 1 } - summaries, hasProxy := summarizeServices(out.Nodes.ToServiceDump(), nil, "") + summaries, hasProxy := summarizeServices(append(out.Nodes, out.ImportedNodes...).ToServiceDump(), nil, "") sorted := prepSummaryOutput(summaries, false) // Ensure at least a zero length slice @@ -233,17 +252,18 @@ RPC: sum := ServiceListingSummary{ServiceSummary: *svc} sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta) - if hasProxy[sn] { + psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn} + if hasProxy[psn] { sum.ConnectedWithProxy = true } // Verify that at least one of the gateways linked by config entry has an instance registered in the catalog - for _, gw := range serviceGateways[sn] { + for _, gw := range serviceGateways[psn] { if s := summaries[gw]; s != nil && sum.InstanceCount > 0 { sum.ConnectedWithGateway = true } } - sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[sn] + sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[psn] result = append(result, &sum) } @@ -389,31 +409,43 @@ RPC: return topo, nil } -func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.ServiceName]*ServiceSummary, map[structs.ServiceName]bool) { +func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.PeeredServiceName]*ServiceSummary, map[structs.PeeredServiceName]bool) { var ( - summary = make(map[structs.ServiceName]*ServiceSummary) - hasProxy = make(map[structs.ServiceName]bool) + summary = make(map[structs.PeeredServiceName]*ServiceSummary) + hasProxy = make(map[structs.PeeredServiceName]bool) ) - getService := func(service structs.ServiceName) *ServiceSummary { - serv, ok := summary[service] + getService := func(psn structs.PeeredServiceName) *ServiceSummary { + serv, ok := summary[psn] if !ok { serv = &ServiceSummary{ - Name: service.Name, - EnterpriseMeta: service.EnterpriseMeta, + Name: psn.ServiceName.Name, + EnterpriseMeta: psn.ServiceName.EnterpriseMeta, // the other code will increment this unconditionally so we // shouldn't initialize it to 1 InstanceCount: 0, + PeerName: psn.Peer, } - summary[service] = serv + summary[psn] = serv } return serv } for _, csn := range dump { + var peerName string + // all entities will have the same peer name so it is safe to use the node's peer name + if csn.Node == nil { + // this can happen for gateway dumps that call this summarize func + peerName = structs.DefaultPeerKeyword + } else { + peerName = csn.Node.PeerName + } + if cfg != nil && csn.GatewayService != nil { gwsvc := csn.GatewayService - sum := getService(gwsvc.Service) + + psn := structs.PeeredServiceName{Peer: peerName, ServiceName: gwsvc.Service} + sum := getService(psn) modifySummaryForGatewayService(cfg, dc, sum, gwsvc) } @@ -421,8 +453,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s if csn.Service == nil { continue } + sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta) - sum := getService(sn) + psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn} + sum := getService(psn) svc := csn.Service sum.Nodes = append(sum.Nodes, csn.Node.Node) @@ -432,9 +466,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s sum.ConnectNative = svc.Connect.Native if svc.Kind == structs.ServiceKindConnectProxy { sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta) - hasProxy[sn] = true + psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn} + hasProxy[psn] = true - destination := getService(sn) + destination := getService(psn) for _, check := range csn.Checks { cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta) uid := structs.UniqueID(csn.Node.Node, cid.String()) @@ -496,7 +531,7 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s return summary, hasProxy } -func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary { +func prepSummaryOutput(summaries map[structs.PeeredServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary { var resp []*ServiceSummary // Ensure at least a zero length slice resp = make([]*ServiceSummary, 0) diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index f2a17f73e..0ddb180ac 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -11,6 +12,7 @@ import ( "path/filepath" "sync/atomic" "testing" + "time" cleanhttp "github.com/hashicorp/go-cleanhttp" "github.com/stretchr/testify/assert" @@ -19,12 +21,14 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/types" ) -func TestUiIndex(t *testing.T) { +func TestUIIndex(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -74,7 +78,7 @@ func TestUiIndex(t *testing.T) { } } -func TestUiNodes(t *testing.T) { +func TestUINodes(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -84,15 +88,42 @@ func TestUiNodes(t *testing.T) { defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "test", - Address: "127.0.0.1", + args := []*structs.RegisterRequest{ + { + Datacenter: "dc1", + Node: "test", + Address: "127.0.0.1", + }, + { + Datacenter: "dc1", + Node: "foo-peer", + Address: "127.0.0.3", + PeerName: "peer1", + }, } - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) + for _, reg := range args { + var out struct{} + err := a.RPC("Catalog.Register", reg, &out) + require.NoError(t, err) + } + + // establish "peer1" + { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + peerOne := &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "peer1", + State: pbpeering.PeeringState_INITIAL, + PeerCAPems: nil, + PeerServerName: "fooservername", + PeerServerAddresses: []string{"addr1"}, + }, + } + _, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne) + require.NoError(t, err) } req, _ := http.NewRequest("GET", "/v1/internal/ui/nodes/dc1", nil) @@ -103,20 +134,32 @@ func TestUiNodes(t *testing.T) { } assertIndex(t, resp) - // Should be 2 nodes, and all the empty lists should be non-nil + // Should be 3 nodes, and all the empty lists should be non-nil nodes := obj.(structs.NodeDump) - if len(nodes) != 2 || - nodes[0].Node != a.Config.NodeName || - nodes[0].Services == nil || len(nodes[0].Services) != 1 || - nodes[0].Checks == nil || len(nodes[0].Checks) != 1 || - nodes[1].Node != "test" || - nodes[1].Services == nil || len(nodes[1].Services) != 0 || - nodes[1].Checks == nil || len(nodes[1].Checks) != 0 { - t.Fatalf("bad: %v", obj) - } + require.Len(t, nodes, 3) + + // check local nodes, services and checks + require.Equal(t, a.Config.NodeName, nodes[0].Node) + require.NotNil(t, nodes[0].Services) + require.Len(t, nodes[0].Services, 1) + require.NotNil(t, nodes[0].Checks) + require.Len(t, nodes[0].Checks, 1) + require.Equal(t, "test", nodes[1].Node) + require.NotNil(t, nodes[1].Services) + require.Len(t, nodes[1].Services, 0) + require.NotNil(t, nodes[1].Checks) + require.Len(t, nodes[1].Checks, 0) + + // peered node + require.Equal(t, "foo-peer", nodes[2].Node) + require.Equal(t, "peer1", nodes[2].PeerName) + require.NotNil(t, nodes[2].Services) + require.Len(t, nodes[2].Services, 0) + require.NotNil(t, nodes[1].Checks) + require.Len(t, nodes[2].Services, 0) } -func TestUiNodes_Filter(t *testing.T) { +func TestUINodes_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -162,7 +205,7 @@ func TestUiNodes_Filter(t *testing.T) { require.Empty(t, nodes[0].Checks) } -func TestUiNodeInfo(t *testing.T) { +func TestUINodeInfo(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -214,7 +257,7 @@ func TestUiNodeInfo(t *testing.T) { } } -func TestUiServices(t *testing.T) { +func TestUIServices(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } @@ -318,6 +361,30 @@ func TestUiServices(t *testing.T) { Tags: []string{}, }, }, + // register peer node foo with peer service + { + Datacenter: "dc1", + Node: "foo", + ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"), + Address: "127.0.0.2", + TaggedAddresses: map[string]string{ + "lan": "127.0.0.2", + "wan": "198.18.0.2", + }, + NodeMeta: map[string]string{ + "env": "production", + "os": "linux", + }, + PeerName: "peer1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "serviceID", + Service: "service", + Port: 1235, + Address: "198.18.1.2", + PeerName: "peer1", + }, + }, } for _, args := range requests { @@ -325,6 +392,24 @@ func TestUiServices(t *testing.T) { require.NoError(t, a.RPC("Catalog.Register", args, &out)) } + // establish "peer1" + { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + peerOne := &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "peer1", + State: pbpeering.PeeringState_INITIAL, + PeerCAPems: nil, + PeerServerName: "fooservername", + PeerServerAddresses: []string{"addr1"}, + }, + } + _, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne) + require.NoError(t, err) + } + // Register a terminating gateway associated with api and cache { arg := structs.RegisterRequest{ @@ -393,7 +478,7 @@ func TestUiServices(t *testing.T) { // Should be 2 nodes, and all the empty lists should be non-nil summary := obj.([]*ServiceListingSummary) - require.Len(t, summary, 6) + require.Len(t, summary, 7) // internal accounting that users don't see can be blown away for _, sum := range summary { @@ -493,6 +578,21 @@ func TestUiServices(t *testing.T) { EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), }, }, + { + ServiceSummary: ServiceSummary{ + Kind: structs.ServiceKindTypical, + Name: "service", + Datacenter: "dc1", + Tags: nil, + Nodes: []string{"foo"}, + InstanceCount: 1, + ChecksPassing: 0, + ChecksWarning: 0, + ChecksCritical: 0, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + PeerName: "peer1", + }, + }, } require.ElementsMatch(t, expected, summary) })