Add the `operator usage instances` command and api endpoint (#16205)

This endpoint shows total services, connect service instances and
billable service instances in the local datacenter or globally. Billable
instances = total service instances - connect services - consul server instances.
This commit is contained in:
Kyle Havlovitz 2023-02-08 12:07:21 -08:00 committed by GitHub
parent fd010a326c
commit 220ca06201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 943 additions and 23 deletions

3
.changelog/16205.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
command: Adds the `operator usage instances` subcommand for displaying total services, connect service instances and billable service instances in the local datacenter or globally.
```

View File

@ -5,7 +5,7 @@ import (
"net/http"
"strings"
metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
cachetype "github.com/hashicorp/consul/agent/cache-types"

View File

@ -0,0 +1,62 @@
package consul
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
// Usage returns counts for service usage within catalog.
func (op *Operator) Usage(args *structs.OperatorUsageRequest, reply *structs.Usage) error {
reply.Usage = make(map[string]structs.ServiceUsage)
if args.Global {
remoteDCs := op.srv.router.GetDatacenters()
for _, dc := range remoteDCs {
remoteArgs := &structs.OperatorUsageRequest{
DCSpecificRequest: structs.DCSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: args.Token,
},
},
}
var resp structs.Usage
if _, err := op.srv.ForwardRPC("Operator.Usage", remoteArgs, &resp); err != nil {
op.logger.Warn("error forwarding usage request to remote datacenter", "datacenter", dc, "error", err)
}
if usage, ok := resp.Usage[dc]; ok {
reply.Usage[dc] = usage
}
}
}
var authzContext acl.AuthorizerContext
authz, err := op.srv.ResolveTokenAndDefaultMeta(args.Token, structs.DefaultEnterpriseMetaInDefaultPartition(), &authzContext)
if err != nil {
return err
}
err = authz.ToAllowAuthorizer().OperatorReadAllowed(&authzContext)
if err != nil {
return err
}
if err = op.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return op.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
// Get service usage.
index, serviceUsage, err := state.ServiceUsage(ws)
if err != nil {
return err
}
reply.QueryMeta.Index, reply.Usage[op.srv.config.Datacenter] = index, serviceUsage
return nil
})
}

View File

@ -52,14 +52,6 @@ type UsageEntry struct {
Count int
}
// ServiceUsage contains all of the usage data related to services
type ServiceUsage struct {
Services int
ServiceInstances int
ConnectServiceInstances map[string]int
EnterpriseServiceUsage
}
// NodeUsage contains all of the usage data related to nodes
type NodeUsage struct {
Nodes int
@ -128,6 +120,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
addEnterpriseServiceInstanceUsage(usageDeltas, change)
connectDeltas(change, usageDeltas, delta)
billableServiceInstancesDeltas(change, usageDeltas, delta)
// Construct a mapping of all of the various service names that were
// changed, in order to compare it with the finished memdb state.
// Make sure to account for the fact that services can change their names.
@ -271,6 +265,53 @@ func connectDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
}
}
// billableServiceInstancesDeltas calculates deltas for the billable services. Billable services
// are of "typical" service kind (i.e. non-connect or connect-native), excluding the "consul" service.
func billableServiceInstancesDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
// Billable service instances = # of typical service instances (i.e. non-connect) + connect-native service instances.
// Specifically, it should exclude "consul" service instances from the count.
//
// If the service has been updated, then we check
// 1. If the service name changed to or from "consul" and update deltas such that we exclude consul server service instances.
// This case is a bit contrived because we don't expect consul service to change once it's registered (beyond changing its instance count).
// a) If changed to "consul" -> decrement deltas by one
// b) If changed from "consul" and it's not a "connect" service -> increase deltas by one
// 2. If the service kind changed to or from "typical", we need to we need to update deltas so that we only account
// for non-connect or connect-native instances.
if change.Updated() {
// When there's an update, the delta arg passed to this function is 0, and so we need to explicitly increment
// or decrement by 1 depending on the situation.
before := change.Before.(*structs.ServiceNode)
after := change.After.(*structs.ServiceNode)
// Service name changed away from "consul" means we now need to account for this service instances unless it's a "connect" service.
if before.ServiceName == structs.ConsulServiceName && after.ServiceName != structs.ConsulServiceName {
if after.ServiceKind == structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] += 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
}
}
if before.ServiceName != structs.ConsulServiceName && after.ServiceName == structs.ConsulServiceName {
usageDeltas[billableServiceInstancesTableName()] -= 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
}
if before.ServiceKind != structs.ServiceKindTypical && after.ServiceKind == structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] += 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
} else if before.ServiceKind == structs.ServiceKindTypical && after.ServiceKind != structs.ServiceKindTypical {
usageDeltas[billableServiceInstancesTableName()] -= 1
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
}
} else {
svc := changeObject(change).(*structs.ServiceNode)
// If it's not an update, only update delta if it's a typical service and not the "consul" service.
if svc.ServiceKind == structs.ServiceKindTypical && svc.ServiceName != structs.ConsulServiceName {
usageDeltas[billableServiceInstancesTableName()] += delta
addEnterpriseBillableServiceInstanceUsage(usageDeltas, svc, delta)
}
}
}
// writeUsageDeltas will take in a map of IDs to deltas and update each
// entry accordingly, checking for integer underflow. The index that is
// passed in will be recorded on the entry as well.
@ -289,7 +330,7 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
// large numbers.
delta = 0
}
err := tx.Insert(tableUsage, &UsageEntry{
err = tx.Insert(tableUsage, &UsageEntry{
ID: id,
Count: delta,
Index: idx,
@ -365,37 +406,43 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
// ServiceUsage returns the latest seen Raft index, a compiled set of service
// usage data, and any errors.
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) {
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, structs.ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
serviceInstances, err := firstUsageEntry(ws, tx, tableServices)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
serviceKindInstances := make(map[string]int)
for _, kind := range allConnectKind {
usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind))
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
serviceKindInstances[kind] = usage.Count
}
usage := ServiceUsage{
ServiceInstances: serviceInstances.Count,
Services: services.Count,
ConnectServiceInstances: serviceKindInstances,
billableServiceInstances, err := firstUsageEntry(ws, tx, billableServiceInstancesTableName())
if err != nil {
return 0, structs.ServiceUsage{}, fmt.Errorf("failed billable services lookup: %s", err)
}
usage := structs.ServiceUsage{
ServiceInstances: serviceInstances.Count,
Services: services.Count,
ConnectServiceInstances: serviceKindInstances,
BillableServiceInstances: billableServiceInstances.Count,
}
results, err := compileEnterpriseServiceUsage(ws, tx, usage)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
return serviceInstances.Index, results, nil
@ -469,3 +516,7 @@ func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, err
return realUsage, nil
}
func billableServiceInstancesTableName() string {
return fmt.Sprintf("billable-%s", tableServices)
}

View File

@ -25,11 +25,13 @@ func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueSer
func addEnterpriseConnectServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}
func addEnterpriseBillableServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}
func addEnterpriseKVUsage(map[string]int, memdb.Change) {}
func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {}
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage structs.ServiceUsage) (structs.ServiceUsage, error) {
return usage, nil
}

View File

@ -160,6 +160,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
for k := range usage.ConnectServiceInstances {
require.Equal(t, 0, usage.ConnectServiceInstances[k])
}
require.Equal(t, 0, usage.BillableServiceInstances)
}
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
@ -184,6 +185,7 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) {
require.Equal(t, 8, usage.ServiceInstances)
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 6, usage.BillableServiceInstances)
testRegisterSidecarProxy(t, s, 16, "node2", "service2")
@ -225,6 +227,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
require.Equal(t, 4, usage.ServiceInstances)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)
require.NoError(t, s.DeleteNode(4, "node1", nil, ""))
@ -236,6 +239,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
for k := range usage.ConnectServiceInstances {
require.Equal(t, 0, usage.ConnectServiceInstances[k])
}
require.Equal(t, 0, usage.BillableServiceInstances)
}
// Test that services from remote peers aren't counted in writes or deletes.
@ -263,6 +267,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
require.Equal(t, 3, usage.ServiceInstances)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 2, usage.BillableServiceInstances)
})
testutil.RunStep(t, "deletes", func(t *testing.T) {
@ -275,6 +280,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
require.Equal(t, 0, usage.ServiceInstances)
require.Equal(t, 0, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 0, usage.BillableServiceInstances)
})
}
@ -311,6 +317,7 @@ func TestStateStore_Usage_Restore(t *testing.T) {
require.Equal(t, idx, uint64(9))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2)
require.Equal(t, usage.BillableServiceInstances, 2)
}
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
@ -411,6 +418,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, usage.BillableServiceInstances, 1)
})
t.Run("update service to be connect native", func(t *testing.T) {
@ -432,6 +440,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 1, usage.BillableServiceInstances)
})
t.Run("update service to not be connect native", func(t *testing.T) {
@ -453,6 +462,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 1, usage.BillableServiceInstances)
})
t.Run("rename service with a multiple instances", func(t *testing.T) {
@ -484,6 +494,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 2)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)
update := &structs.NodeService{
ID: "service2",
@ -502,6 +513,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.Equal(t, usage.Services, 3)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
require.Equal(t, 3, usage.BillableServiceInstances)
})
}
@ -528,6 +540,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 1)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 0, usage.BillableServiceInstances)
})
t.Run("rename service with a multiple instances", func(t *testing.T) {
@ -554,6 +567,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 2)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 1, usage.BillableServiceInstances)
update := &structs.NodeService{
ID: "service3",
@ -569,6 +583,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.Equal(t, usage.Services, 3)
require.Equal(t, usage.ServiceInstances, 3)
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 2, usage.BillableServiceInstances)
})
}

View File

@ -88,6 +88,10 @@ var Gauges = []prometheus.GaugeDefinition{
Name: []string{"state", "config_entries"},
Help: "Measures the current number of unique configuration entries registered with Consul, labeled by Kind. It is only emitted by Consul servers. Added in v1.10.4.",
},
{
Name: []string{"state", "billable_service_instances"},
Help: "Total number of billable service instances in the local datacenter.",
},
}
type getMembersFunc func() []serf.Member

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
func (u *UsageMetricsReporter) emitNodeUsage(nodeUsage state.NodeUsage) {
@ -74,7 +75,7 @@ func (u *UsageMetricsReporter) emitMemberUsage(members []serf.Member) {
)
}
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage) {
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage structs.ServiceUsage) {
metrics.SetGaugeWithLabels(
[]string{"consul", "state", "services"},
float32(serviceUsage.Services),
@ -96,6 +97,11 @@ func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage)
float32(serviceUsage.ServiceInstances),
u.metricLabels,
)
metrics.SetGaugeWithLabels(
[]string{"state", "billable_service_instances"},
float32(serviceUsage.BillableServiceInstances),
u.metricLabels,
)
for k, i := range serviceUsage.ConnectServiceInstances {
metrics.SetGaugeWithLabels(

View File

@ -178,6 +178,13 @@ var baseCases = map[string]testCase{
{Name: "kind", Value: "connect-native"},
},
},
"consul.usage.test.state.billable_service_instances;datacenter=dc1": {
Name: "consul.usage.test.state.billable_service_instances",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
// --- kv ---
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": { // Legacy
Name: "consul.usage.test.consul.state.kv_entries",
@ -598,6 +605,13 @@ var baseCases = map[string]testCase{
{Name: "kind", Value: "connect-native"},
},
},
"consul.usage.test.state.billable_service_instances;datacenter=dc1": {
Name: "consul.usage.test.state.billable_service_instances",
Value: 0,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
},
// --- kv ---
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": { // Legacy
Name: "consul.usage.test.consul.state.kv_entries",
@ -1176,6 +1190,13 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
{Name: "kind", Value: "connect-native"},
},
}
nodesAndSvcsCase.expectedGauges["consul.usage.test.state.billable_service_instances;datacenter=dc1"] = metrics.GaugeValue{
Name: "consul.usage.test.state.billable_service_instances",
Value: 3,
Labels: []metrics.Label{
{Name: "datacenter", Value: "dc1"},
},
}
nodesAndSvcsCase.expectedGauges["consul.usage.test.consul.state.config_entries;datacenter=dc1;kind=ingress-gateway"] = metrics.GaugeValue{ // Legacy
Name: "consul.usage.test.consul.state.config_entries",
Value: 3,

View File

@ -11,7 +11,6 @@ import (
"github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
)
@ -206,5 +205,5 @@ func (c *Controller) countProxies(ctx context.Context) (<-chan error, uint32, er
type Store interface {
AbandonCh() <-chan struct{}
ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error)
ServiceUsage(ws memdb.WatchSet) (uint64, structs.ServiceUsage, error)
}

View File

@ -102,6 +102,7 @@ func init() {
registerEndpoint("/v1/operator/raft/transfer-leader", []string{"POST"}, (*HTTPHandlers).OperatorRaftTransferLeader)
registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer)
registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint)
registerEndpoint("/v1/operator/usage", []string{"GET"}, (*HTTPHandlers).OperatorUsage)
registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration)
registerEndpoint("/v1/operator/autopilot/health", []string{"GET"}, (*HTTPHandlers).OperatorServerHealth)
registerEndpoint("/v1/operator/autopilot/state", []string{"GET"}, (*HTTPHandlers).OperatorAutopilotState)

View File

@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/armon/go-metrics"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/pboperator"
@ -366,6 +367,43 @@ func (s *HTTPHandlers) OperatorAutopilotState(resp http.ResponseWriter, req *htt
return out, nil
}
func (s *HTTPHandlers) OperatorUsage(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
metrics.IncrCounterWithLabels([]string{"client", "api", "operator_usage"}, 1,
s.nodeMetricsLabels())
var args structs.OperatorUsageRequest
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
if _, ok := req.URL.Query()["global"]; ok {
args.Global = true
}
// Make the RPC request
var out structs.Usage
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
err := s.agent.RPC(req.Context(), "Operator.Usage", &args, &out)
if err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "operator_usage"}, 1,
s.nodeMetricsLabels())
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
metrics.IncrCounterWithLabels([]string{"client", "api", "success", "operator_usage"}, 1,
s.nodeMetricsLabels())
return out, nil
}
func stringIDs(ids []raft.ServerID) []string {
out := make([]string, len(ids))
for i, id := range ids {

View File

@ -0,0 +1,88 @@
//go:build !consulent
// +build !consulent
package agent
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestOperator_Usage(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
req, err := http.NewRequest("GET", "/v1/operator/usage", nil)
require.NoError(t, err)
// Register a few services
require.NoError(t, upsertTestService(a.RPC, "", "dc1", "web", "test-node", "", func(svc *structs.NodeService) {
svc.ID = "web1"
}))
require.NoError(t, upsertTestService(a.RPC, "", "dc1", "web", "test-node", "", func(svc *structs.NodeService) {
svc.ID = "web2"
}))
require.NoError(t, upsertTestService(a.RPC, "", "dc1", "db", "test-node", ""))
require.NoError(t, upsertTestService(a.RPC, "", "dc1", "web-proxy", "test-node", "", func(svc *structs.NodeService) {
svc.Kind = structs.ServiceKindConnectProxy
svc.Proxy = structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
}
}))
// Add connect-native service to check that we include it in the billable service instances
require.NoError(t, upsertTestService(a.RPC, "", "dc1", "connect-native-app", "test-node", "", func(svc *structs.NodeService) {
svc.Connect.Native = true
}))
raw, err := a.srv.OperatorUsage(httptest.NewRecorder(), req)
require.NoError(t, err)
expected := map[string]structs.ServiceUsage{
"dc1": {
Services: 5,
ServiceInstances: 6,
ConnectServiceInstances: map[string]int{
"connect-native": 1,
"connect-proxy": 1,
"ingress-gateway": 0,
"mesh-gateway": 0,
"terminating-gateway": 0,
},
// 4 = 6 total service instances - 1 connect proxy - 1 consul service
BillableServiceInstances: 4,
},
}
require.Equal(t, expected, raw.(structs.Usage).Usage)
}
func upsertTestService(rpc rpcFn, secret, datacenter, name, node, partition string, modifyFuncs ...func(*structs.NodeService)) error {
req := structs.RegisterRequest{
Datacenter: datacenter,
Node: node,
SkipNodeUpdate: true,
Service: &structs.NodeService{
ID: name,
Service: name,
Port: 8080,
},
WriteRequest: structs.WriteRequest{Token: secret},
}
for _, modify := range modifyFuncs {
modify(req.Service)
}
var out struct{}
return rpc(context.Background(), "Catalog.Register", &req, &out)
}

View File

@ -629,6 +629,12 @@ func (r *DCSpecificRequest) CacheMinIndex() uint64 {
return r.QueryOptions.MinQueryIndex
}
type OperatorUsageRequest struct {
DCSpecificRequest
Global bool
}
type ServiceDumpRequest struct {
Datacenter string
ServiceKind ServiceKind
@ -2240,6 +2246,21 @@ type IndexedServices struct {
QueryMeta
}
type Usage struct {
Usage map[string]ServiceUsage
QueryMeta
}
// ServiceUsage contains all of the usage data related to services
type ServiceUsage struct {
Services int
ServiceInstances int
ConnectServiceInstances map[string]int
BillableServiceInstances int
EnterpriseServiceUsage
}
// PeeredServiceName is a basic tuple of ServiceName and peer
type PeeredServiceName struct {
ServiceName ServiceName

View File

@ -169,3 +169,5 @@ func (t *Intention) HasWildcardDestination() bool {
func (s *ServiceNode) NodeIdentity() Identity {
return Identity{ID: s.Node}
}
type EnterpriseServiceUsage struct{}

View File

@ -206,6 +206,10 @@ type QueryOptions struct {
// This can be used to ensure a full service definition is returned in the response
// especially when the service might not be written into the catalog that way.
MergeCentralConfig bool
// Global is used to request information from all datacenters. Currently only
// used for operator usage requests.
Global bool
}
func (o *QueryOptions) Context() context.Context {
@ -895,6 +899,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.MergeCentralConfig {
r.params.Set("merge-central-config", "")
}
if q.Global {
r.params.Set("global", "")
}
r.ctx = q.ctx
}

53
api/operator_usage.go Normal file
View File

@ -0,0 +1,53 @@
package api
type Usage struct {
// Usage is a map of datacenter -> usage information
Usage map[string]ServiceUsage
}
// ServiceUsage contains information about the number of services and service instances for a datacenter.
type ServiceUsage struct {
Services int
ServiceInstances int
ConnectServiceInstances map[string]int
// Billable services are of "typical" service kind (i.e. non-connect or connect-native),
// excluding the "consul" service.
BillableServiceInstances int
// A map of partition+namespace to number of unique services registered in that namespace
PartitionNamespaceServices map[string]map[string]int
// A map of partition+namespace to number of service instances registered in that namespace
PartitionNamespaceServiceInstances map[string]map[string]int
// A map of partition+namespace+kind to number of service-mesh instances registered in that namespace
PartitionNamespaceConnectServiceInstances map[string]map[string]map[string]int
// A map of partition+namespace to number of billable instances registered in that namespace
PartitionNamespaceBillableServiceInstances map[string]map[string]int
}
// Usage is used to query for usage information in the given datacenter.
func (op *Operator) Usage(q *QueryOptions) (*Usage, *QueryMeta, error) {
r := op.c.newRequest("GET", "/v1/operator/usage")
r.setQueryOptions(q)
rtt, resp, err := op.c.doRequest(r)
if err != nil {
return nil, nil, err
}
defer closeResponseBody(resp)
if err := requireOK(resp); err != nil {
return nil, nil, err
}
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out *Usage
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -0,0 +1,63 @@
package api
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAPI_OperatorUsage(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
registerService := func(svc *AgentService) {
reg := &CatalogRegistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
Service: svc,
}
if _, err := c.Catalog().Register(reg, nil); err != nil {
t.Fatal(err)
}
}
registerService(&AgentService{
ID: "redis1",
Service: "redis",
Port: 8000,
})
registerService(&AgentService{
ID: "redis2",
Service: "redis",
Port: 8001,
})
registerService(&AgentService{
Kind: ServiceKindConnectProxy,
ID: "proxy1",
Service: "proxy",
Port: 9000,
Proxy: &AgentServiceConnectProxyConfig{DestinationServiceName: "foo"},
})
registerService(&AgentService{
ID: "web-native",
Service: "web",
Port: 8002,
Connect: &AgentServiceConnect{Native: true},
})
usage, _, err := c.Operator().Usage(nil)
require.NoError(t, err)
require.Contains(t, usage.Usage, "dc1")
require.Equal(t, 4, usage.Usage["dc1"].Services)
require.Equal(t, 5, usage.Usage["dc1"].ServiceInstances)
require.Equal(t, map[string]int{
"connect-native": 1,
"connect-proxy": 1,
"ingress-gateway": 0,
"mesh-gateway": 0,
"terminating-gateway": 0,
}, usage.Usage["dc1"].ConnectServiceInstances)
require.Equal(t, 3, usage.Usage["dc1"].BillableServiceInstances)
}

View File

@ -0,0 +1,241 @@
package instances
import (
"bytes"
"flag"
"fmt"
"sort"
"strings"
"text/tabwriter"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
c := &cmd{UI: ui}
c.init()
return c
}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
help string
// flags
onlyBillable bool
onlyConnect bool
allDatacenters bool
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.BoolVar(&c.onlyBillable, "billable", false, "Display only billable service info.")
c.flags.BoolVar(&c.onlyConnect, "connect", false, "Display only Connect service info.")
c.flags.BoolVar(&c.allDatacenters, "all-datacenters", false, "Display service counts from "+
"all datacenters.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
c.help = flags.Usage(help, c.flags)
}
func (c *cmd) Run(args []string) int {
if err := c.flags.Parse(args); err != nil {
return 1
}
if l := len(c.flags.Args()); l > 0 {
c.UI.Error(fmt.Sprintf("Too many arguments (expected 0, got %d)", l))
return 1
}
// Create and test the HTTP client
client, err := c.http.APIClient()
if err != nil {
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
billableTotal := 0
var datacenterBillableTotals []string
usage, _, err := client.Operator().Usage(&api.QueryOptions{Global: c.allDatacenters})
if err != nil {
c.UI.Error(fmt.Sprintf("Error fetching usage information: %s", err))
return 1
}
for dc, usage := range usage.Usage {
billableTotal += usage.BillableServiceInstances
datacenterBillableTotals = append(datacenterBillableTotals,
fmt.Sprintf("%s Billable Service Instances: %d", dc, usage.BillableServiceInstances))
}
// Output billable service counts
if !c.onlyConnect {
c.UI.Output(fmt.Sprintf("Billable Service Instances Total: %d", billableTotal))
sort.Strings(datacenterBillableTotals)
for _, datacenterTotal := range datacenterBillableTotals {
c.UI.Output(datacenterTotal)
}
c.UI.Output("\nBillable Services")
billableOutput, err := formatServiceCounts(usage.Usage, true, c.allDatacenters)
if err != nil {
c.UI.Error(err.Error())
return 1
}
c.UI.Output(billableOutput + "\n")
}
// Output Connect service counts
if !c.onlyBillable {
c.UI.Output("Connect Services")
connectOutput, err := formatServiceCounts(usage.Usage, false, c.allDatacenters)
if err != nil {
c.UI.Error(err.Error())
return 1
}
c.UI.Output(connectOutput)
}
return 0
}
func formatServiceCounts(usageStats map[string]api.ServiceUsage, billable, showDatacenter bool) (string, error) {
var output bytes.Buffer
tw := tabwriter.NewWriter(&output, 0, 2, 6, ' ', 0)
var serviceCounts []serviceCount
for datacenter, usage := range usageStats {
if billable {
serviceCounts = append(serviceCounts, getBillableInstanceCounts(usage, datacenter)...)
} else {
serviceCounts = append(serviceCounts, getConnectInstanceCounts(usage, datacenter)...)
}
}
sortServiceCounts(serviceCounts)
if showDatacenter {
fmt.Fprintf(tw, "Datacenter\t")
}
if showPartitionNamespace {
fmt.Fprintf(tw, "Partition\tNamespace\t")
}
if !billable {
fmt.Fprintf(tw, "Type\t")
} else {
fmt.Fprintf(tw, "Services\t")
}
fmt.Fprintf(tw, "Service instances\n")
serviceTotal := 0
instanceTotal := 0
for _, c := range serviceCounts {
if showDatacenter {
fmt.Fprintf(tw, "%s\t", c.datacenter)
}
if showPartitionNamespace {
fmt.Fprintf(tw, "%s\t%s\t", c.partition, c.namespace)
}
if !billable {
fmt.Fprintf(tw, "%s\t", c.serviceType)
} else {
fmt.Fprintf(tw, "%d\t", c.services)
}
fmt.Fprintf(tw, "%d\n", c.instanceCount)
serviceTotal += c.services
instanceTotal += c.instanceCount
}
// Show total counts if there's multiple rows because of datacenter or partition/ns view
if showDatacenter || showPartitionNamespace {
if showDatacenter {
fmt.Fprint(tw, "\t")
}
if showPartitionNamespace {
fmt.Fprint(tw, "\t\t")
}
fmt.Fprint(tw, "\t\n")
fmt.Fprintf(tw, "Total")
if showPartitionNamespace {
fmt.Fprint(tw, "\t")
if showDatacenter {
fmt.Fprint(tw, "\t")
}
}
if billable {
fmt.Fprintf(tw, "\t%d\t%d\n", serviceTotal, instanceTotal)
} else {
fmt.Fprintf(tw, "\t\t%d\n", instanceTotal)
}
}
if err := tw.Flush(); err != nil {
return "", fmt.Errorf("Error flushing tabwriter: %s", err)
}
return strings.TrimSpace(output.String()), nil
}
type serviceCount struct {
datacenter string
partition string
namespace string
serviceType string
instanceCount int
services int
}
// Sort entries by datacenter > partition > namespace
func sortServiceCounts(counts []serviceCount) {
sort.Slice(counts, func(i, j int) bool {
if counts[i].datacenter != counts[j].datacenter {
return counts[i].datacenter < counts[j].datacenter
}
if counts[i].partition != counts[j].partition {
return counts[i].partition < counts[j].partition
}
if counts[i].namespace != counts[j].namespace {
return counts[i].namespace < counts[j].namespace
}
return counts[i].serviceType < counts[j].serviceType
})
}
func (c *cmd) Synopsis() string {
return synopsis
}
func (c *cmd) Help() string {
return c.help
}
const (
synopsis = "Display service instance usage information"
help = `
Usage: consul usage instances [options]
Retrieves usage information about the number of services registered in a given
datacenter. By default, the datacenter of the local agent is queried.
To retrieve the service usage data:
$ consul usage instances
To show only billable service instance counts:
$ consul usage instances -billable
To show only connect service instance counts:
$ consul usage instances -connect
For a full list of options and examples, please see the Consul documentation.
`
)

View File

@ -0,0 +1,39 @@
//go:build !consulent
// +build !consulent
package instances
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/api"
)
const showPartitionNamespace = false
func getBillableInstanceCounts(usage api.ServiceUsage, datacenter string) []serviceCount {
return []serviceCount{
{
datacenter: datacenter,
partition: acl.DefaultPartitionName,
namespace: acl.DefaultNamespaceName,
instanceCount: usage.BillableServiceInstances,
services: usage.Services,
},
}
}
func getConnectInstanceCounts(usage api.ServiceUsage, datacenter string) []serviceCount {
var counts []serviceCount
for serviceType, instanceCount := range usage.ConnectServiceInstances {
counts = append(counts, serviceCount{
datacenter: datacenter,
partition: acl.DefaultPartitionName,
namespace: acl.DefaultNamespaceName,
serviceType: serviceType,
instanceCount: instanceCount,
})
}
return counts
}

View File

@ -0,0 +1,116 @@
//go:build !consulent
// +build !consulent
package instances
import (
"strings"
"testing"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)
func TestUsageInstances_formatServiceCounts(t *testing.T) {
usageBasic := map[string]api.ServiceUsage{
"dc1": {
Services: 10,
ServiceInstances: 35,
ConnectServiceInstances: map[string]int{
"connect-native": 1,
"connect-proxy": 3,
"ingress-gateway": 4,
"mesh-gateway": 2,
"terminating-gateway": 5,
},
BillableServiceInstances: 20,
},
}
usageMultiDC := map[string]api.ServiceUsage{
"dc1": {
Services: 10,
ServiceInstances: 35,
ConnectServiceInstances: map[string]int{
"connect-native": 1,
"connect-proxy": 3,
"ingress-gateway": 4,
"mesh-gateway": 2,
"terminating-gateway": 5,
},
BillableServiceInstances: 20,
},
"dc2": {
Services: 23,
ServiceInstances: 11,
ConnectServiceInstances: map[string]int{
"connect-native": 9,
"connect-proxy": 8,
"ingress-gateway": 7,
"mesh-gateway": 6,
"terminating-gateway": 0,
},
BillableServiceInstances: 33,
},
}
cases := []struct {
name string
usageStats map[string]api.ServiceUsage
showDatacenter bool
expectedBillable string
expectedConnect string
}{
{
name: "basic",
usageStats: usageBasic,
expectedBillable: `
Services Service instances
10 20`,
expectedConnect: `
Type Service instances
connect-native 1
connect-proxy 3
ingress-gateway 4
mesh-gateway 2
terminating-gateway 5`,
},
{
name: "multi-datacenter",
usageStats: usageMultiDC,
showDatacenter: true,
expectedBillable: `
Datacenter Services Service instances
dc1 10 20
dc2 23 33
Total 33 53`,
expectedConnect: `
Datacenter Type Service instances
dc1 connect-native 1
dc1 connect-proxy 3
dc1 ingress-gateway 4
dc1 mesh-gateway 2
dc1 terminating-gateway 5
dc2 connect-native 9
dc2 connect-proxy 8
dc2 ingress-gateway 7
dc2 mesh-gateway 6
dc2 terminating-gateway 0
Total 45`,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
billableOutput, err := formatServiceCounts(tc.usageStats, true, tc.showDatacenter)
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(tc.expectedBillable), billableOutput)
connectOutput, err := formatServiceCounts(tc.usageStats, false, tc.showDatacenter)
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(tc.expectedConnect), connectOutput)
})
}
}

View File

@ -0,0 +1,50 @@
package instances
import (
"testing"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestUsageInstancesCommand(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := agent.NewTestAgent(t, ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Add another 2 services for testing
if err := a.Client().Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: "testing",
Port: 8080,
Address: "127.0.0.1",
}); err != nil {
t.Fatal(err)
}
if err := a.Client().Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: "testing2",
Port: 8081,
Address: "127.0.0.1",
}); err != nil {
t.Fatal(err)
}
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad exit code %d: %s", code, ui.ErrorWriter.String())
}
output := ui.OutputWriter.String()
require.Contains(t, output, "Billable Service Instances Total: 2")
}

View File

@ -0,0 +1,34 @@
package usage
import (
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
func New() *cmd {
return &cmd{}
}
type cmd struct{}
func (c *cmd) Run(args []string) int {
return cli.RunResultHelp
}
func (c *cmd) Synopsis() string {
return synopsis
}
func (c *cmd) Help() string {
return flags.Usage(help, nil)
}
const synopsis = "Provides cluster-level usage information"
const help = `
Usage: consul operator usage <subcommand> [options] [args]
This command has subcommands for displaying usage information. The subcommands
default to working with services registered with the local datacenter.
For more examples, ask for subcommand help or view the documentation.
`

View File

@ -96,6 +96,8 @@ import (
operraftlist "github.com/hashicorp/consul/command/operator/raft/listpeers"
operraftremove "github.com/hashicorp/consul/command/operator/raft/removepeer"
"github.com/hashicorp/consul/command/operator/raft/transferleader"
"github.com/hashicorp/consul/command/operator/usage"
"github.com/hashicorp/consul/command/operator/usage/instances"
"github.com/hashicorp/consul/command/peering"
peerdelete "github.com/hashicorp/consul/command/peering/delete"
peerestablish "github.com/hashicorp/consul/command/peering/establish"
@ -223,6 +225,8 @@ func RegisteredCommands(ui cli.Ui) map[string]mcli.CommandFactory {
entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }},
entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }},
entry{"operator raft transfer-leader", func(ui cli.Ui) (cli.Command, error) { return transferleader.New(ui), nil }},
entry{"operator usage", func(ui cli.Ui) (cli.Command, error) { return usage.New(), nil }},
entry{"operator usage instances", func(ui cli.Ui) (cli.Command, error) { return instances.New(ui), nil }},
entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }},
entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }},
entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }},