Add new usage memdb table that tracks usage counts of various elements

We update the usage table on Commit() by using the TrackedChanges() API
of memdb.

Track memdb changes on restore so that usage data can be compiled
This commit is contained in:
Chris Piraino 2020-09-02 10:24:16 -05:00
parent 736a3c89e7
commit 3af96930eb
6 changed files with 387 additions and 10 deletions

View File

@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, fedState2, fedStateLoaded2)
// Verify usage data is correctly updated
idx, nodeCount, err := fsm2.state.NodeCount()
require.NoError(t, err)
require.Equal(t, len(nodes), nodeCount)
require.NotZero(t, idx)
// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)

View File

@ -89,17 +89,20 @@ func (c *changeTrackerDB) publish(changes Changes) error {
return nil
}
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
// tracking enabled. This should only be used in Restore where we need to
// replace the entire contents of the Store without a need to track the changes.
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
// at one index - the effect is to write many values that were previously
// WriteTxnRestore returns a wrapped RW transaction that should only be used in
// Restore where we need to replace the entire contents of the Store.
// WriteTxnRestore uses a zero index since the whole restore doesn't really
// occur at one index - the effect is to write many values that were previously
// written across many indexes.
func (c *changeTrackerDB) WriteTxnRestore() *txn {
return &txn{
t := &txn{
Txn: c.db.Txn(true),
Index: 0,
}
// We enable change tracking so that usage data is correctly populated.
t.Txn.TrackChanges()
return t
}
// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher.
@ -125,14 +128,21 @@ type txn struct {
// by the caller. A non-nil error indicates that a commit failed and was not
// applied.
func (tx *txn) Commit() error {
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
}
if len(changes.Changes) > 0 {
if err := updateUsage(tx, changes); err != nil {
return err
}
}
// publish may be nil if this is a read-only or WriteTxnRestore transaction.
// In those cases changes should also be empty, and there will be nothing
// to publish.
if tx.publish != nil {
if err := tx.publish(changes); err != nil {
return err
}

170
agent/consul/state/usage.go Normal file
View File

@ -0,0 +1,170 @@
package state
import (
"fmt"
memdb "github.com/hashicorp/go-memdb"
)
// usageTableSchema returns a new table schema used for tracking various indexes
// for the Raft log.
func usageTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "usage",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Lowercase: true,
},
},
},
}
}
func init() {
registerSchema(usageTableSchema)
}
type UsageEntry struct {
ID string
Index uint64
Count int
}
// updateUsage takes a set of memdb changes and computes a delta for specific
// usage metrics that we track.
func updateUsage(tx *txn, changes Changes) error {
usageDeltas := make(map[string]int)
for _, change := range changes.Changes {
var delta int
if change.Created() {
delta = 1
} else if change.Deleted() {
delta = -1
}
switch change.Table {
case "nodes":
usageDeltas[change.Table] += delta
case "services":
usageDeltas[change.Table] += delta
}
addEnterpriseUsage(usageDeltas, change)
}
idx := changes.Index
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
if idx == 0 {
idx = maxIndexTxn(tx, "nodes", "services")
}
for id, delta := range usageDeltas {
u, err := tx.First("usage", "id", id)
if err != nil {
return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
}
if u == nil {
if delta < 0 {
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Count: delta,
Index: idx,
})
if err != nil {
return fmt.Errorf("failed to update usage entry: %s", err)
}
} else if cur, ok := u.(*UsageEntry); ok {
if cur.Count+delta < 0 {
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Count: cur.Count + delta,
Index: idx,
})
if err != nil {
return fmt.Errorf("failed to update usage entry: %s", err)
}
}
}
return nil
}
// ServiceUsage contains all of the usage data related to services
type ServiceUsage struct {
Services int
ServiceInstances int
EnterpriseServiceUsage
}
// NodeCount returns the latest seen Raft index, a count of the number of nodes
// registered, and any errors.
func (s *Store) NodeCount() (uint64, int, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
usage, err := tx.First("usage", "id", "nodes")
if err != nil {
return 0, 0, fmt.Errorf("failed nodes lookup: %s", err)
}
// If no nodes have been registered, the usage entry will not exist.
if usage == nil {
return 0, 0, nil
}
nodeUsage, ok := usage.(*UsageEntry)
if !ok {
return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage)
}
return nodeUsage.Index, nodeUsage.Count, nil
}
// ServiceUsage returns the latest seen Raft index, a compiled set of service
// usage data, and any errors.
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
usage, err := firstUsageEntry(tx, "services")
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
results, err := s.compileServiceUsage(tx, usage.Count)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
return usage.Index, results, nil
}
func firstUsageEntry(tx *txn, id string) (*UsageEntry, error) {
usage, err := tx.First("usage", "id", id)
if err != nil {
return nil, err
}
// If no elements have been inserted, the usage entry will not exist. We
// return a valid value so that can be certain the return value is not nil
// when no error has occurred.
if usage == nil {
return &UsageEntry{ID: id, Count: 0}, nil
}
realUsage, ok := usage.(*UsageEntry)
if !ok {
return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage)
}
return realUsage, nil
}

View File

@ -0,0 +1,33 @@
// +build !consulent
package state
import (
"fmt"
memdb "github.com/hashicorp/go-memdb"
)
type EnterpriseServiceUsage struct{}
func addEnterpriseUsage(map[string]int, memdb.Change) {}
func (s *Store) compileServiceUsage(tx *txn, totalInstances int) (ServiceUsage, error) {
var totalServices int
results, err := tx.Get(
"index",
"id_prefix",
serviceIndexName("", nil),
)
if err != nil {
return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err)
}
for i := results.Next(); i != nil; i = results.Next() {
totalServices += 1
}
return ServiceUsage{
Services: totalServices,
ServiceInstances: totalInstances,
}, nil
}

View File

@ -0,0 +1,25 @@
// +build !consulent
package state
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
testRegisterService(t, s, 8, "node1", "service1")
testRegisterService(t, s, 9, "node2", "service1")
testRegisterService(t, s, 10, "node2", "service2")
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(10))
require.Equal(t, 2, usage.Services)
require.Equal(t, 3, usage.ServiceInstances)
}

View File

@ -0,0 +1,133 @@
package state
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
func TestStateStore_Usage_NodeCount(t *testing.T) {
s := testStateStore(t)
// No nodes have been registered, and thus no usage entry exists
idx, count, err := s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(0))
require.Equal(t, count, 0)
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
idx, count, err = s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2)
}
func TestStateStore_Usage_NodeCount_Delete(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 0, "node1")
testRegisterNode(t, s, 1, "node2")
idx, count, err := s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2)
require.NoError(t, s.DeleteNode(2, "node2"))
idx, count, err = s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, count, 1)
}
func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
s := testStateStore(t)
// No services have been registered, and thus no usage entry exists
idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(0))
require.Equal(t, usage.Services, 0)
require.Equal(t, usage.ServiceInstances, 0)
}
func TestStateStore_Usage_Restore(t *testing.T) {
s := testStateStore(t)
restore := s.Restore()
restore.Registration(9, &structs.RegisterRequest{
Node: "test-node",
Service: &structs.NodeService{
ID: "mysql",
Service: "mysql",
Port: 8080,
Address: "198.18.0.2",
},
})
require.NoError(t, restore.Commit())
idx, count, err := s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(9))
require.Equal(t, count, 1)
}
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
s := testStateStore(t)
txn := s.db.WriteTxn(1)
// A single delete change will cause a negative count
changes := Changes{
Index: 1,
Changes: memdb.Changes{
{
Table: "nodes",
Before: &structs.Node{},
After: nil,
},
},
}
err := updateUsage(txn, changes)
require.Error(t, err)
require.Contains(t, err.Error(), "negative count")
// A insert a change to create a usage entry
changes = Changes{
Index: 1,
Changes: memdb.Changes{
{
Table: "nodes",
Before: nil,
After: &structs.Node{},
},
},
}
err = updateUsage(txn, changes)
require.NoError(t, err)
// Two deletes will cause a negative count now
changes = Changes{
Index: 1,
Changes: memdb.Changes{
{
Table: "nodes",
Before: &structs.Node{},
After: nil,
},
{
Table: "nodes",
Before: &structs.Node{},
After: nil,
},
},
}
err = updateUsage(txn, changes)
require.Error(t, err)
require.Contains(t, err.Error(), "negative count")
}