state: Use the tableIndex constant

This commit is contained in:
Daniel Nephin 2021-02-05 17:53:08 -05:00
parent de841bd459
commit cdda3b9321
6 changed files with 30 additions and 30 deletions

View File

@ -285,7 +285,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
defer tx.Abort() defer tx.Abort()
// We must have initialized before this will ever be possible. // We must have initialized before this will ever be possible.
existing, err := tx.First("index", "id", "acl-token-bootstrap") existing, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
if err != nil { if err != nil {
return fmt.Errorf("bootstrap check failed: %v", err) return fmt.Errorf("bootstrap check failed: %v", err)
} }
@ -300,7 +300,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil { if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil {
return fmt.Errorf("failed inserting bootstrap token: %v", err) return fmt.Errorf("failed inserting bootstrap token: %v", err)
} }
if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err) return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err)
} }
return tx.Commit() return tx.Commit()
@ -311,7 +311,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
// Lookup the bootstrap sentinel // Lookup the bootstrap sentinel
out, err := tx.First("index", "id", "acl-token-bootstrap") out, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }

View File

@ -324,7 +324,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
if err := tx.Insert("nodes", node); err != nil { if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err) return fmt.Errorf("failed inserting node: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
// Update the node's service indexes as the node information is included // Update the node's service indexes as the node information is included
@ -557,7 +557,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
if err := tx.Delete("coordinates", coord); err != nil { if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err) return fmt.Errorf("failed deleting coordinate: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
} }
@ -566,7 +566,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
if err := tx.Delete("nodes", node); err != nil { if err := tx.Delete("nodes", node); err != nil {
return fmt.Errorf("failed deleting node: %s", err) return fmt.Errorf("failed deleting node: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
@ -1367,7 +1367,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta) _, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta)
if err == nil && serviceIndex != nil { if err == nil && serviceIndex != nil {
// we found service.<serviceName> index, garbage collect it // we found service.<serviceName> index, garbage collect it
if errW := tx.Delete("index", serviceIndex); errW != nil { if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
} }
} }

View File

@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _
} }
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{indexServiceExtinction, idx}); err != nil {
return fmt.Errorf("failed updating missing service extinction index: %s", err) return fmt.Errorf("failed updating missing service extinction index: %s", err)
} }
return nil return nil
@ -86,7 +86,7 @@ func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
} }
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil)) return tx.FirstWatch(tableIndex, "id", serviceIndexName(serviceName, nil))
} }
func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 { func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En
} }
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) { func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
return tx.First("index", "id", indexServiceExtinction) return tx.First(tableIndex, "id", indexServiceExtinction)
} }
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
@ -129,7 +129,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
// update the universal index entry // update the universal index entry
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
return nil return nil

View File

@ -5,8 +5,9 @@ package state
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
) )
func kvsIndexer() *memdb.StringFieldIndex { func kvsIndexer() *memdb.StringFieldIndex {
@ -26,7 +27,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e
return fmt.Errorf("failed updating kvs index: %v", err) return fmt.Errorf("failed updating kvs index: %v", err)
} }
} else { } else {
if err := tx.Insert("index", &IndexEntry{"kvs", entry.ModifyIndex}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err) return fmt.Errorf("failed updating kvs index: %s", err)
} }
} }
@ -70,7 +71,7 @@ func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta
} }
} }
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
} }
@ -87,7 +88,7 @@ func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error
return fmt.Errorf("failed deleting kvs entry: %s", err) return fmt.Errorf("failed deleting kvs entry: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err) return fmt.Errorf("failed updating kvs index: %s", err)
} }

View File

@ -58,6 +58,8 @@ type IndexEntry struct {
Value uint64 Value uint64
} }
const tableIndex = "index"
// indexTableSchema returns a new table schema used for tracking various the // indexTableSchema returns a new table schema used for tracking various the
// latest raft index for a table or entities within a table. // latest raft index for a table or entities within a table.
// //
@ -67,10 +69,10 @@ type IndexEntry struct {
// table, even when that update is a delete of the most recent item. // table, even when that update is a delete of the most recent item.
func indexTableSchema() *memdb.TableSchema { func indexTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{ return &memdb.TableSchema{
Name: "index", Name: tableIndex,
Indexes: map[string]*memdb.IndexSchema{ Indexes: map[string]*memdb.IndexSchema{
"id": { indexID: {
Name: "id", Name: indexID,
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{

View File

@ -209,16 +209,12 @@ func (s *Snapshot) LastIndex() uint64 {
} }
func (s *Snapshot) Indexes() (memdb.ResultIterator, error) { func (s *Snapshot) Indexes() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("index", "id") return s.tx.Get(tableIndex, indexID)
if err != nil {
return nil, err
}
return iter, nil
} }
// IndexRestore is used to restore an index // IndexRestore is used to restore an index
func (s *Restore) IndexRestore(idx *IndexEntry) error { func (s *Restore) IndexRestore(idx *IndexEntry) error {
if err := s.tx.Insert("index", idx); err != nil { if err := s.tx.Insert(tableIndex, idx); err != nil {
return fmt.Errorf("index insert failed: %v", err) return fmt.Errorf("index insert failed: %v", err)
} }
return nil return nil
@ -279,7 +275,7 @@ func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
var lindex uint64 var lindex uint64
for _, table := range tables { for _, table := range tables {
ch, ti, err := tx.FirstWatch("index", "id", table) ch, ti, err := tx.FirstWatch(tableIndex, "id", table)
if err != nil { if err != nil {
panic(fmt.Sprintf("unknown index: %s err: %s", table, err)) panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
} }
@ -294,21 +290,22 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to // indexUpdateMaxTxn is used when restoring entries and sets the table's index to
// the given idx only if it's greater than the current index. // the given idx only if it's greater than the current index.
func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error { func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
ti, err := tx.First("index", "id", table) ti, err := tx.First(tableIndex, indexID, table)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve existing index: %s", err) return fmt.Errorf("failed to retrieve existing index: %s", err)
} }
// Always take the first update, otherwise do the > check. // Always take the first update, otherwise do the > check.
if ti == nil { if ti == nil {
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil { if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
return fmt.Errorf("failed updating index %s", err) return fmt.Errorf("failed updating index %s", err)
} }
} else if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value { return nil
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil { }
if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
return fmt.Errorf("failed updating index %s", err) return fmt.Errorf("failed updating index %s", err)
} }
} }
return nil return nil
} }