From 80f923a47af203259d95491a3dcfbd54931ab851 Mon Sep 17 00:00:00 2001 From: Chris Piraino Date: Wed, 2 Sep 2020 10:24:21 -0500 Subject: [PATCH] Refactor state store usage to track unique service names This commit refactors the state store usage code to track unique service name changes on transaction commit. This means we only need to lookup usage entries when reading the information, as opposed to iterating over a large number of service indices. - Take into account a service instance's name being changed - Do not iterate through entire list of service instances, we only care about whether there is 0, 1, or more than 1. --- agent/consul/state/usage.go | 138 +++++++++++++++++++++++++------ agent/consul/state/usage_oss.go | 24 +----- agent/consul/state/usage_test.go | 61 ++++++++++++++ 3 files changed, 177 insertions(+), 46 deletions(-) diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 8b226b3e1..6e43f3729 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -3,9 +3,14 @@ package state import ( "fmt" + "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) +const ( + serviceNamesUsageTable = "service-names" +) + // usageTableSchema returns a new table schema used for tracking various indexes // for the Raft log. func usageTableSchema() *memdb.TableSchema { @@ -29,12 +34,29 @@ func init() { registerSchema(usageTableSchema) } +// UsageEntry represents a count of some arbitrary identifier within the +// state store, along with the last seen index. type UsageEntry struct { ID string Index uint64 Count int } +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +type uniqueServiceState int + +const ( + NoChange uniqueServiceState = 0 + Deleted uniqueServiceState = 1 + Created uniqueServiceState = 2 +) + // updateUsage takes a set of memdb changes and computes a delta for specific // usage metrics that we track. func updateUsage(tx WriteTxn, changes Changes) error { @@ -46,23 +68,98 @@ func updateUsage(tx WriteTxn, changes Changes) error { } else if change.Deleted() { delta = -1 } + switch change.Table { case "nodes": usageDeltas[change.Table] += delta case "services": + svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta - } + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } - addEnterpriseUsage(usageDeltas, change) + var serviceState uniqueServiceState + if serviceIter.Next() == nil { + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + } else if serviceIter.Next() == nil { + // If a second call to Next() returns nil, we know only a single + // instance exists. If, in addition, a new service name has been + // registered, either via creating a new service instance or via + // renaming an existing service, than we update our service count. + // + // We only care about two cases here: + // 1. A new service instance has been created with a unique name + // 2. An existing service instance has been updated with a new unique name + // + // These are the only ways a new unique service can be created. The + // other valid cases here: an update that does not change the service + // name, and a deletion, both do not impact the count of unique service + // names in the system. + + if change.Created() { + // Given a single existing service instance of the service: If a + // service has just been created, then we know this is a new unique + // service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + } else if serviceNameChanged(change) { + // Given a single existing service instance of the service: If a + // service has been updated with a new service name, then we know + // this is a new unique service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + + // Check whether the previous name was deleted in this rename, this + // is a special case of renaming a service which does not result in + // changing the count of unique service names. + before := change.Before.(*structs.ServiceNode) + beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) + if err != nil { + return err + } + if beforeSvc == nil { + usageDeltas[serviceNamesUsageTable] -= 1 + // set serviceState to NoChange since we have both gained and lost a + // service, cancelling each other out + serviceState = NoChange + } + } + } + addEnterpriseServiceUsage(usageDeltas, change, serviceState) + } } idx := changes.Index // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. if idx == 0 { - idx = maxIndexTxn(tx, "nodes", "services") + idx = maxIndexTxn(tx, "nodes", servicesTableName) } + return writeUsageDeltas(tx, idx, usageDeltas) +} + +// serviceNameChanged returns a boolean that indicates whether the +// provided change resulted in an update to the service's service name. +func serviceNameChanged(change memdb.Change) bool { + if change.Updated() { + before := change.Before.(*structs.ServiceNode) + after := change.After.(*structs.ServiceNode) + return before.ServiceName != after.ServiceName + } + + return false +} + +// writeUsageDeltas will take in a map of IDs to deltas and update each +// entry accordingly, checking for integer underflow. The index that is +// passed in will be recorded on the entry as well. +func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { for id, delta := range usageDeltas { u, err := tx.First("usage", "id", id) if err != nil { @@ -98,34 +195,16 @@ func updateUsage(tx WriteTxn, changes Changes) error { return nil } -// ServiceUsage contains all of the usage data related to services -type ServiceUsage struct { - Services int - ServiceInstances int - EnterpriseServiceUsage -} - // NodeCount returns the latest seen Raft index, a count of the number of nodes // registered, and any errors. func (s *Store) NodeCount() (uint64, int, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := tx.First("usage", "id", "nodes") + nodeUsage, err := firstUsageEntry(tx, "nodes") if err != nil { return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) } - - // If no nodes have been registered, the usage entry will not exist. - if usage == nil { - return 0, 0, nil - } - - nodeUsage, ok := usage.(*UsageEntry) - if !ok { - return 0, 0, fmt.Errorf("failed nodes lookup: type %T is not *UsageEntry", usage) - } - return nodeUsage.Index, nodeUsage.Count, nil } @@ -135,17 +214,26 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - usage, err := firstUsageEntry(tx, "services") + serviceInstances, err := firstUsageEntry(tx, servicesTableName) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - results, err := compileServiceUsage(tx, usage.Count) + services, err := firstUsageEntry(tx, serviceNamesUsageTable) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - return usage.Index, results, nil + usage := ServiceUsage{ + ServiceInstances: serviceInstances.Count, + Services: services.Count, + } + results, err := compileEnterpriseUsage(tx, usage) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return serviceInstances.Index, results, nil } func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index 825b80494..f35576abf 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -3,31 +3,13 @@ package state import ( - "fmt" - memdb "github.com/hashicorp/go-memdb" ) type EnterpriseServiceUsage struct{} -func addEnterpriseUsage(map[string]int, memdb.Change) {} +func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} -func compileServiceUsage(tx ReadTxn, totalInstances int) (ServiceUsage, error) { - var totalServices int - results, err := tx.Get( - "index", - "id_prefix", - serviceIndexName("", nil), - ) - if err != nil { - return ServiceUsage{}, fmt.Errorf("failed services index lookup: %s", err) - } - for i := results.Next(); i != nil; i = results.Next() { - totalServices += 1 - } - - return ServiceUsage{ - Services: totalServices, - ServiceInstances: totalInstances, - }, nil +func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { + return usage, nil } diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index a1c07f654..f608d7d75 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -131,3 +131,64 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "negative count") } + +func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + testRegisterService(t, s, 1, "node1", "service1") + + t.Run("rename service with a single instance", func(t *testing.T) { + svc := &structs.NodeService{ + ID: "service1", + Service: "after", + Address: "1.1.1.1", + Port: 1111, + } + require.NoError(t, s.EnsureService(2, "node1", svc)) + + // We renamed a service with a single instance, so we maintain 1 service. + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 1) + }) + + t.Run("rename service with a multiple instances", func(t *testing.T) { + svc2 := &structs.NodeService{ + ID: "service2", + Service: "before", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(3, "node1", svc2)) + + svc3 := &structs.NodeService{ + ID: "service3", + Service: "before", + Address: "1.1.1.3", + Port: 1111, + } + require.NoError(t, s.EnsureService(4, "node1", svc3)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(4)) + require.Equal(t, usage.Services, 2) + require.Equal(t, usage.ServiceInstances, 3) + + update := &structs.NodeService{ + ID: "service2", + Service: "another-name", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(5, "node1", update)) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(5)) + require.Equal(t, usage.Services, 3) + require.Equal(t, usage.ServiceInstances, 3) + }) +}