state: convert services.node and checks.node indexes

Using NodeIdentity to share the indexes with both.
This commit is contained in:
Daniel Nephin 2021-02-12 17:31:02 -05:00
parent bfcd311159
commit 0b3930272d
9 changed files with 107 additions and 34 deletions

View File

@ -58,21 +58,13 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
// Services is used to pull the full list of services for a given node for use
// during snapshots.
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) {
iter, err := catalogServiceListByNode(s.tx, node, structs.WildcardEnterpriseMeta(), true)
if err != nil {
return nil, err
}
return iter, nil
return s.tx.Get(tableServices, indexNode, Query{Value: node})
}
// Checks is used to pull the full list of checks for a given node for use
// during snapshots.
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
iter, err := catalogListChecksByNode(s.tx, node, structs.WildcardEnterpriseMeta())
if err != nil {
return nil, err
}
return iter, nil
return s.tx.Get(tableChecks, indexNode, Query{Value: node})
}
// Registration is used to make sure a node, service, and check registration is
@ -509,7 +501,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
}
// Delete all services associated with the node and update the service index.
services, err := tx.Get("services", "node", nodeName)
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeName})
if err != nil {
return fmt.Errorf("failed service lookup: %s", err)
}
@ -535,7 +527,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
// Delete all checks associated with the node. This will invalidate
// sessions as necessary.
checks, err := tx.Get("checks", "node", nodeName)
checks, err := tx.Get(tableChecks, indexNode, Query{Value: nodeName})
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}
@ -1414,7 +1406,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error {
services, err := tx.Get("services", "node", nodeID)
services, err := tx.Get(tableServices, indexNode, Query{Value: nodeID})
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
}
@ -1589,11 +1581,15 @@ func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string, entMeta *structs.
tx := s.db.Txn(false)
defer tx.Abort()
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta)
// Return the checks.
iter, err := catalogListChecksByNode(tx, nodeName, entMeta)
iter, err := catalogListChecksByNode(tx, Query{Value: nodeName, EnterpriseMeta: *entMeta})
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
@ -2286,6 +2282,10 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind,
func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) {
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// We don't want to track an unlimited number of services, so we pull a
// top-level watch to use as a fallback.
allServices, err := tx.Get("services", "id")
@ -2326,7 +2326,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
}
// Query the service level checks
checks, err := catalogListChecksByNode(tx, node.Node, entMeta)
checks, err := catalogListChecksByNode(tx, Query{Value: node.Node, EnterpriseMeta: *entMeta})
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}

View File

@ -491,7 +491,7 @@ func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNod
// parseCheckServiceNodes but is more efficient since we know they are all on
// the same node.
func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
services, err := catalogServiceListByNode(tx, node, structs.WildcardEnterpriseMeta(), true)
services, err := tx.Get(tableServices, indexNode, Query{Value: node})
if err != nil {
return nil, err
}
@ -525,7 +525,7 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc
}
n := nodeRaw.(*structs.Node)
iter, err := catalogListChecksByNode(tx, node, structs.WildcardEnterpriseMeta())
iter, err := tx.Get(tableChecks, indexNode, Query{Value: node})
if err != nil {
return nil, nil, err
}

View File

@ -69,6 +69,24 @@ func indexFromNodeQuery(arg interface{}) ([]byte, error) {
return b.Bytes(), nil
}
func indexFromNodeIdentity(raw interface{}) ([]byte, error) {
n, ok := raw.(interface {
NodeIdentity() structs.Identity
})
if !ok {
return nil, fmt.Errorf("unexpected type %T for index, type must provide NodeIdentity()", raw)
}
id := n.NodeIdentity()
if id.ID == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(id.ID))
return b.Bytes(), nil
}
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
return fmt.Sprintf("service.%s", name)
}
@ -159,7 +177,7 @@ func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.E
}
func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
return tx.Get("services", "node", node)
return tx.Get(tableServices, indexNode, Query{Value: node})
}
func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
@ -196,8 +214,8 @@ func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "checks")
}
func catalogListChecksByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "node", node)
func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) {
return tx.Get(tableChecks, indexNode, q)
}
func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

View File

@ -22,6 +22,21 @@ func testIndexerTableChecks() map[string]indexerTestCase {
expected: []byte("node\x00service\x00"),
},
},
indexNode: {
read: indexValue{
source: Query{
Value: "NoDe",
},
expected: []byte("node\x00"),
},
write: indexValue{
source: &structs.HealthCheck{
Node: "NoDe",
ServiceID: "SeRvIcE",
},
expected: []byte("node\x00"),
},
},
}
}
@ -39,3 +54,23 @@ func testIndexerTableNodes() map[string]indexerTestCase {
},
}
}
func testIndexerTableServices() map[string]indexerTestCase {
return map[string]indexerTestCase{
indexNode: {
read: indexValue{
source: Query{
Value: "NoDe",
},
expected: []byte("node\x00"),
},
write: indexValue{
source: &structs.ServiceNode{
Node: "NoDe",
ServiceID: "SeRvIcE",
},
expected: []byte("node\x00"),
},
},
}
}

View File

@ -23,6 +23,7 @@ const (
indexKind = "kind"
indexStatus = "status"
indexNodeService = "node_service"
indexNode = "node"
)
// nodesTableSchema returns a new table schema used for storing struct.Node.
@ -81,13 +82,13 @@ func servicesTableSchema() *memdb.TableSchema {
},
},
},
"node": {
Name: "node",
indexNode: {
Name: indexNode,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
Indexer: indexerSingle{
readIndex: readIndex(indexFromNodeQuery),
writeIndex: writeIndex(indexFromNodeIdentity),
},
},
indexServiceName: {
@ -157,13 +158,13 @@ func checksTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"node": {
Name: "node",
indexNode: {
Name: indexNode,
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
Indexer: indexerSingle{
readIndex: readIndex(indexFromNodeQuery),
writeIndex: writeIndex(indexFromNodeIdentity),
},
},
indexNodeService: {

View File

@ -129,6 +129,7 @@ func TestNewDBSchema_Indexers(t *testing.T) {
var testcases = map[string]func() map[string]indexerTestCase{
tableChecks: testIndexerTableChecks,
tableServices: testIndexerTableServices,
tableNodes: testIndexerTableNodes,
}

View File

@ -50,7 +50,7 @@ table=checks
index=id unique
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false
index=node allow-missing
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
index=node_service allow-missing
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck
index=service allow-missing
@ -154,7 +154,7 @@ table=services
index=kind
indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind
index=node
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
index=service allow-missing
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true

10
agent/structs/identity.go Normal file
View File

@ -0,0 +1,10 @@
package structs
// Identity of some entity (ex: service, node, check).
//
// TODO: this type should replace ServiceID, ServiceName, and CheckID which all
// have roughly identical implementations.
type Identity struct {
ID string
EnterpriseMeta
}

View File

@ -833,6 +833,10 @@ type ServiceNode struct {
RaftIndex `bexpr:"-"`
}
func (s *ServiceNode) NodeIdentity() Identity {
return Identity{ID: s.Node}
}
// PartialClone() returns a clone of the given service node, minus the node-
// related fields that get filled in later, Address and TaggedAddresses.
func (s *ServiceNode) PartialClone() *ServiceNode {
@ -1402,6 +1406,10 @@ type HealthCheck struct {
RaftIndex `bexpr:"-"`
}
func (hc *HealthCheck) NodeIdentity() Identity {
return Identity{ID: hc.Node}
}
func (hc *HealthCheck) CompoundServiceID() ServiceID {
id := hc.ServiceID
if id == "" {