Merge pull request #5003 from hashicorp/f-state-upgrade

Support upgrading state from
This commit is contained in:
Michael Schurter 2018-12-19 11:49:46 -08:00 committed by GitHub
commit f43a8a1053
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1083 additions and 47 deletions

3
.gitignore vendored
View File

@ -39,6 +39,9 @@ website/npm-debug.log
# Test file
exit-code
# Don't commit uncompressed state test files
client/state/testdata/*.db
ui/.sass-cache
ui/static/base.css

View File

@ -334,6 +334,18 @@ func (ar *allocRunner) GetAllocDir() *allocdir.AllocDir {
// Restore state from database. Must be called after NewAllocRunner but before
// Run.
func (ar *allocRunner) Restore() error {
// Retrieve deployment status to avoid reseting it across agent
// restarts. Once a deployment status is set Nomad no longer monitors
// alloc health, so we must persist deployment state across restarts.
ds, err := ar.stateDB.GetDeploymentStatus(ar.id)
if err != nil {
return err
}
ar.stateLock.Lock()
ar.state.DeploymentStatus = ds
ar.stateLock.Unlock()
// Restore task runners
for _, tr := range ar.tasks {
if err := tr.Restore(); err != nil {
@ -344,6 +356,19 @@ func (ar *allocRunner) Restore() error {
return nil
}
// persistDeploymentStatus stores AllocDeploymentStatus.
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
// While any persistence errors are very bad, the worst case
// scenario for failing to persist deployment status is that if
// the agent is restarted it will monitor the deployment status
// again. This could cause a deployment's status to change when
// that shouldn't happen. However, allowing that seems better
// than failing the entire allocation.
ar.logger.Error("error storing deployment status", "error", err)
}
}
// TaskStateUpdated is called by TaskRunner when a task's state has been
// updated. It does not process the update synchronously but instead notifies a
// goroutine the state has change. Since processing the state change may cause

View File

@ -16,6 +16,13 @@ type allocHealthSetter struct {
ar *allocRunner
}
// HasHealth returns true if a deployment status is already set.
func (a *allocHealthSetter) HasHealth() bool {
a.ar.stateLock.Lock()
defer a.ar.stateLock.Unlock()
return a.ar.state.DeploymentStatus.HasHealth()
}
// ClearHealth allows the health watcher hook to clear the alloc's deployment
// health if the deployment id changes. It does not update the server as the
// status is only cleared when already receiving an update from the server.
@ -24,6 +31,7 @@ type allocHealthSetter struct {
func (a *allocHealthSetter) ClearHealth() {
a.ar.stateLock.Lock()
a.ar.state.ClearDeploymentStatus()
a.ar.persistDeploymentStatus(nil)
a.ar.stateLock.Unlock()
}
@ -37,6 +45,7 @@ func (a *allocHealthSetter) SetHealth(healthy, isDeploy bool, trackerTaskEvents
// ModifyIndex as they're only mutated by the server.
a.ar.stateLock.Lock()
a.ar.state.SetDeploymentStatus(time.Now(), healthy)
a.ar.persistDeploymentStatus(a.ar.state.DeploymentStatus)
a.ar.stateLock.Unlock()
// If deployment is unhealthy emit task events explaining why

View File

@ -16,6 +16,9 @@ import (
// healthMutator is able to set/clear alloc health.
type healthSetter interface {
// HasHealth returns true if health is already set.
HasHealth() bool
// Set health via the mutator
SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent)
@ -100,7 +103,7 @@ func (h *allocHealthWatcherHook) Name() string {
// Not threadsafe so the caller should lock since Updates occur concurrently.
func (h *allocHealthWatcherHook) init() error {
// No need to watch health as it's already set
if h.alloc.DeploymentStatus.HasHealth() {
if h.healthSetter.HasHealth() {
return nil
}

View File

@ -76,6 +76,12 @@ func (m *mockHealthSetter) ClearHealth() {
m.taskEvents = nil
}
func (m *mockHealthSetter) HasHealth() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.healthy != nil
}
// TestHealthHook_PrerunPostrun asserts a health hook does not error if it is
// run and postrunned.
func TestHealthHook_PrerunPostrun(t *testing.T) {

View File

@ -26,6 +26,8 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
}
func (*artifactHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "artifacts"
}

View File

@ -28,6 +28,8 @@ func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHo
}
func (*dispatchHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "dispatch_payload"
}

View File

@ -76,6 +76,7 @@ func (h *HookState) Copy() *HookState {
c := new(HookState)
*c = *h
c.Data = helper.CopyMapStringString(c.Data)
c.Env = helper.CopyMapStringString(c.Env)
return c
}
@ -88,5 +89,9 @@ func (h *HookState) Equal(o *HookState) bool {
return false
}
return helper.CompareMapStringString(h.Data, o.Data)
if !helper.CompareMapStringString(h.Data, o.Data) {
return false
}
return helper.CompareMapStringString(h.Env, o.Env)
}

View File

@ -27,6 +27,8 @@ func newTaskDirHook(runner *TaskRunner, logger log.Logger) *taskDirHook {
}
func (h *taskDirHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "task_dir"
}

View File

@ -462,10 +462,21 @@ func (c *Client) init() error {
c.logger.Info("using state directory", "state_dir", c.config.StateDir)
// Open the state database
db, err := state.GetStateDBFactory(c.config.DevMode)(c.config.StateDir)
db, err := state.GetStateDBFactory(c.config.DevMode)(c.logger, c.config.StateDir)
if err != nil {
return fmt.Errorf("failed to open state database: %v", err)
}
// Upgrade the state database
if err := db.Upgrade(); err != nil {
// Upgrade only returns an error on critical persistence
// failures in which an operator should intervene before the
// node is accessible. Upgrade drops and logs corrupt state it
// encounters, so failing to start the agent should be extremely
// rare.
return fmt.Errorf("failed to upgrade state database: %v", err)
}
c.stateDB = db
// Ensure the alloc dir exists if we have one

78
client/state/08types.go Normal file
View File

@ -0,0 +1,78 @@
package state
import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// allocRunnerMutableState08 is state that had to be written on each save as it
// changed over the life-cycle of the alloc_runner in Nomad 0.8.
//
// https://github.com/hashicorp/nomad/blob/v0.8.6/client/alloc_runner.go#L146-L153
//
type allocRunnerMutableState08 struct {
// AllocClientStatus does not need to be upgraded as it is computed
// from task states.
AllocClientStatus string
// AllocClientDescription does not need to be upgraded as it is computed
// from task states.
AllocClientDescription string
TaskStates map[string]*structs.TaskState
DeploymentStatus *structs.AllocDeploymentStatus
}
// taskRunnerState08 was used to snapshot the state of the task runner in Nomad
// 0.8.
//
// https://github.com/hashicorp/nomad/blob/v0.8.6/client/task_runner.go#L188-L197
//
type taskRunnerState08 struct {
Version string
HandleID string
ArtifactDownloaded bool
TaskDirBuilt bool
PayloadRendered bool
DriverNetwork *cstructs.DriverNetwork
// Created Resources are no longer used.
//CreatedResources *driver.CreatedResources
}
func (t *taskRunnerState08) Upgrade() *state.LocalState {
ls := state.NewLocalState()
// Reuse DriverNetwork
ls.DriverNetwork = t.DriverNetwork
// Upgrade artifact state
ls.Hooks["artifacts"] = &state.HookState{
PrestartDone: t.ArtifactDownloaded,
}
// Upgrade task dir state
ls.Hooks["task_dir"] = &state.HookState{
PrestartDone: t.TaskDirBuilt,
}
// Upgrade dispatch payload state
ls.Hooks["dispatch_payload"] = &state.HookState{
PrestartDone: t.PayloadRendered,
}
//TODO How to convert handles?! This does not work.
ls.TaskHandle = drivers.NewTaskHandle("TODO")
//TODO where do we get this from?
ls.TaskHandle.Config = nil
//TODO do we need to se this accurately? Or will RecoverTask handle it?
ls.TaskHandle.State = drivers.TaskStateUnknown
//TODO do we need an envelope so drivers know this is an old state?
ls.TaskHandle.SetDriverState(t.HandleID)
return ls
}

View File

@ -9,17 +9,18 @@ import (
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
func setupBoltStateDB(t *testing.T) (*BoltStateDB, func()) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
db, err := NewBoltStateDB(dir)
db, err := NewBoltStateDB(testlog.HCLogger(t), dir)
if err != nil {
if err := os.RemoveAll(dir); err != nil {
t.Logf("error removing boltdb dir: %v", err)
@ -40,7 +41,7 @@ func setupBoltDB(t *testing.T) (*BoltStateDB, func()) {
}
func testDB(t *testing.T, f func(*testing.T, StateDB)) {
boltdb, cleanup := setupBoltDB(t)
boltdb, cleanup := setupBoltStateDB(t)
defer cleanup()
memdb := NewMemDB()
@ -242,3 +243,13 @@ func TestStateDB_DriverManager(t *testing.T) {
require.Equal(state, ps)
})
}
// TestStateDB_Upgrade asserts calling Upgrade on new databases always
// succeeds.
func TestStateDB_Upgrade(t *testing.T) {
t.Parallel()
testDB(t, func(t *testing.T, db StateDB) {
require.NoError(t, db.Upgrade())
})
}

View File

@ -12,6 +12,11 @@ type StateDB interface {
// Name of implementation.
Name() string
// Upgrade ensures the layout of the database is at the latest version
// or returns an error. Corrupt data will be dropped when possible.
// Errors should be considered critical and unrecoverable.
Upgrade() error
// GetAllAllocations returns all valid allocations and a map of
// allocation IDs to retrieval errors.
//
@ -22,6 +27,11 @@ type StateDB interface {
// not be stored.
PutAllocation(*structs.Allocation) error
// Get/Put DeploymentStatus get and put the allocation's deployment
// status. It may be nil.
GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error)
PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. Either state may be nil if it is not found, but if an
// error is encountered only the error will be non-nil.

View File

@ -15,6 +15,9 @@ type MemDB struct {
// alloc_id -> value
allocs map[string]*structs.Allocation
// alloc_id -> value
deployStatus map[string]*structs.AllocDeploymentStatus
// alloc_id -> task_name -> value
localTaskState map[string]map[string]*state.LocalState
taskState map[string]map[string]*structs.TaskState
@ -31,6 +34,7 @@ type MemDB struct {
func NewMemDB() *MemDB {
return &MemDB{
allocs: make(map[string]*structs.Allocation),
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
}
@ -40,6 +44,10 @@ func (m *MemDB) Name() string {
return "memdb"
}
func (m *MemDB) Upgrade() error {
return nil
}
func (m *MemDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -59,6 +67,19 @@ func (m *MemDB) PutAllocation(alloc *structs.Allocation) error {
return nil
}
func (m *MemDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.deployStatus[allocID], nil
}
func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
m.mu.Lock()
m.deployStatus[allocID] = ds
defer m.mu.Unlock()
return nil
}
func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@ -14,6 +14,10 @@ func (n NoopDB) Name() string {
return "noopdb"
}
func (n NoopDB) Upgrade() error {
return nil
}
func (n NoopDB) GetAllAllocations() ([]*structs.Allocation, map[string]error, error) {
return nil, nil, nil
}
@ -22,6 +26,14 @@ func (n NoopDB) PutAllocation(*structs.Allocation) error {
return nil
}
func (n NoopDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
return nil, nil
}
func (n NoopDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
return nil
}
func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
return nil, nil, nil
}

View File

@ -2,8 +2,11 @@ package state
import (
"fmt"
"os"
"path/filepath"
"time"
hclog "github.com/hashicorp/go-hclog"
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
@ -14,34 +17,53 @@ import (
/*
The client has a boltDB backed state store. The schema as of 0.9 looks as follows:
allocations/ (bucket)
|--> <alloc-id>/ (bucket)
|--> alloc -> *structs.Allocation
|--> alloc_runner persisted objects (k/v)
|--> <task-name>/ (bucket)
|--> task_runner persisted objects (k/v)
meta/
|--> version -> '2' (not msgpack encoded)
|--> upgraded -> time.Now().Format(timeRFC3339)
allocations/
|--> <alloc-id>/
|--> alloc -> allocEntry{*structs.Allocation}
|--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus}
|--> task-<name>/
|--> local_state -> *trstate.LocalState # Local-only state
|--> task_state -> *structs.TaskState # Sync'd to servers
devicemanager/
|--> plugin-state -> *dmstate.PluginState
|--> plugin_state -> *dmstate.PluginState
drivermanager/
|--> plugin-state -> *driverstate.PluginState
|--> plugin_state -> *dmstate.PluginState
*/
var (
// metaBucketName is the name of the metadata bucket
metaBucketName = []byte("meta")
// metaVersionKey is the key the state schema version is stored under.
metaVersionKey = []byte("version")
// metaVersion is the value of the state schema version to detect when
// an upgrade is needed. It skips the usual boltdd/msgpack backend to
// be as portable and futureproof as possible.
metaVersion = []byte{'2'}
// metaUpgradedKey is the key that stores the timestamp of the last
// time the schema was upgraded.
metaUpgradedKey = []byte("upgraded")
// allocationsBucketName is the bucket name containing all allocation related
// data
allocationsBucketName = []byte("allocations")
// allocKey is the key serialized Allocations are stored under
// allocKey is the key Allocations are stored under encapsulated in
// allocEntry structs.
allocKey = []byte("alloc")
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
//XXX Old key - going to need to migrate
//taskRunnerStateAllKey = []byte("simple-all")
// allocDeployStatusKey is the key *structs.AllocDeploymentStatus is
// stored under.
allocDeployStatusKey = []byte("deploy_status")
// allocations -> $allocid -> $taskname -> the keys below
// allocations -> $allocid -> task-$taskname -> the keys below
taskLocalStateKey = []byte("local_state")
taskStateKey = []byte("task_state")
@ -58,14 +80,19 @@ var (
managerPluginStateKey = []byte("plugin_state")
)
// taskBucketName returns the bucket name for the given task name.
func taskBucketName(taskName string) []byte {
return []byte("task-" + taskName)
}
// NewStateDBFunc creates a StateDB given a state directory.
type NewStateDBFunc func(stateDir string) (StateDB, error)
type NewStateDBFunc func(logger hclog.Logger, stateDir string) (StateDB, error)
// GetStateDBFactory returns a func for creating a StateDB
func GetStateDBFactory(devMode bool) NewStateDBFunc {
// Return a noop state db implementation when in debug mode
if devMode {
return func(string) (StateDB, error) {
return func(hclog.Logger, string) (StateDB, error) {
return NoopDB{}, nil
}
}
@ -76,21 +103,42 @@ func GetStateDBFactory(devMode bool) NewStateDBFunc {
// BoltStateDB persists and restores Nomad client state in a boltdb. All
// methods are safe for concurrent access.
type BoltStateDB struct {
db *boltdd.DB
stateDir string
db *boltdd.DB
logger hclog.Logger
}
// NewBoltStateDB creates or opens an existing boltdb state file or returns an
// error.
func NewBoltStateDB(stateDir string) (StateDB, error) {
func NewBoltStateDB(logger hclog.Logger, stateDir string) (StateDB, error) {
fn := filepath.Join(stateDir, "state.db")
// Check to see if the DB already exists
fi, err := os.Stat(fn)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
firstRun := fi == nil
// Create or open the boltdb state database
db, err := boltdd.Open(filepath.Join(stateDir, "state.db"), 0600, nil)
db, err := boltdd.Open(fn, 0600, nil)
if err != nil {
return nil, fmt.Errorf("failed to create state database: %v", err)
}
sdb := &BoltStateDB{
db: db,
stateDir: stateDir,
db: db,
logger: logger,
}
// If db did not already exist, initialize metadata fields
if firstRun {
if err := sdb.init(); err != nil {
return nil, err
}
}
return sdb, nil
}
@ -181,6 +229,64 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error {
})
}
// deployStatusEntry wraps values for DeploymentStatus keys.
type deployStatusEntry struct {
DeploymentStatus *structs.AllocDeploymentStatus
}
// PutDeploymentStatus stores an allocation's DeploymentStatus or returns an
// error.
func (s *BoltStateDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentStatus) error {
return s.db.Update(func(tx *boltdd.Tx) error {
return putDeploymentStatusImpl(tx, allocID, ds)
})
}
func putDeploymentStatusImpl(tx *boltdd.Tx, allocID string, ds *structs.AllocDeploymentStatus) error {
allocBkt, err := getAllocationBucket(tx, allocID)
if err != nil {
return err
}
entry := deployStatusEntry{
DeploymentStatus: ds,
}
return allocBkt.Put(allocDeployStatusKey, &entry)
}
// GetDeploymentStatus retrieves an allocation's DeploymentStatus or returns an
// error.
func (s *BoltStateDB) GetDeploymentStatus(allocID string) (*structs.AllocDeploymentStatus, error) {
var entry deployStatusEntry
err := s.db.View(func(tx *boltdd.Tx) error {
allAllocsBkt := tx.Bucket(allocationsBucketName)
if allAllocsBkt == nil {
// No state, return
return nil
}
allocBkt := allAllocsBkt.Bucket([]byte(allocID))
if allocBkt == nil {
// No state for alloc, return
return nil
}
return allocBkt.Get(allocDeployStatusKey, &entry)
})
// It's valid for this field to be nil/missing
if boltdd.IsErrNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
return entry.DeploymentStatus, nil
}
// GetTaskRunnerState returns the LocalState and TaskState for a
// TaskRunner. LocalState or TaskState will be nil if they do not exist.
//
@ -202,7 +308,7 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
return nil
}
taskBkt := allocBkt.Bucket([]byte(taskName))
taskBkt := allocBkt.Bucket(taskBucketName(taskName))
if taskBkt == nil {
// No state for task, return
return nil
@ -243,31 +349,43 @@ func (s *BoltStateDB) GetTaskRunnerState(allocID, taskName string) (*trstate.Loc
// PutTaskRunnerLocalState stores TaskRunner's LocalState or returns an error.
func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val *trstate.LocalState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
taskBkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := taskBkt.Put(taskLocalStateKey, val); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
return nil
return putTaskRunnerLocalStateImpl(tx, allocID, taskName, val)
})
}
// putTaskRunnerLocalStateImpl stores TaskRunner's LocalState in an ongoing
// transaction or returns an error.
func putTaskRunnerLocalStateImpl(tx *boltdd.Tx, allocID, taskName string, val *trstate.LocalState) error {
taskBkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := taskBkt.Put(taskLocalStateKey, val); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
return nil
}
// PutTaskState stores a task's state or returns an error.
func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.TaskState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
taskBkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
return taskBkt.Put(taskStateKey, state)
return putTaskStateImpl(tx, allocID, taskName, state)
})
}
// putTaskStateImpl stores a task's state in an ongoing transaction or returns
// an error.
func putTaskStateImpl(tx *boltdd.Tx, allocID, taskName string, state *structs.TaskState) error {
taskBkt, err := getTaskBucket(tx, allocID, taskName)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
return taskBkt.Put(taskStateKey, state)
}
// DeleteTaskBucket is used to delete a task bucket if it exists.
func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
return s.db.Update(func(tx *boltdd.Tx) error {
@ -284,7 +402,7 @@ func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error {
}
// Check if the bucket exists
key := []byte(taskName)
key := taskBucketName(taskName)
return alloc.DeleteBucket(key)
})
}
@ -359,7 +477,7 @@ func getTaskBucket(tx *boltdd.Tx, allocID, taskName string) (*boltdd.Bucket, err
// Retrieve the specific task bucket
w := tx.Writable()
key := []byte(taskName)
key := taskBucketName(taskName)
task := alloc.Bucket(key)
if task == nil {
if !w {
@ -468,3 +586,59 @@ func (s *BoltStateDB) GetDriverPluginState() (*driverstate.PluginState, error) {
return ps, nil
}
// init initializes metadata entries in a newly created state database.
func (s *BoltStateDB) init() error {
return s.db.Update(func(tx *boltdd.Tx) error {
return addMeta(tx.BoltTx())
})
}
// Upgrade bolt state db from 0.8 schema to 0.9 schema. Noop if already using
// 0.9 schema. Creates a backup before upgrading.
func (s *BoltStateDB) Upgrade() error {
// Check to see if the underlying DB needs upgrading.
upgrade, err := NeedsUpgrade(s.db.BoltDB())
if err != nil {
return err
}
if !upgrade {
// No upgrade needed!
return nil
}
// Upgraded needed. Backup the boltdb first.
backupFileName := filepath.Join(s.stateDir, "state.db.backup")
if err := backupDB(s.db.BoltDB(), backupFileName); err != nil {
return fmt.Errorf("error backing up state db: %v", err)
}
// Perform the upgrade
if err := s.db.Update(func(tx *boltdd.Tx) error {
if err := UpgradeAllocs(s.logger, tx); err != nil {
return err
}
// Add standard metadata
if err := addMeta(tx.BoltTx()); err != nil {
return err
}
// Write the time the upgrade was done
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
return bkt.Put(metaUpgradedKey, time.Now().Format(time.RFC3339))
}); err != nil {
return err
}
s.logger.Info("successfully upgraded state")
return nil
}
// DB allows access to the underlying BoltDB for testing purposes.
func (s *BoltStateDB) DB() *boltdd.DB {
return s.db
}

BIN
client/state/testdata/state-0.7.1.db.gz (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
client/state/testdata/state-0.8.6-empty.db.gz (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
client/state/testdata/state-0.8.6-no-deploy.db.gz (Stored with Git LFS) vendored Normal file

Binary file not shown.

299
client/state/upgrade.go Normal file
View File

@ -0,0 +1,299 @@
package state
import (
"bytes"
"fmt"
"os"
"github.com/boltdb/bolt"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
// already up to date.
func NeedsUpgrade(bdb *bolt.DB) (bool, error) {
needsUpgrade := true
err := bdb.View(func(tx *bolt.Tx) error {
b := tx.Bucket(metaBucketName)
if b == nil {
// No meta bucket; upgrade
return nil
}
v := b.Get(metaVersionKey)
if len(v) == 0 {
// No version; upgrade
return nil
}
if !bytes.Equal(v, metaVersion) {
// Version exists but does not match. Abort.
return fmt.Errorf("incompatible state version. expected %q but found %q",
metaVersion, v)
}
// Version matches! Assume migrated!
needsUpgrade = false
return nil
})
return needsUpgrade, err
}
// addMeta adds version metadata to BoltDB to mark it as upgraded and
// should be run at the end of the upgrade transaction.
func addMeta(tx *bolt.Tx) error {
// Create the meta bucket if it doesn't exist
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
return bkt.Put(metaVersionKey, metaVersion)
}
// backupDB backs up the existing state database prior to upgrade overwriting
// previous backups.
func backupDB(bdb *bolt.DB, dst string) error {
fd, err := os.Create(dst)
if err != nil {
return err
}
return bdb.View(func(tx *bolt.Tx) error {
if _, err := tx.WriteTo(fd); err != nil {
fd.Close()
return err
}
return fd.Close()
})
}
// UpgradeSchema upgrades the boltdb schema. Example 0.8 schema:
//
// * allocations
// * 15d83e8a-74a2-b4da-3f17-ed5c12895ea8
// * echo
// - simple-all (342 bytes)
// - alloc (2827 bytes)
// - alloc-dir (166 bytes)
// - immutable (15 bytes)
// - mutable (1294 bytes)
//
func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
btx := tx.BoltTx()
allocationsBucket := btx.Bucket(allocationsBucketName)
if allocationsBucket == nil {
// No state!
return nil
}
// Gather alloc buckets and remove unexpected key/value pairs
allocBuckets := [][]byte{}
cur := allocationsBucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v != nil {
logger.Warn("deleting unexpected key in state db",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return fmt.Errorf("error deleting unexpected key %q: %v", string(k), err)
}
continue
}
allocBuckets = append(allocBuckets, k)
}
for _, allocBucket := range allocBuckets {
allocID := string(allocBucket)
bkt := allocationsBucket.Bucket(allocBucket)
if bkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", allocID)
}
allocLogger := logger.With("alloc_id", allocID)
if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil {
// Log and drop invalid allocs
allocLogger.Error("dropping invalid allocation due to error while upgrading state",
"error", err,
)
// If we can't delete the bucket something is seriously
// wrong, fail hard.
if err := allocationsBucket.DeleteBucket(allocBucket); err != nil {
return fmt.Errorf("error deleting invalid allocation state: %v", err)
}
}
}
return nil
}
// upgradeAllocBucket upgrades an alloc bucket.
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, allocID string) error {
allocFound := false
taskBuckets := [][]byte{}
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
switch string(k) {
case "alloc":
// Alloc has not changed; leave it be
allocFound = true
case "alloc-dir":
// Drop alloc-dir entries as they're no longer needed.
cur.Delete()
case "immutable":
// Drop immutable state. Nothing from it needs to be
// upgraded.
cur.Delete()
case "mutable":
// Decode and upgrade
if err := upgradeOldAllocMutable(tx, allocID, v); err != nil {
return err
}
cur.Delete()
default:
if v != nil {
logger.Warn("deleting unexpected state entry for allocation",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return err
}
continue
}
// Nested buckets are tasks
taskBuckets = append(taskBuckets, k)
}
}
// If the alloc entry was not found, abandon this allocation as the
// state has been corrupted.
if !allocFound {
return fmt.Errorf("alloc entry not found")
}
// Upgrade tasks
for _, taskBucket := range taskBuckets {
taskName := string(taskBucket)
taskLogger := logger.With("task_name", taskName)
taskBkt := bkt.Bucket(taskBucket)
if taskBkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", taskName)
}
oldState, err := upgradeTaskBucket(taskLogger, taskBkt)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
}
// Convert 0.8 task state to 0.9 task state
localTaskState := oldState.Upgrade()
// Insert the new task state
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {
return err
}
// Delete the old task bucket
if err := bkt.DeleteBucket(taskBucket); err != nil {
return err
}
taskLogger.Trace("upgraded", "from", oldState.Version)
}
return nil
}
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
// and returning the 0.8 version of the state.
func upgradeTaskBucket(logger hclog.Logger, bkt *bolt.Bucket) (*taskRunnerState08, error) {
simpleFound := false
var trState taskRunnerState08
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v == nil {
// value is nil: delete unexpected bucket
logger.Warn("deleting unexpected task state bucket",
"bucket", string(k),
)
if err := bkt.DeleteBucket(k); err != nil {
return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err)
}
continue
}
if !bytes.Equal(k, []byte("simple-all")) {
// value is non-nil: delete unexpected entry
logger.Warn("deleting unexpected task state entry",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return nil, fmt.Errorf("error delting unexpected task key %q: %v", string(k), err)
}
continue
}
// Decode simple-all
simpleFound = true
if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil {
return nil, fmt.Errorf("failed to decode task state from 'simple-all' entry: %v", err)
}
}
if !simpleFound {
return nil, fmt.Errorf("task state entry not found")
}
return &trState, nil
}
// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state.
func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error {
var oldMutable allocRunnerMutableState08
err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable)
if err != nil {
return err
}
// Upgrade Deployment Status
if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil {
return err
}
// Upgrade Task States
for taskName, taskState := range oldMutable.TaskStates {
if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,159 @@
package state_test
import (
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
. "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestBoltStateDB_Upgrade_Ok asserts upgading an old state db does not error
// during upgrade and restore.
func TestBoltStateDB_UpgradeOld_Ok(t *testing.T) {
t.Parallel()
files, err := filepath.Glob("testdata/*.db*")
require.NoError(t, err)
for _, fn := range files {
t.Run(fn, func(t *testing.T) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
defer os.RemoveAll(dir)
var src io.ReadCloser
src, err = os.Open(fn)
require.NoError(t, err)
defer src.Close()
// testdata may be gzip'd; decode on copy
if strings.HasSuffix(fn, ".gz") {
src, err = gzip.NewReader(src)
require.NoError(t, err)
}
dst, err := os.Create(filepath.Join(dir, "state.db"))
require.NoError(t, err)
// Copy test files before testing them for safety
_, err = io.Copy(dst, src)
require.NoError(t, err)
require.NoError(t, src.Close())
dbI, err := NewBoltStateDB(testlog.HCLogger(t), dir)
require.NoError(t, err)
defer dbI.Close()
db := dbI.(*BoltStateDB)
// Simply opening old files should *not* alter them
require.NoError(t, db.DB().View(func(tx *boltdd.Tx) error {
b := tx.Bucket([]byte("meta"))
if b != nil {
return fmt.Errorf("meta bucket found but should not exist yet!")
}
return nil
}))
needsUpgrade, err := NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.True(t, needsUpgrade)
// Attept the upgrade
require.NoError(t, db.Upgrade())
needsUpgrade, err = NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.False(t, needsUpgrade)
// Ensure Allocations can be restored and
// NewAR/AR.Restore do not error.
allocs, errs, err := db.GetAllAllocations()
require.NoError(t, err)
assert.Len(t, errs, 0)
for _, alloc := range allocs {
checkUpgradedAlloc(t, dir, db, alloc)
}
// Should be nil for all upgrades
ps, err := db.GetDevicePluginState()
require.NoError(t, err)
require.Nil(t, ps)
ps = &dmstate.PluginState{
ReattachConfigs: map[string]*shared.ReattachConfig{
"test": {Pid: 1},
},
}
require.NoError(t, db.PutDevicePluginState(ps))
require.NoError(t, db.Close())
})
}
}
// checkUpgradedAlloc creates and restores an AllocRunner from an upgraded
// database.
//
// It does not call AR.Run as its intended to be used against a wide test
// corpus in testdata that may be expensive to run and require unavailable
// dependencies.
func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Allocation) {
_, err := db.GetDeploymentStatus(alloc.ID)
assert.NoError(t, err)
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, task := range tg.Tasks {
_, _, err := db.GetTaskRunnerState(alloc.ID, task.Name)
require.NoError(t, err)
}
clientConf, cleanup := clientconfig.TestClientConfig(t)
// Does *not* cleanup overridden StateDir below. That's left alone for
// the caller to cleanup.
defer cleanup()
clientConf.StateDir = path
conf := &allocrunner.Config{
Alloc: alloc,
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: db,
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &allocrunner.MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
}
ar, err := allocrunner.NewAllocRunner(conf)
require.NoError(t, err)
// AllocRunner.Restore should not error
require.NoError(t, ar.Restore())
}

View File

@ -0,0 +1,190 @@
package state
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/boltdb/bolt"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
)
func setupBoltDB(t *testing.T) (*bolt.DB, func()) {
dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(t, err)
db, err := bolt.Open(filepath.Join(dir, "state.db"), 0666, nil)
if err != nil {
os.RemoveAll(dir)
require.NoError(t, err)
}
return db, func() {
require.NoError(t, db.Close())
require.NoError(t, os.RemoveAll(dir))
}
}
// TestUpgrade_NeedsUpgrade_New asserts new state dbs do not need upgrading.
func TestUpgrade_NeedsUpgrade_New(t *testing.T) {
t.Parallel()
// Setting up a new StateDB should initialize it at the latest version.
db, cleanup := setupBoltStateDB(t)
defer cleanup()
up, err := NeedsUpgrade(db.DB().BoltDB())
require.NoError(t, err)
require.False(t, up)
}
// TestUpgrade_NeedsUpgrade_Old asserts state dbs with just the alloctions
// bucket *do* need upgrading.
func TestUpgrade_NeedsUpgrade_Old(t *testing.T) {
t.Parallel()
db, cleanup := setupBoltDB(t)
defer cleanup()
// Create the allocations bucket which exists in both the old and 0.9
// schemas
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket(allocationsBucketName)
return err
}))
up, err := NeedsUpgrade(db)
require.NoError(t, err)
require.True(t, up)
// Adding meta should mark it as upgraded
require.NoError(t, db.Update(addMeta))
up, err = NeedsUpgrade(db)
require.NoError(t, err)
require.False(t, up)
}
// TestUpgrade_NeedsUpgrade_Error asserts that an error is returned from
// NeedsUpgrade if an invalid db version is found. This is a safety measure to
// prevent invalid and unintentional upgrades when downgrading Nomad.
func TestUpgrade_NeedsUpgrade_Error(t *testing.T) {
t.Parallel()
cases := [][]byte{
{'"', '2', '"'}, // wrong type
{'1'}, // wrong version (never existed)
{'3'}, // wrong version (future)
}
for _, tc := range cases {
tc := tc
t.Run(fmt.Sprintf("%v", tc), func(t *testing.T) {
db, cleanup := setupBoltDB(t)
defer cleanup()
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
require.NoError(t, err)
return bkt.Put(metaVersionKey, tc)
}))
_, err := NeedsUpgrade(db)
require.Error(t, err)
})
}
}
// TestUpgrade_DeleteInvalidAllocs asserts invalid allocations are deleted
// during state upgades instead of failing the entire agent.
func TestUpgrade_DeleteInvalidAllocs_NoAlloc(t *testing.T) {
t.Parallel()
bdb, cleanup := setupBoltDB(t)
defer cleanup()
db := boltdd.New(bdb)
allocID := []byte(uuid.Generate())
// Create an allocation bucket with no `alloc` key. This is an observed
// pre-0.9 state corruption that should result in the allocation being
// dropped while allowing the upgrade to continue.
require.NoError(t, db.Update(func(tx *boltdd.Tx) error {
parentBkt, err := tx.CreateBucket(allocationsBucketName)
if err != nil {
return err
}
_, err = parentBkt.CreateBucket(allocID)
return err
}))
// Perform the Upgrade
require.NoError(t, db.Update(func(tx *boltdd.Tx) error {
return UpgradeAllocs(testlog.HCLogger(t), tx)
}))
// Assert invalid allocation bucket was removed
require.NoError(t, db.View(func(tx *boltdd.Tx) error {
parentBkt := tx.Bucket(allocationsBucketName)
if parentBkt == nil {
return fmt.Errorf("parent allocations bucket should not have been removed")
}
if parentBkt.Bucket(allocID) != nil {
return fmt.Errorf("invalid alloc bucket should have been deleted")
}
return nil
}))
}
// TestUpgrade_DeleteInvalidTaskEntries asserts invalid entries under a task
// bucket are deleted.
func TestUpgrade_upgradeTaskBucket_InvalidEntries(t *testing.T) {
t.Parallel()
db, cleanup := setupBoltDB(t)
defer cleanup()
taskName := []byte("fake-task")
// Insert unexpected bucket, unexpected key, and missing simple-all
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucket(taskName)
if err != nil {
return err
}
_, err = bkt.CreateBucket([]byte("unexpectedBucket"))
if err != nil {
return err
}
return bkt.Put([]byte("unexepectedKey"), []byte{'x'})
}))
require.NoError(t, db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(taskName)
// upgradeTaskBucket should fail
state, err := upgradeTaskBucket(testlog.HCLogger(t), bkt)
require.Nil(t, state)
require.Error(t, err)
// Invalid entries should have been deleted
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
t.Errorf("unexpected entry found: key=%q value=%q", k, v)
}
return nil
}))
}

View File

@ -1,4 +1,4 @@
// boltdd contains a wrapper around BoltDB to deduplicate writes and encode
// BOLTdd contains a wrapper around BoltDB to deduplicate writes and encode
// values using mgspack. (dd stands for DeDuplicate)
package boltdd
@ -54,10 +54,15 @@ func Open(path string, mode os.FileMode, options *bolt.Options) (*DB, error) {
return nil, err
}
return New(bdb), nil
}
// New deduplicating wrapper for the given boltdb.
func New(bdb *bolt.DB) *DB {
return &DB{
rootBuckets: make(map[string]*bucketMeta),
bdb: bdb,
}, nil
}
}
func (db *DB) bucket(btx *bolt.Tx, name []byte) *Bucket {