diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b5c4a26ba..b38beef0a 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -58,21 +58,13 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) { // Services is used to pull the full list of services for a given node for use // during snapshots. func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) { - iter, err := catalogServiceListByNode(s.tx, node, structs.WildcardEnterpriseMeta(), true) - if err != nil { - return nil, err - } - return iter, nil + return s.tx.Get(tableServices, indexNode, Query{Value: node}) } // Checks is used to pull the full list of checks for a given node for use // during snapshots. func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) { - iter, err := catalogListChecksByNode(s.tx, node, structs.WildcardEnterpriseMeta()) - if err != nil { - return nil, err - } - return iter, nil + return s.tx.Get(tableChecks, indexNode, Query{Value: node}) } // Registration is used to make sure a node, service, and check registration is @@ -509,7 +501,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { } // Delete all services associated with the node and update the service index. - services, err := tx.Get("services", "node", nodeName) + services, err := tx.Get(tableServices, indexNode, Query{Value: nodeName}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -535,7 +527,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { // Delete all checks associated with the node. This will invalidate // sessions as necessary. - checks, err := tx.Get("checks", "node", nodeName) + checks, err := tx.Get(tableChecks, indexNode, Query{Value: nodeName}) if err != nil { return fmt.Errorf("failed check lookup: %s", err) } @@ -1414,7 +1406,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { // updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node func updateAllServiceIndexesOfNode(tx WriteTxn, idx uint64, nodeID string) error { - services, err := tx.Get("services", "node", nodeID) + services, err := tx.Get(tableServices, indexNode, Query{Value: nodeID}) if err != nil { return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) } @@ -1589,11 +1581,15 @@ func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string, entMeta *structs. tx := s.db.Txn(false) defer tx.Abort() + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) // Return the checks. - iter, err := catalogListChecksByNode(tx, nodeName, entMeta) + iter, err := catalogListChecksByNode(tx, Query{Value: nodeName, EnterpriseMeta: *entMeta}) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -2286,6 +2282,10 @@ func serviceDumpKindTxn(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, iter memdb.ResultIterator, entMeta *structs.EnterpriseMeta) (uint64, structs.NodeDump, error) { + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // We don't want to track an unlimited number of services, so we pull a // top-level watch to use as a fallback. allServices, err := tx.Get("services", "id") @@ -2326,7 +2326,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, } // Query the service level checks - checks, err := catalogListChecksByNode(tx, node.Node, entMeta) + checks, err := catalogListChecksByNode(tx, Query{Value: node.Node, EnterpriseMeta: *entMeta}) if err != nil { return 0, nil, fmt.Errorf("failed node lookup: %s", err) } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 3f1e1c0e2..065aa518e 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -491,7 +491,7 @@ func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNod // parseCheckServiceNodes but is more efficient since we know they are all on // the same node. func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { - services, err := catalogServiceListByNode(tx, node, structs.WildcardEnterpriseMeta(), true) + services, err := tx.Get(tableServices, indexNode, Query{Value: node}) if err != nil { return nil, err } @@ -525,7 +525,7 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc } n := nodeRaw.(*structs.Node) - iter, err := catalogListChecksByNode(tx, node, structs.WildcardEnterpriseMeta()) + iter, err := tx.Get(tableChecks, indexNode, Query{Value: node}) if err != nil { return nil, nil, err } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 65ec52684..cf225056c 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -69,6 +69,24 @@ func indexFromNodeQuery(arg interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexFromNodeIdentity(raw interface{}) ([]byte, error) { + n, ok := raw.(interface { + NodeIdentity() structs.Identity + }) + if !ok { + return nil, fmt.Errorf("unexpected type %T for index, type must provide NodeIdentity()", raw) + } + + id := n.NodeIdentity() + if id.ID == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(id.ID)) + return b.Bytes(), nil +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } @@ -159,7 +177,7 @@ func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.E } func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) { - return tx.Get("services", "node", node) + return tx.Get(tableServices, indexNode, Query{Value: node}) } func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { @@ -196,8 +214,8 @@ func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { return maxIndexTxn(tx, "checks") } -func catalogListChecksByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "node", node) +func catalogListChecksByNode(tx ReadTxn, q Query) (memdb.ResultIterator, error) { + return tx.Get(tableChecks, indexNode, q) } func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 3c5c6327a..63cbff3cc 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -22,6 +22,21 @@ func testIndexerTableChecks() map[string]indexerTestCase { expected: []byte("node\x00service\x00"), }, }, + indexNode: { + read: indexValue{ + source: Query{ + Value: "NoDe", + }, + expected: []byte("node\x00"), + }, + write: indexValue{ + source: &structs.HealthCheck{ + Node: "NoDe", + ServiceID: "SeRvIcE", + }, + expected: []byte("node\x00"), + }, + }, } } @@ -39,3 +54,23 @@ func testIndexerTableNodes() map[string]indexerTestCase { }, } } + +func testIndexerTableServices() map[string]indexerTestCase { + return map[string]indexerTestCase{ + indexNode: { + read: indexValue{ + source: Query{ + Value: "NoDe", + }, + expected: []byte("node\x00"), + }, + write: indexValue{ + source: &structs.ServiceNode{ + Node: "NoDe", + ServiceID: "SeRvIcE", + }, + expected: []byte("node\x00"), + }, + }, + } +} diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 098f0f2cb..8769d1e75 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -23,6 +23,7 @@ const ( indexKind = "kind" indexStatus = "status" indexNodeService = "node_service" + indexNode = "node" ) // nodesTableSchema returns a new table schema used for storing struct.Node. @@ -81,13 +82,13 @@ func servicesTableSchema() *memdb.TableSchema { }, }, }, - "node": { - Name: "node", + indexNode: { + Name: indexNode, AllowMissing: false, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, + Indexer: indexerSingle{ + readIndex: readIndex(indexFromNodeQuery), + writeIndex: writeIndex(indexFromNodeIdentity), }, }, indexServiceName: { @@ -157,13 +158,13 @@ func checksTableSchema() *memdb.TableSchema { Lowercase: true, }, }, - "node": { - Name: "node", + indexNode: { + Name: indexNode, AllowMissing: true, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, + Indexer: indexerSingle{ + readIndex: readIndex(indexFromNodeQuery), + writeIndex: writeIndex(indexFromNodeIdentity), }, }, indexNodeService: { diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index 94d1f81cd..5d05cea7f 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -128,8 +128,9 @@ func TestNewDBSchema_Indexers(t *testing.T) { require.NoError(t, schema.Validate()) var testcases = map[string]func() map[string]indexerTestCase{ - tableChecks: testIndexerTableChecks, - tableNodes: testIndexerTableNodes, + tableChecks: testIndexerTableChecks, + tableServices: testIndexerTableServices, + tableNodes: testIndexerTableNodes, } for _, table := range schema.Tables { diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 74cedbb15..04510ece7 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -50,7 +50,7 @@ table=checks index=id unique indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false index=node allow-missing - indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity index=node_service allow-missing indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck index=service allow-missing @@ -154,7 +154,7 @@ table=services index=kind indexer=github.com/hashicorp/consul/agent/consul/state.IndexServiceKind index=node - indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity index=service allow-missing indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true diff --git a/agent/structs/identity.go b/agent/structs/identity.go new file mode 100644 index 000000000..afe560abf --- /dev/null +++ b/agent/structs/identity.go @@ -0,0 +1,10 @@ +package structs + +// Identity of some entity (ex: service, node, check). +// +// TODO: this type should replace ServiceID, ServiceName, and CheckID which all +// have roughly identical implementations. +type Identity struct { + ID string + EnterpriseMeta +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 5a10a36a1..5d639db19 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -833,6 +833,10 @@ type ServiceNode struct { RaftIndex `bexpr:"-"` } +func (s *ServiceNode) NodeIdentity() Identity { + return Identity{ID: s.Node} +} + // PartialClone() returns a clone of the given service node, minus the node- // related fields that get filled in later, Address and TaggedAddresses. func (s *ServiceNode) PartialClone() *ServiceNode { @@ -1402,6 +1406,10 @@ type HealthCheck struct { RaftIndex `bexpr:"-"` } +func (hc *HealthCheck) NodeIdentity() Identity { + return Identity{ID: hc.Node} +} + func (hc *HealthCheck) CompoundServiceID() ServiceID { id := hc.ServiceID if id == "" {