From b6dd1191b244ff5ac43e518e744c3956dfa611a5 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 11 Jul 2022 10:48:00 -0400 Subject: [PATCH] snapshot restore-from-archive streaming and filtering (#13658) Stream snapshot to FSM when restoring from archive The `RestoreFromArchive` helper decompresses the snapshot archive to a temporary file before reading it into the FSM. For large snapshots this performs a lot of disk IO. Stream decompress the snapshot as we read it, without first writing to a temporary file. Add bexpr filters to the `RestoreFromArchive` helper. The operator can pass these as `-filter` arguments to `nomad operator snapshot state` (and other commands in the future) to include only desired data when reading the snapshot. --- .changelog/13658.txt | 3 + command/operator_snapshot_state.go | 33 ++++- helper/raftutil/fsm.go | 1 + helper/raftutil/snapshot.go | 45 +++--- helper/snapshot/snapshot.go | 19 ++- nomad/fsm.go | 197 +++++++++++++++++---------- scheduler/benchmarks/helpers_test.go | 2 +- 7 files changed, 196 insertions(+), 104 deletions(-) create mode 100644 .changelog/13658.txt diff --git a/.changelog/13658.txt b/.changelog/13658.txt new file mode 100644 index 000000000..b196bc500 --- /dev/null +++ b/.changelog/13658.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: `operator snapshot state` supports `-filter` expressions and avoids writing large temporary files +``` diff --git a/command/operator_snapshot_state.go b/command/operator_snapshot_state.go index 5ba28a56c..025b2a759 100644 --- a/command/operator_snapshot_state.go +++ b/command/operator_snapshot_state.go @@ -6,7 +6,9 @@ import ( "os" "strings" + flaghelper "github.com/hashicorp/nomad/helper/flags" "github.com/hashicorp/nomad/helper/raftutil" + "github.com/hashicorp/nomad/nomad" "github.com/posener/complete" ) @@ -16,13 +18,19 @@ type OperatorSnapshotStateCommand struct { func (c *OperatorSnapshotStateCommand) Help() string { helpText := ` -Usage: nomad operator snapshot state +Usage: nomad operator snapshot state [options] Displays a JSON representation of state in the snapshot. To inspect the file "backup.snap": $ nomad operator snapshot state backup.snap + +Snapshot State Options: + + -filter + Specifies an expression used to filter query results. + ` return strings.TrimSpace(helpText) } @@ -42,14 +50,31 @@ func (c *OperatorSnapshotStateCommand) Synopsis() string { func (c *OperatorSnapshotStateCommand) Name() string { return "operator snapshot state" } func (c *OperatorSnapshotStateCommand) Run(args []string) int { + var filterExpr flaghelper.StringFlag + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + + flags.Var(&filterExpr, "filter", "") + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + filter, err := nomad.NewFSMFilter(filterExpr.String()) + if err != nil { + c.Ui.Error(fmt.Sprintf("Invalid filter expression %q: %s", filterExpr, err)) + return 1 + } + // Check that we either got no filename or exactly one. - if len(args) != 1 { + if len(flags.Args()) != 1 { c.Ui.Error("This command takes one argument: ") c.Ui.Error(commandErrorText(c)) return 1 } - path := args[0] + path := flags.Args()[0] f, err := os.Open(path) if err != nil { c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err)) @@ -57,7 +82,7 @@ func (c *OperatorSnapshotStateCommand) Run(args []string) int { } defer f.Close() - state, meta, err := raftutil.RestoreFromArchive(f) + state, meta, err := raftutil.RestoreFromArchive(f, filter) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to read archive file: %s", err)) return 1 diff --git a/helper/raftutil/fsm.go b/helper/raftutil/fsm.go index cad222f2e..edebad0f3 100644 --- a/helper/raftutil/fsm.go +++ b/helper/raftutil/fsm.go @@ -20,6 +20,7 @@ type nomadFSM interface { raft.FSM State() *state.StateStore Restore(io.ReadCloser) error + RestoreWithFilter(io.ReadCloser, *nomad.FSMFilter) error } type FSMHelper struct { diff --git a/helper/raftutil/snapshot.go b/helper/raftutil/snapshot.go index 9374b0dca..a9c7377a9 100644 --- a/helper/raftutil/snapshot.go +++ b/helper/raftutil/snapshot.go @@ -3,16 +3,16 @@ package raftutil import ( "fmt" "io" - "io/ioutil" - "os" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/helper/snapshot" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/raft" + + "github.com/hashicorp/nomad/helper/snapshot" + "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/state" ) -func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMeta, error) { +func RestoreFromArchive(archive io.Reader, filter *nomad.FSMFilter) (*state.StateStore, *raft.SnapshotMeta, error) { logger := hclog.L() fsm, err := dummyFSM(logger) @@ -20,27 +20,30 @@ func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMet return nil, nil, fmt.Errorf("failed to create FSM: %w", err) } - snap, err := ioutil.TempFile("", "snap-") - if err != nil { - return nil, nil, fmt.Errorf("failed to create a temp file: %w", err) - } - defer os.Remove(snap.Name()) - defer snap.Close() + // r is closed by RestoreFiltered, w is closed by CopySnapshot + r, w := io.Pipe() - meta, err := snapshot.CopySnapshot(archive, snap) - if err != nil { - return nil, nil, fmt.Errorf("failed to read snapshot: %w", err) - } + errCh := make(chan error) + metaCh := make(chan *raft.SnapshotMeta) - _, err = snap.Seek(0, 0) - if err != nil { - return nil, nil, fmt.Errorf("failed to seek: %w", err) - } + go func() { + meta, err := snapshot.CopySnapshot(archive, w) + if err != nil { + errCh <- fmt.Errorf("failed to read snapshot: %w", err) + } else { + metaCh <- meta + } + }() - err = fsm.Restore(snap) + err = fsm.RestoreWithFilter(r, filter) if err != nil { return nil, nil, fmt.Errorf("failed to restore from snapshot: %w", err) } - return fsm.State(), meta, nil + select { + case err := <-errCh: + return nil, nil, err + case meta := <-metaCh: + return fsm.State(), meta, nil + } } diff --git a/helper/snapshot/snapshot.go b/helper/snapshot/snapshot.go index 390856ea1..74965fd69 100644 --- a/helper/snapshot/snapshot.go +++ b/helper/snapshot/snapshot.go @@ -138,13 +138,22 @@ func (s *Snapshot) Close() error { return os.Remove(s.file.Name()) } -// Verify takes the snapshot from the reader and verifies its contents. -func Verify(in io.Reader) (*raft.SnapshotMeta, error) { - return CopySnapshot(in, ioutil.Discard) +type Discard struct { + io.Writer } -// CopySnapshot copies the snapshot content from snapshot archive to dest -func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) { +func (dc Discard) Close() error { return nil } + +// Verify takes the snapshot from the reader and verifies its contents. +func Verify(in io.Reader) (*raft.SnapshotMeta, error) { + return CopySnapshot(in, Discard{Writer: io.Discard}) +} + +// CopySnapshot copies the snapshot content from snapshot archive to dest. +// It will close the destination once complete. +func CopySnapshot(in io.Reader, dest io.WriteCloser) (*raft.SnapshotMeta, error) { + defer dest.Close() + // Wrap the reader in a gzip decompressor. decomp, err := gzip.NewReader(in) if err != nil { diff --git a/nomad/fsm.go b/nomad/fsm.go index e686e211b..7476d0fe9 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -8,6 +8,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" @@ -54,6 +55,7 @@ const ( ScalingEventsSnapshot SnapshotType = 19 EventSinkSnapshot SnapshotType = 20 ServiceRegistrationSnapshot SnapshotType = 21 + // Namespace appliers were moved from enterprise and therefore start at 64 NamespaceSnapshot SnapshotType = 64 ) @@ -1404,7 +1406,20 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { return ns, nil } +// Restore implements the raft.FSM interface, which doesn't support a +// filtering parameter func (n *nomadFSM) Restore(old io.ReadCloser) error { + return n.restoreImpl(old, nil) +} + +// RestoreWithFilter includes a set of bexpr filter evaluators, so +// that we can create a FSM that excludes a portion of a snapshot +// (typically for debugging and testing) +func (n *nomadFSM) RestoreWithFilter(old io.ReadCloser, filter *FSMFilter) error { + return n.restoreImpl(old, filter) +} + +func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { defer old.Close() // Create a new state store @@ -1459,12 +1474,11 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(node); err != nil { return err } - - // Handle upgrade paths - node.Canonicalize() - - if err := restore.NodeRestore(node); err != nil { - return err + if filter.Include(node) { + node.Canonicalize() // Handle upgrade paths + if err := restore.NodeRestore(node); err != nil { + return err + } } case JobSnapshot: @@ -1472,18 +1486,17 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(job); err != nil { return err } - - /* Handle upgrade paths: - * - Empty maps and slices should be treated as nil to avoid - * un-intended destructive updates in scheduler since we use - * reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanitizes - * the incoming job. - * - Migrate from old style upgrade stanza that used only a stagger. - */ - job.Canonicalize() - - if err := restore.JobRestore(job); err != nil { - return err + if filter.Include(job) { + /* Handle upgrade paths: + * - Empty maps and slices should be treated as nil to avoid + * un-intended destructive updates in scheduler since we use + * reflect.DeepEqual. Job submission sanitizes the incoming job. + * - Migrate from old style upgrade stanza that used only a stagger. + */ + job.Canonicalize() + if err := restore.JobRestore(job); err != nil { + return err + } } case EvalSnapshot: @@ -1491,9 +1504,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(eval); err != nil { return err } - - if err := restore.EvalRestore(eval); err != nil { - return err + if filter.Include(eval) { + if err := restore.EvalRestore(eval); err != nil { + return err + } } case AllocSnapshot: @@ -1501,12 +1515,11 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(alloc); err != nil { return err } - - // Handle upgrade path - alloc.Canonicalize() - - if err := restore.AllocRestore(alloc); err != nil { - return err + if filter.Include(alloc) { + alloc.Canonicalize() // Handle upgrade path + if err := restore.AllocRestore(alloc); err != nil { + return err + } } case IndexSnapshot: @@ -1523,9 +1536,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(launch); err != nil { return err } - - if err := restore.PeriodicLaunchRestore(launch); err != nil { - return err + if filter.Include(launch) { + if err := restore.PeriodicLaunchRestore(launch); err != nil { + return err + } } case JobSummarySnapshot: @@ -1533,9 +1547,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(summary); err != nil { return err } - - if err := restore.JobSummaryRestore(summary); err != nil { - return err + if filter.Include(summary) { + if err := restore.JobSummaryRestore(summary); err != nil { + return err + } } case VaultAccessorSnapshot: @@ -1543,8 +1558,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(accessor); err != nil { return err } - if err := restore.VaultAccessorRestore(accessor); err != nil { - return err + if filter.Include(accessor) { + if err := restore.VaultAccessorRestore(accessor); err != nil { + return err + } } case ServiceIdentityTokenAccessorSnapshot: @@ -1552,8 +1569,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(accessor); err != nil { return err } - if err := restore.SITokenAccessorRestore(accessor); err != nil { - return err + if filter.Include(accessor) { + if err := restore.SITokenAccessorRestore(accessor); err != nil { + return err + } } case JobVersionSnapshot: @@ -1561,9 +1580,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(version); err != nil { return err } - - if err := restore.JobVersionRestore(version); err != nil { - return err + if filter.Include(version) { + if err := restore.JobVersionRestore(version); err != nil { + return err + } } case DeploymentSnapshot: @@ -1571,9 +1591,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(deployment); err != nil { return err } - - if err := restore.DeploymentRestore(deployment); err != nil { - return err + if filter.Include(deployment) { + if err := restore.DeploymentRestore(deployment); err != nil { + return err + } } case ACLPolicySnapshot: @@ -1581,8 +1602,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(policy); err != nil { return err } - if err := restore.ACLPolicyRestore(policy); err != nil { - return err + if filter.Include(policy) { + if err := restore.ACLPolicyRestore(policy); err != nil { + return err + } } case ACLTokenSnapshot: @@ -1590,8 +1613,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(token); err != nil { return err } - if err := restore.ACLTokenRestore(token); err != nil { - return err + if filter.Include(token) { + if err := restore.ACLTokenRestore(token); err != nil { + return err + } } case SchedulerConfigSnapshot: @@ -1618,9 +1643,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(jobScalingEvents); err != nil { return err } - - if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil { - return err + if filter.Include(jobScalingEvents) { + if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil { + return err + } } case ScalingPolicySnapshot: @@ -1628,13 +1654,13 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(scalingPolicy); err != nil { return err } - - // Handle upgrade path: - // - Set policy type if empty - scalingPolicy.Canonicalize() - - if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { - return err + if filter.Include(scalingPolicy) { + // Handle upgrade path: + // - Set policy type if empty + scalingPolicy.Canonicalize() + if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { + return err + } } case CSIPluginSnapshot: @@ -1642,19 +1668,21 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(plugin); err != nil { return err } - - if err := restore.CSIPluginRestore(plugin); err != nil { - return err + if filter.Include(plugin) { + if err := restore.CSIPluginRestore(plugin); err != nil { + return err + } } case CSIVolumeSnapshot: - plugin := new(structs.CSIVolume) - if err := dec.Decode(plugin); err != nil { + volume := new(structs.CSIVolume) + if err := dec.Decode(volume); err != nil { return err } - - if err := restore.CSIVolumeRestore(plugin); err != nil { - return err + if filter.Include(volume) { + if err := restore.CSIVolumeRestore(volume); err != nil { + return err + } } case NamespaceSnapshot: @@ -1671,18 +1699,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return nil case ServiceRegistrationSnapshot: - - // Create a new ServiceRegistration object, so we can decode the - // message into it. serviceRegistration := new(structs.ServiceRegistration) - if err := dec.Decode(serviceRegistration); err != nil { return err } - - // Perform the restoration. - if err := restore.ServiceRegistrationRestore(serviceRegistration); err != nil { - return err + if filter.Include(serviceRegistration) { + // Perform the restoration. + if err := restore.ServiceRegistrationRestore(serviceRegistration); err != nil { + return err + } } default: @@ -1944,6 +1969,32 @@ func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.Messag return nil } +type FSMFilter struct { + evaluator *bexpr.Evaluator +} + +func NewFSMFilter(expr string) (*FSMFilter, error) { + if expr == "" { + return nil, nil + } + evaluator, err := bexpr.CreateEvaluator(expr) + if err != nil { + return nil, err + } + return &FSMFilter{evaluator: evaluator}, nil +} + +func (f *FSMFilter) Include(item interface{}) bool { + if f == nil { + return true + } + ok, err := f.evaluator.Evaluate(item) + if !ok || err != nil { + return false + } + return true +} + func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) // Register the nodes diff --git a/scheduler/benchmarks/helpers_test.go b/scheduler/benchmarks/helpers_test.go index 78d7b1b12..dd23deec9 100644 --- a/scheduler/benchmarks/helpers_test.go +++ b/scheduler/benchmarks/helpers_test.go @@ -70,7 +70,7 @@ func NewHarnessFromSnapshot(t testing.TB, snapshotPath string) (*scheduler.Harne } defer f.Close() - state, _, err := raftutil.RestoreFromArchive(f) + state, _, err := raftutil.RestoreFromArchive(f, nil) if err != nil { return nil, err }