From 49938bc47213fda1d812c462224e147fdc3edade Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 12 Feb 2021 18:06:56 -0500 Subject: [PATCH 1/3] state: convert checks.ID index to new pattern --- agent/consul/state/catalog.go | 23 ++++++++---- agent/consul/state/catalog_oss.go | 36 ++++++++++++++++--- agent/consul/state/catalog_schema.go | 22 ++++++------ agent/consul/state/catalog_test.go | 4 +-- agent/consul/state/operations_oss.go | 8 ++--- agent/consul/state/session_oss.go | 5 +-- agent/consul/state/state_store_test.go | 2 +- .../testdata/TestStateStoreSchema.golden | 2 +- 8 files changed, 68 insertions(+), 34 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 0171d6b24..c93e87d9f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID { // Look up the existing node's Serf health check to see if it's failed. // If it is, the node can be renamed. - _, enodeCheck, err := firstWatchCompoundWithTxn(tx, "checks", "id", structs.DefaultEnterpriseMeta(), enode.Node, string(structs.SerfCheckID)) + _, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) if err != nil { return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err) } @@ -1471,7 +1471,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe // checks with no matching node or service. func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check - _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) + _, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) if err != nil { return fmt.Errorf("failed health check lookup: %s", err) } @@ -1578,8 +1578,13 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Return the check. - _, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, nodeName, string(checkID)) + _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1702,7 +1707,7 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru var iter memdb.ResultIterator var err error if state == api.HealthAny { - iter, err = catalogListChecks(tx, entMeta) + iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta) } else { iter, err = catalogListChecksInState(tx, state, entMeta) } @@ -1800,8 +1805,12 @@ type NodeServiceQuery struct { // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Try to retrieve the existing health check. - _, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID)) + _, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("check lookup failed: %s", err) } @@ -2149,7 +2158,7 @@ func parseCheckServiceNodes( // We need a similar fallback for checks. Since services need the // status of node + service-specific checks, we pull in a top-level // watch over all checks. - allChecks, err := tx.Get("checks", "id") + allChecks, err := tx.Get(tableChecks, indexID) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } @@ -2310,7 +2319,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, allServicesCh := allServices.WatchCh() // We need a similar fallback for checks. - allChecks, err := tx.Get("checks", "id") + allChecks, err := tx.Get(tableChecks, indexID) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 090422603..8a187066c 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -103,6 +103,38 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexFromHealthCheck(raw interface{}) ([]byte, error) { + hc, ok := raw.(*structs.HealthCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw) + } + + if hc.Node == "" || hc.CheckID == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(string(hc.CheckID))) + return b.Bytes(), nil +} + +func indexFromNodeCheckID(raw interface{}) ([]byte, error) { + hc, ok := raw.(NodeCheckID) + if !ok { + return nil, fmt.Errorf("unexpected type %T for NodeCheckID index", raw) + } + + if hc.Node == "" || hc.CheckID == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(hc.CheckID)) + return b.Bytes(), nil +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } @@ -243,10 +275,6 @@ func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMet return tx.Get("checks", "status", state) } -func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "id") -} - func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { // Insert the check if err := tx.Insert("checks", chk); err != nil { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 506da6f5a..eef6c6d2a 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -123,17 +123,10 @@ func checksTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "CheckID", - Lowercase: true, - }, - }, + Indexer: indexerSingleWithPrefix{ + readIndex: readIndex(indexFromNodeCheckID), + prefixIndex: prefixIndex(prefixIndexFromQuery), + writeIndex: writeIndex(indexFromHealthCheck), }, }, indexStatus: { @@ -331,3 +324,10 @@ type upstreamDownstream struct { structs.RaftIndex } + +// NodeCheckID is used to query the ID index of the checks table. +type NodeCheckID struct { + Node string + CheckID string + structs.EnterpriseMeta +} diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 1f22948d9..44f82e295 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1307,7 +1307,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Associated health check was removed. - checks, err := getCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1") + checks, err := tx.Get(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) { // that it actually is removed in the state store. tx := s.db.Txn(false) defer tx.Abort() - _, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1") + _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) if err != nil || check != nil { t.Fatalf("bad: %#v (err: %s)", check, err) } diff --git a/agent/consul/state/operations_oss.go b/agent/consul/state/operations_oss.go index 30deb7068..a3a8f396d 100644 --- a/agent/consul/state/operations_oss.go +++ b/agent/consul/state/operations_oss.go @@ -3,8 +3,9 @@ package state import ( - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) func firstWithTxn(tx ReadTxn, @@ -19,11 +20,6 @@ func firstWatchWithTxn(tx ReadTxn, return tx.FirstWatch(table, index, idxVal) } -func firstWatchCompoundWithTxn(tx ReadTxn, - table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) { - return tx.FirstWatch(table, index, idxVals...) -} - func getWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index b76deb500..cd4cad3ad 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -5,9 +5,10 @@ package state import ( "fmt" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-memdb" ) func sessionIndexer() *memdb.UUIDFieldIndex { @@ -107,7 +108,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { func validateSessionChecksTxn(tx *txn, session *structs.Session) error { // Go over the session checks and ensure they exist. for _, checkID := range session.CheckIDs() { - check, err := tx.First("checks", "id", session.Node, string(checkID)) + check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: session.Node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("failed check lookup: %s", err) } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index bc87eda27..938804c90 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64, tx := s.db.Txn(false) defer tx.Abort() - _, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID)) + _, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 4a6b75255..72bb71e3c 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -48,7 +48,7 @@ table=autopilot-config table=checks index=id unique - indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeCheckID writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromHealthCheck prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery index=node allow-missing indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity index=node_service allow-missing From abbe5c3701388f58cee195a6c8d0f542c6e62ab6 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 22 Mar 2021 17:09:22 -0400 Subject: [PATCH 2/3] state: use tx.First instead of tx.FirstWatch Where appropriate. After removing the helper function a bunch of these calls can be changed to tx.First. --- agent/consul/state/catalog.go | 23 +++++++++++------------ agent/consul/state/catalog_test.go | 2 +- agent/consul/state/state_store_test.go | 6 +++--- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index c93e87d9f..aa8034506 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -141,7 +141,7 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b // node info above to make sure we actually need to update the service // definition in order to prevent useless churn if nothing has changed. if req.Service != nil { - _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID}) + existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: req.Service.EnterpriseMeta, Node: req.Node, Service: req.Service.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID { // Look up the existing node's Serf health check to see if it's failed. // If it is, the node can be renamed. - _, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) + enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) if err != nil { return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err) } @@ -602,7 +602,7 @@ var errCASCompareFailed = errors.New("compare-and-set: comparison failed") // Returns an error if the write didn't happen and nil if write was successful. func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.NodeService) error { // Retrieve the existing service. - _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) + existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -627,7 +627,7 @@ func ensureServiceCASTxn(tx WriteTxn, idx uint64, node string, svc *structs.Node // existing memdb transaction. func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool, svc *structs.NodeService) error { // Check for existing service - _, existing, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) + existing, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: svc.EnterpriseMeta, Node: node, Service: svc.ID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1148,7 +1148,7 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs. } // Query the service - _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) if err != nil { return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } @@ -1321,8 +1321,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st entMeta = structs.DefaultEnterpriseMeta() } - // Look up the service. - _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: *entMeta, Node: nodeName, Service: serviceID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1471,7 +1470,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe // checks with no matching node or service. func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check - _, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) + existing, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) if err != nil { return fmt.Errorf("failed health check lookup: %s", err) } @@ -1503,7 +1502,7 @@ func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc // If the check is associated with a service, check that we have // a registration for the service. if hc.ServiceID != "" { - _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, Service: hc.ServiceID}) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } @@ -1584,7 +1583,7 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta } // Return the check. - _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) + check, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1810,7 +1809,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ } // Try to retrieve the existing health check. - _, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) + hc, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("check lookup failed: %s", err) } @@ -1825,7 +1824,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ return err } - _, svcRaw, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID}) + svcRaw, err := tx.First(tableServices, indexID, NodeServiceQuery{EnterpriseMeta: existing.EnterpriseMeta, Node: existing.Node, Service: existing.ServiceID}) if err != nil { return fmt.Errorf("failed retrieving service from state store: %v", err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 44f82e295..3302ffa8e 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) { // that it actually is removed in the state store. tx := s.db.Txn(false) defer tx.Abort() - _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) + check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) if err != nil || check != nil { t.Fatalf("bad: %#v (err: %s)", check, err) } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 938804c90..0e0db1f2b 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -105,7 +105,7 @@ func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, s tx := s.db.Txn(false) defer tx.Abort() - _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) if err != nil { t.Fatalf("err: %s", err) } @@ -138,7 +138,7 @@ func testRegisterIngressService(t *testing.T, s *Store, idx uint64, nodeID, serv tx := s.db.Txn(false) defer tx.Abort() - _, service, err := tx.FirstWatch(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) + service, err := tx.First(tableServices, indexID, NodeServiceQuery{Node: nodeID, Service: serviceID}) if err != nil { t.Fatalf("err: %s", err) } @@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64, tx := s.db.Txn(false) defer tx.Abort() - _, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)}) + c, err := tx.First(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)}) if err != nil { t.Fatalf("err: %s", err) } From 25b791ba47ebaa71901cfe914721f2b353917877 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 22 Mar 2021 17:29:58 -0400 Subject: [PATCH 3/3] state: add tests for checks.ID indexer --- agent/consul/state/catalog.go | 8 ++--- agent/consul/state/catalog_oss.go | 4 +-- agent/consul/state/catalog_oss_test.go | 42 ++++++++++++++++++++------ agent/consul/state/catalog_schema.go | 4 +-- agent/consul/state/catalog_test.go | 4 +-- agent/consul/state/session_oss.go | 2 +- agent/consul/state/state_store_test.go | 2 +- 7 files changed, 45 insertions(+), 21 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index aa8034506..d8d1b5a21 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID { // Look up the existing node's Serf health check to see if it's failed. // If it is, the node can be renamed. - enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) + enodeCheck, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) if err != nil { return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err) } @@ -1470,7 +1470,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe // checks with no matching node or service. func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check - existing, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) + existing, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) if err != nil { return fmt.Errorf("failed health check lookup: %s", err) } @@ -1583,7 +1583,7 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta } // Return the check. - check, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) + check, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1809,7 +1809,7 @@ func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID typ } // Try to retrieve the existing health check. - hc, err := tx.First(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) + hc, err := tx.First(tableChecks, indexID, NodeCheckQuery{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("check lookup failed: %s", err) } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 8a187066c..ab2d87bf3 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -120,9 +120,9 @@ func indexFromHealthCheck(raw interface{}) ([]byte, error) { } func indexFromNodeCheckID(raw interface{}) ([]byte, error) { - hc, ok := raw.(NodeCheckID) + hc, ok := raw.(NodeCheckQuery) if !ok { - return nil, fmt.Errorf("unexpected type %T for NodeCheckID index", raw) + return nil, fmt.Errorf("unexpected type %T for NodeCheckQuery index", raw) } if hc.Node == "" || hc.CheckID == "" { diff --git a/agent/consul/state/catalog_oss_test.go b/agent/consul/state/catalog_oss_test.go index 39d61ac10..925c130c4 100644 --- a/agent/consul/state/catalog_oss_test.go +++ b/agent/consul/state/catalog_oss_test.go @@ -2,10 +2,40 @@ package state -import "github.com/hashicorp/consul/agent/structs" +import ( + "github.com/hashicorp/consul/agent/structs" +) func testIndexerTableChecks() map[string]indexerTestCase { + obj := &structs.HealthCheck{ + Node: "NoDe", + ServiceID: "SeRvIcE", + CheckID: "CheckID", + } return map[string]indexerTestCase{ + indexID: { + read: indexValue{ + source: NodeCheckQuery{ + Node: "NoDe", + CheckID: "CheckId", + }, + expected: []byte("node\x00checkid\x00"), + }, + write: indexValue{ + source: obj, + expected: []byte("node\x00checkid\x00"), + }, + prefix: []indexValue{ + { + source: structs.EnterpriseMeta{}, + expected: nil, + }, + { + source: Query{Value: "nOdE"}, + expected: []byte("node\x00"), + }, + }, + }, indexNodeService: { read: indexValue{ source: NodeServiceQuery{ @@ -15,10 +45,7 @@ func testIndexerTableChecks() map[string]indexerTestCase { expected: []byte("node\x00service\x00"), }, write: indexValue{ - source: &structs.HealthCheck{ - Node: "NoDe", - ServiceID: "SeRvIcE", - }, + source: obj, expected: []byte("node\x00service\x00"), }, }, @@ -30,10 +57,7 @@ func testIndexerTableChecks() map[string]indexerTestCase { expected: []byte("node\x00"), }, write: indexValue{ - source: &structs.HealthCheck{ - Node: "NoDe", - ServiceID: "SeRvIcE", - }, + source: obj, expected: []byte("node\x00"), }, }, diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index eef6c6d2a..5c10c8e34 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -325,8 +325,8 @@ type upstreamDownstream struct { structs.RaftIndex } -// NodeCheckID is used to query the ID index of the checks table. -type NodeCheckID struct { +// NodeCheckQuery is used to query the ID index of the checks table. +type NodeCheckQuery struct { Node string CheckID string structs.EnterpriseMeta diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 3302ffa8e..b68a76aca 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1307,7 +1307,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Associated health check was removed. - checks, err := tx.Get(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) + checks, err := tx.Get(tableChecks, indexID, NodeCheckQuery{Node: "node1", CheckID: "check1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) { // that it actually is removed in the state store. tx := s.db.Txn(false) defer tx.Abort() - check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) + check, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: "node1", CheckID: "check1"}) if err != nil || check != nil { t.Fatalf("bad: %#v (err: %s)", check, err) } diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index cd4cad3ad..483ff840d 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -108,7 +108,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { func validateSessionChecksTxn(tx *txn, session *structs.Session) error { // Go over the session checks and ensure they exist. for _, checkID := range session.CheckIDs() { - check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: session.Node, CheckID: string(checkID)}) + check, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: session.Node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("failed check lookup: %s", err) } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index 0e0db1f2b..985e7540b 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64, tx := s.db.Txn(false) defer tx.Abort() - c, err := tx.First(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)}) + c, err := tx.First(tableChecks, indexID, NodeCheckQuery{Node: nodeID, CheckID: string(checkID)}) if err != nil { t.Fatalf("err: %s", err) }