2360 lines
69 KiB
Go
2360 lines
69 KiB
Go
package state
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/consul/structs"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/serf/coordinate"
|
|
)
|
|
|
|
var (
|
|
// ErrMissingNode is the error returned when trying an operation
|
|
// which requires a node registration but none exists.
|
|
ErrMissingNode = errors.New("Missing node registration")
|
|
|
|
// ErrMissingService is the error we return if trying an
|
|
// operation which requires a service but none exists.
|
|
ErrMissingService = errors.New("Missing service registration")
|
|
|
|
// ErrMissingSessionID is returned when a session registration
|
|
// is attempted with an empty session ID.
|
|
ErrMissingSessionID = errors.New("Missing session ID")
|
|
|
|
// ErrMissingACLID is returned when a session set is called on
|
|
// a session with an empty ID.
|
|
ErrMissingACLID = errors.New("Missing ACL ID")
|
|
)
|
|
|
|
// StateStore is where we store all of Consul's state, including
|
|
// records of node registrations, services, checks, key/value
|
|
// pairs and more. The DB is entirely in-memory and is constructed
|
|
// from the Raft log through the FSM.
|
|
type StateStore struct {
|
|
schema *memdb.DBSchema
|
|
db *memdb.MemDB
|
|
|
|
// tableWatches holds all the full table watches, indexed by table name.
|
|
tableWatches map[string]*FullTableWatch
|
|
|
|
// kvsWatch holds the special prefix watch for the key value store.
|
|
kvsWatch *PrefixWatch
|
|
|
|
// kvsGraveyard manages tombstones for the key value store.
|
|
kvsGraveyard *Graveyard
|
|
|
|
// lockDelay holds expiration times for locks associated with keys.
|
|
lockDelay *Delay
|
|
}
|
|
|
|
// StateSnapshot is used to provide a point-in-time snapshot. It
|
|
// works by starting a read transaction against the whole state store.
|
|
type StateSnapshot struct {
|
|
store *StateStore
|
|
tx *memdb.Txn
|
|
lastIndex uint64
|
|
}
|
|
|
|
// StateRestore is used to efficiently manage restoring a large amount of
|
|
// data to a state store.
|
|
type StateRestore struct {
|
|
store *StateStore
|
|
tx *memdb.Txn
|
|
watches *DumbWatchManager
|
|
}
|
|
|
|
// IndexEntry keeps a record of the last index per-table.
|
|
type IndexEntry struct {
|
|
Key string
|
|
Value uint64
|
|
}
|
|
|
|
// sessionCheck is used to create a many-to-one table such that
|
|
// each check registered by a session can be mapped back to the
|
|
// session table. This is only used internally in the state
|
|
// store and thus it is not exported.
|
|
type sessionCheck struct {
|
|
Node string
|
|
CheckID string
|
|
Session string
|
|
}
|
|
|
|
// NewStateStore creates a new in-memory state storage layer.
|
|
func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
|
// Create the in-memory DB.
|
|
schema := stateStoreSchema()
|
|
db, err := memdb.NewMemDB(schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed setting up state store: %s", err)
|
|
}
|
|
|
|
// Build up the all-table watches.
|
|
tableWatches := make(map[string]*FullTableWatch)
|
|
for table, _ := range schema.Tables {
|
|
if table == "kvs" || table == "tombstones" {
|
|
continue
|
|
}
|
|
|
|
tableWatches[table] = NewFullTableWatch()
|
|
}
|
|
|
|
// Create and return the state store.
|
|
s := &StateStore{
|
|
schema: schema,
|
|
db: db,
|
|
tableWatches: tableWatches,
|
|
kvsWatch: NewPrefixWatch(),
|
|
kvsGraveyard: NewGraveyard(gc),
|
|
lockDelay: NewDelay(),
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Snapshot is used to create a point-in-time snapshot of the entire db.
|
|
func (s *StateStore) Snapshot() *StateSnapshot {
|
|
tx := s.db.Txn(false)
|
|
|
|
var tables []string
|
|
for table, _ := range s.schema.Tables {
|
|
tables = append(tables, table)
|
|
}
|
|
idx := maxIndexTxn(tx, tables...)
|
|
|
|
return &StateSnapshot{s, tx, idx}
|
|
}
|
|
|
|
// LastIndex returns that last index that affects the snapshotted data.
|
|
func (s *StateSnapshot) LastIndex() uint64 {
|
|
return s.lastIndex
|
|
}
|
|
|
|
// Close performs cleanup of a state snapshot.
|
|
func (s *StateSnapshot) Close() {
|
|
s.tx.Abort()
|
|
}
|
|
|
|
// Nodes is used to pull the full list of nodes for use during snapshots.
|
|
func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("nodes", "id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Services is used to pull the full list of services for a given node for use
|
|
// during snapshots.
|
|
func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("services", "node", node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Checks is used to pull the full list of checks for a given node for use
|
|
// during snapshots.
|
|
func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("checks", "node", node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// KVs is used to pull the full list of KVS entries for use during snapshots.
|
|
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("kvs", "id_prefix")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Tombstones is used to pull all the tombstones from the graveyard.
|
|
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
|
|
return s.store.kvsGraveyard.DumpTxn(s.tx)
|
|
}
|
|
|
|
// Sessions is used to pull the full list of sessions for use during snapshots.
|
|
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("sessions", "id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// ACLs is used to pull all the ACLs from the snapshot.
|
|
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("acls", "id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Coordinates is used to pull all the coordinates from the snapshot.
|
|
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
|
|
iter, err := s.tx.Get("coordinates", "id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Restore is used to efficiently manage restoring a large amount of data into
|
|
// the state store. It works by doing all the restores inside of a single
|
|
// transaction.
|
|
func (s *StateStore) Restore() *StateRestore {
|
|
tx := s.db.Txn(true)
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
return &StateRestore{s, tx, watches}
|
|
}
|
|
|
|
// Abort abandons the changes made by a restore. This or Commit should always be
|
|
// called.
|
|
func (s *StateRestore) Abort() {
|
|
s.tx.Abort()
|
|
}
|
|
|
|
// Commit commits the changes made by a restore. This or Abort should always be
|
|
// called.
|
|
func (s *StateRestore) Commit() {
|
|
// Fire off a single KVS watch instead of a zillion prefix ones, and use
|
|
// a dumb watch manager to single-fire all the full table watches.
|
|
s.tx.Defer(func() { s.store.kvsWatch.Notify("", true) })
|
|
s.tx.Defer(func() { s.watches.Notify() })
|
|
|
|
s.tx.Commit()
|
|
}
|
|
|
|
// Registration is used to make sure a node, service, and check registration is
|
|
// performed within a single transaction to avoid race conditions on state
|
|
// updates.
|
|
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
|
if err := s.store.ensureRegistrationTxn(s.tx, idx, s.watches, req); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
|
|
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
|
|
if err := s.tx.Insert("kvs", entry); err != nil {
|
|
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
|
}
|
|
|
|
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
// We have a single top-level KVS watch trigger instead of doing
|
|
// tons of prefix watches.
|
|
return nil
|
|
}
|
|
|
|
// Tombstone is used when restoring from a snapshot. For general inserts, use
|
|
// Graveyard.InsertTxn.
|
|
func (s *StateRestore) Tombstone(stone *Tombstone) error {
|
|
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
|
|
return fmt.Errorf("failed restoring tombstone: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Session is used when restoring from a snapshot. For general inserts, use
|
|
// SessionCreate.
|
|
func (s *StateRestore) Session(sess *structs.Session) error {
|
|
// Insert the session.
|
|
if err := s.tx.Insert("sessions", sess); err != nil {
|
|
return fmt.Errorf("failed inserting session: %s", err)
|
|
}
|
|
|
|
// Insert the check mappings.
|
|
for _, checkID := range sess.Checks {
|
|
mapping := &sessionCheck{
|
|
Node: sess.Node,
|
|
CheckID: checkID,
|
|
Session: sess.ID,
|
|
}
|
|
if err := s.tx.Insert("session_checks", mapping); err != nil {
|
|
return fmt.Errorf("failed inserting session check mapping: %s", err)
|
|
}
|
|
}
|
|
|
|
// Update the index.
|
|
if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
s.watches.Arm("sessions")
|
|
return nil
|
|
}
|
|
|
|
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
|
|
func (s *StateRestore) ACL(acl *structs.ACL) error {
|
|
if err := s.tx.Insert("acls", acl); err != nil {
|
|
return fmt.Errorf("failed restoring acl: %s", err)
|
|
}
|
|
|
|
if err := indexUpdateMaxTxn(s.tx, acl.ModifyIndex, "acls"); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
s.watches.Arm("acls")
|
|
return nil
|
|
}
|
|
|
|
// Coordinates is used when restoring from a snapshot. For general inserts, use
|
|
// CoordinateBatchUpdate. We do less vetting of the updates here because they
|
|
// already got checked on the way in during a batch update.
|
|
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
|
for _, update := range updates {
|
|
if err := s.tx.Insert("coordinates", update); err != nil {
|
|
return fmt.Errorf("failed restoring coordinate: %s", err)
|
|
}
|
|
}
|
|
|
|
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
s.watches.Arm("coordinates")
|
|
return nil
|
|
}
|
|
|
|
// maxIndex is a helper used to retrieve the highest known index
|
|
// amongst a set of tables in the db.
|
|
func (s *StateStore) maxIndex(tables ...string) uint64 {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
return maxIndexTxn(tx, tables...)
|
|
}
|
|
|
|
// maxIndexTxn is a helper used to retrieve the highest known index
|
|
// amongst a set of tables in the db.
|
|
func maxIndexTxn(tx *memdb.Txn, tables ...string) uint64 {
|
|
var lindex uint64
|
|
for _, table := range tables {
|
|
ti, err := tx.First("index", "id", table)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
|
|
}
|
|
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
|
|
lindex = idx.Value
|
|
}
|
|
}
|
|
return lindex
|
|
}
|
|
|
|
// indexUpdateMaxTxn is used when restoring entries and sets the table's index to
|
|
// the given idx only if it's greater than the current index.
|
|
func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error {
|
|
ti, err := tx.First("index", "id", table)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to retrieve existing index: %s", err)
|
|
}
|
|
|
|
// Always take the first update, otherwise do the > check.
|
|
if ti == nil {
|
|
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
|
|
return fmt.Errorf("failed updating index %s", err)
|
|
}
|
|
} else if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value {
|
|
if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil {
|
|
return fmt.Errorf("failed updating index %s", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReapTombstones is used to delete all the tombstones with an index
|
|
// less than or equal to the given index. This is used to prevent
|
|
// unbounded storage growth of the tombstones.
|
|
func (s *StateStore) ReapTombstones(index uint64) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
if err := s.kvsGraveyard.ReapTxn(tx, index); err != nil {
|
|
return fmt.Errorf("failed to reap kvs tombstones: %s", err)
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// getWatchTables returns the list of tables that should be watched and used for
|
|
// max index calculations for the given query method. This is used for all
|
|
// methods except for KVS. This will panic if the method is unknown.
|
|
func (s *StateStore) getWatchTables(method string) []string {
|
|
switch method {
|
|
case "GetNode", "Nodes":
|
|
return []string{"nodes"}
|
|
case "Services":
|
|
return []string{"services"}
|
|
case "ServiceNodes", "NodeServices":
|
|
return []string{"nodes", "services"}
|
|
case "NodeChecks", "ServiceChecks", "ChecksInState":
|
|
return []string{"checks"}
|
|
case "CheckServiceNodes", "NodeInfo", "NodeDump":
|
|
return []string{"nodes", "services", "checks"}
|
|
case "SessionGet", "SessionList", "NodeSessions":
|
|
return []string{"sessions"}
|
|
case "ACLGet", "ACLList":
|
|
return []string{"acls"}
|
|
case "Coordinates":
|
|
return []string{"coordinates"}
|
|
}
|
|
|
|
panic(fmt.Sprintf("Unknown method %s", method))
|
|
}
|
|
|
|
// getTableWatch returns a full table watch for the given table. This will panic
|
|
// if the table doesn't have a full table watch.
|
|
func (s *StateStore) getTableWatch(table string) Watch {
|
|
if watch, ok := s.tableWatches[table]; ok {
|
|
return watch
|
|
}
|
|
|
|
panic(fmt.Sprintf("Unknown watch for table %s", table))
|
|
}
|
|
|
|
// GetQueryWatch returns a watch for the given query method. This is
|
|
// used for all methods except for KV; you should call GetKVSWatch instead.
|
|
// This will panic if the method is unknown.
|
|
func (s *StateStore) GetQueryWatch(method string) Watch {
|
|
tables := s.getWatchTables(method)
|
|
if len(tables) == 1 {
|
|
return s.getTableWatch(tables[0])
|
|
}
|
|
|
|
var watches []Watch
|
|
for _, table := range tables {
|
|
watches = append(watches, s.getTableWatch(table))
|
|
}
|
|
return NewMultiWatch(watches...)
|
|
}
|
|
|
|
// GetKVSWatch returns a watch for the given prefix in the key value store.
|
|
func (s *StateStore) GetKVSWatch(prefix string) Watch {
|
|
return s.kvsWatch.GetSubwatch(prefix)
|
|
}
|
|
|
|
// EnsureRegistration is used to make sure a node, service, and check
|
|
// registration is performed within a single transaction to avoid race
|
|
// conditions on state updates.
|
|
func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.ensureRegistrationTxn(tx, idx, watches, req); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// ensureRegistrationTxn is used to make sure a node, service, and check
|
|
// registration is performed within a single transaction to avoid race
|
|
// conditions on state updates.
|
|
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
|
req *structs.RegisterRequest) error {
|
|
// Add the node.
|
|
node := &structs.Node{Node: req.Node, Address: req.Address}
|
|
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
|
return fmt.Errorf("failed inserting node: %s", err)
|
|
}
|
|
|
|
// Add the service, if any.
|
|
if req.Service != nil {
|
|
if err := s.ensureServiceTxn(tx, idx, watches, req.Node, req.Service); err != nil {
|
|
return fmt.Errorf("failed inserting service: %s", err)
|
|
}
|
|
}
|
|
|
|
// Add the checks, if any.
|
|
if req.Check != nil {
|
|
if err := s.ensureCheckTxn(tx, idx, watches, req.Check); err != nil {
|
|
return fmt.Errorf("failed inserting check: %s", err)
|
|
}
|
|
}
|
|
for _, check := range req.Checks {
|
|
if err := s.ensureCheckTxn(tx, idx, watches, check); err != nil {
|
|
return fmt.Errorf("failed inserting check: %s", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EnsureNode is used to upsert node registration or modification.
|
|
func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the node upsert
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// ensureNodeTxn is the inner function called to actually create a node
|
|
// registration or modify an existing one in the state store. It allows
|
|
// passing in a memdb transaction so it may be part of a larger txn.
|
|
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
|
node *structs.Node) error {
|
|
// Check for an existing node
|
|
existing, err := tx.First("nodes", "id", node.Node)
|
|
if err != nil {
|
|
return fmt.Errorf("node lookup failed: %s", err)
|
|
}
|
|
|
|
// Get the indexes
|
|
if existing != nil {
|
|
node.CreateIndex = existing.(*structs.Node).CreateIndex
|
|
node.ModifyIndex = idx
|
|
} else {
|
|
node.CreateIndex = idx
|
|
node.ModifyIndex = idx
|
|
}
|
|
|
|
// Insert the node and update the index
|
|
if err := tx.Insert("nodes", node); err != nil {
|
|
return fmt.Errorf("failed inserting node: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
watches.Arm("nodes")
|
|
return nil
|
|
}
|
|
|
|
// GetNode is used to retrieve a node registration by node ID.
|
|
func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("GetNode")...)
|
|
|
|
// Retrieve the node from the state store
|
|
node, err := tx.First("nodes", "id", id)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
|
|
}
|
|
if node != nil {
|
|
return idx, node.(*structs.Node), nil
|
|
}
|
|
return idx, nil, nil
|
|
}
|
|
|
|
// Nodes is used to return all of the known nodes.
|
|
func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
|
|
|
|
// Retrieve all of the nodes
|
|
nodes, err := tx.Get("nodes", "id")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
|
}
|
|
|
|
// Create and return the nodes list.
|
|
var results structs.Nodes
|
|
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
|
results = append(results, node.(*structs.Node))
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// DeleteNode is used to delete a given node by its ID.
|
|
func (s *StateStore) DeleteNode(idx uint64, nodeID string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the node deletion.
|
|
if err := s.deleteNodeTxn(tx, idx, nodeID); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// deleteNodeTxn is the inner method used for removing a node from
|
|
// the store within a given transaction.
|
|
func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) error {
|
|
// Look up the node.
|
|
node, err := tx.First("nodes", "id", nodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("node lookup failed: %s", err)
|
|
}
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
|
|
// Use a watch manager since the inner functions can perform multiple
|
|
// ops per table.
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
|
|
// Delete all services associated with the node and update the service index.
|
|
services, err := tx.Get("services", "node", nodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
var sids []string
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
sids = append(sids, service.(*structs.ServiceNode).ServiceID)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, sid := range sids {
|
|
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, sid); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Delete all checks associated with the node. This will invalidate
|
|
// sessions as necessary.
|
|
checks, err := tx.Get("checks", "node", nodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
var cids []string
|
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
|
cids = append(cids, check.(*structs.HealthCheck).CheckID)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, cid := range cids {
|
|
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Delete any coordinate associated with this node.
|
|
coord, err := tx.First("coordinates", "id", nodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed coordinate lookup: %s", err)
|
|
}
|
|
if coord != nil {
|
|
if err := tx.Delete("coordinates", coord); err != nil {
|
|
return fmt.Errorf("failed deleting coordinate: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
watches.Arm("coordinates")
|
|
}
|
|
|
|
// Delete the node and update the index.
|
|
if err := tx.Delete("nodes", node); err != nil {
|
|
return fmt.Errorf("failed deleting node: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
// Invalidate any sessions for this node.
|
|
sessions, err := tx.Get("sessions", "node", nodeID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
var ids []string
|
|
for sess := sessions.Next(); sess != nil; sess = sessions.Next() {
|
|
ids = append(ids, sess.(*structs.Session).ID)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, id := range ids {
|
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
|
return fmt.Errorf("failed session delete: %s", err)
|
|
}
|
|
}
|
|
|
|
watches.Arm("nodes")
|
|
tx.Defer(func() { watches.Notify() })
|
|
return nil
|
|
}
|
|
|
|
// EnsureService is called to upsert creation of a given NodeService.
|
|
func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the service registration upsert
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.ensureServiceTxn(tx, idx, watches, node, svc); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// ensureServiceTxn is used to upsert a service registration within an
|
|
// existing memdb transaction.
|
|
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
|
node string, svc *structs.NodeService) error {
|
|
// Check for existing service
|
|
existing, err := tx.First("services", "id", node, svc.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
|
|
// Create the service node entry and populate the indexes. We leave the
|
|
// address blank and fill that in on the way out during queries.
|
|
entry := svc.ToServiceNode(node, "")
|
|
if existing != nil {
|
|
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
|
|
entry.ModifyIndex = idx
|
|
} else {
|
|
entry.CreateIndex = idx
|
|
entry.ModifyIndex = idx
|
|
}
|
|
|
|
// Get the node
|
|
n, err := tx.First("nodes", "id", node)
|
|
if err != nil {
|
|
return fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
if n == nil {
|
|
return ErrMissingNode
|
|
}
|
|
|
|
// Insert the service and update the index
|
|
if err := tx.Insert("services", entry); err != nil {
|
|
return fmt.Errorf("failed inserting service: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
watches.Arm("services")
|
|
return nil
|
|
}
|
|
|
|
// Services returns all services along with a list of associated tags.
|
|
func (s *StateStore) Services() (uint64, structs.Services, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("Services")...)
|
|
|
|
// List all the services.
|
|
services, err := tx.Get("services", "id")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed querying services: %s", err)
|
|
}
|
|
|
|
// Rip through the services and enumerate them and their unique set of
|
|
// tags.
|
|
unique := make(map[string]map[string]struct{})
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
svc := service.(*structs.ServiceNode)
|
|
tags, ok := unique[svc.ServiceName]
|
|
if !ok {
|
|
unique[svc.ServiceName] = make(map[string]struct{})
|
|
tags = unique[svc.ServiceName]
|
|
}
|
|
for _, tag := range svc.ServiceTags {
|
|
tags[tag] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Generate the output structure.
|
|
var results = make(structs.Services)
|
|
for service, tags := range unique {
|
|
results[service] = make([]string, 0)
|
|
for tag, _ := range tags {
|
|
results[service] = append(results[service], tag)
|
|
}
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// ServiceNodes returns the nodes associated with a given service name.
|
|
func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
|
|
|
|
// List all the services.
|
|
services, err := tx.Get("services", "service", serviceName)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
var results structs.ServiceNodes
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
results = append(results, service.(*structs.ServiceNode))
|
|
}
|
|
|
|
// Fill in the address details.
|
|
results, err = s.parseServiceNodes(tx, results)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// ServiceTagNodes returns the nodes associated with a given service, filtering
|
|
// out services that don't contain the given tag.
|
|
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
|
|
|
|
// List all the services.
|
|
services, err := tx.Get("services", "service", service)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
|
|
// Gather all the services and apply the tag filter.
|
|
var results structs.ServiceNodes
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
svc := service.(*structs.ServiceNode)
|
|
if !serviceTagFilter(svc, tag) {
|
|
results = append(results, svc)
|
|
}
|
|
}
|
|
|
|
// Fill in the address details.
|
|
results, err = s.parseServiceNodes(tx, results)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// serviceTagFilter returns true (should filter) if the given service node
|
|
// doesn't contain the given tag.
|
|
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
|
tag = strings.ToLower(tag)
|
|
|
|
// Look for the lower cased version of the tag.
|
|
for _, t := range sn.ServiceTags {
|
|
if strings.ToLower(t) == tag {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// If we didn't hit the tag above then we should filter.
|
|
return true
|
|
}
|
|
|
|
// parseServiceNodes iterates over a services query and fills in the node details,
|
|
// returning a ServiceNodes slice.
|
|
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
|
var results structs.ServiceNodes
|
|
for _, sn := range services {
|
|
// Note that we have to clone here because we don't want to
|
|
// modify the address field on the object in the database,
|
|
// which is what we are referencing.
|
|
s := sn.Clone()
|
|
|
|
// Fill in the address of the node.
|
|
n, err := tx.First("nodes", "id", sn.Node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
s.Address = n.(*structs.Node).Address
|
|
results = append(results, s)
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
// NodeServices is used to query service registrations by node ID.
|
|
func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeServices")...)
|
|
|
|
// Query the node
|
|
n, err := tx.First("nodes", "id", nodeID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
|
|
}
|
|
if n == nil {
|
|
return 0, nil, nil
|
|
}
|
|
node := n.(*structs.Node)
|
|
|
|
// Read all of the services
|
|
services, err := tx.Get("services", "node", nodeID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
|
|
}
|
|
|
|
// Initialize the node services struct
|
|
ns := &structs.NodeServices{
|
|
Node: node,
|
|
Services: make(map[string]*structs.NodeService),
|
|
}
|
|
|
|
// Add all of the services to the map.
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
svc := service.(*structs.ServiceNode).ToNodeService()
|
|
ns.Services[svc.ID] = svc
|
|
}
|
|
|
|
return idx, ns, nil
|
|
}
|
|
|
|
// DeleteService is used to delete a given service associated with a node.
|
|
func (s *StateStore) DeleteService(idx uint64, nodeID, serviceID string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the service deletion
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.deleteServiceTxn(tx, idx, watches, nodeID, serviceID); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// deleteServiceTxn is the inner method called to remove a service
|
|
// registration within an existing transaction.
|
|
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, nodeID, serviceID string) error {
|
|
// Look up the service.
|
|
service, err := tx.First("services", "id", nodeID, serviceID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
if service == nil {
|
|
return nil
|
|
}
|
|
|
|
// Delete any checks associated with the service. This will invalidate
|
|
// sessions as necessary.
|
|
checks, err := tx.Get("checks", "node_service", nodeID, serviceID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed service check lookup: %s", err)
|
|
}
|
|
var cids []string
|
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
|
cids = append(cids, check.(*structs.HealthCheck).CheckID)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, cid := range cids {
|
|
if err := s.deleteCheckTxn(tx, idx, watches, nodeID, cid); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Update the index.
|
|
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
// Delete the service and update the index
|
|
if err := tx.Delete("services", service); err != nil {
|
|
return fmt.Errorf("failed deleting service: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
watches.Arm("services")
|
|
return nil
|
|
}
|
|
|
|
// EnsureCheck is used to store a check registration in the db.
|
|
func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the check registration
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.ensureCheckTxn(tx, idx, watches, hc); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// ensureCheckTransaction is used as the inner method to handle inserting
|
|
// a health check into the state store. It ensures safety against inserting
|
|
// checks with no matching node or service.
|
|
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
|
hc *structs.HealthCheck) error {
|
|
// Check if we have an existing health check
|
|
existing, err := tx.First("checks", "id", hc.Node, hc.CheckID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed health check lookup: %s", err)
|
|
}
|
|
|
|
// Set the indexes
|
|
if existing != nil {
|
|
hc.CreateIndex = existing.(*structs.HealthCheck).CreateIndex
|
|
hc.ModifyIndex = idx
|
|
} else {
|
|
hc.CreateIndex = idx
|
|
hc.ModifyIndex = idx
|
|
}
|
|
|
|
// Use the default check status if none was provided
|
|
if hc.Status == "" {
|
|
hc.Status = structs.HealthCritical
|
|
}
|
|
|
|
// Get the node
|
|
node, err := tx.First("nodes", "id", hc.Node)
|
|
if err != nil {
|
|
return fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
if node == nil {
|
|
return ErrMissingNode
|
|
}
|
|
|
|
// If the check is associated with a service, check that we have
|
|
// a registration for the service.
|
|
if hc.ServiceID != "" {
|
|
service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
if service == nil {
|
|
return ErrMissingService
|
|
}
|
|
|
|
// Copy in the service name
|
|
hc.ServiceName = service.(*structs.ServiceNode).ServiceName
|
|
}
|
|
|
|
// Delete any sessions for this check if the health is critical.
|
|
if hc.Status == structs.HealthCritical {
|
|
mappings, err := tx.Get("session_checks", "node_check", hc.Node, hc.CheckID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
|
}
|
|
|
|
var ids []string
|
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
|
ids = append(ids, mapping.(*sessionCheck).Session)
|
|
}
|
|
|
|
// Delete the session in a separate loop so we don't trash the
|
|
// iterator.
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
for _, id := range ids {
|
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
|
return fmt.Errorf("failed deleting session: %s", err)
|
|
}
|
|
}
|
|
tx.Defer(func() { watches.Notify() })
|
|
}
|
|
|
|
// Persist the check registration in the db.
|
|
if err := tx.Insert("checks", hc); err != nil {
|
|
return fmt.Errorf("failed inserting service: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
watches.Arm("checks")
|
|
return nil
|
|
}
|
|
|
|
// NodeChecks is used to retrieve checks associated with the
|
|
// given node from the state store.
|
|
func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeChecks")...)
|
|
|
|
// Return the checks.
|
|
checks, err := tx.Get("checks", "node", nodeID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
return s.parseChecks(idx, checks)
|
|
}
|
|
|
|
// ServiceChecks is used to get all checks associated with a
|
|
// given service ID. The query is performed against a service
|
|
// _name_ instead of a service ID.
|
|
func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthChecks, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecks")...)
|
|
|
|
// Return the checks.
|
|
checks, err := tx.Get("checks", "service", serviceName)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
return s.parseChecks(idx, checks)
|
|
}
|
|
|
|
// ChecksInState is used to query the state store for all checks
|
|
// which are in the provided state.
|
|
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInState")...)
|
|
|
|
// Query all checks if HealthAny is passed
|
|
if state == structs.HealthAny {
|
|
checks, err := tx.Get("checks", "status")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
return s.parseChecks(idx, checks)
|
|
}
|
|
|
|
// Any other state we need to query for explicitly
|
|
checks, err := tx.Get("checks", "status", state)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
return s.parseChecks(idx, checks)
|
|
}
|
|
|
|
// parseChecks is a helper function used to deduplicate some
|
|
// repetitive code for returning health checks.
|
|
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
|
|
// Gather the health checks and return them properly type casted.
|
|
var results structs.HealthChecks
|
|
for check := iter.Next(); check != nil; check = iter.Next() {
|
|
results = append(results, check.(*structs.HealthCheck))
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// DeleteCheck is used to delete a health check registration.
|
|
func (s *StateStore) DeleteCheck(idx uint64, node, id string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the check deletion
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.deleteCheckTxn(tx, idx, watches, node, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// deleteCheckTxn is the inner method used to call a health
|
|
// check deletion within an existing transaction.
|
|
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, node, id string) error {
|
|
// Try to retrieve the existing health check.
|
|
hc, err := tx.First("checks", "id", node, id)
|
|
if err != nil {
|
|
return fmt.Errorf("check lookup failed: %s", err)
|
|
}
|
|
if hc == nil {
|
|
return nil
|
|
}
|
|
|
|
// Delete the check from the DB and update the index.
|
|
if err := tx.Delete("checks", hc); err != nil {
|
|
return fmt.Errorf("failed removing check: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
// Delete any sessions for this check.
|
|
mappings, err := tx.Get("session_checks", "node_check", node, id)
|
|
if err != nil {
|
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
|
}
|
|
var ids []string
|
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
|
ids = append(ids, mapping.(*sessionCheck).Session)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, id := range ids {
|
|
if err := s.deleteSessionTxn(tx, idx, watches, id); err != nil {
|
|
return fmt.Errorf("failed deleting session: %s", err)
|
|
}
|
|
}
|
|
|
|
watches.Arm("checks")
|
|
return nil
|
|
}
|
|
|
|
// CheckServiceNodes is used to query all nodes and checks for a given service
|
|
// The results are compounded into a CheckServiceNodes, and the index returned
|
|
// is the maximum index observed over any node, check, or service in the result
|
|
// set.
|
|
func (s *StateStore) CheckServiceNodes(serviceName string) (uint64, structs.CheckServiceNodes, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
|
|
|
|
// Query the state store for the service.
|
|
services, err := tx.Get("services", "service", serviceName)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
|
|
// Return the results.
|
|
var results structs.ServiceNodes
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
results = append(results, service.(*structs.ServiceNode))
|
|
}
|
|
return s.parseCheckServiceNodes(tx, idx, results, err)
|
|
}
|
|
|
|
// CheckServiceTagNodes is used to query all nodes and checks for a given
|
|
// service, filtering out services that don't contain the given tag. The results
|
|
// are compounded into a CheckServiceNodes, and the index returned is the maximum
|
|
// index observed over any node, check, or service in the result set.
|
|
func (s *StateStore) CheckServiceTagNodes(serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
|
|
|
|
// Query the state store for the service.
|
|
services, err := tx.Get("services", "service", serviceName)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
|
}
|
|
|
|
// Return the results, filtering by tag.
|
|
var results structs.ServiceNodes
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
svc := service.(*structs.ServiceNode)
|
|
if !serviceTagFilter(svc, tag) {
|
|
results = append(results, svc)
|
|
}
|
|
}
|
|
return s.parseCheckServiceNodes(tx, idx, results, err)
|
|
}
|
|
|
|
// parseCheckServiceNodes is used to parse through a given set of services,
|
|
// and query for an associated node and a set of checks. This is the inner
|
|
// method used to return a rich set of results from a more simple query.
|
|
func (s *StateStore) parseCheckServiceNodes(
|
|
tx *memdb.Txn, idx uint64, services structs.ServiceNodes,
|
|
err error) (uint64, structs.CheckServiceNodes, error) {
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
var results structs.CheckServiceNodes
|
|
for _, sn := range services {
|
|
// Retrieve the node.
|
|
n, err := tx.First("nodes", "id", sn.Node)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
if n == nil {
|
|
return 0, nil, ErrMissingNode
|
|
}
|
|
node := n.(*structs.Node)
|
|
|
|
// We need to return the checks specific to the given service
|
|
// as well as the node itself. Unfortunately, memdb won't let
|
|
// us use the index to do the latter query so we have to pull
|
|
// them all and filter.
|
|
var checks structs.HealthChecks
|
|
iter, err := tx.Get("checks", "node", sn.Node)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
for check := iter.Next(); check != nil; check = iter.Next() {
|
|
hc := check.(*structs.HealthCheck)
|
|
if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID {
|
|
checks = append(checks, hc)
|
|
}
|
|
}
|
|
|
|
// Append to the results.
|
|
results = append(results, structs.CheckServiceNode{
|
|
Node: node,
|
|
Service: sn.ToNodeService(),
|
|
Checks: checks,
|
|
})
|
|
}
|
|
|
|
return idx, results, nil
|
|
}
|
|
|
|
// NodeInfo is used to generate a dump of a single node. The dump includes
|
|
// all services and checks which are registered against the node.
|
|
func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeInfo")...)
|
|
|
|
// Query the node by the passed node
|
|
nodes, err := tx.Get("nodes", "id", node)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
return s.parseNodes(tx, idx, nodes)
|
|
}
|
|
|
|
// NodeDump is used to generate a dump of all nodes. This call is expensive
|
|
// as it has to query every node, service, and check. The response can also
|
|
// be quite large since there is currently no filtering applied.
|
|
func (s *StateStore) NodeDump() (uint64, structs.NodeDump, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeDump")...)
|
|
|
|
// Fetch all of the registered nodes
|
|
nodes, err := tx.Get("nodes", "id")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
return s.parseNodes(tx, idx, nodes)
|
|
}
|
|
|
|
// parseNodes takes an iterator over a set of nodes and returns a struct
|
|
// containing the nodes along with all of their associated services
|
|
// and/or health checks.
|
|
func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
|
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
|
|
|
|
var results structs.NodeDump
|
|
for n := iter.Next(); n != nil; n = iter.Next() {
|
|
node := n.(*structs.Node)
|
|
|
|
// Create the wrapped node
|
|
dump := &structs.NodeInfo{
|
|
Node: node.Node,
|
|
Address: node.Address,
|
|
}
|
|
|
|
// Query the node services
|
|
services, err := tx.Get("services", "node", node.Node)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
|
|
}
|
|
for service := services.Next(); service != nil; service = services.Next() {
|
|
ns := service.(*structs.ServiceNode).ToNodeService()
|
|
dump.Services = append(dump.Services, ns)
|
|
}
|
|
|
|
// Query the node checks
|
|
checks, err := tx.Get("checks", "node", node.Node)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
|
hc := check.(*structs.HealthCheck)
|
|
dump.Checks = append(dump.Checks, hc)
|
|
}
|
|
|
|
// Add the result to the slice
|
|
results = append(results, dump)
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// KVSSet is used to store a key/value pair.
|
|
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Perform the actual set.
|
|
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// kvsSetTxn is used to insert or update a key/value pair in the state
|
|
// store. It is the inner method used and handles only the actual storage.
|
|
// If updateSession is true, then the incoming entry will set the new
|
|
// session (should be validated before calling this). Otherwise, we will keep
|
|
// whatever the existing session is.
|
|
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
|
|
// Retrieve an existing KV pair
|
|
existing, err := tx.First("kvs", "id", entry.Key)
|
|
if err != nil {
|
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Set the indexes.
|
|
if existing != nil {
|
|
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
|
|
} else {
|
|
entry.CreateIndex = idx
|
|
}
|
|
entry.ModifyIndex = idx
|
|
|
|
// Preserve the existing session unless told otherwise. The "existing"
|
|
// session for a new entry is "no session".
|
|
if !updateSession {
|
|
if existing != nil {
|
|
entry.Session = existing.(*structs.DirEntry).Session
|
|
} else {
|
|
entry.Session = ""
|
|
}
|
|
}
|
|
|
|
// Store the kv pair in the state store and update the index.
|
|
if err := tx.Insert("kvs", entry); err != nil {
|
|
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
|
|
return nil
|
|
}
|
|
|
|
// KVSGet is used to retrieve a key/value pair from the state store.
|
|
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
|
|
|
// Retrieve the key.
|
|
entry, err := tx.First("kvs", "id", key)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
if entry != nil {
|
|
return idx, entry.(*structs.DirEntry), nil
|
|
}
|
|
return idx, nil, nil
|
|
}
|
|
|
|
// KVSList is used to list out all keys under a given prefix. If the
|
|
// prefix is left empty, all keys in the KVS will be returned. The returned
|
|
// is the max index of the returned kvs entries or applicable tombstones, or
|
|
// else it's the full table indexes for kvs and tombstones.
|
|
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table indexes.
|
|
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
|
|
|
// Query the prefix and list the available keys
|
|
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Gather all of the keys found in the store
|
|
var ents structs.DirEntries
|
|
var lindex uint64
|
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
e := entry.(*structs.DirEntry)
|
|
ents = append(ents, e)
|
|
if e.ModifyIndex > lindex {
|
|
lindex = e.ModifyIndex
|
|
}
|
|
}
|
|
|
|
// Check for the highest index in the graveyard. If the prefix is empty
|
|
// then just use the full table indexes since we are listing everything.
|
|
if prefix != "" {
|
|
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
|
|
}
|
|
if gindex > lindex {
|
|
lindex = gindex
|
|
}
|
|
} else {
|
|
lindex = idx
|
|
}
|
|
|
|
// Use the sub index if it was set and there are entries, otherwise use
|
|
// the full table index from above.
|
|
if lindex != 0 {
|
|
idx = lindex
|
|
}
|
|
return idx, ents, nil
|
|
}
|
|
|
|
// KVSListKeys is used to query the KV store for keys matching the given prefix.
|
|
// An optional separator may be specified, which can be used to slice off a part
|
|
// of the response so that only a subset of the prefix is returned. In this
|
|
// mode, the keys which are omitted are still counted in the returned index.
|
|
func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table indexes.
|
|
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
|
|
|
// Fetch keys using the specified prefix
|
|
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
prefixLen := len(prefix)
|
|
sepLen := len(sep)
|
|
|
|
var keys []string
|
|
var lindex uint64
|
|
var last string
|
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
e := entry.(*structs.DirEntry)
|
|
|
|
// Accumulate the high index
|
|
if e.ModifyIndex > lindex {
|
|
lindex = e.ModifyIndex
|
|
}
|
|
|
|
// Always accumulate if no separator provided
|
|
if sepLen == 0 {
|
|
keys = append(keys, e.Key)
|
|
continue
|
|
}
|
|
|
|
// Parse and de-duplicate the returned keys based on the
|
|
// key separator, if provided.
|
|
after := e.Key[prefixLen:]
|
|
sepIdx := strings.Index(after, sep)
|
|
if sepIdx > -1 {
|
|
key := e.Key[:prefixLen+sepIdx+sepLen]
|
|
if key != last {
|
|
keys = append(keys, key)
|
|
last = key
|
|
}
|
|
} else {
|
|
keys = append(keys, e.Key)
|
|
}
|
|
}
|
|
|
|
// Check for the highest index in the graveyard. If the prefix is empty
|
|
// then just use the full table indexes since we are listing everything.
|
|
if prefix != "" {
|
|
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
|
|
}
|
|
if gindex > lindex {
|
|
lindex = gindex
|
|
}
|
|
} else {
|
|
lindex = idx
|
|
}
|
|
|
|
// Use the sub index if it was set and there are entries, otherwise use
|
|
// the full table index from above.
|
|
if lindex != 0 {
|
|
idx = lindex
|
|
}
|
|
return idx, keys, nil
|
|
}
|
|
|
|
// KVSDelete is used to perform a shallow delete on a single key in the
|
|
// the state store.
|
|
func (s *StateStore) KVSDelete(idx uint64, key string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Perform the actual delete
|
|
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// kvsDeleteTxn is the inner method used to perform the actual deletion
|
|
// of a key/value pair within an existing transaction.
|
|
func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
|
|
// Look up the entry in the state store.
|
|
entry, err := tx.First("kvs", "id", key)
|
|
if err != nil {
|
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
if entry == nil {
|
|
return nil
|
|
}
|
|
|
|
// Create a tombstone.
|
|
if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil {
|
|
return fmt.Errorf("failed adding to graveyard: %s", err)
|
|
}
|
|
|
|
// Delete the entry and update the index.
|
|
if err := tx.Delete("kvs", entry); err != nil {
|
|
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.kvsWatch.Notify(key, false) })
|
|
return nil
|
|
}
|
|
|
|
// KVSDeleteCAS is used to try doing a KV delete operation with a given
|
|
// raft index. If the CAS index specified is not equal to the last
|
|
// observed index for the given key, then the call is a noop, otherwise
|
|
// a normal KV delete is invoked.
|
|
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Retrieve the existing kvs entry, if any exists.
|
|
entry, err := tx.First("kvs", "id", key)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// If the existing index does not match the provided CAS
|
|
// index arg, then we shouldn't update anything and can safely
|
|
// return early here.
|
|
e, ok := entry.(*structs.DirEntry)
|
|
if !ok || e.ModifyIndex != cidx {
|
|
return entry == nil, nil
|
|
}
|
|
|
|
// Call the actual deletion if the above passed.
|
|
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
tx.Commit()
|
|
return true, nil
|
|
}
|
|
|
|
// KVSSetCAS is used to do a check-and-set operation on a KV entry. The
|
|
// ModifyIndex in the provided entry is used to determine if we should
|
|
// write the entry to the state store or bail. Returns a bool indicating
|
|
// if a write happened and any error.
|
|
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Retrieve the existing entry.
|
|
existing, err := tx.First("kvs", "id", entry.Key)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Check if the we should do the set. A ModifyIndex of 0 means that
|
|
// we are doing a set-if-not-exists.
|
|
if entry.ModifyIndex == 0 && existing != nil {
|
|
return false, nil
|
|
}
|
|
if entry.ModifyIndex != 0 && existing == nil {
|
|
return false, nil
|
|
}
|
|
e, ok := existing.(*structs.DirEntry)
|
|
if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex {
|
|
return false, nil
|
|
}
|
|
|
|
// If we made it this far, we should perform the set.
|
|
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
tx.Commit()
|
|
return true, nil
|
|
}
|
|
|
|
// KVSDeleteTree is used to do a recursive delete on a key prefix
|
|
// in the state store. If any keys are modified, the last index is
|
|
// set, otherwise this is a no-op.
|
|
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Get an iterator over all of the keys with the given prefix.
|
|
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
|
if err != nil {
|
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Go over all of the keys and remove them. We call the delete
|
|
// directly so that we only update the index once. We also add
|
|
// tombstones as we go.
|
|
var modified bool
|
|
var objs []interface{}
|
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
e := entry.(*structs.DirEntry)
|
|
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
|
|
return fmt.Errorf("failed adding to graveyard: %s", err)
|
|
}
|
|
objs = append(objs, entry)
|
|
modified = true
|
|
}
|
|
|
|
// Do the actual deletes in a separate loop so we don't trash the
|
|
// iterator as we go.
|
|
for _, obj := range objs {
|
|
if err := tx.Delete("kvs", obj); err != nil {
|
|
return fmt.Errorf("failed deleting kvs entry: %s", err)
|
|
}
|
|
}
|
|
|
|
// Update the index
|
|
if modified {
|
|
tx.Defer(func() { s.kvsWatch.Notify(prefix, true) })
|
|
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// KVSLockDelay returns the expiration time for any lock delay associated with
|
|
// the given key.
|
|
func (s *StateStore) KVSLockDelay(key string) time.Time {
|
|
return s.lockDelay.GetExpiration(key)
|
|
}
|
|
|
|
// KVSLock is similar to KVSSet but only performs the set if the lock can be
|
|
// acquired.
|
|
func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Verify that a session is present.
|
|
if entry.Session == "" {
|
|
return false, fmt.Errorf("missing session")
|
|
}
|
|
|
|
// Verify that the session exists.
|
|
sess, err := tx.First("sessions", "id", entry.Session)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
if sess == nil {
|
|
return false, fmt.Errorf("invalid session %#v", entry.Session)
|
|
}
|
|
|
|
// Retrieve the existing entry.
|
|
existing, err := tx.First("kvs", "id", entry.Key)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Set up the entry, using the existing entry if present.
|
|
if existing != nil {
|
|
e := existing.(*structs.DirEntry)
|
|
if e.Session == entry.Session {
|
|
// We already hold this lock, good to go.
|
|
entry.CreateIndex = e.CreateIndex
|
|
entry.LockIndex = e.LockIndex
|
|
} else if e.Session != "" {
|
|
// Bail out, someone else holds this lock.
|
|
return false, nil
|
|
} else {
|
|
// Set up a new lock with this session.
|
|
entry.CreateIndex = e.CreateIndex
|
|
entry.LockIndex = e.LockIndex + 1
|
|
}
|
|
} else {
|
|
entry.CreateIndex = idx
|
|
entry.LockIndex = 1
|
|
}
|
|
entry.ModifyIndex = idx
|
|
|
|
// If we made it this far, we should perform the set.
|
|
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
tx.Commit()
|
|
return true, nil
|
|
}
|
|
|
|
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
|
|
// unlocked (the key must already exist and be locked).
|
|
func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Verify that a session is present.
|
|
if entry.Session == "" {
|
|
return false, fmt.Errorf("missing session")
|
|
}
|
|
|
|
// Retrieve the existing entry.
|
|
existing, err := tx.First("kvs", "id", entry.Key)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
|
|
// Bail if there's no existing key.
|
|
if existing == nil {
|
|
return false, nil
|
|
}
|
|
|
|
// Make sure the given session is the lock holder.
|
|
e := existing.(*structs.DirEntry)
|
|
if e.Session != entry.Session {
|
|
return false, nil
|
|
}
|
|
|
|
// Clear the lock and update the entry.
|
|
entry.Session = ""
|
|
entry.LockIndex = e.LockIndex
|
|
entry.CreateIndex = e.CreateIndex
|
|
entry.ModifyIndex = idx
|
|
|
|
// If we made it this far, we should perform the set.
|
|
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
tx.Commit()
|
|
return true, nil
|
|
}
|
|
|
|
// SessionCreate is used to register a new session in the state store.
|
|
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// This code is technically able to (incorrectly) update an existing
|
|
// session but we never do that in practice. The upstream endpoint code
|
|
// always adds a unique ID when doing a create operation so we never hit
|
|
// an existing session again. It isn't worth the overhead to verify
|
|
// that here, but it's worth noting that we should never do this in the
|
|
// future.
|
|
|
|
// Call the session creation
|
|
if err := s.sessionCreateTxn(tx, idx, sess); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// sessionCreateTxn is the inner method used for creating session entries in
|
|
// an open transaction. Any health checks registered with the session will be
|
|
// checked for failing status. Returns any error encountered.
|
|
func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
|
|
// Check that we have a session ID
|
|
if sess.ID == "" {
|
|
return ErrMissingSessionID
|
|
}
|
|
|
|
// Verify the session behavior is valid
|
|
switch sess.Behavior {
|
|
case "":
|
|
// Release by default to preserve backwards compatibility
|
|
sess.Behavior = structs.SessionKeysRelease
|
|
case structs.SessionKeysRelease:
|
|
case structs.SessionKeysDelete:
|
|
default:
|
|
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
|
|
}
|
|
|
|
// Assign the indexes. ModifyIndex likely will not be used but
|
|
// we set it here anyways for sanity.
|
|
sess.CreateIndex = idx
|
|
sess.ModifyIndex = idx
|
|
|
|
// Check that the node exists
|
|
node, err := tx.First("nodes", "id", sess.Node)
|
|
if err != nil {
|
|
return fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
if node == nil {
|
|
return ErrMissingNode
|
|
}
|
|
|
|
// Go over the session checks and ensure they exist.
|
|
for _, checkID := range sess.Checks {
|
|
check, err := tx.First("checks", "id", sess.Node, checkID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed check lookup: %s", err)
|
|
}
|
|
if check == nil {
|
|
return fmt.Errorf("Missing check '%s' registration", checkID)
|
|
}
|
|
|
|
// Check that the check is not in critical state
|
|
status := check.(*structs.HealthCheck).Status
|
|
if status == structs.HealthCritical {
|
|
return fmt.Errorf("Check '%s' is in %s state", checkID, status)
|
|
}
|
|
}
|
|
|
|
// Insert the session
|
|
if err := tx.Insert("sessions", sess); err != nil {
|
|
return fmt.Errorf("failed inserting session: %s", err)
|
|
}
|
|
|
|
// Insert the check mappings
|
|
for _, checkID := range sess.Checks {
|
|
mapping := &sessionCheck{
|
|
Node: sess.Node,
|
|
CheckID: checkID,
|
|
Session: sess.ID,
|
|
}
|
|
if err := tx.Insert("session_checks", mapping); err != nil {
|
|
return fmt.Errorf("failed inserting session check mapping: %s", err)
|
|
}
|
|
}
|
|
|
|
// Update the index
|
|
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.tableWatches["sessions"].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// SessionGet is used to retrieve an active session from the state store.
|
|
func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("SessionGet")...)
|
|
|
|
// Look up the session by its ID
|
|
session, err := tx.First("sessions", "id", sessionID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
if session != nil {
|
|
return idx, session.(*structs.Session), nil
|
|
}
|
|
return idx, nil, nil
|
|
}
|
|
|
|
// SessionList returns a slice containing all of the active sessions.
|
|
func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("SessionList")...)
|
|
|
|
// Query all of the active sessions.
|
|
sessions, err := tx.Get("sessions", "id")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
|
|
// Go over the sessions and create a slice of them.
|
|
var result structs.Sessions
|
|
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
|
result = append(result, session.(*structs.Session))
|
|
}
|
|
return idx, result, nil
|
|
}
|
|
|
|
// NodeSessions returns a set of active sessions associated
|
|
// with the given node ID. The returned index is the highest
|
|
// index seen from the result set.
|
|
func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("NodeSessions")...)
|
|
|
|
// Get all of the sessions which belong to the node
|
|
sessions, err := tx.Get("sessions", "node", nodeID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
|
|
// Go over all of the sessions and return them as a slice
|
|
var result structs.Sessions
|
|
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
|
result = append(result, session.(*structs.Session))
|
|
}
|
|
return idx, result, nil
|
|
}
|
|
|
|
// SessionDestroy is used to remove an active session. This will
|
|
// implicitly invalidate the session and invoke the specified
|
|
// session destroy behavior.
|
|
func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the session deletion.
|
|
watches := NewDumbWatchManager(s.tableWatches)
|
|
if err := s.deleteSessionTxn(tx, idx, watches, sessionID); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Defer(func() { watches.Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// deleteSessionTxn is the inner method, which is used to do the actual
|
|
// session deletion and handle session invalidation, watch triggers, etc.
|
|
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, sessionID string) error {
|
|
// Look up the session.
|
|
sess, err := tx.First("sessions", "id", sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed session lookup: %s", err)
|
|
}
|
|
if sess == nil {
|
|
return nil
|
|
}
|
|
|
|
// Delete the session and write the new index.
|
|
if err := tx.Delete("sessions", sess); err != nil {
|
|
return fmt.Errorf("failed deleting session: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
// Enforce the max lock delay.
|
|
session := sess.(*structs.Session)
|
|
delay := session.LockDelay
|
|
if delay > structs.MaxLockDelay {
|
|
delay = structs.MaxLockDelay
|
|
}
|
|
|
|
// Snag the current now time so that all the expirations get calculated
|
|
// the same way.
|
|
now := time.Now()
|
|
|
|
// Get an iterator over all of the keys with the given session.
|
|
entries, err := tx.Get("kvs", "session", sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed kvs lookup: %s", err)
|
|
}
|
|
var kvs []interface{}
|
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
kvs = append(kvs, entry)
|
|
}
|
|
|
|
// Invalidate any held locks.
|
|
switch session.Behavior {
|
|
case structs.SessionKeysRelease:
|
|
for _, obj := range kvs {
|
|
// Note that we clone here since we are modifying the
|
|
// returned object and want to make sure our set op
|
|
// respects the transaction we are in.
|
|
e := obj.(*structs.DirEntry).Clone()
|
|
e.Session = ""
|
|
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
|
|
return fmt.Errorf("failed kvs update: %s", err)
|
|
}
|
|
|
|
// Apply the lock delay if present.
|
|
if delay > 0 {
|
|
s.lockDelay.SetExpiration(e.Key, now, delay)
|
|
}
|
|
}
|
|
case structs.SessionKeysDelete:
|
|
for _, obj := range kvs {
|
|
e := obj.(*structs.DirEntry)
|
|
if err := s.kvsDeleteTxn(tx, idx, e.Key); err != nil {
|
|
return fmt.Errorf("failed kvs delete: %s", err)
|
|
}
|
|
|
|
// Apply the lock delay if present.
|
|
if delay > 0 {
|
|
s.lockDelay.SetExpiration(e.Key, now, delay)
|
|
}
|
|
}
|
|
default:
|
|
return fmt.Errorf("unknown session behavior %#v", session.Behavior)
|
|
}
|
|
|
|
// Delete any check mappings.
|
|
mappings, err := tx.Get("session_checks", "session", sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed session checks lookup: %s", err)
|
|
}
|
|
var objs []interface{}
|
|
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
|
|
objs = append(objs, mapping)
|
|
}
|
|
|
|
// Do the delete in a separate loop so we don't trash the iterator.
|
|
for _, obj := range objs {
|
|
if err := tx.Delete("session_checks", obj); err != nil {
|
|
return fmt.Errorf("failed deleting session check: %s", err)
|
|
}
|
|
}
|
|
|
|
watches.Arm("sessions")
|
|
return nil
|
|
}
|
|
|
|
// ACLSet is used to insert an ACL rule into the state store.
|
|
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call set on the ACL
|
|
if err := s.aclSetTxn(tx, idx, acl); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// aclSetTxn is the inner method used to insert an ACL rule with the
|
|
// proper indexes into the state store.
|
|
func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
|
|
// Check that the ID is set
|
|
if acl.ID == "" {
|
|
return ErrMissingACLID
|
|
}
|
|
|
|
// Check for an existing ACL
|
|
existing, err := tx.First("acls", "id", acl.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed acl lookup: %s", err)
|
|
}
|
|
|
|
// Set the indexes
|
|
if existing != nil {
|
|
acl.CreateIndex = existing.(*structs.ACL).CreateIndex
|
|
acl.ModifyIndex = idx
|
|
} else {
|
|
acl.CreateIndex = idx
|
|
acl.ModifyIndex = idx
|
|
}
|
|
|
|
// Insert the ACL
|
|
if err := tx.Insert("acls", acl); err != nil {
|
|
return fmt.Errorf("failed inserting acl: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// ACLGet is used to look up an existing ACL by ID.
|
|
func (s *StateStore) ACLGet(aclID string) (uint64, *structs.ACL, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ACLGet")...)
|
|
|
|
// Query for the existing ACL
|
|
acl, err := tx.First("acls", "id", aclID)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
|
|
}
|
|
if acl != nil {
|
|
return idx, acl.(*structs.ACL), nil
|
|
}
|
|
return idx, nil, nil
|
|
}
|
|
|
|
// ACLList is used to list out all of the ACLs in the state store.
|
|
func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
|
|
|
|
// Return the ACLs.
|
|
acls, err := s.aclListTxn(tx)
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
|
|
}
|
|
return idx, acls, nil
|
|
}
|
|
|
|
// aclListTxn is used to list out all of the ACLs in the state store. This is a
|
|
// function vs. a method so it can be called from the snapshotter.
|
|
func (s *StateStore) aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
|
|
// Query all of the ACLs in the state store
|
|
acls, err := tx.Get("acls", "id")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed acl lookup: %s", err)
|
|
}
|
|
|
|
// Go over all of the ACLs and build the response
|
|
var result structs.ACLs
|
|
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
|
a := acl.(*structs.ACL)
|
|
result = append(result, a)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ACLDelete is used to remove an existing ACL from the state store. If
|
|
// the ACL does not exist this is a no-op and no error is returned.
|
|
func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Call the ACL delete
|
|
if err := s.aclDeleteTxn(tx, idx, aclID); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.Commit()
|
|
return nil
|
|
}
|
|
|
|
// aclDeleteTxn is used to delete an ACL from the state store within
|
|
// an existing transaction.
|
|
func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
|
|
// Look up the existing ACL
|
|
acl, err := tx.First("acls", "id", aclID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed acl lookup: %s", err)
|
|
}
|
|
if acl == nil {
|
|
return nil
|
|
}
|
|
|
|
// Delete the ACL from the state store and update indexes
|
|
if err := tx.Delete("acls", acl); err != nil {
|
|
return fmt.Errorf("failed deleting acl: %s", err)
|
|
}
|
|
if err := tx.Insert("index", &IndexEntry{"acls", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.tableWatches["acls"].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// CoordinateGetRaw queries for the coordinate of the given node. This is an
|
|
// unusual state store method because it just returns the raw coordinate or
|
|
// nil, none of the Raft or node information is returned. This hits the 90%
|
|
// internal-to-Consul use case for this data, and this isn't exposed via an
|
|
// endpoint, so it doesn't matter that the Raft info isn't available.
|
|
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Pull the full coordinate entry.
|
|
coord, err := tx.First("coordinates", "id", node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
|
}
|
|
|
|
// Pick out just the raw coordinate.
|
|
if coord != nil {
|
|
return coord.(*structs.Coordinate).Coord, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Coordinates queries for all nodes with coordinates.
|
|
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
|
|
tx := s.db.Txn(false)
|
|
defer tx.Abort()
|
|
|
|
// Get the table index.
|
|
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
|
|
|
|
// Pull all the coordinates.
|
|
coords, err := tx.Get("coordinates", "id")
|
|
if err != nil {
|
|
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
|
|
}
|
|
var results structs.Coordinates
|
|
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
|
results = append(results, coord.(*structs.Coordinate))
|
|
}
|
|
return idx, results, nil
|
|
}
|
|
|
|
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
|
|
// them in a single transaction.
|
|
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
|
|
tx := s.db.Txn(true)
|
|
defer tx.Abort()
|
|
|
|
// Upsert the coordinates.
|
|
for _, update := range updates {
|
|
// Since the cleanup of coordinates is tied to deletion of
|
|
// nodes, we silently drop any updates for nodes that we don't
|
|
// know about. This might be possible during normal operation
|
|
// if we happen to get a coordinate update for a node that
|
|
// hasn't been able to add itself to the catalog yet. Since we
|
|
// don't carefully sequence this, and since it will fix itself
|
|
// on the next coordinate update from that node, we don't return
|
|
// an error or log anything.
|
|
node, err := tx.First("nodes", "id", update.Node)
|
|
if err != nil {
|
|
return fmt.Errorf("failed node lookup: %s", err)
|
|
}
|
|
if node == nil {
|
|
continue
|
|
}
|
|
|
|
if err := tx.Insert("coordinates", update); err != nil {
|
|
return fmt.Errorf("failed inserting coordinate: %s", err)
|
|
}
|
|
}
|
|
|
|
// Update the index.
|
|
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
}
|
|
|
|
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
|
|
tx.Commit()
|
|
return nil
|
|
}
|