state: convert checks.ID index to new pattern

This commit is contained in:
Daniel Nephin 2021-02-12 18:06:56 -05:00
parent e95737dee2
commit 49938bc472
8 changed files with 68 additions and 34 deletions

View File

@ -194,7 +194,7 @@ func ensureNoNodeWithSimilarNameTxn(tx ReadTxn, node *structs.Node, allowClashWi
if strings.EqualFold(node.Node, enode.Node) && node.ID != enode.ID {
// Look up the existing node's Serf health check to see if it's failed.
// If it is, the node can be renamed.
_, enodeCheck, err := firstWatchCompoundWithTxn(tx, "checks", "id", structs.DefaultEnterpriseMeta(), enode.Node, string(structs.SerfCheckID))
_, enodeCheck, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *structs.DefaultEnterpriseMeta(), Node: enode.Node, CheckID: string(structs.SerfCheckID)})
if err != nil {
return fmt.Errorf("Cannot get status of node %s: %s", enode.Node, err)
}
@ -1471,7 +1471,7 @@ func (s *Store) ensureCheckCASTxn(tx WriteTxn, idx uint64, hc *structs.HealthChe
// checks with no matching node or service.
func (s *Store) ensureCheckTxn(tx WriteTxn, idx uint64, preserveIndexes bool, hc *structs.HealthCheck) error {
// Check if we have an existing health check
_, existing, err := firstWatchCompoundWithTxn(tx, "checks", "id", &hc.EnterpriseMeta, hc.Node, string(hc.CheckID))
_, existing, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: hc.EnterpriseMeta, Node: hc.Node, CheckID: string(hc.CheckID)})
if err != nil {
return fmt.Errorf("failed health check lookup: %s", err)
}
@ -1578,8 +1578,13 @@ func getNodeCheckTxn(tx ReadTxn, nodeName string, checkID types.CheckID, entMeta
// Get the table index.
idx := catalogChecksMaxIndex(tx, entMeta)
// TODO: accept non-pointer value
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Return the check.
_, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, nodeName, string(checkID))
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: nodeName, CheckID: string(checkID)})
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
@ -1702,7 +1707,7 @@ func checksInStateTxn(tx ReadTxn, ws memdb.WatchSet, state string, entMeta *stru
var iter memdb.ResultIterator
var err error
if state == api.HealthAny {
iter, err = catalogListChecks(tx, entMeta)
iter, err = tx.Get(tableChecks, indexID+"_prefix", entMeta)
} else {
iter, err = catalogListChecksInState(tx, state, entMeta)
}
@ -1800,8 +1805,12 @@ type NodeServiceQuery struct {
// deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
if entMeta == nil {
entMeta = structs.DefaultEnterpriseMeta()
}
// Try to retrieve the existing health check.
_, hc, err := firstWatchCompoundWithTxn(tx, "checks", "id", entMeta, node, string(checkID))
_, hc, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{EnterpriseMeta: *entMeta, Node: node, CheckID: string(checkID)})
if err != nil {
return fmt.Errorf("check lookup failed: %s", err)
}
@ -2149,7 +2158,7 @@ func parseCheckServiceNodes(
// We need a similar fallback for checks. Since services need the
// status of node + service-specific checks, we pull in a top-level
// watch over all checks.
allChecks, err := tx.Get("checks", "id")
allChecks, err := tx.Get(tableChecks, indexID)
if err != nil {
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
}
@ -2310,7 +2319,7 @@ func parseNodes(tx ReadTxn, ws memdb.WatchSet, idx uint64,
allServicesCh := allServices.WatchCh()
// We need a similar fallback for checks.
allChecks, err := tx.Get("checks", "id")
allChecks, err := tx.Get(tableChecks, indexID)
if err != nil {
return 0, nil, fmt.Errorf("failed checks lookup: %s", err)
}

View File

@ -103,6 +103,38 @@ func indexFromServiceNode(raw interface{}) ([]byte, error) {
return b.Bytes(), nil
}
func indexFromHealthCheck(raw interface{}) ([]byte, error) {
hc, ok := raw.(*structs.HealthCheck)
if !ok {
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
}
if hc.Node == "" || hc.CheckID == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(hc.Node))
b.String(strings.ToLower(string(hc.CheckID)))
return b.Bytes(), nil
}
func indexFromNodeCheckID(raw interface{}) ([]byte, error) {
hc, ok := raw.(NodeCheckID)
if !ok {
return nil, fmt.Errorf("unexpected type %T for NodeCheckID index", raw)
}
if hc.Node == "" || hc.CheckID == "" {
return nil, errMissingValueForIndex
}
var b indexBuilder
b.String(strings.ToLower(hc.Node))
b.String(strings.ToLower(hc.CheckID))
return b.Bytes(), nil
}
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
return fmt.Sprintf("service.%s", name)
}
@ -243,10 +275,6 @@ func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMet
return tx.Get("checks", "status", state)
}
func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get("checks", "id")
}
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
// Insert the check
if err := tx.Insert("checks", chk); err != nil {

View File

@ -123,17 +123,10 @@ func checksTableSchema() *memdb.TableSchema {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "CheckID",
Lowercase: true,
},
},
Indexer: indexerSingleWithPrefix{
readIndex: readIndex(indexFromNodeCheckID),
prefixIndex: prefixIndex(prefixIndexFromQuery),
writeIndex: writeIndex(indexFromHealthCheck),
},
},
indexStatus: {
@ -331,3 +324,10 @@ type upstreamDownstream struct {
structs.RaftIndex
}
// NodeCheckID is used to query the ID index of the checks table.
type NodeCheckID struct {
Node string
CheckID string
structs.EnterpriseMeta
}

View File

@ -1307,7 +1307,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
}
// Associated health check was removed.
checks, err := getCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1")
checks, err := tx.Get(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
if err != nil {
t.Fatalf("err: %s", err)
}
@ -2067,7 +2067,7 @@ func TestStateStore_DeleteService(t *testing.T) {
// that it actually is removed in the state store.
tx := s.db.Txn(false)
defer tx.Abort()
_, check, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, "node1", "check1")
_, check, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: "node1", CheckID: "check1"})
if err != nil || check != nil {
t.Fatalf("bad: %#v (err: %s)", check, err)
}

View File

@ -3,8 +3,9 @@
package state
import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
func firstWithTxn(tx ReadTxn,
@ -19,11 +20,6 @@ func firstWatchWithTxn(tx ReadTxn,
return tx.FirstWatch(table, index, idxVal)
}
func firstWatchCompoundWithTxn(tx ReadTxn,
table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch(table, index, idxVals...)
}
func getWithTxn(tx ReadTxn,
table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

View File

@ -5,9 +5,10 @@ package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-memdb"
)
func sessionIndexer() *memdb.UUIDFieldIndex {
@ -107,7 +108,7 @@ func sessionMaxIndex(tx ReadTxn, entMeta *structs.EnterpriseMeta) uint64 {
func validateSessionChecksTxn(tx *txn, session *structs.Session) error {
// Go over the session checks and ensure they exist.
for _, checkID := range session.CheckIDs() {
check, err := tx.First("checks", "id", session.Node, string(checkID))
check, err := tx.First(tableChecks, indexID, NodeCheckID{Node: session.Node, CheckID: string(checkID)})
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}

View File

@ -163,7 +163,7 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
tx := s.db.Txn(false)
defer tx.Abort()
_, c, err := firstWatchCompoundWithTxn(tx, "checks", "id", nil, nodeID, string(checkID))
_, c, err := tx.FirstWatch(tableChecks, indexID, NodeCheckID{Node: nodeID, CheckID: string(checkID)})
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -48,7 +48,7 @@ table=autopilot-config
table=checks
index=id unique
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=CheckID Lowercase=true] AllowMissing=false
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingleWithPrefix readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeCheckID writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromHealthCheck prefixIndex=github.com/hashicorp/consul/agent/consul/state.prefixIndexFromQuery
index=node allow-missing
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeIdentity
index=node_service allow-missing