open-nomad/client/state/upgrade.go

353 lines
9.1 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package state
import (
"bytes"
"container/list"
"fmt"
"os"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/helper/boltdd"
"github.com/hashicorp/nomad/nomad/structs"
"go.etcd.io/bbolt"
)
// NeedsUpgrade returns true if the BoltDB needs upgrading or false if it is
// already up to date.
func NeedsUpgrade(bdb *bbolt.DB) (upgradeTo09, upgradeTo13 bool, err error) {
upgradeTo09 = true
upgradeTo13 = true
err = bdb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(metaBucketName)
if b == nil {
// No meta bucket; upgrade
return nil
}
v := b.Get(metaVersionKey)
if len(v) == 0 {
// No version; upgrade
return nil
}
if bytes.Equal(v, []byte{'2'}) {
upgradeTo09 = false
return nil
}
if bytes.Equal(v, metaVersion) {
upgradeTo09 = false
upgradeTo13 = false
return nil
}
// Version exists but does not match. Abort.
return fmt.Errorf("incompatible state version. expected %q but found %q",
metaVersion, v)
})
return
}
// addMeta adds version metadata to BoltDB to mark it as upgraded and
// should be run at the end of the upgrade transaction.
func addMeta(tx *bbolt.Tx) error {
// Create the meta bucket if it doesn't exist
bkt, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
return bkt.Put(metaVersionKey, metaVersion)
}
// backupDB backs up the existing state database prior to upgrade overwriting
// previous backups.
func backupDB(bdb *bbolt.DB, dst string) error {
fd, err := os.Create(dst)
if err != nil {
return err
}
return bdb.View(func(tx *bbolt.Tx) error {
if _, err := tx.WriteTo(fd); err != nil {
fd.Close()
return err
}
return fd.Close()
})
}
// UpgradeAllocs upgrades the boltdb schema. Example 0.8 schema:
//
// allocations
// 15d83e8a-74a2-b4da-3f17-ed5c12895ea8
// echo
// simple-all (342 bytes)
// alloc (2827 bytes)
// alloc-dir (166 bytes)
// immutable (15 bytes)
// mutable (1294 bytes)
func UpgradeAllocs(logger hclog.Logger, tx *boltdd.Tx) error {
btx := tx.BoltTx()
allocationsBucket := btx.Bucket(allocationsBucketName)
if allocationsBucket == nil {
// No state!
return nil
}
// Gather alloc buckets and remove unexpected key/value pairs
allocBuckets := [][]byte{}
cur := allocationsBucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v != nil {
logger.Warn("deleting unexpected key in state db",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return fmt.Errorf("error deleting unexpected key %q: %v", string(k), err)
}
continue
}
allocBuckets = append(allocBuckets, k)
}
for _, allocBucket := range allocBuckets {
allocID := string(allocBucket)
bkt := allocationsBucket.Bucket(allocBucket)
if bkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", allocID)
}
allocLogger := logger.With("alloc_id", allocID)
if err := upgradeAllocBucket(allocLogger, tx, bkt, allocID); err != nil {
// Log and drop invalid allocs
allocLogger.Error("dropping invalid allocation due to error while upgrading state",
"error", err,
)
// If we can't delete the bucket something is seriously
// wrong, fail hard.
if err := allocationsBucket.DeleteBucket(allocBucket); err != nil {
return fmt.Errorf("error deleting invalid allocation state: %v", err)
}
}
}
return nil
}
// upgradeAllocBucket upgrades an alloc bucket.
func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bbolt.Bucket, allocID string) error {
allocFound := false
taskBuckets := [][]byte{}
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
switch string(k) {
case "alloc":
// Alloc has not changed; leave it be
allocFound = true
case "alloc-dir":
// Drop alloc-dir entries as they're no longer needed.
cur.Delete()
case "immutable":
// Drop immutable state. Nothing from it needs to be
// upgraded.
cur.Delete()
case "mutable":
// Decode and upgrade
if err := upgradeOldAllocMutable(tx, allocID, v); err != nil {
return err
}
cur.Delete()
default:
if v != nil {
logger.Warn("deleting unexpected state entry for allocation",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return err
}
continue
}
// Nested buckets are tasks
taskBuckets = append(taskBuckets, k)
}
}
// If the alloc entry was not found, abandon this allocation as the
// state has been corrupted.
if !allocFound {
return fmt.Errorf("alloc entry not found")
}
// Upgrade tasks
for _, taskBucket := range taskBuckets {
taskName := string(taskBucket)
taskLogger := logger.With("task_name", taskName)
taskBkt := bkt.Bucket(taskBucket)
if taskBkt == nil {
// This should never happen as we just read the bucket.
return fmt.Errorf("unexpected bucket missing %q", taskName)
}
oldState, err := upgradeTaskBucket(taskLogger, taskBkt)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
continue
}
// Convert 0.8 task state to 0.9 task state
localTaskState, err := oldState.Upgrade(allocID, taskName)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
continue
}
// Insert the new task state
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {
return err
}
// Delete the old task bucket
if err := bkt.DeleteBucket(taskBucket); err != nil {
return err
}
taskLogger.Trace("upgraded", "from", oldState.Version)
}
return nil
}
// upgradeTaskBucket iterates over keys in a task bucket, deleting invalid keys
// and returning the 0.8 version of the state.
func upgradeTaskBucket(logger hclog.Logger, bkt *bbolt.Bucket) (*taskRunnerState08, error) {
simpleFound := false
var trState taskRunnerState08
cur := bkt.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
if v == nil {
// value is nil: delete unexpected bucket
logger.Warn("deleting unexpected task state bucket",
"bucket", string(k),
)
if err := bkt.DeleteBucket(k); err != nil {
return nil, fmt.Errorf("error deleting unexpected task bucket %q: %v", string(k), err)
}
continue
}
if !bytes.Equal(k, []byte("simple-all")) {
// value is non-nil: delete unexpected entry
logger.Warn("deleting unexpected task state entry",
"key", string(k), "value_bytes", len(v),
)
if err := cur.Delete(); err != nil {
return nil, fmt.Errorf("error delting unexpected task key %q: %v", string(k), err)
}
continue
}
// Decode simple-all
simpleFound = true
if err := codec.NewDecoderBytes(v, structs.MsgpackHandle).Decode(&trState); err != nil {
return nil, fmt.Errorf("failed to decode task state from 'simple-all' entry: %v", err)
}
}
if !simpleFound {
return nil, fmt.Errorf("task state entry not found")
}
return &trState, nil
}
// upgradeOldAllocMutable upgrades Nomad 0.8 alloc runner state.
func upgradeOldAllocMutable(tx *boltdd.Tx, allocID string, oldBytes []byte) error {
var oldMutable allocRunnerMutableState08
err := codec.NewDecoderBytes(oldBytes, structs.MsgpackHandle).Decode(&oldMutable)
if err != nil {
return err
}
// Upgrade Deployment Status
if err := putDeploymentStatusImpl(tx, allocID, oldMutable.DeploymentStatus); err != nil {
return err
}
// Upgrade Task States
for taskName, taskState := range oldMutable.TaskStates {
if err := putTaskStateImpl(tx, allocID, taskName, taskState); err != nil {
return err
}
}
return nil
}
func UpgradeDynamicPluginRegistry(logger hclog.Logger, tx *boltdd.Tx) error {
dynamicBkt := tx.Bucket(dynamicPluginBucketName)
if dynamicBkt == nil {
return nil // no previous plugins upgrade
}
oldState := &RegistryState12{}
if err := dynamicBkt.Get(registryStateKey, oldState); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read dynamic plugin registry state: %v", err)
}
}
newState := &dynamicplugins.RegistryState{
Plugins: make(map[string]map[string]*list.List),
}
for ptype, plugins := range oldState.Plugins {
newState.Plugins[ptype] = make(map[string]*list.List)
for pname, pluginInfo := range plugins {
newState.Plugins[ptype][pname] = list.New()
entry := list.Element{Value: pluginInfo}
newState.Plugins[ptype][pname].PushFront(entry)
}
}
return dynamicBkt.Put(registryStateKey, newState)
}