259 lines
7.5 KiB
Go
259 lines
7.5 KiB
Go
package state
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
)
|
|
|
|
const (
|
|
serviceNamesUsageTable = "service-names"
|
|
)
|
|
|
|
// usageTableSchema returns a new table schema used for tracking various indexes
|
|
// for the Raft log.
|
|
func usageTableSchema() *memdb.TableSchema {
|
|
return &memdb.TableSchema{
|
|
Name: "usage",
|
|
Indexes: map[string]*memdb.IndexSchema{
|
|
"id": {
|
|
Name: "id",
|
|
AllowMissing: false,
|
|
Unique: true,
|
|
Indexer: &memdb.StringFieldIndex{
|
|
Field: "ID",
|
|
Lowercase: true,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
registerSchema(usageTableSchema)
|
|
}
|
|
|
|
// UsageEntry represents a count of some arbitrary identifier within the
|
|
// state store, along with the last seen index.
|
|
type UsageEntry struct {
|
|
ID string
|
|
Index uint64
|
|
Count int
|
|
}
|
|
|
|
// ServiceUsage contains all of the usage data related to services
|
|
type ServiceUsage struct {
|
|
Services int
|
|
ServiceInstances int
|
|
EnterpriseServiceUsage
|
|
}
|
|
|
|
type uniqueServiceState int
|
|
|
|
const (
|
|
NoChange uniqueServiceState = 0
|
|
Deleted uniqueServiceState = 1
|
|
Created uniqueServiceState = 2
|
|
)
|
|
|
|
// updateUsage takes a set of memdb changes and computes a delta for specific
|
|
// usage metrics that we track.
|
|
func updateUsage(tx WriteTxn, changes Changes) error {
|
|
usageDeltas := make(map[string]int)
|
|
for _, change := range changes.Changes {
|
|
var delta int
|
|
if change.Created() {
|
|
delta = 1
|
|
} else if change.Deleted() {
|
|
delta = -1
|
|
}
|
|
|
|
switch change.Table {
|
|
case "nodes":
|
|
usageDeltas[change.Table] += delta
|
|
case "services":
|
|
svc := changeObject(change).(*structs.ServiceNode)
|
|
usageDeltas[change.Table] += delta
|
|
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var serviceState uniqueServiceState
|
|
if serviceIter.Next() == nil {
|
|
// If no services exist, we know we deleted the last service
|
|
// instance.
|
|
serviceState = Deleted
|
|
usageDeltas[serviceNamesUsageTable] -= 1
|
|
} else if serviceIter.Next() == nil {
|
|
// If a second call to Next() returns nil, we know only a single
|
|
// instance exists. If, in addition, a new service name has been
|
|
// registered, either via creating a new service instance or via
|
|
// renaming an existing service, than we update our service count.
|
|
//
|
|
// We only care about two cases here:
|
|
// 1. A new service instance has been created with a unique name
|
|
// 2. An existing service instance has been updated with a new unique name
|
|
//
|
|
// These are the only ways a new unique service can be created. The
|
|
// other valid cases here: an update that does not change the service
|
|
// name, and a deletion, both do not impact the count of unique service
|
|
// names in the system.
|
|
|
|
if change.Created() {
|
|
// Given a single existing service instance of the service: If a
|
|
// service has just been created, then we know this is a new unique
|
|
// service.
|
|
serviceState = Created
|
|
usageDeltas[serviceNamesUsageTable] += 1
|
|
} else if serviceNameChanged(change) {
|
|
// Given a single existing service instance of the service: If a
|
|
// service has been updated with a new service name, then we know
|
|
// this is a new unique service.
|
|
serviceState = Created
|
|
usageDeltas[serviceNamesUsageTable] += 1
|
|
|
|
// Check whether the previous name was deleted in this rename, this
|
|
// is a special case of renaming a service which does not result in
|
|
// changing the count of unique service names.
|
|
before := change.Before.(*structs.ServiceNode)
|
|
beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if beforeSvc == nil {
|
|
usageDeltas[serviceNamesUsageTable] -= 1
|
|
// set serviceState to NoChange since we have both gained and lost a
|
|
// service, cancelling each other out
|
|
serviceState = NoChange
|
|
}
|
|
}
|
|
}
|
|
addEnterpriseServiceUsage(usageDeltas, change, serviceState)
|
|
}
|
|
}
|
|
|
|
idx := changes.Index
|
|
// This will happen when restoring from a snapshot, just take the max index
|
|
// of the tables we are tracking.
|
|
if idx == 0 {
|
|
idx = maxIndexTxn(tx, "nodes", servicesTableName)
|
|
}
|
|
|
|
return writeUsageDeltas(tx, idx, usageDeltas)
|
|
}
|
|
|
|
// serviceNameChanged returns a boolean that indicates whether the
|
|
// provided change resulted in an update to the service's service name.
|
|
func serviceNameChanged(change memdb.Change) bool {
|
|
if change.Updated() {
|
|
before := change.Before.(*structs.ServiceNode)
|
|
after := change.After.(*structs.ServiceNode)
|
|
return before.ServiceName != after.ServiceName
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// writeUsageDeltas will take in a map of IDs to deltas and update each
|
|
// entry accordingly, checking for integer underflow. The index that is
|
|
// passed in will be recorded on the entry as well.
|
|
func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error {
|
|
for id, delta := range usageDeltas {
|
|
u, err := tx.First("usage", "id", id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
|
|
}
|
|
|
|
if u == nil {
|
|
if delta < 0 {
|
|
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
|
}
|
|
err := tx.Insert("usage", &UsageEntry{
|
|
ID: id,
|
|
Count: delta,
|
|
Index: idx,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update usage entry: %s", err)
|
|
}
|
|
} else if cur, ok := u.(*UsageEntry); ok {
|
|
if cur.Count+delta < 0 {
|
|
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
|
}
|
|
err := tx.Insert("usage", &UsageEntry{
|
|
ID: id,
|
|
Count: cur.Count + delta,
|
|
Index: idx,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update usage entry: %s", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NodeCount returns the latest seen Raft index, a count of the number of nodes
|
|
// registered, and any errors.
|
|
func (s *Store) NodeCount() (uint64, int, error) {
|
|
tx := s.db.ReadTxn()
|
|
defer tx.Abort()
|
|
|
|
nodeUsage, err := firstUsageEntry(tx, "nodes")
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed nodes lookup: %s", err)
|
|
}
|
|
return nodeUsage.Index, nodeUsage.Count, nil
|
|
}
|
|
|
|
// ServiceUsage returns the latest seen Raft index, a compiled set of service
|
|
// usage data, and any errors.
|
|
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
|
tx := s.db.ReadTxn()
|
|
defer tx.Abort()
|
|
|
|
serviceInstances, err := firstUsageEntry(tx, servicesTableName)
|
|
if err != nil {
|
|
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
|
}
|
|
|
|
services, err := firstUsageEntry(tx, serviceNamesUsageTable)
|
|
if err != nil {
|
|
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
|
}
|
|
|
|
usage := ServiceUsage{
|
|
ServiceInstances: serviceInstances.Count,
|
|
Services: services.Count,
|
|
}
|
|
results, err := compileEnterpriseUsage(tx, usage)
|
|
if err != nil {
|
|
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
|
}
|
|
|
|
return serviceInstances.Index, results, nil
|
|
}
|
|
|
|
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
|
|
usage, err := tx.First("usage", "id", id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If no elements have been inserted, the usage entry will not exist. We
|
|
// return a valid value so that can be certain the return value is not nil
|
|
// when no error has occurred.
|
|
if usage == nil {
|
|
return &UsageEntry{ID: id, Count: 0}, nil
|
|
}
|
|
|
|
realUsage, ok := usage.(*UsageEntry)
|
|
if !ok {
|
|
return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage)
|
|
}
|
|
|
|
return realUsage, nil
|
|
}
|