package state import ( "fmt" "path/filepath" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" "github.com/hashicorp/nomad/helper/boltdd" "github.com/hashicorp/nomad/nomad/structs" ) /* The client has a boltDB backed state store. The schema as of 0.9 looks as follows: allocations/ |--> / |--> alloc -> allocEntry{*structs.Allocation} |--> task-/ |--> local_state -> *trstate.LocalState # Local-only state |--> task_state -> *structs.TaskState # Sync'd to servers devicemanager/ |--> plugin_state -> *dmstate.PluginState drivermanager/ |--> plugin_state -> *dmstate.PluginState */ var ( // allocationsBucketName is the bucket name containing all allocation related // data allocationsBucketName = []byte("allocations") // allocKey is the key Allocations are stored under encapsulated in // allocEntry structs. allocKey = []byte("alloc") // allocations -> $allocid -> task-$taskname -> the keys below taskLocalStateKey = []byte("local_state") taskStateKey = []byte("task_state") // devManagerBucket is the bucket name containing all device manager related // data devManagerBucket = []byte("devicemanager") // driverManagerBucket is the bucket name container all driver manager // related data driverManagerBucket = []byte("drivermanager") // managerPluginStateKey is the key by which plugin manager plugin state is // stored at managerPluginStateKey = []byte("plugin_state") ) // taskBucketName returns the bucket name for the given task name. func taskBucketName(taskName string) []byte { return []byte("task-" + taskName) } // NewStateDBFunc creates a StateDB given a state directory. type NewStateDBFunc func(stateDir string) (StateDB, error) // GetStateDBFactory returns a func for creating a StateDB func GetStateDBFactory(devMode bool) NewStateDBFunc { // Return a noop state db implementation when in debug mode if devMode { return func(string) (StateDB, error) { return NoopDB{}, nil } } return NewBoltStateDB } // BoltStateDB persists and restores Nomad client state in a boltdb. All // methods are safe for concurrent access. type BoltStateDB struct { db *boltdd.DB } // NewBoltStateDB creates or opens an existing boltdb state file or returns an // error. func NewBoltStateDB(stateDir string) (StateDB, error) { // Create or open the boltdb state database db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil) if err != nil { return nil, fmt.Errorf("failed to create state database: %v", err) } sdb := &BoltStateDB{ db: db, } return sdb, nil } func (s *BoltStateDB) Name() string { return "boltdb" } // GetAllAllocations gets all allocations persisted by this client and returns // a map of alloc ids to errors for any allocations that could not be restored. // // If a fatal error was encountered it will be returned and the other two // values will be nil. func (s *BoltStateDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) { var allocs []*structs.Allocation var errs map[string]error err := s.db.View(func(tx *boltdd.Tx) error { allocs, errs = s.getAllAllocations(tx) return nil }) // db.View itself may return an error, so still check if err != nil { return nil, nil, err } return allocs, errs, nil } // allocEntry wraps values in the Allocations buckets type allocEntry struct { Alloc *structs.Allocation } func (s *BoltStateDB) getAllAllocations(tx *boltdd.Tx) ([]*structs.Allocation, map[string]error) { allocs := []*structs.Allocation{} errs := map[string]error{} allocationsBkt := tx.Bucket(allocationsBucketName) if allocationsBkt == nil { // No allocs return allocs, errs } // Create a cursor for iteration. c := allocationsBkt.BoltBucket().Cursor() // Iterate over all the allocation buckets for k, _ := c.First(); k != nil; k, _ = c.Next() { allocID := string(k) allocBkt := allocationsBkt.Bucket(k) if allocBkt == nil { errs[allocID] = fmt.Errorf("missing alloc bucket") continue } var ae allocEntry if err := allocBkt.Get(allocKey, &ae); err != nil { errs[allocID] = fmt.Errorf("failed to decode alloc: %v", err) continue } allocs = append(allocs, ae.Alloc) } return allocs, errs } // PutAllocation stores an allocation or returns an error. func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { return err } // Retrieve the specific allocations bucket key := []byte(alloc.ID) allocBkt, err := allocsBkt.CreateBucketIfNotExists(key) if err != nil { return err } allocState := allocEntry{ Alloc: alloc, } return allocBkt.Put(allocKey, &allocState) }) } // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. LocalState or TaskState will be nil if they do not exist. // // If an error is encountered both LocalState and TaskState will be nil. func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.LocalState, *structs.TaskState, error) { var ls *trstate.LocalState var ts *structs.TaskState err := s.db.View(func(tx *boltdd.Tx) error { allAllocsBkt := tx.Bucket(allocationsBucketName) if allAllocsBkt == nil { // No state, return return nil } allocBkt := allAllocsBkt.Bucket([]byte(allocID)) if allocBkt == nil { // No state for alloc, return return nil } taskBkt := allocBkt.Bucket(taskBucketName(taskName)) if taskBkt == nil { // No state for task, return return nil } // Restore Local State if it exists ls = &trstate.LocalState{} if err := taskBkt.Get(taskLocalStateKey, ls); err != nil { if !boltdd.IsErrNotFound(err) { return fmt.Errorf("failed to read local task runner state: %v", err) } // Key not found, reset ls to nil ls = nil } // Restore Task State if it exists ts = &structs.TaskState{} if err := taskBkt.Get(taskStateKey, ts); err != nil { if !boltdd.IsErrNotFound(err) { return fmt.Errorf("failed to read task state: %v", err) } // Key not found, reset ts to nil ts = nil } return nil }) if err != nil { return nil, nil, err } return ls, ts, nil } // PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error. func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error { return s.db.Update(func(tx *boltdd.Tx) error { taskBkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } if err := taskBkt.Put(taskLocalStateKey, val); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } return nil }) } // PutTaskState stores a task's state or returns an error. func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error { return s.db.Update(func(tx *boltdd.Tx) error { taskBkt, err := getTaskBucket(tx, allocID, taskName) if err != nil { return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } return taskBkt.Put(taskStateKey, state) }) } // DeleteTaskBucket is used to delete a task bucket if it exists. func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocations := tx.Bucket(allocationsBucketName) if allocations == nil { return nil } // Retrieve the specific allocations bucket alloc := allocations.Bucket([]byte(allocID)) if alloc == nil { return nil } // Check if the bucket exists key := taskBucketName(taskName) return alloc.DeleteBucket(key) }) } // DeleteAllocationBucket is used to delete an allocation bucket if it exists. func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root allocations bucket allocations := tx.Bucket(allocationsBucketName) if allocations == nil { return nil } key := []byte(allocID) return allocations.DeleteBucket(key) }) } // Close releases all database resources and unlocks the database file on disk. // All transactions must be closed before closing the database. func (s *BoltStateDB) Close() error { return s.db.Close() } // getAllocationBucket returns the bucket used to persist state about a // particular allocation. If the root allocation bucket or the specific // allocation bucket doesn't exist, it will be created as long as the // transaction is writable. func getAllocationBucket(tx *boltdd.Tx, allocID string) (*boltdd.Bucket, error) { var err error w := tx.Writable() // Retrieve the root allocations bucket allocations := tx.Bucket(allocationsBucketName) if allocations == nil { if !w { return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") } allocations, err = tx.CreateBucketIfNotExists(allocationsBucketName) if err != nil { return nil, err } } // Retrieve the specific allocations bucket key := []byte(allocID) alloc := allocations.Bucket(key) if alloc == nil { if !w { return nil, fmt.Errorf("Allocation bucket doesn't exist and transaction is not writable") } alloc, err = allocations.CreateBucket(key) if err != nil { return nil, err } } return alloc, nil } // getTaskBucket returns the bucket used to persist state about a // particular task. If the root allocation bucket, the specific // allocation or task bucket doesn't exist, they will be created as long as the // transaction is writable. func getTaskBucket(tx *boltdd.Tx, allocID, taskName string) (*boltdd.Bucket, error) { alloc, err := getAllocationBucket(tx, allocID) if err != nil { return nil, err } // Retrieve the specific task bucket w := tx.Writable() key := taskBucketName(taskName) task := alloc.Bucket(key) if task == nil { if !w { return nil, fmt.Errorf("Task bucket doesn't exist and transaction is not writable") } task, err = alloc.CreateBucket(key) if err != nil { return nil, err } } return task, nil } // PutDevicePluginState stores the device manager's plugin state or returns an // error. func (s *BoltStateDB) PutDevicePluginState(ps *dmstate.PluginState) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root device manager bucket devBkt, err := tx.CreateBucketIfNotExists(devManagerBucket) if err != nil { return err } return devBkt.Put(managerPluginStateKey, ps) }) } // GetDevicePluginState stores the device manager's plugin state or returns an // error. func (s *BoltStateDB) GetDevicePluginState() (*dmstate.PluginState, error) { var ps *dmstate.PluginState err := s.db.View(func(tx *boltdd.Tx) error { devBkt := tx.Bucket(devManagerBucket) if devBkt == nil { // No state, return return nil } // Restore Plugin State if it exists ps = &dmstate.PluginState{} if err := devBkt.Get(managerPluginStateKey, ps); err != nil { if !boltdd.IsErrNotFound(err) { return fmt.Errorf("failed to read device manager plugin state: %v", err) } // Key not found, reset ps to nil ps = nil } return nil }) if err != nil { return nil, err } return ps, nil } // PutDriverPluginState stores the driver manager's plugin state or returns an // error. func (s *BoltStateDB) PutDriverPluginState(ps *driverstate.PluginState) error { return s.db.Update(func(tx *boltdd.Tx) error { // Retrieve the root driver manager bucket driverBkt, err := tx.CreateBucketIfNotExists(driverManagerBucket) if err != nil { return err } return driverBkt.Put(managerPluginStateKey, ps) }) } // GetDriverPluginState stores the driver manager's plugin state or returns an // error. func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) { var ps *driverstate.PluginState err := s.db.View(func(tx *boltdd.Tx) error { driverBkt := tx.Bucket(driverManagerBucket) if driverBkt == nil { // No state, return return nil } // Restore Plugin State if it exists ps = &driverstate.PluginState{} if err := driverBkt.Get(managerPluginStateKey, ps); err != nil { if !boltdd.IsErrNotFound(err) { return fmt.Errorf("failed to read driver manager plugin state: %v", err) } // Key not found, reset ps to nil ps = nil } return nil }) if err != nil { return nil, err } return ps, nil }