open-consul/agent/consul/state/system_metadata.go
Daniel Nephin b747b27afd state: remove the need for registerSchema
registerSchema creates some indirection which is not necessary in this
case. newDBSchema can call each of the tables.

Enterprise tables can be added from the existing withEnterpriseSchema
shim.
2021-02-05 12:19:56 -05:00

192 lines
5.3 KiB
Go

package state
import (
"fmt"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
)
const tableSystemMetadata = "system-metadata"
func systemMetadataTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: tableSystemMetadata,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: true,
},
},
},
}
}
// SystemMetadataEntries used to pull all the system metadata entries for the snapshot.
func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, error) {
entries, err := s.tx.Get(tableSystemMetadata, "id")
if err != nil {
return nil, err
}
var ret []*structs.SystemMetadataEntry
for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() {
ret = append(ret, wrapped.(*structs.SystemMetadataEntry))
}
return ret, nil
}
// SystemMetadataEntry is used when restoring from a snapshot.
func (s *Restore) SystemMetadataEntry(entry *structs.SystemMetadataEntry) error {
// Insert
if err := s.tx.Insert(tableSystemMetadata, entry); err != nil {
return fmt.Errorf("failed restoring system metadata object: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, tableSystemMetadata); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// SystemMetadataSet is called to do an upsert of a set of system metadata entries.
func (s *Store) SystemMetadataSet(idx uint64, entry *structs.SystemMetadataEntry) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if err := systemMetadataSetTxn(tx, idx, entry); err != nil {
return err
}
return tx.Commit()
}
// systemMetadataSetTxn upserts a system metadata inside of a transaction.
func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntry) error {
// The only validation we care about is non-empty keys.
if entry.Key == "" {
return fmt.Errorf("missing key on system metadata")
}
// Check for existing.
var existing *structs.SystemMetadataEntry
existingRaw, err := tx.First(tableSystemMetadata, "id", entry.Key)
if err != nil {
return fmt.Errorf("failed system metadata lookup: %s", err)
}
if existingRaw != nil {
existing = existingRaw.(*structs.SystemMetadataEntry)
}
// Set the indexes
if existing != nil {
entry.CreateIndex = existing.CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
}
// Insert the system metadata and update the index
if err := tx.Insert(tableSystemMetadata, entry); err != nil {
return fmt.Errorf("failed inserting system metadata: %s", err)
}
if err := tx.Insert("index", &IndexEntry{tableSystemMetadata, idx}); err != nil {
return fmt.Errorf("failed updating index: %v", err)
}
return nil
}
// SystemMetadataGet is called to get a system metadata.
func (s *Store) SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return systemMetadataGetTxn(tx, ws, key)
}
func systemMetadataGetTxn(tx ReadTxn, ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
// Get the index
idx := maxIndexTxn(tx, tableSystemMetadata)
// Get the existing contents.
watchCh, existing, err := tx.FirstWatch(tableSystemMetadata, "id", key)
if err != nil {
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
}
ws.Add(watchCh)
if existing == nil {
return idx, nil, nil
}
entry, ok := existing.(*structs.SystemMetadataEntry)
if !ok {
return 0, nil, fmt.Errorf("system metadata %q is an invalid type: %T", key, existing)
}
return idx, entry, nil
}
// SystemMetadataList is called to get all system metadata objects.
func (s *Store) SystemMetadataList(ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return systemMetadataListTxn(tx, ws)
}
func systemMetadataListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
// Get the index
idx := maxIndexTxn(tx, tableSystemMetadata)
iter, err := tx.Get(tableSystemMetadata, "id")
if err != nil {
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results []*structs.SystemMetadataEntry
for v := iter.Next(); v != nil; v = iter.Next() {
results = append(results, v.(*structs.SystemMetadataEntry))
}
return idx, results, nil
}
func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEntry) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if err := systemMetadataDeleteTxn(tx, idx, entry.Key); err != nil {
return err
}
return tx.Commit()
}
func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error {
// Try to retrieve the existing system metadata.
existing, err := tx.First(tableSystemMetadata, "id", key)
if err != nil {
return fmt.Errorf("failed system metadata lookup: %s", err)
}
if existing == nil {
return nil
}
// Delete the system metadata from the DB and update the index.
if err := tx.Delete(tableSystemMetadata, existing); err != nil {
return fmt.Errorf("failed removing system metadata: %s", err)
}
if err := tx.Insert("index", &IndexEntry{tableSystemMetadata, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}