Fix bug in usage metrics when multiple service instances are changed in a single transaction (#9440)
* Fix bug in usage metrics that caused a negative count to occur There were a couple of instances were usage metrics would do the wrong thing and result in incorrect counts, causing the count to attempt to decrement below zero and return an error. The usage metrics did not account for various places where a single transaction could delete/update/add multiple service instances at once. We also remove the error when attempting to decrement below zero, and instead just make sure we do not accidentally underflow the unsigned integer. This is a more graceful failure than returning an error and not allowing a transaction to commit. * Add changelog
This commit is contained in:
parent
acc843f04d
commit
baad708929
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
state: fix computation of usage metrics to account for various places that can modify multiple services in a single transaction.
|
||||||
|
```
|
|
@ -61,6 +61,7 @@ const (
|
||||||
// usage metrics that we track.
|
// usage metrics that we track.
|
||||||
func updateUsage(tx WriteTxn, changes Changes) error {
|
func updateUsage(tx WriteTxn, changes Changes) error {
|
||||||
usageDeltas := make(map[string]int)
|
usageDeltas := make(map[string]int)
|
||||||
|
serviceNameChanges := make(map[structs.ServiceName]int)
|
||||||
for _, change := range changes.Changes {
|
for _, change := range changes.Changes {
|
||||||
var delta int
|
var delta int
|
||||||
if change.Created() {
|
if change.Created() {
|
||||||
|
@ -75,64 +76,26 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
||||||
case "services":
|
case "services":
|
||||||
svc := changeObject(change).(*structs.ServiceNode)
|
svc := changeObject(change).(*structs.ServiceNode)
|
||||||
usageDeltas[change.Table] += delta
|
usageDeltas[change.Table] += delta
|
||||||
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta)
|
addEnterpriseServiceInstanceUsage(usageDeltas, change)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var serviceState uniqueServiceState
|
// Construct a mapping of all of the various service names that were
|
||||||
if serviceIter.Next() == nil {
|
// changed, in order to compare it with the finished memdb state.
|
||||||
// If no services exist, we know we deleted the last service
|
// Make sure to account for the fact that services can change their names.
|
||||||
// instance.
|
if serviceNameChanged(change) {
|
||||||
serviceState = Deleted
|
serviceNameChanges[svc.CompoundServiceName()] += 1
|
||||||
usageDeltas[serviceNamesUsageTable] -= 1
|
|
||||||
} else if serviceIter.Next() == nil {
|
|
||||||
// If a second call to Next() returns nil, we know only a single
|
|
||||||
// instance exists. If, in addition, a new service name has been
|
|
||||||
// registered, either via creating a new service instance or via
|
|
||||||
// renaming an existing service, than we update our service count.
|
|
||||||
//
|
|
||||||
// We only care about two cases here:
|
|
||||||
// 1. A new service instance has been created with a unique name
|
|
||||||
// 2. An existing service instance has been updated with a new unique name
|
|
||||||
//
|
|
||||||
// These are the only ways a new unique service can be created. The
|
|
||||||
// other valid cases here: an update that does not change the service
|
|
||||||
// name, and a deletion, both do not impact the count of unique service
|
|
||||||
// names in the system.
|
|
||||||
|
|
||||||
if change.Created() {
|
|
||||||
// Given a single existing service instance of the service: If a
|
|
||||||
// service has just been created, then we know this is a new unique
|
|
||||||
// service.
|
|
||||||
serviceState = Created
|
|
||||||
usageDeltas[serviceNamesUsageTable] += 1
|
|
||||||
} else if serviceNameChanged(change) {
|
|
||||||
// Given a single existing service instance of the service: If a
|
|
||||||
// service has been updated with a new service name, then we know
|
|
||||||
// this is a new unique service.
|
|
||||||
serviceState = Created
|
|
||||||
usageDeltas[serviceNamesUsageTable] += 1
|
|
||||||
|
|
||||||
// Check whether the previous name was deleted in this rename, this
|
|
||||||
// is a special case of renaming a service which does not result in
|
|
||||||
// changing the count of unique service names.
|
|
||||||
before := change.Before.(*structs.ServiceNode)
|
before := change.Before.(*structs.ServiceNode)
|
||||||
beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta)
|
serviceNameChanges[before.CompoundServiceName()] -= 1
|
||||||
|
} else {
|
||||||
|
serviceNameChanges[svc.CompoundServiceName()] += delta
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if beforeSvc == nil {
|
addEnterpriseServiceUsage(usageDeltas, serviceStates)
|
||||||
usageDeltas[serviceNamesUsageTable] -= 1
|
|
||||||
// set serviceState to NoChange since we have both gained and lost a
|
|
||||||
// service, cancelling each other out
|
|
||||||
serviceState = NoChange
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
addEnterpriseServiceUsage(usageDeltas, change, serviceState)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idx := changes.Index
|
idx := changes.Index
|
||||||
// This will happen when restoring from a snapshot, just take the max index
|
// This will happen when restoring from a snapshot, just take the max index
|
||||||
|
@ -144,6 +107,46 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
||||||
return writeUsageDeltas(tx, idx, usageDeltas)
|
return writeUsageDeltas(tx, idx, usageDeltas)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
||||||
|
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
||||||
|
for svc, delta := range serviceNameChanges {
|
||||||
|
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count the number of service instances associated with the given service
|
||||||
|
// name at the end of this transaction, and compare that with how many were
|
||||||
|
// added/removed during the transaction. This allows us to handle a single
|
||||||
|
// transaction committing multiple changes related to a single service
|
||||||
|
// name.
|
||||||
|
var svcCount int
|
||||||
|
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
|
||||||
|
svcCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
var serviceState uniqueServiceState
|
||||||
|
switch {
|
||||||
|
case svcCount == 0:
|
||||||
|
// If no services exist, we know we deleted the last service
|
||||||
|
// instance.
|
||||||
|
serviceState = Deleted
|
||||||
|
usageDeltas[serviceNamesUsageTable] -= 1
|
||||||
|
case svcCount == delta:
|
||||||
|
// If the current number of service instances equals the number added,
|
||||||
|
// than we know we created a new service name.
|
||||||
|
serviceState = Created
|
||||||
|
usageDeltas[serviceNamesUsageTable] += 1
|
||||||
|
default:
|
||||||
|
serviceState = NoChange
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceStates[svc] = serviceState
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceStates, nil
|
||||||
|
}
|
||||||
|
|
||||||
// serviceNameChanged returns a boolean that indicates whether the
|
// serviceNameChanged returns a boolean that indicates whether the
|
||||||
// provided change resulted in an update to the service's service name.
|
// provided change resulted in an update to the service's service name.
|
||||||
func serviceNameChanged(change memdb.Change) bool {
|
func serviceNameChanged(change memdb.Change) bool {
|
||||||
|
@ -168,7 +171,11 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
|
||||||
|
|
||||||
if u == nil {
|
if u == nil {
|
||||||
if delta < 0 {
|
if delta < 0 {
|
||||||
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
// Don't return an error here, since we don't want to block updates
|
||||||
|
// from happening to the state store. But, set the delta to 0 so that
|
||||||
|
// we do not accidentally underflow the uint64 and begin reporting
|
||||||
|
// large numbers.
|
||||||
|
delta = 0
|
||||||
}
|
}
|
||||||
err := tx.Insert("usage", &UsageEntry{
|
err := tx.Insert("usage", &UsageEntry{
|
||||||
ID: id,
|
ID: id,
|
||||||
|
@ -179,12 +186,17 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
|
||||||
return fmt.Errorf("failed to update usage entry: %s", err)
|
return fmt.Errorf("failed to update usage entry: %s", err)
|
||||||
}
|
}
|
||||||
} else if cur, ok := u.(*UsageEntry); ok {
|
} else if cur, ok := u.(*UsageEntry); ok {
|
||||||
if cur.Count+delta < 0 {
|
updated := cur.Count + delta
|
||||||
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
|
if updated < 0 {
|
||||||
|
// Don't return an error here, since we don't want to block updates
|
||||||
|
// from happening to the state store. But, set the delta to 0 so that
|
||||||
|
// we do not accidentally underflow the uint64 and begin reporting
|
||||||
|
// large numbers.
|
||||||
|
updated = 0
|
||||||
}
|
}
|
||||||
err := tx.Insert("usage", &UsageEntry{
|
err := tx.Insert("usage", &UsageEntry{
|
||||||
ID: id,
|
ID: id,
|
||||||
Count: cur.Count + delta,
|
Count: updated,
|
||||||
Index: idx,
|
Index: idx,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -3,12 +3,15 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EnterpriseServiceUsage struct{}
|
type EnterpriseServiceUsage struct{}
|
||||||
|
|
||||||
func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {}
|
func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {}
|
||||||
|
|
||||||
|
func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {}
|
||||||
|
|
||||||
func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
||||||
return usage, nil
|
return usage, nil
|
||||||
|
|
|
@ -55,6 +55,43 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
|
||||||
require.Equal(t, usage.ServiceInstances, 0)
|
require.Equal(t, usage.ServiceInstances, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
testRegisterNode(t, s, 1, "node1")
|
||||||
|
|
||||||
|
svc1 := &structs.NodeService{
|
||||||
|
ID: "service1",
|
||||||
|
Service: "test",
|
||||||
|
Address: "1.1.1.1",
|
||||||
|
Port: 1111,
|
||||||
|
}
|
||||||
|
svc2 := &structs.NodeService{
|
||||||
|
ID: "service2",
|
||||||
|
Service: "test",
|
||||||
|
Address: "1.1.1.1",
|
||||||
|
Port: 1111,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register multiple instances on a single node to test that we do not
|
||||||
|
// double count deletions within the same transaction.
|
||||||
|
require.NoError(t, s.EnsureService(1, "node1", svc1))
|
||||||
|
require.NoError(t, s.EnsureService(2, "node1", svc2))
|
||||||
|
|
||||||
|
idx, usage, err := s.ServiceUsage()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, idx, uint64(2))
|
||||||
|
require.Equal(t, usage.Services, 1)
|
||||||
|
require.Equal(t, usage.ServiceInstances, 2)
|
||||||
|
|
||||||
|
require.NoError(t, s.DeleteNode(3, "node1"))
|
||||||
|
|
||||||
|
idx, usage, err = s.ServiceUsage()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, idx, uint64(3))
|
||||||
|
require.Equal(t, usage.Services, 0)
|
||||||
|
require.Equal(t, usage.ServiceInstances, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_Usage_Restore(t *testing.T) {
|
func TestStateStore_Usage_Restore(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
restore := s.Restore()
|
restore := s.Restore()
|
||||||
|
@ -67,12 +104,27 @@ func TestStateStore_Usage_Restore(t *testing.T) {
|
||||||
Address: "198.18.0.2",
|
Address: "198.18.0.2",
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
restore.Registration(9, &structs.RegisterRequest{
|
||||||
|
Node: "test-node",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "mysql1",
|
||||||
|
Service: "mysql",
|
||||||
|
Port: 8081,
|
||||||
|
Address: "198.18.0.2",
|
||||||
|
},
|
||||||
|
})
|
||||||
require.NoError(t, restore.Commit())
|
require.NoError(t, restore.Commit())
|
||||||
|
|
||||||
idx, count, err := s.NodeCount()
|
idx, count, err := s.NodeCount()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, idx, uint64(9))
|
require.Equal(t, idx, uint64(9))
|
||||||
require.Equal(t, count, 1)
|
require.Equal(t, count, 1)
|
||||||
|
|
||||||
|
idx, usage, err := s.ServiceUsage()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, idx, uint64(9))
|
||||||
|
require.Equal(t, usage.Services, 1)
|
||||||
|
require.Equal(t, usage.ServiceInstances, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
||||||
|
@ -92,8 +144,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := updateUsage(txn, changes)
|
err := updateUsage(txn, changes)
|
||||||
require.Error(t, err)
|
require.NoError(t, err)
|
||||||
require.Contains(t, err.Error(), "negative count")
|
|
||||||
|
// Check that we do not underflow
|
||||||
|
u, err := txn.First("usage", "id", "nodes")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, u.(*UsageEntry).Count)
|
||||||
|
|
||||||
// A insert a change to create a usage entry
|
// A insert a change to create a usage entry
|
||||||
changes = Changes{
|
changes = Changes{
|
||||||
|
@ -128,8 +184,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateUsage(txn, changes)
|
err = updateUsage(txn, changes)
|
||||||
require.Error(t, err)
|
require.NoError(t, err)
|
||||||
require.Contains(t, err.Error(), "negative count")
|
|
||||||
|
// Check that we do not underflow
|
||||||
|
u, err = txn.First("usage", "id", "nodes")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, u.(*UsageEntry).Count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) {
|
func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue