package nomad import ( "fmt" "io" "log" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" ) // The StateStore is responsible for maintaining all the Nomad // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide // high concurrency for read operations without blocking writes, and // to provide write availability in the face of reads. EVERY object // returned as a result of a read against the state store should be // considered a constant and NEVER modified in place. type StateStore struct { logger *log.Logger db *memdb.MemDB } // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore } // StateRestore is used to optimize the performance when // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { txn *memdb.Txn } // Abort is used to abort the restore operation func (s *StateRestore) Abort() { s.txn.Abort() } // Commit is used to commit the restore operation func (s *StateRestore) Commit() { s.txn.Commit() } // IndexEntry is used with the "index" table // for managing the latest Raft index affecting a table. type IndexEntry struct { Key string Value uint64 } // NewStateStore is used to create a new state store func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the MemDB db, err := memdb.NewMemDB(stateStoreSchema()) if err != nil { return nil, fmt.Errorf("state store setup failed: %v", err) } // Create the state store s := &StateStore{ logger: log.New(logOutput, "", log.LstdFlags), db: db, } return s, nil } // Snapshot is used to create a point in time snapshot. Because // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { snap := &StateSnapshot{ StateStore: StateStore{ logger: s.logger, db: s.db.Snapshot(), }, } return snap, nil } // Restore is used to optimize the efficiency of rebuilding // state by minimizing the number of transactions and checking // overhead. func (s *StateStore) Restore() (*StateRestore, error) { txn := s.db.Txn(true) return &StateRestore{txn}, nil } // RegisterNode is used to register a node or update a node definition func (s *StateStore) RegisterNode(index uint64, node *structs.Node) error { txn := s.db.Txn(true) defer txn.Abort() // Check if the node already exists existing, err := txn.First("nodes", "id", node.ID) if err != nil { return fmt.Errorf("node lookup failed: %v", err) } // Setup the indexes correctly if existing != nil { node.CreateIndex = existing.(*structs.Node).CreateIndex node.ModifyIndex = index } else { node.CreateIndex = index node.ModifyIndex = index } // Insert the node if err := txn.Insert("nodes", node); err != nil { return fmt.Errorf("node insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // DeregisterNode is used to deregister a node func (s *StateStore) DeregisterNode(index uint64, nodeID string) error { txn := s.db.Txn(true) defer txn.Abort() // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { return fmt.Errorf("node lookup failed: %v", err) } if existing == nil { return fmt.Errorf("node not found") } // Delete the node if err := txn.Delete("nodes", existing); err != nil { return fmt.Errorf("node delete failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // UpdateNodeStatus is used to update the status of a node func (s *StateStore) UpdateNodeStatus(index uint64, nodeID string, status string) error { txn := s.db.Txn(true) defer txn.Abort() // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { return fmt.Errorf("node lookup failed: %v", err) } if existing == nil { return fmt.Errorf("node not found") } // Copy the existing node existingNode := existing.(*structs.Node) copyNode := new(structs.Node) *copyNode = *existingNode // Update the status in the copy copyNode.Status = status copyNode.ModifyIndex = index // Insert the node if err := txn.Insert("nodes", copyNode); err != nil { return fmt.Errorf("node update failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // GetNodeByID is used to lookup a node by ID func (s *StateStore) GetNodeByID(nodeID string) (*structs.Node, error) { txn := s.db.Txn(false) existing, err := txn.First("nodes", "id", nodeID) if err != nil { return nil, fmt.Errorf("node lookup failed: %v", err) } if existing != nil { return existing.(*structs.Node), nil } return nil, nil } // Nodes returns an iterator over all the nodes func (s *StateStore) Nodes() (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire nodes table iter, err := txn.Get("nodes", "id") if err != nil { return nil, err } return iter, nil } // RegisterJob is used to register a job or update a job definition func (s *StateStore) RegisterJob(index uint64, job *structs.Job) error { txn := s.db.Txn(true) defer txn.Abort() // Check if the job already exists existing, err := txn.First("jobs", "id", job.ID) if err != nil { return fmt.Errorf("job lookup failed: %v", err) } // Setup the indexes correctly if existing != nil { job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index } else { job.CreateIndex = index job.ModifyIndex = index } // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // DeregisterJob is used to deregister a job func (s *StateStore) DeregisterJob(index uint64, jobID string) error { txn := s.db.Txn(true) defer txn.Abort() // Lookup the node existing, err := txn.First("jobs", "id", jobID) if err != nil { return fmt.Errorf("job lookup failed: %v", err) } if existing == nil { return fmt.Errorf("job not found") } // Delete the node if err := txn.Delete("jobs", existing); err != nil { return fmt.Errorf("job delete failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // GetJobByID is used to lookup a job by its ID func (s *StateStore) GetJobByID(id string) (*structs.Job, error) { txn := s.db.Txn(false) existing, err := txn.First("jobs", "id", id) if err != nil { return nil, fmt.Errorf("job lookup failed: %v", err) } if existing != nil { return existing.(*structs.Job), nil } return nil, nil } // Jobs returns an iterator over all the jobs func (s *StateStore) Jobs() (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire jobs table iter, err := txn.Get("jobs", "id") if err != nil { return nil, err } return iter, nil } // UpsertEvaluation is used to upsert an evaluation func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) defer txn.Abort() // Do a nested upsert for _, eval := range evals { if err := s.nestedUpsertEval(txn, index, eval); err != nil { return err } } txn.Commit() return nil } // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation existing, err := txn.First("evals", "id", eval.ID) if err != nil { return fmt.Errorf("eval lookup failed: %v", err) } // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex eval.ModifyIndex = index } else { eval.CreateIndex = index eval.ModifyIndex = index } // Insert the eval if err := txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } return nil } // DeleteEval is used to delete an evaluation func (s *StateStore) DeleteEval(index uint64, evalID string) error { txn := s.db.Txn(true) defer txn.Abort() // Lookup the node existing, err := txn.First("evals", "id", evalID) if err != nil { return fmt.Errorf("eval lookup failed: %v", err) } if existing == nil { return fmt.Errorf("eval not found") } // Delete the node if err := txn.Delete("evals", existing); err != nil { return fmt.Errorf("eval delete failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // GetEvalByID is used to lookup an eval by its ID func (s *StateStore) GetEvalByID(id string) (*structs.Evaluation, error) { txn := s.db.Txn(false) existing, err := txn.First("evals", "id", id) if err != nil { return nil, fmt.Errorf("eval lookup failed: %v", err) } if existing != nil { return existing.(*structs.Evaluation), nil } return nil, nil } // Evals returns an iterator over all the evaluations func (s *StateStore) Evals() (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire table iter, err := txn.Get("evals", "id") if err != nil { return nil, err } return iter, nil } // UpdateAllocations is used to evict a set of allocations // and allocate new ones at the same time. func (s *StateStore) UpdateAllocations(index uint64, evicts []string, allocs []*structs.Allocation) error { txn := s.db.Txn(true) defer txn.Abort() // Handle evictions first for _, evict := range evicts { existing, err := txn.First("allocs", "id", evict) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) } if existing == nil { continue } if err := txn.Delete("allocs", existing); err != nil { return fmt.Errorf("alloc delete failed: %v", err) } } // Handle the allocations for _, alloc := range allocs { existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) } if existing == nil { alloc.CreateIndex = index alloc.ModifyIndex = index } else { alloc.CreateIndex = existing.(*structs.Allocation).CreateIndex alloc.ModifyIndex = index } if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } } // Update the indexes if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() return nil } // GetAllocByID is used to lookup an allocation by its ID func (s *StateStore) GetAllocByID(id string) (*structs.Allocation, error) { txn := s.db.Txn(false) existing, err := txn.First("allocs", "id", id) if err != nil { return nil, fmt.Errorf("alloc lookup failed: %v", err) } if existing != nil { return existing.(*structs.Allocation), nil } return nil, nil } // AllocsByNode returns all the allocations by node func (s *StateStore) AllocsByNode(node string) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations iter, err := txn.Get("allocs", "node", node) if err != nil { return nil, err } var out []*structs.Allocation for { raw := iter.Next() if raw == nil { break } out = append(out, raw.(*structs.Allocation)) } return out, nil } // AllocsByJob returns all the allocations by job id func (s *StateStore) AllocsByJob(jobID string) ([]*structs.Allocation, error) { txn := s.db.Txn(false) // Get an iterator over the node allocations iter, err := txn.Get("allocs", "job", jobID) if err != nil { return nil, err } var out []*structs.Allocation for { raw := iter.Next() if raw == nil { break } out = append(out, raw.(*structs.Allocation)) } return out, nil } // Allocs returns an iterator over all the evaluations func (s *StateStore) Allocs() (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire table iter, err := txn.Get("allocs", "id") if err != nil { return nil, err } return iter, nil } // GetIndex finds the matching index value func (s *StateStore) GetIndex(name string) (uint64, error) { txn := s.db.Txn(false) // Lookup the first matching index out, err := txn.First("index", "id", name) if err != nil { return 0, err } if out == nil { return 0, nil } return out.(*IndexEntry).Value, nil } // Indexes returns an iterator over all the indexes func (s *StateStore) Indexes() (memdb.ResultIterator, error) { txn := s.db.Txn(false) // Walk the entire nodes table iter, err := txn.Get("index", "id") if err != nil { return nil, err } return iter, nil } // NodeRestore is used to restore a node func (r *StateRestore) NodeRestore(node *structs.Node) error { if err := r.txn.Insert("nodes", node); err != nil { return fmt.Errorf("node insert failed: %v", err) } return nil } // JobRestore is used to restore a job func (r *StateRestore) JobRestore(job *structs.Job) error { if err := r.txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) } return nil } // EvalRestore is used to restore an evaluation func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { if err := r.txn.Insert("evals", eval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } return nil } // AllocRestore is used to restore an allocation func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } return nil } // IndexRestore is used to restore an index func (r *StateRestore) IndexRestore(idx *IndexEntry) error { if err := r.txn.Insert("index", idx); err != nil { return fmt.Errorf("index insert failed: %v", err) } return nil }