diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 0171d6b24..c93e87d9f 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID { // Look up the existing node's Serf health check to see if it's failed. // If it is, the node can be renamed. - _, enodeCheck, err := firstWatchCompoundWithTxn(tx, "checks", "id", structs.DefaultEnterpriseMeta(), enode.Node, string(structs.SerfCheckID)) + _, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)}) if err != nil { return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err) } @@ -1471,7 +1471,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe // checks with no matching node or service. func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error { // Check if we have an existing health check - _, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID)) + _, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)}) if err != nil { return fmt.Errorf("failed health check lookup: %s", err) } @@ -1578,8 +1578,13 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta // Get the table index. idx := catalogChecksMaxIndex(tx, entMeta) + // TODO: accept non-pointer value + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Return the check. - _, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, nodeName, string(checkID)) + _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)}) if err != nil { return 0, nil, fmt.Errorf("failed check lookup: %s", err) } @@ -1702,7 +1707,7 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru var iter memdb.ResultIterator var err error if state == api.HealthAny { - iter, err = catalogListChecks(tx, entMeta) + iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta) } else { iter, err = catalogListChecksInState(tx, state, entMeta) } @@ -1800,8 +1805,12 @@ type NodeServiceQuery struct { // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { + if entMeta == nil { + entMeta = structs.DefaultEnterpriseMeta() + } + // Try to retrieve the existing health check. - _, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID)) + _, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("check lookup failed: %s", err) } @@ -2149,7 +2158,7 @@ func parseCheckServiceNodes( // We need a similar fallback for checks. Since services need the // status of node + service-specific checks, we pull in a top-level // watch over all checks. - allChecks, err := tx.Get("checks", "id") + allChecks, err := tx.Get(tableChecks, indexID) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } @@ -2310,7 +2319,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64, allServicesCh := allServices.WatchCh() // We need a similar fallback for checks. - allChecks, err := tx.Get("checks", "id") + allChecks, err := tx.Get(tableChecks, indexID) if err != nil { return 0, nil, fmt.Errorf("failed checks lookup: %s", err) } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 090422603..8a187066c 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -103,6 +103,38 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) { return b.Bytes(), nil } +func indexFromHealthCheck(raw interface{}) ([]byte, error) { + hc, ok := raw.(*structs.HealthCheck) + if !ok { + return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw) + } + + if hc.Node == "" || hc.CheckID == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(string(hc.CheckID))) + return b.Bytes(), nil +} + +func indexFromNodeCheckID(raw interface{}) ([]byte, error) { + hc, ok := raw.(NodeCheckID) + if !ok { + return nil, fmt.Errorf("unexpected type %T for NodeCheckID index", raw) + } + + if hc.Node == "" || hc.CheckID == "" { + return nil, errMissingValueForIndex + } + + var b indexBuilder + b.String(strings.ToLower(hc.Node)) + b.String(strings.ToLower(hc.CheckID)) + return b.Bytes(), nil +} + func serviceIndexName(name string, _ *structs.EnterpriseMeta) string { return fmt.Sprintf("service.%s", name) } @@ -243,10 +275,6 @@ func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMet return tx.Get("checks", "status", state) } -func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get("checks", "id") -} - func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error { // Insert the check if err := tx.Insert("checks", chk); err != nil { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 506da6f5a..eef6c6d2a 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -123,17 +123,10 @@ func checksTableSchema() *memdb.TableSchema { Name: indexID, AllowMissing: false, Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Node", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "CheckID", - Lowercase: true, - }, - }, + Indexer: indexerSingleWithPrefix{ + readIndex: readIndex(indexFromNodeCheckID), + prefixIndex: prefixIndex(prefixIndexFromQuery), + writeIndex: writeIndex(indexFromHealthCheck), }, }, indexStatus: { @@ -331,3 +324,10 @@ type upstreamDownstream struct { structs.RaftIndex } + +// NodeCheckID is used to query the ID index of the checks table. +type NodeCheckID struct { + Node string + CheckID string + structs.EnterpriseMeta +} diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 1f22948d9..44f82e295 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1307,7 +1307,7 @@ func TestStateStore_DeleteNode(t *testing.T) { } // Associated health check was removed. - checks, err := getCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1") + checks, err := tx.Get(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) if err != nil { t.Fatalf("err: %s", err) } @@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) { // that it actually is removed in the state store. tx := s.db.Txn(false) defer tx.Abort() - _, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1") + _, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"}) if err != nil || check != nil { t.Fatalf("bad: %#v (err: %s)", check, err) } diff --git a/agent/consul/state/operations_oss.go b/agent/consul/state/operations_oss.go index 30deb7068..a3a8f396d 100644 --- a/agent/consul/state/operations_oss.go +++ b/agent/consul/state/operations_oss.go @@ -3,8 +3,9 @@ package state import ( - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) func firstWithTxn(tx ReadTxn, @@ -19,11 +20,6 @@ func firstWatchWithTxn(tx ReadTxn, return tx.FirstWatch(table, index, idxVal) } -func firstWatchCompoundWithTxn(tx ReadTxn, - table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) { - return tx.FirstWatch(table, index, idxVals...) -} - func getWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { diff --git a/agent/consul/state/session_oss.go b/agent/consul/state/session_oss.go index b76deb500..cd4cad3ad 100644 --- a/agent/consul/state/session_oss.go +++ b/agent/consul/state/session_oss.go @@ -5,9 +5,10 @@ package state import ( "fmt" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-memdb" ) func sessionIndexer() *memdb.UUIDFieldIndex { @@ -107,7 +108,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 { func validateSessionChecksTxn(tx *txn, session *structs.Session) error { // Go over the session checks and ensure they exist. for _, checkID := range session.CheckIDs() { - check, err := tx.First("checks", "id", session.Node, string(checkID)) + check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: session.Node, CheckID: string(checkID)}) if err != nil { return fmt.Errorf("failed check lookup: %s", err) } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index bc87eda27..938804c90 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64, tx := s.db.Txn(false) defer tx.Abort() - _, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID)) + _, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)}) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/consul/state/testdata/TestStateStoreSchema.golden b/agent/consul/state/testdata/TestStateStoreSchema.golden index 4a6b75255..72bb71e3c 100644 --- a/agent/consul/state/testdata/TestStateStoreSchema.golden +++ b/agent/consul/state/testdata/TestStateStoreSchema.golden @@ -48,7 +48,7 @@ table=autopilot-config table=checks index=id unique - indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false + indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeCheckID writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromHealthCheck prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery index=node allow-missing indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity index=node_service allow-missing