Parallel migration (#18815) (#18817)

* Parallel migration (#18815)
* flagParallel sanity check
* Attempt to use ErrGroups
* Updated docs
* Allow 'start' and 'max-parallel' together
* parallel flag renamed to max-parallel
* tests for start + parallel
* Removed permit pool
* Updated docs to make it clearer that a high setting might not be honored based on storage backend setting
* System dependent max int size
* Default max-parallel 1 => 10
* Test folder/paths updated

Co-authored-by: Tomasz Pawelczak <10206601+gites@users.noreply.github.com>
Co-authored-by: Mike Palmiotto <mike.palmiotto@hashicorp.com>
This commit is contained in:
Peter Wilson 2023-01-25 15:19:45 +00:00 committed by GitHub
parent 5dc4e5bc1d
commit 292207b7d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 36 deletions

3
changelog/18817.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
migration: allow parallelization of key migration for `vault operator migrate` in order to speed up a migration.
```

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"sort"
@ -22,6 +23,7 @@ import (
"github.com/mitchellh/cli"
"github.com/pkg/errors"
"github.com/posener/complete"
"golang.org/x/sync/errgroup"
)
var (
@ -39,6 +41,7 @@ type OperatorMigrateCommand struct {
flagLogLevel string
flagStart string
flagReset bool
flagMaxParallel int
logger log.Logger
ShutdownCh chan struct{}
}
@ -98,6 +101,14 @@ func (c *OperatorMigrateCommand) Flags() *FlagSets {
Usage: "Reset the migration lock. No migration will occur.",
})
f.IntVar(&IntVar{
Name: "max-parallel",
Default: 10,
Target: &c.flagMaxParallel,
Usage: "Specifies the maximum number of parallel migration threads (goroutines) that may be used when migrating. " +
"This can speed up the migration process on slow backends but uses more resources.",
})
f.StringVar(&StringVar{
Name: "log-level",
Target: &c.flagLogLevel,
@ -126,7 +137,6 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
c.UI.Error(err.Error())
return 1
}
c.flagLogLevel = strings.ToLower(c.flagLogLevel)
validLevels := []string{"trace", "debug", "info", "warn", "error"}
if !strutil.StrListContains(validLevels, c.flagLogLevel) {
@ -135,6 +145,11 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}
c.logger = logging.NewVaultLogger(log.LevelFromString(c.flagLogLevel))
if c.flagMaxParallel < 1 {
c.UI.Error(fmt.Sprintf("Argument to flag -max-parallel must be between 1 and %d", math.MaxInt))
return 1
}
if c.flagConfig == "" {
c.UI.Error("Must specify exactly one config path using -config")
return 1
@ -164,7 +179,7 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}
// migrate attempts to instantiate the source and destinations backends,
// and then invoke the migration the the root of the keyspace.
// and then invoke the migration the root of the keyspace.
func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
from, err := c.newBackend(config.StorageSource.Type, config.StorageSource.Config)
if err != nil {
@ -209,7 +224,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
doneCh := make(chan error)
go func() {
doneCh <- c.migrateAll(ctx, from, to)
doneCh <- c.migrateAll(ctx, from, to, c.flagMaxParallel)
}()
select {
@ -225,8 +240,8 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
}
// migrateAll copies all keys in lexicographic order.
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend) error {
return dfsScan(ctx, from, func(ctx context.Context, path string) error {
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend, maxParallel int) error {
return dfsScan(ctx, from, maxParallel, func(ctx context.Context, path string) error {
if path < c.flagStart || path == storageMigrationLock || path == vault.CoreLockPath {
return nil
}
@ -365,10 +380,20 @@ func parseStorage(result *migratorConfig, list *ast.ObjectList, name string) err
// dfsScan will invoke cb with every key from source.
// Keys will be traversed in lexicographic, depth-first order.
func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.Context, path string) error) error {
func dfsScan(ctx context.Context, source physical.Backend, maxParallel int, cb func(ctx context.Context, path string) error) error {
dfs := []string{""}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(maxParallel)
for l := len(dfs); l > 0; l = len(dfs) {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
key := dfs[len(dfs)-1]
if key == "" || strings.HasSuffix(key, "/") {
children, err := source.List(ctx, key)
@ -385,19 +410,14 @@ func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.C
}
}
} else {
err := cb(ctx, key)
if err != nil {
return err
}
// Pooling
eg.Go(func() error {
return cb(ctx, key)
})
dfs = dfs[:len(dfs)-1]
}
select {
case <-ctx.Done():
return nil
default:
}
}
return nil
return eg.Wait()
}

View File

@ -4,13 +4,13 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
@ -18,7 +18,6 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/base62"
"github.com/hashicorp/vault/command/server"
"github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/vault"
)
@ -35,8 +34,45 @@ func TestMigration(t *testing.T) {
fromFactory := physicalBackends["file"]
folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := t.TempDir()
confFrom := map[string]string{
"path": folder,
}
from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}
toFactory := physicalBackends["inmem"]
confTo := map[string]string{}
to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}
cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}
if err := compareStoredData(to, data, ""); err != nil {
t.Fatal(err)
}
})
t.Run("Concurrent migration", func(t *testing.T) {
data := generateData()
fromFactory := physicalBackends["file"]
folder := t.TempDir()
confFrom := map[string]string{
"path": folder,
}
@ -59,10 +95,10 @@ func TestMigration(t *testing.T) {
cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}
if err := compareStoredData(to, data, ""); err != nil {
t.Fatal(err)
}
@ -82,8 +118,7 @@ func TestMigration(t *testing.T) {
}
toFactory := physicalBackends["file"]
folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := t.TempDir()
confTo := map[string]string{
"path": folder,
}
@ -99,7 +134,46 @@ func TestMigration(t *testing.T) {
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}
if err := compareStoredData(to, data, start); err != nil {
t.Fatal(err)
}
})
t.Run("Start option (parallel)", func(t *testing.T) {
data := generateData()
fromFactory := physicalBackends["inmem"]
confFrom := map[string]string{}
from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}
toFactory := physicalBackends["file"]
folder := t.TempDir()
confTo := map[string]string{
"path": folder,
}
to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}
const start = "m"
cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}
@ -110,9 +184,8 @@ func TestMigration(t *testing.T) {
t.Run("Config parsing", func(t *testing.T) {
cmd := new(OperatorMigrateCommand)
cfgName := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
ioutil.WriteFile(cfgName, []byte(`
cfgName := filepath.Join(t.TempDir(), "migrator")
os.WriteFile(cfgName, []byte(`
storage_source "src_type" {
path = "src_path"
}
@ -120,7 +193,6 @@ storage_source "src_type" {
storage_destination "dest_type" {
path = "dest_path"
}`), 0o644)
defer os.Remove(cfgName)
expCfg := &migratorConfig{
StorageSource: &server.Storage{
@ -145,7 +217,7 @@ storage_destination "dest_type" {
}
verifyBad := func(cfg string) {
ioutil.WriteFile(cfgName, []byte(cfg), 0o644)
os.WriteFile(cfgName, []byte(cfg), 0o644)
_, err := cmd.loadMigratorConfig(cfgName)
if err == nil {
t.Fatalf("expected error but none received from: %v", cfg)
@ -192,6 +264,7 @@ storage_destination "dest_type2" {
path = "dest_path"
}`)
})
t.Run("DFS Scan", func(t *testing.T) {
s, _ := physicalBackends["inmem"](map[string]string{}, nil)
@ -204,9 +277,16 @@ storage_destination "dest_type2" {
l := randomLister{s}
var out []string
dfsScan(context.Background(), l, func(ctx context.Context, path string) error {
out = append(out, path)
type SafeAppend struct {
out []string
lock sync.Mutex
}
outKeys := SafeAppend{}
dfsScan(context.Background(), l, 10, func(ctx context.Context, path string) error {
outKeys.lock.Lock()
defer outKeys.lock.Unlock()
outKeys.out = append(outKeys.out, path)
return nil
})
@ -218,8 +298,11 @@ storage_destination "dest_type2" {
keys = append(keys, key)
}
sort.Strings(keys)
if !reflect.DeepEqual(keys, out) {
t.Fatalf("expected equal: %v, %v", keys, out)
outKeys.lock.Lock()
sort.Strings(outKeys.out)
outKeys.lock.Unlock()
if !reflect.DeepEqual(keys, outKeys.out) {
t.Fatalf("expected equal: %v, %v", keys, outKeys.out)
}
})
}

View File

@ -132,3 +132,11 @@ The following flags are available for the `operator migrate` command.
- `-reset` - Reset the migration lock. A lock file is added during migration to prevent
starting the Vault server or another migration. The `-reset` option can be used to
remove a stale lock file if present.
- `-max-parallel` `int: 10` - Allows the operator to specify the maximum number of lightweight threads (goroutines)
which may be used to migrate data in parallel. This can potentially speed up migration on slower backends at
the cost of more resources (e.g. CPU, memory). Permitted values range from `1` (synchronous) to the maximum value
for an `integer`. If not supplied, a default of `10` parallel goroutines will be used.
~> Note: The maximum number of concurrent requests handled by a storage backend is ultimately governed by the
storage backend configuration setting, which enforces a maximum number of concurrent requests (`max_parallel`).