Add per-node max indexes (#12399)
Adds fine-grained node.[node] entries to the index table, allowing blocking queries to return fine-grained indexes that prevent them from returning immediately when unrelated nodes/services are updated. Co-authored-by: kisunji <ckim@hashicorp.com>
This commit is contained in:
parent
aaf3c051f2
commit
25f4c44268
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:enhancement
|
||||||
|
catalog: Add per-node indexes to reduce watchset firing for unrelated nodes and services.
|
||||||
|
```
|
|
@ -17,9 +17,15 @@ import (
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// indexServiceExtinction keeps track of the last raft index when the last instance
|
const (
|
||||||
// of any service was unregistered. This is used by blocking queries on missing services.
|
// indexServiceExtinction keeps track of the last raft index when the last instance
|
||||||
const indexServiceExtinction = "service_last_extinction"
|
// of any service was unregistered. This is used by blocking queries on missing services.
|
||||||
|
indexServiceExtinction = "service_last_extinction"
|
||||||
|
|
||||||
|
// indexNodeExtinction keeps track of the last raft index when the last instance
|
||||||
|
// of any node was unregistered. This is used by blocking queries on missing nodes.
|
||||||
|
indexNodeExtinction = "node_last_extinction"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// minUUIDLookupLen is used as a minimum length of a node name required before
|
// minUUIDLookupLen is used as a minimum length of a node name required before
|
||||||
|
@ -414,8 +420,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
|
||||||
// We are actually renaming a node, remove its reference first
|
// We are actually renaming a node, remove its reference first
|
||||||
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName)
|
err := s.deleteNodeTxn(tx, idx, n.Node, n.GetEnterpriseMeta(), n.PeerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s",
|
return fmt.Errorf("Error while renaming Node ID: %q (%s) from %s to %s: %w",
|
||||||
node.ID, node.Address, n.Node, node.Node)
|
node.ID, node.Address, n.Node, node.Node, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -764,6 +770,15 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string, entMeta
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up node entry from index table
|
||||||
|
if err := tx.Delete(tableIndex, &IndexEntry{Key: nodeIndexName(nodeName, entMeta, node.PeerName)}); err != nil {
|
||||||
|
return fmt.Errorf("failed deleting nodeIndex %q: %w", nodeIndexName(nodeName, entMeta, node.PeerName), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := catalogUpdateNodeExtinctionIndex(tx, idx, entMeta, node.PeerName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if peerName == "" {
|
if peerName == "" {
|
||||||
// Invalidate any sessions for this node.
|
// Invalidate any sessions for this node.
|
||||||
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
|
toDelete, err := allNodeSessionsTxn(tx, nodeName, entMeta.PartitionOrDefault())
|
||||||
|
@ -1683,9 +1698,6 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
|
||||||
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
|
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := catalogMaxIndex(tx, entMeta, peerName, false)
|
|
||||||
|
|
||||||
// Query the node by node name
|
// Query the node by node name
|
||||||
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
|
watchCh, n, err := tx.FirstWatch(tableNodes, indexID, Query{
|
||||||
Value: nodeNameOrID,
|
Value: nodeNameOrID,
|
||||||
|
@ -1712,16 +1724,16 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ws.Add(watchCh)
|
ws.Add(watchCh)
|
||||||
// TODO(sean@): We could/should log an error re: the uuid_prefix lookup
|
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
|
||||||
// failing once a logger has been introduced to the catalog.
|
return true, idx, nil, nil, nil
|
||||||
return true, 0, nil, nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n = iter.Next()
|
n = iter.Next()
|
||||||
if n == nil {
|
if n == nil {
|
||||||
// No nodes matched, even with the Node ID: add a watch on the node name.
|
// No nodes matched, even with the Node ID: add a watch on the node name.
|
||||||
ws.Add(watchCh)
|
ws.Add(watchCh)
|
||||||
return true, 0, nil, nil, nil
|
idx := catalogNodeLastExtinctionIndex(tx, entMeta, peerName)
|
||||||
|
return true, idx, nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
idWatchCh := iter.WatchCh()
|
idWatchCh := iter.WatchCh()
|
||||||
|
@ -1745,6 +1757,9 @@ func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *ac
|
||||||
}
|
}
|
||||||
ws.Add(services.WatchCh())
|
ws.Add(services.WatchCh())
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := catalogNodeMaxIndex(tx, nodeName, entMeta, peerName)
|
||||||
|
|
||||||
return false, idx, node, services, nil
|
return false, idx, node, services, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1902,10 +1917,17 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
|
|
||||||
svc := service.(*structs.ServiceNode)
|
svc := service.(*structs.ServiceNode)
|
||||||
if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil {
|
if err := catalogUpdateServicesIndexes(tx, idx, entMeta, svc.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating services indexes: %w", err)
|
||||||
}
|
}
|
||||||
if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
if err := catalogUpdateServiceKindIndexes(tx, idx, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating service-kind indexes: %w", err)
|
||||||
|
}
|
||||||
|
// Update the node indexes as the service information is included in node catalog queries.
|
||||||
|
if err := catalogUpdateNodesIndexes(tx, idx, entMeta, peerName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating nodes indexes: %w", err)
|
||||||
|
}
|
||||||
|
if err := catalogUpdateNodeIndexes(tx, idx, nodeName, entMeta, peerName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating node indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
name := svc.CompoundServiceName()
|
name := svc.CompoundServiceName()
|
||||||
|
@ -1930,7 +1952,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName)
|
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta, svc.PeerName)
|
||||||
if err == nil && serviceIndex != nil {
|
if err == nil && serviceIndex != nil {
|
||||||
// we found service.<serviceName> index, garbage collect it
|
// we found service.<serviceName> index, garbage collect it
|
||||||
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
|
if err := tx.Delete(tableIndex, serviceIndex); err != nil {
|
||||||
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -24,8 +24,12 @@ func serviceKindIndexName(kind structs.ServiceKind, _ *acl.EnterpriseMeta, peerN
|
||||||
return peeredIndexEntryName(base, peerName)
|
return peeredIndexEntryName(base, peerName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nodeIndexName(name string, _ *acl.EnterpriseMeta, peerName string) string {
|
||||||
|
return peeredIndexEntryName(fmt.Sprintf("node.%s", name), peerName)
|
||||||
|
}
|
||||||
|
|
||||||
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
// overall nodes index
|
// overall nodes index for snapshot and ListNodes RPC
|
||||||
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableNodes); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -38,12 +42,22 @@ func catalogUpdateNodesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, p
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// catalogUpdateNodeIndexes upserts the max index for a single node
|
||||||
|
func catalogUpdateNodeIndexes(tx WriteTxn, idx uint64, nodeName string, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
|
// per-node index
|
||||||
|
if err := indexUpdateMaxTxn(tx, idx, nodeIndexName(nodeName, nil, peerName)); err != nil {
|
||||||
|
return fmt.Errorf("failed updating node index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels
|
// catalogUpdateServicesIndexes upserts the max index for the entire services table with varying levels
|
||||||
// of granularity (no-op if `idx` is lower than what exists for that index key):
|
// of granularity (no-op if `idx` is lower than what exists for that index key):
|
||||||
// - all services
|
// - all services
|
||||||
// - all services in a specified peer (including internal)
|
// - all services in a specified peer (including internal)
|
||||||
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
func catalogUpdateServicesIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
// overall services index
|
// overall services index for snapshot
|
||||||
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableServices); err != nil {
|
||||||
return fmt.Errorf("failed updating index for services table: %w", err)
|
return fmt.Errorf("failed updating index for services table: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -84,14 +98,16 @@ func catalogUpdateServiceIndexes(tx WriteTxn, idx uint64, serviceName string, _
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
if err := indexUpdateMaxTxn(tx, idx, indexServiceExtinction); err != nil {
|
|
||||||
return fmt.Errorf("failed updating missing service extinction index: %w", err)
|
|
||||||
}
|
|
||||||
// update the peer index
|
|
||||||
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexServiceExtinction, peerName)); err != nil {
|
||||||
return fmt.Errorf("failed updating missing service extinction peered index: %w", err)
|
return fmt.Errorf("failed updating missing service extinction peered index: %w", err)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func catalogUpdateNodeExtinctionIndex(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
|
if err := indexUpdateMaxTxn(tx, idx, peeredIndexEntryName(indexNodeExtinction, peerName)); err != nil {
|
||||||
|
return fmt.Errorf("failed updating missing node extinction peered index: %w", err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +121,10 @@ func catalogInsertNode(tx WriteTxn, node *structs.Node) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil {
|
if err := catalogUpdateNodesIndexes(tx, node.ModifyIndex, node.GetEnterpriseMeta(), node.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating nodes indexes: %w", err)
|
||||||
|
}
|
||||||
|
if err := catalogUpdateNodeIndexes(tx, node.ModifyIndex, node.Node, node.GetEnterpriseMeta(), node.PeerName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating node indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the node's service indexes as the node information is included
|
// Update the node's service indexes as the node information is included
|
||||||
|
@ -125,15 +144,23 @@ func catalogInsertService(tx WriteTxn, svc *structs.ServiceNode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
if err := catalogUpdateServicesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating services indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
if err := catalogUpdateServiceIndexes(tx, svc.ModifyIndex, svc.ServiceName, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating service indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
if err := catalogUpdateServiceKindIndexes(tx, svc.ModifyIndex, svc.ServiceKind, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
return err
|
return fmt.Errorf("failed updating service-kind indexes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the node indexes as the service information is included in node catalog queries.
|
||||||
|
if err := catalogUpdateNodesIndexes(tx, svc.ModifyIndex, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating nodes indexes: %w", err)
|
||||||
|
}
|
||||||
|
if err := catalogUpdateNodeIndexes(tx, svc.ModifyIndex, svc.Node, &svc.EnterpriseMeta, svc.PeerName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating node indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -143,6 +170,14 @@ func catalogNodesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) ui
|
||||||
return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName))
|
return maxIndexTxn(tx, peeredIndexEntryName(tableNodes, peerName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func catalogNodeMaxIndex(tx ReadTxn, nodeName string, _ *acl.EnterpriseMeta, peerName string) uint64 {
|
||||||
|
return maxIndexTxn(tx, nodeIndexName(nodeName, nil, peerName))
|
||||||
|
}
|
||||||
|
|
||||||
|
func catalogNodeLastExtinctionIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
|
||||||
|
return maxIndexTxn(tx, peeredIndexEntryName(indexNodeExtinction, peerName))
|
||||||
|
}
|
||||||
|
|
||||||
func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
|
func catalogServicesMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string) uint64 {
|
||||||
return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName))
|
return maxIndexTxn(tx, peeredIndexEntryName(tableServices, peerName))
|
||||||
}
|
}
|
||||||
|
@ -185,7 +220,6 @@ func catalogMaxIndex(tx ReadTxn, _ *acl.EnterpriseMeta, peerName string, checks
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 {
|
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta, peerName string, checks bool) uint64 {
|
||||||
// TODO(peering_indexes): pipe peerName here
|
|
||||||
if checks {
|
if checks {
|
||||||
return maxIndexWatchTxn(tx, ws,
|
return maxIndexWatchTxn(tx, ws,
|
||||||
peeredIndexEntryName(tableChecks, peerName),
|
peeredIndexEntryName(tableChecks, peerName),
|
||||||
|
@ -200,7 +234,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *acl.EnterpriseMeta,
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *acl.EnterpriseMeta, peerName string) error {
|
||||||
// update the universal index entry
|
// update the overall index entry for snapshot
|
||||||
if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableChecks); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -555,7 +555,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
run := func(t *testing.T, peerName string) {
|
run := func(t *testing.T, peerName string) {
|
||||||
verifyNode := func(t *testing.T, s *Store, nodeLookup string) {
|
verifyNode := func(t *testing.T, s *Store, nodeLookup string, expectIdx uint64) {
|
||||||
idx, out, err := s.GetNode(nodeLookup, nil, peerName)
|
idx, out, err := s.GetNode(nodeLookup, nil, peerName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
byID := false
|
byID := false
|
||||||
|
@ -566,7 +566,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NotNil(t, out)
|
require.NotNil(t, out)
|
||||||
require.Equal(t, uint64(1), idx)
|
require.Equal(t, expectIdx, idx)
|
||||||
|
|
||||||
require.Equal(t, "1.2.3.4", out.Address)
|
require.Equal(t, "1.2.3.4", out.Address)
|
||||||
if byID {
|
if byID {
|
||||||
|
@ -661,8 +661,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
require.NoError(t, restore.Commit())
|
require.NoError(t, restore.Commit())
|
||||||
|
|
||||||
// Retrieve the node and verify its contents.
|
// Retrieve the node and verify its contents.
|
||||||
verifyNode(t, s, nodeID)
|
verifyNode(t, s, nodeID, 1)
|
||||||
verifyNode(t, s, nodeName)
|
verifyNode(t, s, nodeName, 1)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Add in a service definition.
|
// Add in a service definition.
|
||||||
|
@ -686,8 +686,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
require.NoError(t, restore.Commit())
|
require.NoError(t, restore.Commit())
|
||||||
|
|
||||||
// Verify that the service got registered.
|
// Verify that the service got registered.
|
||||||
verifyNode(t, s, nodeID)
|
verifyNode(t, s, nodeID, 2)
|
||||||
verifyNode(t, s, nodeName)
|
verifyNode(t, s, nodeName, 2)
|
||||||
verifyService(t, s, nodeID)
|
verifyService(t, s, nodeID)
|
||||||
verifyService(t, s, nodeName)
|
verifyService(t, s, nodeName)
|
||||||
})
|
})
|
||||||
|
@ -726,8 +726,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
require.NoError(t, restore.Commit())
|
require.NoError(t, restore.Commit())
|
||||||
|
|
||||||
// Verify that the check got registered.
|
// Verify that the check got registered.
|
||||||
verifyNode(t, s, nodeID)
|
verifyNode(t, s, nodeID, 2)
|
||||||
verifyNode(t, s, nodeName)
|
verifyNode(t, s, nodeName, 2)
|
||||||
verifyService(t, s, nodeID)
|
verifyService(t, s, nodeID)
|
||||||
verifyService(t, s, nodeName)
|
verifyService(t, s, nodeName)
|
||||||
verifyCheck(t, s)
|
verifyCheck(t, s)
|
||||||
|
@ -776,8 +776,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
|
||||||
require.NoError(t, restore.Commit())
|
require.NoError(t, restore.Commit())
|
||||||
|
|
||||||
// Verify that the additional check got registered.
|
// Verify that the additional check got registered.
|
||||||
verifyNode(t, s, nodeID)
|
verifyNode(t, s, nodeID, 2)
|
||||||
verifyNode(t, s, nodeName)
|
verifyNode(t, s, nodeName, 2)
|
||||||
verifyService(t, s, nodeID)
|
verifyService(t, s, nodeID)
|
||||||
verifyService(t, s, nodeName)
|
verifyService(t, s, nodeName)
|
||||||
verifyChecks(t, s)
|
verifyChecks(t, s)
|
||||||
|
@ -976,7 +976,7 @@ func TestNodeRenamingNodes(t *testing.T) {
|
||||||
Address: "1.1.1.2",
|
Address: "1.1.1.2",
|
||||||
}
|
}
|
||||||
if err := s.EnsureNode(10, in2Modify); err != nil {
|
if err := s.EnsureNode(10, in2Modify); err != nil {
|
||||||
t.Fatalf("Renaming node2 into node1 should fail")
|
t.Fatalf("Renaming node2 into node1 should not fail: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the node again
|
// Retrieve the node again
|
||||||
|
@ -1550,20 +1550,16 @@ func TestStateStore_DeleteNode(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Indexes were updated.
|
// Indexes were updated.
|
||||||
for _, tbl := range []string{tableNodes, tableServices, tableChecks} {
|
assert.Equal(t, uint64(3), catalogChecksMaxIndex(tx, nil, ""))
|
||||||
if idx := s.maxIndex(tbl); idx != 3 {
|
assert.Equal(t, uint64(3), catalogServicesMaxIndex(tx, nil, ""))
|
||||||
t.Fatalf("bad index: %d (%s)", idx, tbl)
|
assert.Equal(t, uint64(3), catalogNodesMaxIndex(tx, nil, ""))
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deleting a nonexistent node should be idempotent and not return
|
// Deleting a nonexistent node should be idempotent and not return
|
||||||
// an error
|
// an error
|
||||||
if err := s.DeleteNode(4, "node1", nil, ""); err != nil {
|
if err := s.DeleteNode(4, "node1", nil, ""); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx := s.maxIndex(tableNodes); idx != 3 {
|
assert.Equal(t, uint64(3), catalogNodesMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_Node_Snapshot(t *testing.T) {
|
func TestStateStore_Node_Snapshot(t *testing.T) {
|
||||||
|
@ -1690,7 +1686,8 @@ func TestStateStore_EnsureService(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx != 30 {
|
// expect node1's max idx
|
||||||
|
if idx != 20 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1713,9 +1710,7 @@ func TestStateStore_EnsureService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index tables were updated.
|
// Index tables were updated.
|
||||||
if idx := s.maxIndex(tableServices); idx != 30 {
|
assert.Equal(t, uint64(30), catalogServicesMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update a service registration.
|
// Update a service registration.
|
||||||
ns1.Address = "1.1.1.2"
|
ns1.Address = "1.1.1.2"
|
||||||
|
@ -1744,9 +1739,7 @@ func TestStateStore_EnsureService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index tables were updated.
|
// Index tables were updated.
|
||||||
if idx := s.maxIndex(tableServices); idx != 40 {
|
assert.Equal(t, uint64(40), catalogServicesMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_EnsureService_connectProxy(t *testing.T) {
|
func TestStateStore_EnsureService_connectProxy(t *testing.T) {
|
||||||
|
@ -2571,21 +2564,15 @@ func TestStateStore_DeleteService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index tables were updated.
|
// Index tables were updated.
|
||||||
if idx := s.maxIndex(tableServices); idx != 4 {
|
assert.Equal(t, uint64(4), catalogChecksMaxIndex(tx, nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, ""))
|
||||||
}
|
|
||||||
if idx := s.maxIndex(tableChecks); idx != 4 {
|
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deleting a nonexistent service should be idempotent and not return an
|
// Deleting a nonexistent service should be idempotent and not return an
|
||||||
// error, nor fire a watch.
|
// error, nor fire a watch.
|
||||||
if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil {
|
if err := s.DeleteService(5, "node1", "service1", nil, ""); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx := s.maxIndex(tableServices); idx != 4 {
|
assert.Equal(t, uint64(4), catalogServicesMaxIndex(tx, nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
if watchFired(ws) {
|
if watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
@ -2906,9 +2893,7 @@ func TestStateStore_EnsureCheck(t *testing.T) {
|
||||||
testCheckOutput(t, 5, 5, "bbbmodified")
|
testCheckOutput(t, 5, 5, "bbbmodified")
|
||||||
|
|
||||||
// Index tables were updated
|
// Index tables were updated
|
||||||
if idx := s.maxIndex(tableChecks); idx != 5 {
|
assert.Equal(t, uint64(5), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) {
|
func TestStateStore_EnsureCheck_defaultStatus(t *testing.T) {
|
||||||
|
@ -3387,9 +3372,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
||||||
if idx, check, err := s.NodeCheck("node1", "check1", nil, ""); idx != 3 || err != nil || check != nil {
|
if idx, check, err := s.NodeCheck("node1", "check1", nil, ""); idx != 3 || err != nil || check != nil {
|
||||||
t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err)
|
t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err)
|
||||||
}
|
}
|
||||||
if idx := s.maxIndex(tableChecks); idx != 3 {
|
assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index for checks: %d", idx)
|
|
||||||
}
|
|
||||||
if !watchFired(ws) {
|
if !watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
@ -3407,18 +3390,14 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index tables were updated.
|
// Index tables were updated.
|
||||||
if idx := s.maxIndex(tableChecks); idx != 3 {
|
assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deleting a nonexistent check should be idempotent and not return an
|
// Deleting a nonexistent check should be idempotent and not return an
|
||||||
// error.
|
// error.
|
||||||
if err := s.DeleteCheck(4, "node1", "check1", nil, ""); err != nil {
|
if err := s.DeleteCheck(4, "node1", "check1", nil, ""); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx := s.maxIndex(tableChecks); idx != 3 {
|
assert.Equal(t, uint64(3), catalogChecksMaxIndex(s.db.ReadTxn(), nil, ""))
|
||||||
t.Fatalf("bad index: %d", idx)
|
|
||||||
}
|
|
||||||
if watchFired(ws) {
|
if watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
|
|
@ -263,25 +263,25 @@ func (s *Store) Abandon() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// maxIndex is a helper used to retrieve the highest known index
|
// maxIndex is a helper used to retrieve the highest known index
|
||||||
// amongst a set of tables in the db.
|
// amongst a set of index keys (e.g. table names) in the db.
|
||||||
func (s *Store) maxIndex(tables ...string) uint64 {
|
func (s *Store) maxIndex(keys ...string) uint64 {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
return maxIndexTxn(tx, tables...)
|
return maxIndexTxn(tx, keys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maxIndexTxn is a helper used to retrieve the highest known index
|
// maxIndexTxn is a helper used to retrieve the highest known index
|
||||||
// amongst a set of tables in the db.
|
// amongst a set of index keys (e.g. table names) in the db.
|
||||||
func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
|
func maxIndexTxn(tx ReadTxn, keys ...string) uint64 {
|
||||||
return maxIndexWatchTxn(tx, nil, tables...)
|
return maxIndexWatchTxn(tx, nil, keys...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
|
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, keys ...string) uint64 {
|
||||||
var lindex uint64
|
var lindex uint64
|
||||||
for _, table := range tables {
|
for _, key := range keys {
|
||||||
ch, ti, err := tx.FirstWatch(tableIndex, "id", table)
|
ch, ti, err := tx.FirstWatch(tableIndex, "id", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
|
panic(fmt.Sprintf("unknown index: %s err: %s", key, err))
|
||||||
}
|
}
|
||||||
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
|
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
|
||||||
lindex = idx.Value
|
lindex = idx.Value
|
||||||
|
|
Loading…
Reference in New Issue