From 292207b7d1d5cdf06c79a60845805fa306520ac8 Mon Sep 17 00:00:00 2001 From: Peter Wilson Date: Wed, 25 Jan 2023 15:19:45 +0000 Subject: [PATCH] Parallel migration (#18815) (#18817) * Parallel migration (#18815) * flagParallel sanity check * Attempt to use ErrGroups * Updated docs * Allow 'start' and 'max-parallel' together * parallel flag renamed to max-parallel * tests for start + parallel * Removed permit pool * Updated docs to make it clearer that a high setting might not be honored based on storage backend setting * System dependent max int size * Default max-parallel 1 => 10 * Test folder/paths updated Co-authored-by: Tomasz Pawelczak <10206601+gites@users.noreply.github.com> Co-authored-by: Mike Palmiotto --- changelog/18817.txt | 3 + command/operator_migrate.go | 54 +++++--- command/operator_migrate_test.go | 121 +++++++++++++++--- .../docs/commands/operator/migrate.mdx | 8 ++ 4 files changed, 150 insertions(+), 36 deletions(-) create mode 100644 changelog/18817.txt diff --git a/changelog/18817.txt b/changelog/18817.txt new file mode 100644 index 000000000..17c93aab7 --- /dev/null +++ b/changelog/18817.txt @@ -0,0 +1,3 @@ +```release-note:improvement +migration: allow parallelization of key migration for `vault operator migrate` in order to speed up a migration. +``` \ No newline at end of file diff --git a/command/operator_migrate.go b/command/operator_migrate.go index a974f58d6..5d3fda46a 100644 --- a/command/operator_migrate.go +++ b/command/operator_migrate.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io/ioutil" + "math" "net/url" "os" "sort" @@ -22,6 +23,7 @@ import ( "github.com/mitchellh/cli" "github.com/pkg/errors" "github.com/posener/complete" + "golang.org/x/sync/errgroup" ) var ( @@ -39,6 +41,7 @@ type OperatorMigrateCommand struct { flagLogLevel string flagStart string flagReset bool + flagMaxParallel int logger log.Logger ShutdownCh chan struct{} } @@ -98,6 +101,14 @@ func (c *OperatorMigrateCommand) Flags() *FlagSets { Usage: "Reset the migration lock. No migration will occur.", }) + f.IntVar(&IntVar{ + Name: "max-parallel", + Default: 10, + Target: &c.flagMaxParallel, + Usage: "Specifies the maximum number of parallel migration threads (goroutines) that may be used when migrating. " + + "This can speed up the migration process on slow backends but uses more resources.", + }) + f.StringVar(&StringVar{ Name: "log-level", Target: &c.flagLogLevel, @@ -126,7 +137,6 @@ func (c *OperatorMigrateCommand) Run(args []string) int { c.UI.Error(err.Error()) return 1 } - c.flagLogLevel = strings.ToLower(c.flagLogLevel) validLevels := []string{"trace", "debug", "info", "warn", "error"} if !strutil.StrListContains(validLevels, c.flagLogLevel) { @@ -135,6 +145,11 @@ func (c *OperatorMigrateCommand) Run(args []string) int { } c.logger = logging.NewVaultLogger(log.LevelFromString(c.flagLogLevel)) + if c.flagMaxParallel < 1 { + c.UI.Error(fmt.Sprintf("Argument to flag -max-parallel must be between 1 and %d", math.MaxInt)) + return 1 + } + if c.flagConfig == "" { c.UI.Error("Must specify exactly one config path using -config") return 1 @@ -164,7 +179,7 @@ func (c *OperatorMigrateCommand) Run(args []string) int { } // migrate attempts to instantiate the source and destinations backends, -// and then invoke the migration the the root of the keyspace. +// and then invoke the migration the root of the keyspace. func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { from, err := c.newBackend(config.StorageSource.Type, config.StorageSource.Config) if err != nil { @@ -209,7 +224,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { doneCh := make(chan error) go func() { - doneCh <- c.migrateAll(ctx, from, to) + doneCh <- c.migrateAll(ctx, from, to, c.flagMaxParallel) }() select { @@ -225,8 +240,8 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { } // migrateAll copies all keys in lexicographic order. -func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend) error { - return dfsScan(ctx, from, func(ctx context.Context, path string) error { +func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend, maxParallel int) error { + return dfsScan(ctx, from, maxParallel, func(ctx context.Context, path string) error { if path < c.flagStart || path == storageMigrationLock || path == vault.CoreLockPath { return nil } @@ -365,10 +380,20 @@ func parseStorage(result *migratorConfig, list *ast.ObjectList, name string) err // dfsScan will invoke cb with every key from source. // Keys will be traversed in lexicographic, depth-first order. -func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.Context, path string) error) error { +func dfsScan(ctx context.Context, source physical.Backend, maxParallel int, cb func(ctx context.Context, path string) error) error { dfs := []string{""} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(maxParallel) + for l := len(dfs); l > 0; l = len(dfs) { + // Check for cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + key := dfs[len(dfs)-1] if key == "" || strings.HasSuffix(key, "/") { children, err := source.List(ctx, key) @@ -385,19 +410,14 @@ func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.C } } } else { - err := cb(ctx, key) - if err != nil { - return err - } + // Pooling + eg.Go(func() error { + return cb(ctx, key) + }) dfs = dfs[:len(dfs)-1] } - - select { - case <-ctx.Done(): - return nil - default: - } } - return nil + + return eg.Wait() } diff --git a/command/operator_migrate_test.go b/command/operator_migrate_test.go index 5db53ebbf..d292626d0 100644 --- a/command/operator_migrate_test.go +++ b/command/operator_migrate_test.go @@ -4,13 +4,13 @@ import ( "bytes" "context" "fmt" - "io/ioutil" "math/rand" "os" "path/filepath" "reflect" "sort" "strings" + "sync" "testing" "time" @@ -18,7 +18,6 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/base62" "github.com/hashicorp/vault/command/server" - "github.com/hashicorp/vault/helper/testhelpers" "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/vault" ) @@ -35,8 +34,45 @@ func TestMigration(t *testing.T) { fromFactory := physicalBackends["file"] - folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator")) - defer os.RemoveAll(folder) + folder := t.TempDir() + + confFrom := map[string]string{ + "path": folder, + } + + from, err := fromFactory(confFrom, nil) + if err != nil { + t.Fatal(err) + } + if err := storeData(from, data); err != nil { + t.Fatal(err) + } + + toFactory := physicalBackends["inmem"] + confTo := map[string]string{} + to, err := toFactory(confTo, nil) + if err != nil { + t.Fatal(err) + } + cmd := OperatorMigrateCommand{ + logger: log.NewNullLogger(), + } + if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil { + t.Fatal(err) + } + + if err := compareStoredData(to, data, ""); err != nil { + t.Fatal(err) + } + }) + + t.Run("Concurrent migration", func(t *testing.T) { + data := generateData() + + fromFactory := physicalBackends["file"] + + folder := t.TempDir() + confFrom := map[string]string{ "path": folder, } @@ -59,10 +95,10 @@ func TestMigration(t *testing.T) { cmd := OperatorMigrateCommand{ logger: log.NewNullLogger(), } - if err := cmd.migrateAll(context.Background(), from, to); err != nil { + + if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil { t.Fatal(err) } - if err := compareStoredData(to, data, ""); err != nil { t.Fatal(err) } @@ -82,8 +118,7 @@ func TestMigration(t *testing.T) { } toFactory := physicalBackends["file"] - folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator")) - defer os.RemoveAll(folder) + folder := t.TempDir() confTo := map[string]string{ "path": folder, } @@ -99,7 +134,46 @@ func TestMigration(t *testing.T) { logger: log.NewNullLogger(), flagStart: start, } - if err := cmd.migrateAll(context.Background(), from, to); err != nil { + if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil { + t.Fatal(err) + } + + if err := compareStoredData(to, data, start); err != nil { + t.Fatal(err) + } + }) + + t.Run("Start option (parallel)", func(t *testing.T) { + data := generateData() + + fromFactory := physicalBackends["inmem"] + confFrom := map[string]string{} + from, err := fromFactory(confFrom, nil) + if err != nil { + t.Fatal(err) + } + if err := storeData(from, data); err != nil { + t.Fatal(err) + } + + toFactory := physicalBackends["file"] + folder := t.TempDir() + confTo := map[string]string{ + "path": folder, + } + + to, err := toFactory(confTo, nil) + if err != nil { + t.Fatal(err) + } + + const start = "m" + + cmd := OperatorMigrateCommand{ + logger: log.NewNullLogger(), + flagStart: start, + } + if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil { t.Fatal(err) } @@ -110,9 +184,8 @@ func TestMigration(t *testing.T) { t.Run("Config parsing", func(t *testing.T) { cmd := new(OperatorMigrateCommand) - - cfgName := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator")) - ioutil.WriteFile(cfgName, []byte(` + cfgName := filepath.Join(t.TempDir(), "migrator") + os.WriteFile(cfgName, []byte(` storage_source "src_type" { path = "src_path" } @@ -120,7 +193,6 @@ storage_source "src_type" { storage_destination "dest_type" { path = "dest_path" }`), 0o644) - defer os.Remove(cfgName) expCfg := &migratorConfig{ StorageSource: &server.Storage{ @@ -145,7 +217,7 @@ storage_destination "dest_type" { } verifyBad := func(cfg string) { - ioutil.WriteFile(cfgName, []byte(cfg), 0o644) + os.WriteFile(cfgName, []byte(cfg), 0o644) _, err := cmd.loadMigratorConfig(cfgName) if err == nil { t.Fatalf("expected error but none received from: %v", cfg) @@ -192,6 +264,7 @@ storage_destination "dest_type2" { path = "dest_path" }`) }) + t.Run("DFS Scan", func(t *testing.T) { s, _ := physicalBackends["inmem"](map[string]string{}, nil) @@ -204,9 +277,16 @@ storage_destination "dest_type2" { l := randomLister{s} - var out []string - dfsScan(context.Background(), l, func(ctx context.Context, path string) error { - out = append(out, path) + type SafeAppend struct { + out []string + lock sync.Mutex + } + outKeys := SafeAppend{} + dfsScan(context.Background(), l, 10, func(ctx context.Context, path string) error { + outKeys.lock.Lock() + defer outKeys.lock.Unlock() + + outKeys.out = append(outKeys.out, path) return nil }) @@ -218,8 +298,11 @@ storage_destination "dest_type2" { keys = append(keys, key) } sort.Strings(keys) - if !reflect.DeepEqual(keys, out) { - t.Fatalf("expected equal: %v, %v", keys, out) + outKeys.lock.Lock() + sort.Strings(outKeys.out) + outKeys.lock.Unlock() + if !reflect.DeepEqual(keys, outKeys.out) { + t.Fatalf("expected equal: %v, %v", keys, outKeys.out) } }) } diff --git a/website/content/docs/commands/operator/migrate.mdx b/website/content/docs/commands/operator/migrate.mdx index ccbc57be6..17b69abca 100644 --- a/website/content/docs/commands/operator/migrate.mdx +++ b/website/content/docs/commands/operator/migrate.mdx @@ -132,3 +132,11 @@ The following flags are available for the `operator migrate` command. - `-reset` - Reset the migration lock. A lock file is added during migration to prevent starting the Vault server or another migration. The `-reset` option can be used to remove a stale lock file if present. + +- `-max-parallel` `int: 10` - Allows the operator to specify the maximum number of lightweight threads (goroutines) + which may be used to migrate data in parallel. This can potentially speed up migration on slower backends at + the cost of more resources (e.g. CPU, memory). Permitted values range from `1` (synchronous) to the maximum value + for an `integer`. If not supplied, a default of `10` parallel goroutines will be used. + + ~> Note: The maximum number of concurrent requests handled by a storage backend is ultimately governed by the + storage backend configuration setting, which enforces a maximum number of concurrent requests (`max_parallel`).