open-consul/agent/consul/state/coordinate.go
Daniel Nephin b747b27afd state: remove the need for registerSchema
registerSchema creates some indirection which is not necessary in this
case. newDBSchema can call each of the tables.

Enterprise tables can be added from the existing withEnterpriseSchema
shim.
2021-02-05 12:19:56 -05:00

169 lines
4.6 KiB
Go

package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
// coordinatesTableSchema returns a new table schema used for storing
// network coordinates.
func coordinatesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "coordinates",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
},
},
},
},
"node": {
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
},
}
}
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Coordinates is used when restoring from a snapshot. For general inserts, use
// CoordinateBatchUpdate. We do less vetting of the updates here because they
// already got checked on the way in during a batch update.
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
for _, update := range updates {
// Skip any bad data that may have gotten into the database from
// a bad client in the past.
if !update.Coord.IsValid() {
continue
}
if err := s.tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
}
}
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
tableIdx := maxIndexTxn(tx, "coordinates")
iter, err := tx.Get("coordinates", "node", node)
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
results := make(lib.CoordinateSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
}
return tableIdx, results, nil
}
// Coordinates queries for all nodes with coordinates.
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "coordinates")
// Pull all the coordinates.
iter, err := tx.Get("coordinates", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results structs.Coordinates
for coord := iter.Next(); coord != nil; coord = iter.Next() {
results = append(results, coord.(*structs.Coordinate))
}
return idx, results, nil
}
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single transaction.
func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
// Upsert the coordinates.
for _, update := range updates {
// Skip any bad data that may have gotten into the database from
// a bad client in the past.
if !update.Coord.IsValid() {
continue
}
// Since the cleanup of coordinates is tied to deletion of
// nodes, we silently drop any updates for nodes that we don't
// know about. This might be possible during normal operation
// if we happen to get a coordinate update for a node that
// hasn't been able to add itself to the catalog yet. Since we
// don't carefully sequence this, and since it will fix itself
// on the next coordinate update from that node, we don't return
// an error or log anything.
node, err := tx.First("nodes", "id", update.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
continue
}
if err := tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
}
// Update the index.
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return tx.Commit()
}