Add KVUsage to consul state usage metrics
This change will add the number of entries in the consul KV store to the already existing usage metrics.
This commit is contained in:
parent
08b222cfc3
commit
698fc291a9
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
const (
|
||||
serviceNamesUsageTable = "service-names"
|
||||
kvUsageTable = "kv-entries"
|
||||
|
||||
tableUsage = "usage"
|
||||
)
|
||||
|
@ -54,6 +55,11 @@ type NodeUsage struct {
|
|||
EnterpriseNodeUsage
|
||||
}
|
||||
|
||||
type KVUsage struct {
|
||||
KVCount int
|
||||
EnterpriseKVUsage
|
||||
}
|
||||
|
||||
type uniqueServiceState int
|
||||
|
||||
const (
|
||||
|
@ -95,6 +101,9 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||
} else {
|
||||
serviceNameChanges[svc.CompoundServiceName()] += delta
|
||||
}
|
||||
case "kvs":
|
||||
usageDeltas[change.Table] += delta
|
||||
addEnterpriseKVUsage(usageDeltas, change)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,6 +278,26 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
|||
return serviceInstances.Index, results, nil
|
||||
}
|
||||
|
||||
func (s *Store) KVUsage() (uint64, KVUsage, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
kvs, err := firstUsageEntry(tx, "kvs")
|
||||
if err != nil {
|
||||
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
||||
usage := KVUsage{
|
||||
KVCount: kvs.Count,
|
||||
}
|
||||
results, err := compileEnterpriseKVUsage(tx, usage)
|
||||
if err != nil {
|
||||
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
|
||||
}
|
||||
|
||||
return kvs.Index, results, nil
|
||||
}
|
||||
|
||||
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
|
||||
usage, err := tx.First(tableUsage, indexID, id)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package state
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
|
||||
type EnterpriseServiceUsage struct{}
|
||||
type EnterpriseNodeUsage struct{}
|
||||
type EnterpriseKVUsage struct{}
|
||||
|
||||
func addEnterpriseNodeUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
|
@ -17,6 +19,8 @@ func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {}
|
|||
|
||||
func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {}
|
||||
|
||||
func addEnterpriseKVUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
||||
return usage, nil
|
||||
}
|
||||
|
@ -24,3 +28,7 @@ func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage
|
|||
func compileEnterpriseNodeUsage(tx ReadTxn, usage NodeUsage) (NodeUsage, error) {
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
func compileEnterpriseKVUsage(tx ReadTxn, usage KVUsage) (KVUsage, error) {
|
||||
return usage, nil
|
||||
}
|
||||
|
|
|
@ -45,6 +45,44 @@ func TestStateStore_Usage_NodeUsage_Delete(t *testing.T) {
|
|||
require.Equal(t, usage.Nodes, 1)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_KVUsage(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// No nodes have been registered, and thus no usage entry exists
|
||||
idx, usage, err := s.KVUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(0))
|
||||
require.Equal(t, usage.KVCount, 0)
|
||||
|
||||
testSetKey(t, s, 0, "key-1", "0", nil)
|
||||
testSetKey(t, s, 1, "key-2", "0", nil)
|
||||
testSetKey(t, s, 2, "key-2", "1", nil)
|
||||
|
||||
idx, usage, err = s.KVUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, usage.KVCount, 2)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_KVUsage_Delete(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
testSetKey(t, s, 0, "key-1", "0", nil)
|
||||
testSetKey(t, s, 1, "key-2", "0", nil)
|
||||
testSetKey(t, s, 2, "key-2", "1", nil)
|
||||
|
||||
idx, usage, err := s.KVUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, usage.KVCount, 2)
|
||||
|
||||
require.NoError(t, s.KVSDelete(3, "key-2", nil))
|
||||
idx, usage, err = s.KVUsage()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, idx, uint64(3))
|
||||
require.Equal(t, usage.KVCount, 1)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -36,6 +36,10 @@ var Gauges = []prometheus.GaugeDefinition{
|
|||
Name: []string{"consul", "members", "servers"},
|
||||
Help: "Measures the current number of server agents registered with Consul. It is only emitted by Consul servers. Added in v1.9.6.",
|
||||
},
|
||||
{
|
||||
Name: []string{"consul", "kv", "entries"},
|
||||
Help: "Measures the current number of server agents registered with Consul. It is only emitted by Consul servers. Added in v1.10.3.",
|
||||
},
|
||||
}
|
||||
|
||||
type getMembersFunc func() []serf.Member
|
||||
|
@ -145,6 +149,7 @@ func (u *UsageMetricsReporter) Run(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (u *UsageMetricsReporter) runOnce() {
|
||||
u.logger.Debug("Starting usage run")
|
||||
state := u.stateProvider.State()
|
||||
|
||||
_, nodeUsage, err := state.NodeUsage()
|
||||
|
@ -163,6 +168,14 @@ func (u *UsageMetricsReporter) runOnce() {
|
|||
|
||||
members := u.memberUsage()
|
||||
u.emitMemberUsage(members)
|
||||
|
||||
_, kvUsage, err := state.KVUsage()
|
||||
if err != nil {
|
||||
u.logger.Warn("failed to retrieve kv entries from state store", "error", err)
|
||||
}
|
||||
|
||||
u.emitKVUsage(kvUsage)
|
||||
|
||||
}
|
||||
|
||||
func (u *UsageMetricsReporter) memberUsage() []serf.Member {
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package usagemetrics
|
||||
|
@ -58,3 +59,11 @@ func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage)
|
|||
u.metricLabels,
|
||||
)
|
||||
}
|
||||
|
||||
func (u *UsageMetricsReporter) emitKVUsage(kvUsage state.KVUsage) {
|
||||
metrics.SetGaugeWithLabels(
|
||||
[]string{"consul", "state", "kv_entries"},
|
||||
float32(kvUsage.KVCount),
|
||||
u.metricLabels,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package usagemetrics
|
||||
|
@ -57,6 +58,11 @@ func TestUsageReporter_emitNodeUsage_OSS(t *testing.T) {
|
|||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
getMembersFunc: func() []serf.Member { return []serf.Member{} },
|
||||
},
|
||||
|
@ -114,6 +120,11 @@ func TestUsageReporter_emitNodeUsage_OSS(t *testing.T) {
|
|||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -199,6 +210,11 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
|
|||
{Name: "datacenter", Value: "dc1"},
|
||||
},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
getMembersFunc: func() []serf.Member { return []serf.Member{} },
|
||||
},
|
||||
|
@ -276,6 +292,11 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
|
|||
{Name: "datacenter", Value: "dc1"},
|
||||
},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -314,3 +335,156 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUsageReporter_emitKVUsage_OSS(t *testing.T) {
|
||||
type testCase struct {
|
||||
modfiyStateStore func(t *testing.T, s *state.Store)
|
||||
getMembersFunc getMembersFunc
|
||||
expectedGauges map[string]metrics.GaugeValue
|
||||
}
|
||||
cases := map[string]testCase{
|
||||
"empty-state": {
|
||||
expectedGauges: map[string]metrics.GaugeValue{
|
||||
// --- node ---
|
||||
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.nodes",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
// --- member ---
|
||||
"consul.usage.test.consul.members.clients;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.members.clients",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.members.servers;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.members.servers",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
// --- service ---
|
||||
"consul.usage.test.consul.state.services;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.services",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.service_instances",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
getMembersFunc: func() []serf.Member { return []serf.Member{} },
|
||||
},
|
||||
"nodes": {
|
||||
modfiyStateStore: func(t *testing.T, s *state.Store) {
|
||||
require.NoError(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||
require.NoError(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
|
||||
require.NoError(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))
|
||||
|
||||
require.NoError(t, s.KVSSet(4, &structs.DirEntry{Key: "a", Value: []byte{1}}))
|
||||
require.NoError(t, s.KVSSet(5, &structs.DirEntry{Key: "b", Value: []byte{1}}))
|
||||
require.NoError(t, s.KVSSet(6, &structs.DirEntry{Key: "c", Value: []byte{1}}))
|
||||
require.NoError(t, s.KVSSet(7, &structs.DirEntry{Key: "d", Value: []byte{1}}))
|
||||
require.NoError(t, s.KVSDelete(8, "d", &structs.EnterpriseMeta{}))
|
||||
require.NoError(t, s.KVSDelete(9, "c", &structs.EnterpriseMeta{}))
|
||||
require.NoError(t, s.KVSSet(10, &structs.DirEntry{Key: "e", Value: []byte{1}}))
|
||||
require.NoError(t, s.KVSSet(11, &structs.DirEntry{Key: "f", Value: []byte{1}}))
|
||||
},
|
||||
getMembersFunc: func() []serf.Member {
|
||||
return []serf.Member{
|
||||
{
|
||||
Name: "foo",
|
||||
Tags: map[string]string{"role": "consul"},
|
||||
Status: serf.StatusAlive,
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Tags: map[string]string{"role": "consul"},
|
||||
Status: serf.StatusAlive,
|
||||
},
|
||||
{
|
||||
Name: "baz",
|
||||
Tags: map[string]string{"role": "node"},
|
||||
Status: serf.StatusAlive,
|
||||
},
|
||||
}
|
||||
},
|
||||
expectedGauges: map[string]metrics.GaugeValue{
|
||||
// --- node ---
|
||||
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.nodes",
|
||||
Value: 3,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
// --- member ---
|
||||
"consul.usage.test.consul.members.servers;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.members.servers",
|
||||
Value: 2,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.members.clients;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.members.clients",
|
||||
Value: 1,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
// --- service ---
|
||||
"consul.usage.test.consul.state.services;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.services",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.service_instances",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
Value: 5,
|
||||
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// Only have a single interval for the test
|
||||
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
|
||||
cfg := metrics.DefaultConfig("consul.usage.test")
|
||||
cfg.EnableHostname = false
|
||||
metrics.NewGlobal(cfg, sink)
|
||||
|
||||
mockStateProvider := &mockStateProvider{}
|
||||
s, err := newStateStore()
|
||||
require.NoError(t, err)
|
||||
if tcase.modfiyStateStore != nil {
|
||||
tcase.modfiyStateStore(t, s)
|
||||
}
|
||||
mockStateProvider.On("State").Return(s)
|
||||
|
||||
reporter, err := NewUsageMetricsReporter(
|
||||
new(Config).
|
||||
WithStateProvider(mockStateProvider).
|
||||
WithLogger(testutil.Logger(t)).
|
||||
WithDatacenter("dc1").
|
||||
WithGetMembersFunc(tcase.getMembersFunc),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
reporter.runOnce()
|
||||
|
||||
intervals := sink.Data()
|
||||
require.Len(t, intervals, 1)
|
||||
intv := intervals[0]
|
||||
|
||||
assertEqualGaugeMaps(t, tcase.expectedGauges, intv.Gauges)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue