diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 8b226b3e1..6e43f3729 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -3,9 +3,14 @@ package state import ( "fmt" + "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) +const ( + serviceNamesUsageTable = "service-names" +) + // usageTableSchema returns a new table schema used for tracking various indexes // for the Raft log. func usageTableSchema() *memdb.TableSchema { @@ -29,12 +34,29 @@ func init() { registerSchema(usageTableSchema) } +// UsageEntry represents a count of some arbitrary identifier within the +// state store, along with the last seen index. type UsageEntry struct { ID string Index uint64 Count int } +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +type uniqueServiceState int + +const ( + NoChange uniqueServiceState = 0 + Deleted uniqueServiceState = 1 + Created uniqueServiceState = 2 +) + // updateUsage takes a set of memdb changes and computes a delta for specific // usage metrics that we track. func updateUsage(tx WriteTxn, changes Changes) error { @@ -46,23 +68,98 @@ func updateUsage(tx WriteTxn, changes Changes) error { } else if change.Deleted() { delta = -1 } + switch change.Table { case "nodes": usageDeltas[change.Table] += delta case "services": + svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta - } + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } - addEnterpriseUsage(usageDeltas, change) + var serviceState uniqueServiceState + if serviceIter.Next() == nil { + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + } else if serviceIter.Next() == nil { + // If a second call to Next() returns nil, we know only a single + // instance exists. If, in addition, a new service name has been + // registered, either via creating a new service instance or via + // renaming an existing service, than we update our service count. + // + // We only care about two cases here: + // 1. A new service instance has been created with a unique name + // 2. An existing service instance has been updated with a new unique name + // + // These are the only ways a new unique service can be created. The + // other valid cases here: an update that does not change the service + // name, and a deletion, both do not impact the count of unique service + // names in the system. + + if change.Created() { + // Given a single existing service instance of the service: If a + // service has just been created, then we know this is a new unique + // service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + } else if serviceNameChanged(change) { + // Given a single existing service instance of the service: If a + // service has been updated with a new service name, then we know + // this is a new unique service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + + // Check whether the previous name was deleted in this rename, this + // is a special case of renaming a service which does not result in + // changing the count of unique service names. + before := change.Before.(*structs.ServiceNode) + beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) + if err != nil { + return err + } + if beforeSvc == nil { + usageDeltas[serviceNamesUsageTable] -= 1 + // set serviceState to NoChange since we have both gained and lost a + // service, cancelling each other out + serviceState = NoChange + } + } + } + addEnterpriseServiceUsage(usageDeltas, change, serviceState) + } } idx := changes.Index // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. if idx == 0 { - idx = maxIndexTxn(tx, "nodes", "services") + idx = maxIndexTxn(tx, "nodes", servicesTableName) } + return writeUsageDeltas(tx, idx, usageDeltas) +} + +// serviceNameChanged returns a boolean that indicates whether the +// provided change resulted in an update to the service's service name. +func serviceNameChanged(change memdb.Change) bool { + if change.Updated() { + before := change.Before.(*structs.ServiceNode) + after := change.After.(*structs.ServiceNode) + return before.ServiceName != after.ServiceName + } + + return false +} + +// writeUsageDeltas will take in a map of IDs to deltas and update each +// entry accordingly, checking for integer underflow. The index that is +// passed in will be recorded on the entry as well. +func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { for id, delta := range usageDeltas { u, err := tx.First("usage", "id", id) if err != nil { @@ -98,34 +195,16 @@ func updateUsage(tx WriteTxn, changes Changes) error { return nil } -// ServiceUsage contains all of the usage data related to services -type ServiceUsage struct { - Services int - ServiceInstances int - EnterpriseServiceUsage -} - // NodeCount returns the latest seen Raft index, a count of the number of nodes // registered, and any errors. func (s *Store) NodeCount() (uint64, int, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := tx.First("usage", "id", "nodes") + nodeUsage, err := firstUsageEntry(tx, "nodes") if err != nil { return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) } - - // If no nodes have been registered, the usage entry will not exist. - if usage == nil { - return 0, 0, nil - } - - nodeUsage, ok := usage.(*UsageEntry) - if !ok { - return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) - } - return nodeUsage.Index, nodeUsage.Count, nil } @@ -135,17 +214,26 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := firstUsageEntry(tx, "services") + serviceInstances, err := firstUsageEntry(tx, servicesTableName) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - results, err := compileServiceUsage(tx, usage.Count) + services, err := firstUsageEntry(tx, serviceNamesUsageTable) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - return usage.Index, results, nil + usage := ServiceUsage{ + ServiceInstances: serviceInstances.Count, + Services: services.Count, + } + results, err := compileEnterpriseUsage(tx, usage) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return serviceInstances.Index, results, nil } func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index 825b80494..f35576abf 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -3,31 +3,13 @@ package state import ( - "fmt" - memdb "github.com/hashicorp/go-memdb" ) type EnterpriseServiceUsage struct{} -func addEnterpriseUsage(map[string]int, memdb.Change) {} +func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} -func compileServiceUsage(tx ReadTxn, totalInstances int) (ServiceUsage, error) { - var totalServices int - results, err := tx.Get( - "index", - "id_prefix", - serviceIndexName("", nil), - ) - if err != nil { - return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) - } - for i := results.Next(); i != nil; i = results.Next() { - totalServices += 1 - } - - return ServiceUsage{ - Services: totalServices, - ServiceInstances: totalInstances, - }, nil +func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { + return usage, nil } diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index a1c07f654..f608d7d75 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -131,3 +131,64 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "negative count") } + +func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + testRegisterService(t, s, 1, "node1", "service1") + + t.Run("rename service with a single instance", func(t *testing.T) { + svc := &structs.NodeService{ + ID: "service1", + Service: "after", + Address: "1.1.1.1", + Port: 1111, + } + require.NoError(t, s.EnsureService(2, "node1", svc)) + + // We renamed a service with a single instance, so we maintain 1 service. + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 1) + }) + + t.Run("rename service with a multiple instances", func(t *testing.T) { + svc2 := &structs.NodeService{ + ID: "service2", + Service: "before", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(3, "node1", svc2)) + + svc3 := &structs.NodeService{ + ID: "service3", + Service: "before", + Address: "1.1.1.3", + Port: 1111, + } + require.NoError(t, s.EnsureService(4, "node1", svc3)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(4)) + require.Equal(t, usage.Services, 2) + require.Equal(t, usage.ServiceInstances, 3) + + update := &structs.NodeService{ + ID: "service2", + Service: "another-name", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(5, "node1", update)) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(5)) + require.Equal(t, usage.Services, 3) + require.Equal(t, usage.ServiceInstances, 3) + }) +}