oss: Add overview UI internal endpoint

This commit is contained in:
Kyle Havlovitz 2022-03-22 16:58:41 -07:00
parent 0d07aa5c57
commit 04f1d9bcc9
10 changed files with 563 additions and 0 deletions

View File

@ -147,6 +147,28 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
})
}
func (m *Internal) CatalogOverview(args *structs.DCSpecificRequest, reply *structs.CatalogSummary) error {
if done, err := m.srv.ForwardRPC("Internal.CatalogOverview", args, reply); done {
return err
}
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if authz.OperatorRead(nil) != acl.Allow {
return acl.PermissionDeniedByACLUnnamed(authz, nil, acl.ResourceOperator, acl.AccessRead)
}
summary := m.srv.overviewManager.GetCurrentSummary()
if summary != nil {
*reply = *summary
}
return nil
}
func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error {
if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, reply); done {
return err

View File

@ -6,6 +6,7 @@ import (
"os"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
@ -2477,3 +2478,93 @@ service_prefix "mongo" { policy = "read" }
})
})
}
func TestInternal_CatalogOverview(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.MetricsReportingInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
retry.Run(t, func(r *retry.R) {
var out structs.CatalogSummary
if err := msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out); err != nil {
r.Fatalf("err: %v", err)
}
expected := structs.CatalogSummary{
Nodes: []structs.HealthSummary{
{
Total: 1,
Passing: 1,
EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(),
},
},
Services: []structs.HealthSummary{
{
Name: "consul",
Total: 1,
Passing: 1,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
},
Checks: []structs.HealthSummary{
{
Name: "Serf Health Status",
Total: 1,
Passing: 1,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
},
}
require.Equal(r, expected, out)
})
}
func TestInternal_CatalogOverview_ACLDeny(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = TestDefaultInitialManagementToken
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.CatalogSummary
err := msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out)
require.True(t, acl.IsErrPermissionDenied(err))
opReadToken, err := upsertTestTokenWithPolicyRules(
codec, TestDefaultInitialManagementToken, "dc1", `operator = "read"`)
require.NoError(t, err)
arg.Token = opReadToken.SecretID
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out))
}

View File

@ -308,6 +308,10 @@ type Server struct {
// Consul router.
statsFetcher *StatsFetcher
// overviewManager is used to periodically update the cluster overview
// and emit node/service/check health metrics.
overviewManager *OverviewManager
// reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore.
reassertLeaderCh chan chan error
@ -613,6 +617,9 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
}
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder
go s.trackLeaderChanges()

View File

@ -0,0 +1,182 @@
package consul
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)
type OverviewManager struct {
stateProvider usagemetrics.StateProvider
logger hclog.Logger
interval time.Duration
currentSummary *structs.CatalogSummary
sync.RWMutex
}
func NewOverviewManager(logger hclog.Logger, sp usagemetrics.StateProvider, interval time.Duration) *OverviewManager {
return &OverviewManager{
stateProvider: sp,
logger: logger.Named("catalog-overview"),
interval: interval,
currentSummary: &structs.CatalogSummary{},
}
}
func (m *OverviewManager) GetCurrentSummary() *structs.CatalogSummary {
m.RLock()
defer m.RUnlock()
return m.currentSummary
}
func (m *OverviewManager) Run(ctx context.Context) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
state := m.stateProvider.State()
catalog, err := state.CatalogDump()
if err != nil {
m.logger.Error("failed to update overview", "error", err)
continue
}
summary := getCatalogOverview(catalog)
m.Lock()
m.currentSummary = summary
m.Unlock()
}
}
}
// getCatalogOverview returns a breakdown of the number of nodes, services, and checks
// in the passing/warning/critical states. In Enterprise, it will also return this
// breakdown for each partition and namespace.
func getCatalogOverview(catalog *structs.CatalogContents) *structs.CatalogSummary {
nodeChecks := make(map[string][]*structs.HealthCheck)
serviceInstanceChecks := make(map[string][]*structs.HealthCheck)
checkSummaries := make(map[string]structs.HealthSummary)
// Compute the health check summaries by taking the pass/warn/fail counts
// of each unique part/ns/checkname combo and storing them. Also store the
// per-node and per-service instance checks for their respective summaries below.
for _, check := range catalog.Checks {
checkID := fmt.Sprintf("%s/%s", check.EnterpriseMeta.String(), check.Name)
summary, ok := checkSummaries[checkID]
if !ok {
summary = structs.HealthSummary{
Name: check.Name,
EnterpriseMeta: check.EnterpriseMeta,
}
}
summary.Add(check.Status)
checkSummaries[checkID] = summary
if check.ServiceID != "" {
serviceInstanceID := fmt.Sprintf("%s/%s/%s", check.EnterpriseMeta.String(), check.Node, check.ServiceID)
serviceInstanceChecks[serviceInstanceID] = append(serviceInstanceChecks[serviceInstanceID], check)
} else {
nodeMeta := check.NodeIdentity().EnterpriseMeta
nodeID := fmt.Sprintf("%s/%s", nodeMeta.String(), check.Node)
nodeChecks[nodeID] = append(nodeChecks[nodeID], check)
}
}
// Compute the service instance summaries by taking the unhealthiest check for
// a given service instance as its health status and totaling the counts for each
// partition/ns/service combination.
serviceSummaries := make(map[string]structs.HealthSummary)
for _, svc := range catalog.Services {
sid := structs.NewServiceID(svc.ServiceName, &svc.EnterpriseMeta)
summary, ok := serviceSummaries[sid.String()]
if !ok {
summary = structs.HealthSummary{
Name: svc.ServiceName,
EnterpriseMeta: svc.EnterpriseMeta,
}
}
// Compute whether this service instance is healthy based on its associated checks.
serviceInstanceID := fmt.Sprintf("%s/%s/%s", svc.EnterpriseMeta.String(), svc.Node, svc.ServiceID)
status := api.HealthPassing
for _, checks := range serviceInstanceChecks[serviceInstanceID] {
if checks.Status == api.HealthWarning && status == api.HealthPassing {
status = api.HealthWarning
}
if checks.Status == api.HealthCritical {
status = api.HealthCritical
}
}
summary.Add(status)
serviceSummaries[sid.String()] = summary
}
// Compute the node summaries by taking the unhealthiest check for each node
// as its health status and totaling the passing/warning/critical counts for
// each partition.
nodeSummaries := make(map[string]structs.HealthSummary)
for _, node := range catalog.Nodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(node.Partition)
summary, ok := nodeSummaries[nodeMeta.String()]
if !ok {
summary = structs.HealthSummary{
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(node.Partition),
}
}
// Compute whether this node is healthy based on its associated checks.
status := api.HealthPassing
nodeID := fmt.Sprintf("%s/%s", nodeMeta.String(), node.Node)
for _, checks := range nodeChecks[nodeID] {
if checks.Status == api.HealthWarning && status == api.HealthPassing {
status = api.HealthWarning
}
if checks.Status == api.HealthCritical {
status = api.HealthCritical
}
}
summary.Add(status)
nodeSummaries[nodeMeta.String()] = summary
}
// Construct the summary.
summary := &structs.CatalogSummary{}
for _, healthSummary := range nodeSummaries {
summary.Nodes = append(summary.Nodes, healthSummary)
}
for _, healthSummary := range serviceSummaries {
summary.Services = append(summary.Services, healthSummary)
}
for _, healthSummary := range checkSummaries {
summary.Checks = append(summary.Checks, healthSummary)
}
summarySort := func(slice []structs.HealthSummary) func(int, int) bool {
return func(i, j int) bool {
if slice[i].Name < slice[j].Name {
return true
}
return slice[i].EnterpriseMeta.String() < slice[j].EnterpriseMeta.String()
}
}
sort.Slice(summary.Nodes, summarySort(summary.Nodes))
sort.Slice(summary.Services, summarySort(summary.Services))
sort.Slice(summary.Checks, summarySort(summary.Checks))
return summary
}

View File

@ -0,0 +1,166 @@
package consul
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)
func TestCatalogOverview(t *testing.T) {
cases := []struct {
name string
nodes []*structs.Node
services []*structs.ServiceNode
checks []*structs.HealthCheck
expected structs.CatalogSummary
}{
{
name: "empty",
expected: structs.CatalogSummary{},
},
{
name: "one node with no checks",
nodes: []*structs.Node{
{Node: "node1"},
},
expected: structs.CatalogSummary{
Nodes: []structs.HealthSummary{
{Total: 1, Passing: 1},
},
},
},
{
name: "one service with no checks",
services: []*structs.ServiceNode{
{Node: "node1", ServiceName: "service1"},
},
expected: structs.CatalogSummary{
Services: []structs.HealthSummary{
{Name: "service1", Total: 1, Passing: 1},
},
},
},
{
name: "three nodes with node checks",
nodes: []*structs.Node{
{Node: "node1"},
{Node: "node2"},
{Node: "node3"},
},
checks: []*structs.HealthCheck{
{Node: "node1", Name: "check1", CheckID: "check1", Status: api.HealthPassing},
{Node: "node2", Name: "check1", CheckID: "check1", Status: api.HealthWarning},
{Node: "node3", Name: "check1", CheckID: "check1", Status: api.HealthCritical},
},
expected: structs.CatalogSummary{
Nodes: []structs.HealthSummary{
{Total: 3, Passing: 1, Warning: 1, Critical: 1},
},
Checks: []structs.HealthSummary{
{Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1},
},
},
},
{
name: "three instances of one service with checks",
nodes: []*structs.Node{
{Node: "node1"},
},
services: []*structs.ServiceNode{
{Node: "node1", ServiceName: "service1", ServiceID: "id1"},
{Node: "node1", ServiceName: "service1", ServiceID: "id2"},
{Node: "node1", ServiceName: "service1", ServiceID: "id3"},
},
checks: []*structs.HealthCheck{
{Node: "node1", Name: "check1", CheckID: "check1", ServiceID: "id1", Status: api.HealthPassing},
{Node: "node1", Name: "check1", CheckID: "check2", ServiceID: "id2", Status: api.HealthWarning},
{Node: "node1", Name: "check1", CheckID: "check3", ServiceID: "id3", Status: api.HealthCritical},
},
expected: structs.CatalogSummary{
Nodes: []structs.HealthSummary{
{Total: 1, Passing: 1},
},
Services: []structs.HealthSummary{
{Name: "service1", Total: 3, Passing: 1, Warning: 1, Critical: 1},
},
Checks: []structs.HealthSummary{
{Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1},
},
},
},
{
name: "three instances of different services with checks",
nodes: []*structs.Node{
{Node: "node1"},
},
services: []*structs.ServiceNode{
{Node: "node1", ServiceName: "service1", ServiceID: "id1"},
{Node: "node1", ServiceName: "service2", ServiceID: "id2"},
{Node: "node1", ServiceName: "service3", ServiceID: "id3"},
},
checks: []*structs.HealthCheck{
{Node: "node1", Name: "check1", CheckID: "check1", ServiceID: "id1", Status: api.HealthPassing},
{Node: "node1", Name: "check1", CheckID: "check2", ServiceID: "id2", Status: api.HealthWarning},
{Node: "node1", Name: "check1", CheckID: "check3", ServiceID: "id3", Status: api.HealthCritical},
},
expected: structs.CatalogSummary{
Nodes: []structs.HealthSummary{
{Total: 1, Passing: 1},
},
Services: []structs.HealthSummary{
{Name: "service1", Total: 1, Passing: 1},
{Name: "service2", Total: 1, Warning: 1},
{Name: "service3", Total: 1, Critical: 1},
},
Checks: []structs.HealthSummary{
{Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1},
},
},
},
{
name: "many instances of the same check",
checks: []*structs.HealthCheck{
{Name: "check1", CheckID: "check1", Status: api.HealthPassing},
{Name: "check1", CheckID: "check2", Status: api.HealthWarning},
{Name: "check1", CheckID: "check3", Status: api.HealthCritical},
{Name: "check1", CheckID: "check4", Status: api.HealthPassing},
{Name: "check1", CheckID: "check5", Status: api.HealthCritical},
},
expected: structs.CatalogSummary{
Checks: []structs.HealthSummary{
{Name: "check1", Total: 5, Passing: 2, Warning: 1, Critical: 2},
},
},
},
{
name: "three different checks",
checks: []*structs.HealthCheck{
{Name: "check1", CheckID: "check1", Status: api.HealthPassing},
{Name: "check2", CheckID: "check2", Status: api.HealthWarning},
{Name: "check3", CheckID: "check3", Status: api.HealthCritical},
},
expected: structs.CatalogSummary{
Checks: []structs.HealthSummary{
{Name: "check1", Total: 1, Passing: 1},
{Name: "check2", Total: 1, Warning: 1},
{Name: "check3", Total: 1, Critical: 1},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
summary := getCatalogOverview(&structs.CatalogContents{
Nodes: tc.nodes,
Services: tc.services,
Checks: tc.checks,
})
require.ElementsMatch(t, tc.expected.Nodes, summary.Nodes)
require.ElementsMatch(t, tc.expected.Services, summary.Services)
require.ElementsMatch(t, tc.expected.Checks, summary.Checks)
})
}
}

View File

@ -4106,3 +4106,39 @@ func cleanupKindServiceName(tx WriteTxn, idx uint64, name structs.ServiceName, k
}
return nil
}
// CatalogDump returns all the contents of the node, service and check tables.
// In Enterprise, this will return entries across all partitions and namespaces.
func (s *Store) CatalogDump() (*structs.CatalogContents, error) {
tx := s.db.Txn(false)
contents := &structs.CatalogContents{}
nodes, err := tx.Get(tableNodes, indexID)
if err != nil {
return nil, fmt.Errorf("failed nodes lookup: %s", err)
}
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
contents.Nodes = append(contents.Nodes, n)
}
services, err := tx.Get(tableServices, indexID)
if err != nil {
return nil, fmt.Errorf("failed services lookup: %s", err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
contents.Services = append(contents.Services, svc)
}
checks, err := tx.Get(tableChecks, indexID)
if err != nil {
return nil, fmt.Errorf("failed checks lookup: %s", err)
}
for check := checks.Next(); check != nil; check = checks.Next() {
c := check.(*structs.HealthCheck)
contents.Checks = append(contents.Checks, c)
}
return contents, nil
}

View File

@ -91,6 +91,7 @@ func init() {
registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPHandlers).UINodes)
registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPHandlers).UINodeInfo)
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPHandlers).UIServices)
registerEndpoint("/v1/internal/ui/catalog-overview", []string{"GET"}, (*HTTPHandlers).UICatalogOverview)
registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPHandlers).UIGatewayServicesNodes)
registerEndpoint("/v1/internal/ui/gateway-intentions/", []string{"GET"}, (*HTTPHandlers).UIGatewayIntentions)
registerEndpoint("/v1/internal/ui/service-topology/", []string{"GET"}, (*HTTPHandlers).UIServiceTopology)

View File

@ -1,6 +1,7 @@
package structs
import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)
@ -19,3 +20,38 @@ const (
ConsulServiceID = "consul"
ConsulServiceName = "consul"
)
type CatalogContents struct {
Nodes []*Node
Services []*ServiceNode
Checks []*HealthCheck
}
type CatalogSummary struct {
Nodes []HealthSummary
Services []HealthSummary
Checks []HealthSummary
}
type HealthSummary struct {
Name string `json:",omitempty"`
Total int
Passing int
Warning int
Critical int
EnterpriseMeta
}
func (h *HealthSummary) Add(status string) {
h.Total++
switch status {
case api.HealthPassing:
h.Passing++
case api.HealthWarning:
h.Warning++
case api.HealthCritical:
h.Critical++
}
}

View File

@ -15,6 +15,10 @@ var emptyEnterpriseMeta = EnterpriseMeta{}
// EnterpriseMeta stub
type EnterpriseMeta struct{}
func (m *EnterpriseMeta) String() string {
return ""
}
func (m *EnterpriseMeta) ToEnterprisePolicyMeta() *acl.EnterprisePolicyMeta {
return nil
}

View File

@ -172,6 +172,24 @@ RPC:
return nil, nil
}
// UICatalogOverview is used to get a high-level overview of the health of nodes, services,
// and checks in the datacenter.
func (s *HTTPHandlers) UICatalogOverview(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse arguments
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Make the RPC request
var out structs.CatalogSummary
if err := s.agent.RPC("Internal.CatalogOverview", &args, &out); err != nil {
return nil, err
}
return out, nil
}
// UIServices is used to list the services in a given datacenter. We return a
// ServiceSummary which provides overview information for the service
func (s *HTTPHandlers) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {