Merge pull request #9728 from hashicorp/dnephin/state-index-table
state: document how index table is used
This commit is contained in:
commit
b17967827d
|
@ -285,7 +285,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// We must have initialized before this will ever be possible.
|
// We must have initialized before this will ever be possible.
|
||||||
existing, err := tx.First("index", "id", "acl-token-bootstrap")
|
existing, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("bootstrap check failed: %v", err)
|
return fmt.Errorf("bootstrap check failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -300,7 +300,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
|
||||||
if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil {
|
if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil {
|
||||||
return fmt.Errorf("failed inserting bootstrap token: %v", err)
|
return fmt.Errorf("failed inserting bootstrap token: %v", err)
|
||||||
}
|
}
|
||||||
if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"acl-token-bootstrap", idx}); err != nil {
|
||||||
return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err)
|
return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err)
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
|
@ -311,7 +311,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
|
||||||
// Lookup the bootstrap sentinel
|
// Lookup the bootstrap sentinel
|
||||||
out, err := tx.First("index", "id", "acl-token-bootstrap")
|
out, err := tx.First(tableIndex, indexID, "acl-token-bootstrap")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, 0, err
|
return false, 0, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -324,7 +324,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
|
||||||
if err := tx.Insert("nodes", node); err != nil {
|
if err := tx.Insert("nodes", node); err != nil {
|
||||||
return fmt.Errorf("failed inserting node: %s", err)
|
return fmt.Errorf("failed inserting node: %s", err)
|
||||||
}
|
}
|
||||||
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
// Update the node's service indexes as the node information is included
|
// Update the node's service indexes as the node information is included
|
||||||
|
@ -557,7 +557,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
||||||
if err := tx.Delete("coordinates", coord); err != nil {
|
if err := tx.Delete("coordinates", coord); err != nil {
|
||||||
return fmt.Errorf("failed deleting coordinate: %s", err)
|
return fmt.Errorf("failed deleting coordinate: %s", err)
|
||||||
}
|
}
|
||||||
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,7 +566,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error {
|
||||||
if err := tx.Delete("nodes", node); err != nil {
|
if err := tx.Delete("nodes", node); err != nil {
|
||||||
return fmt.Errorf("failed deleting node: %s", err)
|
return fmt.Errorf("failed deleting node: %s", err)
|
||||||
}
|
}
|
||||||
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1367,7 +1367,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||||
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta)
|
_, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta)
|
||||||
if err == nil && serviceIndex != nil {
|
if err == nil && serviceIndex != nil {
|
||||||
// we found service.<serviceName> index, garbage collect it
|
// we found service.<serviceName> index, garbage collect it
|
||||||
if errW := tx.Delete("index", serviceIndex); errW != nil {
|
if errW := tx.Delete(tableIndex, serviceIndex); errW != nil {
|
||||||
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
||||||
if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{indexServiceExtinction, idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating missing service extinction index: %s", err)
|
return fmt.Errorf("failed updating missing service extinction index: %s", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -86,7 +86,7 @@ func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil))
|
return tx.FirstWatch(tableIndex, "id", serviceIndexName(serviceName, nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
|
func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
|
||||||
|
@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||||
return tx.First("index", "id", indexServiceExtinction)
|
return tx.First(tableIndex, "id", indexServiceExtinction)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||||
|
@ -129,7 +129,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe
|
||||||
|
|
||||||
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
||||||
// update the universal index entry
|
// update the universal index entry
|
||||||
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -5,8 +5,9 @@ package state
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func kvsIndexer() *memdb.StringFieldIndex {
|
func kvsIndexer() *memdb.StringFieldIndex {
|
||||||
|
@ -26,7 +27,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e
|
||||||
return fmt.Errorf("failed updating kvs index: %v", err)
|
return fmt.Errorf("failed updating kvs index: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := tx.Insert("index", &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
|
||||||
return fmt.Errorf("failed updating kvs index: %s", err)
|
return fmt.Errorf("failed updating kvs index: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,7 +71,7 @@ func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,7 +88,7 @@ func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error
|
||||||
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating kvs index: %s", err)
|
return fmt.Errorf("failed updating kvs index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,14 +52,27 @@ func addTableSchemas(db *memdb.DBSchema, schemas ...func() *memdb.TableSchema) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// indexTableSchema returns a new table schema used for tracking various indexes
|
// IndexEntry keeps a record of the last index of a table or entity within a table.
|
||||||
// for the Raft log.
|
type IndexEntry struct {
|
||||||
|
Key string
|
||||||
|
Value uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
const tableIndex = "index"
|
||||||
|
|
||||||
|
// indexTableSchema returns a new table schema used for tracking various the
|
||||||
|
// latest raft index for a table or entities within a table.
|
||||||
|
//
|
||||||
|
// The index table is necessary for tables that do not use tombstones. If the latest
|
||||||
|
// items in the table are deleted, the max index of a table would appear to go
|
||||||
|
// backwards. With the index table we can keep track of the latest update to a
|
||||||
|
// table, even when that update is a delete of the most recent item.
|
||||||
func indexTableSchema() *memdb.TableSchema {
|
func indexTableSchema() *memdb.TableSchema {
|
||||||
return &memdb.TableSchema{
|
return &memdb.TableSchema{
|
||||||
Name: "index",
|
Name: tableIndex,
|
||||||
Indexes: map[string]*memdb.IndexSchema{
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
"id": {
|
indexID: {
|
||||||
Name: "id",
|
Name: indexID,
|
||||||
AllowMissing: false,
|
AllowMissing: false,
|
||||||
Unique: true,
|
Unique: true,
|
||||||
Indexer: &memdb.StringFieldIndex{
|
Indexer: &memdb.StringFieldIndex{
|
||||||
|
|
|
@ -134,12 +134,6 @@ type Restore struct {
|
||||||
tx *txn
|
tx *txn
|
||||||
}
|
}
|
||||||
|
|
||||||
// IndexEntry keeps a record of the last index per-table.
|
|
||||||
type IndexEntry struct {
|
|
||||||
Key string
|
|
||||||
Value uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// sessionCheck is used to create a many-to-one table such that
|
// sessionCheck is used to create a many-to-one table such that
|
||||||
// each check registered by a session can be mapped back to the
|
// each check registered by a session can be mapped back to the
|
||||||
// session table. This is only used internally in the state
|
// session table. This is only used internally in the state
|
||||||
|
@ -215,16 +209,12 @@ func (s *Snapshot) LastIndex() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Snapshot) Indexes() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Indexes() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("index", "id")
|
return s.tx.Get(tableIndex, indexID)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return iter, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IndexRestore is used to restore an index
|
// IndexRestore is used to restore an index
|
||||||
func (s *Restore) IndexRestore(idx *IndexEntry) error {
|
func (s *Restore) IndexRestore(idx *IndexEntry) error {
|
||||||
if err := s.tx.Insert("index", idx); err != nil {
|
if err := s.tx.Insert(tableIndex, idx); err != nil {
|
||||||
return fmt.Errorf("index insert failed: %v", err)
|
return fmt.Errorf("index insert failed: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -285,7 +275,7 @@ func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
|
||||||
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
|
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
|
||||||
var lindex uint64
|
var lindex uint64
|
||||||
for _, table := range tables {
|
for _, table := range tables {
|
||||||
ch, ti, err := tx.FirstWatch("index", "id", table)
|
ch, ti, err := tx.FirstWatch(tableIndex, "id", table)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
|
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
|
||||||
}
|
}
|
||||||
|
@ -300,21 +290,22 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
|
||||||
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to
|
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to
|
||||||
// the given idx only if it's greater than the current index.
|
// the given idx only if it's greater than the current index.
|
||||||
func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
|
func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error {
|
||||||
ti, err := tx.First("index", "id", table)
|
ti, err := tx.First(tableIndex, indexID, table)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to retrieve existing index: %s", err)
|
return fmt.Errorf("failed to retrieve existing index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always take the first update, otherwise do the > check.
|
// Always take the first update, otherwise do the > check.
|
||||||
if ti == nil {
|
if ti == nil {
|
||||||
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
|
if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index %s", err)
|
return fmt.Errorf("failed updating index %s", err)
|
||||||
}
|
}
|
||||||
} else if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
|
return nil
|
||||||
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
|
}
|
||||||
|
if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
|
||||||
|
if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index %s", err)
|
return fmt.Errorf("failed updating index %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue