open-nomad/client/state/upgrade.go
Seth Hoenig b3ea68948b build: run gofmt on all go source files
Go 1.19 will forecefully format all your doc strings. To get this
out of the way, here is one big commit with all the changes gofmt
wants to make.
2022-08-16 11:14:11 -05:00

350 lines
9 KiB
Go

package state
import (
"bytes"
"container/list"
"fmt"
"os"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
"go.etcd.io/bbolt"
)
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
// already up to date.
func NeedsUpgrade(bdb *bbolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
upgradeTo09 = true
upgradeTo13 = true
err = bdb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(metaBucketName)
if b == nil {
// No meta bucket; upgrade
return nil
}
v := b.Get(metaVersionKey)
if len(v) == 0 {
// No version; upgrade
return nil
}
if bytes.Equal(v, []byte{'2'}) {
upgradeTo09 = false
return nil
}
if bytes.Equal(v, metaVersion) {
upgradeTo09 = false
upgradeTo13 = false
return nil
}
// Version exists but does not match. Abort.
return fmt.Errorf("incompatible state version. expected %q but found %q",
metaVersion, v)
})
return
}
// addMeta adds version metadata to BoltDB to mark it as upgraded and
// should be run at the end of the upgrade transaction.
func addMeta(tx *bbolt.Tx) error {
// Create the meta bucket if it doesn't exist
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
return bkt.Put(metaVersionKey, metaVersion)
}
// backupDB backs up the existing state database prior to upgrade overwriting
// previous backups.
func backupDB(bdb *bbolt.DB, dst string) error {
fd, err := os.Create(dst)
if err != nil {
return err
}
return bdb.View(func(tx *bbolt.Tx) error {
if _, err := tx.WriteTo(fd); err != nil {
fd.Close()
return err
}
return fd.Close()
})
}
// UpgradeAllocs upgrades the boltdb schema. Example 0.8 schema:
//
// allocations
// 15d83e8a-74a2-b4da-3f17-ed5c12895ea8
// echo
// simple-all (342 bytes)
// alloc (2827 bytes)
// alloc-dir (166 bytes)
// immutable (15 bytes)
// mutable (1294 bytes)
func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
btx := tx.BoltTx()
allocationsBucket := btx.Bucket(allocationsBucketName)
if allocationsBucket == nil {
// No state!
return nil
}
// Gather alloc buckets and remove unexpected key/value pairs
allocBuckets := [][]byte{}
cur := allocationsBucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v != nil {
logger.Warn("deleting unexpected key in state db",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return fmt.Errorf("error deleting unexpected key %q: %v", string(k), err)
}
continue
}
allocBuckets = append(allocBuckets, k)
}
for _, allocBucket := range allocBuckets {
allocID := string(allocBucket)
bkt := allocationsBucket.Bucket(allocBucket)
if bkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", allocID)
}
allocLogger := logger.With("alloc_id", allocID)
if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil {
// Log and drop invalid allocs
allocLogger.Error("dropping invalid allocation due to error while upgrading state",
"error", err,
)
// If we can't delete the bucket something is seriously
// wrong, fail hard.
if err := allocationsBucket.DeleteBucket(allocBucket); err != nil {
return fmt.Errorf("error deleting invalid allocation state: %v", err)
}
}
}
return nil
}
// upgradeAllocBucket upgrades an alloc bucket.
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bbolt.Bucket, allocID string) error {
allocFound := false
taskBuckets := [][]byte{}
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
switch string(k) {
case "alloc":
// Alloc has not changed; leave it be
allocFound = true
case "alloc-dir":
// Drop alloc-dir entries as they're no longer needed.
cur.Delete()
case "immutable":
// Drop immutable state. Nothing from it needs to be
// upgraded.
cur.Delete()
case "mutable":
// Decode and upgrade
if err := upgradeOldAllocMutable(tx, allocID, v); err != nil {
return err
}
cur.Delete()
default:
if v != nil {
logger.Warn("deleting unexpected state entry for allocation",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return err
}
continue
}
// Nested buckets are tasks
taskBuckets = append(taskBuckets, k)
}
}
// If the alloc entry was not found, abandon this allocation as the
// state has been corrupted.
if !allocFound {
return fmt.Errorf("alloc entry not found")
}
// Upgrade tasks
for _, taskBucket := range taskBuckets {
taskName := string(taskBucket)
taskLogger := logger.With("task_name", taskName)
taskBkt := bkt.Bucket(taskBucket)
if taskBkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", taskName)
}
oldState, err := upgradeTaskBucket(taskLogger, taskBkt)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
continue
}
// Convert 0.8 task state to 0.9 task state
localTaskState, err := oldState.Upgrade(allocID, taskName)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
continue
}
// Insert the new task state
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {
return err
}
// Delete the old task bucket
if err := bkt.DeleteBucket(taskBucket); err != nil {
return err
}
taskLogger.Trace("upgraded", "from", oldState.Version)
}
return nil
}
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
// and returning the 0.8 version of the state.
func upgradeTaskBucket(logger hclog.Logger, bkt *bbolt.Bucket) (*taskRunnerState08, error) {
simpleFound := false
var trState taskRunnerState08
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v == nil {
// value is nil: delete unexpected bucket
logger.Warn("deleting unexpected task state bucket",
"bucket", string(k),
)
if err := bkt.DeleteBucket(k); err != nil {
return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err)
}
continue
}
if !bytes.Equal(k, []byte("simple-all")) {
// value is non-nil: delete unexpected entry
logger.Warn("deleting unexpected task state entry",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return nil, fmt.Errorf("error delting unexpected task key %q: %v", string(k), err)
}
continue
}
// Decode simple-all
simpleFound = true
if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil {
return nil, fmt.Errorf("failed to decode task state from 'simple-all' entry: %v", err)
}
}
if !simpleFound {
return nil, fmt.Errorf("task state entry not found")
}
return &trState, nil
}
// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state.
func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error {
var oldMutable allocRunnerMutableState08
err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable)
if err != nil {
return err
}
// Upgrade Deployment Status
if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil {
return err
}
// Upgrade Task States
for taskName, taskState := range oldMutable.TaskStates {
if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil {
return err
}
}
return nil
}
func UpgradeDynamicPluginRegistry(logger hclog.Logger, tx *boltdd.Tx) error {
dynamicBkt := tx.Bucket(dynamicPluginBucketName)
if dynamicBkt == nil {
return nil // no previous plugins upgrade
}
oldState := &RegistryState12{}
if err := dynamicBkt.Get(registryStateKey, oldState); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read dynamic plugin registry state: %v", err)
}
}
newState := &dynamicplugins.RegistryState{
Plugins: make(map[string]map[string]*list.List),
}
for ptype, plugins := range oldState.Plugins {
newState.Plugins[ptype] = make(map[string]*list.List)
for pname, pluginInfo := range plugins {
newState.Plugins[ptype][pname] = list.New()
entry := list.Element{Value: pluginInfo}
newState.Plugins[ptype][pname].PushFront(entry)
}
}
return dynamicBkt.Put(registryStateKey, newState)
}