diff --git a/.changelog/9440.txt b/.changelog/9440.txt new file mode 100644 index 000000000..7c253c033 --- /dev/null +++ b/.changelog/9440.txt @@ -0,0 +1,3 @@ +```release-note:bug +state: fix computation of usage metrics to account for various places that can modify multiple services in a single transaction. +``` diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 6e43f3729..bde67d127 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -61,6 +61,7 @@ const ( // usage metrics that we track. func updateUsage(tx WriteTxn, changes Changes) error { usageDeltas := make(map[string]int) + serviceNameChanges := make(map[structs.ServiceName]int) for _, change := range changes.Changes { var delta int if change.Created() { @@ -75,65 +76,27 @@ func updateUsage(tx WriteTxn, changes Changes) error { case "services": svc := changeObject(change).(*structs.ServiceNode) usageDeltas[change.Table] += delta - serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) - if err != nil { - return err + addEnterpriseServiceInstanceUsage(usageDeltas, change) + + // Construct a mapping of all of the various service names that were + // changed, in order to compare it with the finished memdb state. + // Make sure to account for the fact that services can change their names. + if serviceNameChanged(change) { + serviceNameChanges[svc.CompoundServiceName()] += 1 + before := change.Before.(*structs.ServiceNode) + serviceNameChanges[before.CompoundServiceName()] -= 1 + } else { + serviceNameChanges[svc.CompoundServiceName()] += delta } - - var serviceState uniqueServiceState - if serviceIter.Next() == nil { - // If no services exist, we know we deleted the last service - // instance. - serviceState = Deleted - usageDeltas[serviceNamesUsageTable] -= 1 - } else if serviceIter.Next() == nil { - // If a second call to Next() returns nil, we know only a single - // instance exists. If, in addition, a new service name has been - // registered, either via creating a new service instance or via - // renaming an existing service, than we update our service count. - // - // We only care about two cases here: - // 1. A new service instance has been created with a unique name - // 2. An existing service instance has been updated with a new unique name - // - // These are the only ways a new unique service can be created. The - // other valid cases here: an update that does not change the service - // name, and a deletion, both do not impact the count of unique service - // names in the system. - - if change.Created() { - // Given a single existing service instance of the service: If a - // service has just been created, then we know this is a new unique - // service. - serviceState = Created - usageDeltas[serviceNamesUsageTable] += 1 - } else if serviceNameChanged(change) { - // Given a single existing service instance of the service: If a - // service has been updated with a new service name, then we know - // this is a new unique service. - serviceState = Created - usageDeltas[serviceNamesUsageTable] += 1 - - // Check whether the previous name was deleted in this rename, this - // is a special case of renaming a service which does not result in - // changing the count of unique service names. - before := change.Before.(*structs.ServiceNode) - beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) - if err != nil { - return err - } - if beforeSvc == nil { - usageDeltas[serviceNamesUsageTable] -= 1 - // set serviceState to NoChange since we have both gained and lost a - // service, cancelling each other out - serviceState = NoChange - } - } - } - addEnterpriseServiceUsage(usageDeltas, change, serviceState) } } + serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges) + if err != nil { + return err + } + addEnterpriseServiceUsage(usageDeltas, serviceStates) + idx := changes.Index // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. @@ -144,6 +107,46 @@ func updateUsage(tx WriteTxn, changes Changes) error { return writeUsageDeltas(tx, idx, usageDeltas) } +func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { + serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) + for svc, delta := range serviceNameChanges { + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta) + if err != nil { + return nil, err + } + + // Count the number of service instances associated with the given service + // name at the end of this transaction, and compare that with how many were + // added/removed during the transaction. This allows us to handle a single + // transaction committing multiple changes related to a single service + // name. + var svcCount int + for service := serviceIter.Next(); service != nil; service = serviceIter.Next() { + svcCount += 1 + } + + var serviceState uniqueServiceState + switch { + case svcCount == 0: + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + case svcCount == delta: + // If the current number of service instances equals the number added, + // than we know we created a new service name. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + default: + serviceState = NoChange + } + + serviceStates[svc] = serviceState + } + + return serviceStates, nil +} + // serviceNameChanged returns a boolean that indicates whether the // provided change resulted in an update to the service's service name. func serviceNameChanged(change memdb.Change) bool { @@ -168,7 +171,11 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error if u == nil { if delta < 0 { - return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + // Don't return an error here, since we don't want to block updates + // from happening to the state store. But, set the delta to 0 so that + // we do not accidentally underflow the uint64 and begin reporting + // large numbers. + delta = 0 } err := tx.Insert("usage", &UsageEntry{ ID: id, @@ -179,12 +186,17 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error return fmt.Errorf("failed to update usage entry: %s", err) } } else if cur, ok := u.(*UsageEntry); ok { - if cur.Count+delta < 0 { - return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + updated := cur.Count + delta + if updated < 0 { + // Don't return an error here, since we don't want to block updates + // from happening to the state store. But, set the delta to 0 so that + // we do not accidentally underflow the uint64 and begin reporting + // large numbers. + updated = 0 } err := tx.Insert("usage", &UsageEntry{ ID: id, - Count: cur.Count + delta, + Count: updated, Index: idx, }) if err != nil { diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index f35576abf..355f1fbcc 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -3,12 +3,15 @@ package state import ( + "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) type EnterpriseServiceUsage struct{} -func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} +func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {} + +func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {} func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { return usage, nil diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index f608d7d75..3cb0b440e 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -55,6 +55,43 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { require.Equal(t, usage.ServiceInstances, 0) } +func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + + svc1 := &structs.NodeService{ + ID: "service1", + Service: "test", + Address: "1.1.1.1", + Port: 1111, + } + svc2 := &structs.NodeService{ + ID: "service2", + Service: "test", + Address: "1.1.1.1", + Port: 1111, + } + + // Register multiple instances on a single node to test that we do not + // double count deletions within the same transaction. + require.NoError(t, s.EnsureService(1, "node1", svc1)) + require.NoError(t, s.EnsureService(2, "node1", svc2)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 2) + + require.NoError(t, s.DeleteNode(3, "node1")) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(3)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + func TestStateStore_Usage_Restore(t *testing.T) { s := testStateStore(t) restore := s.Restore() @@ -67,12 +104,27 @@ func TestStateStore_Usage_Restore(t *testing.T) { Address: "198.18.0.2", }, }) + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql1", + Service: "mysql", + Port: 8081, + Address: "198.18.0.2", + }, + }) require.NoError(t, restore.Commit()) idx, count, err := s.NodeCount() require.NoError(t, err) require.Equal(t, idx, uint64(9)) require.Equal(t, count, 1) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 2) } func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { @@ -92,8 +144,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { } err := updateUsage(txn, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "negative count") + require.NoError(t, err) + + // Check that we do not underflow + u, err := txn.First("usage", "id", "nodes") + require.NoError(t, err) + require.Equal(t, 0, u.(*UsageEntry).Count) // A insert a change to create a usage entry changes = Changes{ @@ -128,8 +184,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { } err = updateUsage(txn, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "negative count") + require.NoError(t, err) + + // Check that we do not underflow + u, err = txn.First("usage", "id", "nodes") + require.NoError(t, err) + require.Equal(t, 0, u.(*UsageEntry).Count) } func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) {