snapshot restore-from-archive streaming and filtering (#13658)

Stream snapshot to FSM when restoring from archive
The `RestoreFromArchive` helper decompresses the snapshot archive to a
temporary file before reading it into the FSM. For large snapshots
this performs a lot of disk IO. Stream decompress the snapshot as we
read it, without first writing to a temporary file.

Add bexpr filters to the `RestoreFromArchive` helper.
The operator can pass these as `-filter` arguments to `nomad operator
snapshot state` (and other commands in the future) to include only
desired data when reading the snapshot.
This commit is contained in:
Tim Gross 2022-07-11 10:48:00 -04:00 committed by GitHub
parent 353323d171
commit b6dd1191b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 196 additions and 104 deletions

3
.changelog/13658.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
cli: `operator snapshot state` supports `-filter` expressions and avoids writing large temporary files
```

View File

@ -6,7 +6,9 @@ import (
"os"
"strings"
flaghelper "github.com/hashicorp/nomad/helper/flags"
"github.com/hashicorp/nomad/helper/raftutil"
"github.com/hashicorp/nomad/nomad"
"github.com/posener/complete"
)
@ -16,13 +18,19 @@ type OperatorSnapshotStateCommand struct {
func (c *OperatorSnapshotStateCommand) Help() string {
helpText := `
Usage: nomad operator snapshot state <file>
Usage: nomad operator snapshot state [options] <file>
Displays a JSON representation of state in the snapshot.
To inspect the file "backup.snap":
$ nomad operator snapshot state backup.snap
Snapshot State Options:
-filter
Specifies an expression used to filter query results.
`
return strings.TrimSpace(helpText)
}
@ -42,14 +50,31 @@ func (c *OperatorSnapshotStateCommand) Synopsis() string {
func (c *OperatorSnapshotStateCommand) Name() string { return "operator snapshot state" }
func (c *OperatorSnapshotStateCommand) Run(args []string) int {
var filterExpr flaghelper.StringFlag
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.Var(&filterExpr, "filter", "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
filter, err := nomad.NewFSMFilter(filterExpr.String())
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid filter expression %q: %s", filterExpr, err))
return 1
}
// Check that we either got no filename or exactly one.
if len(args) != 1 {
if len(flags.Args()) != 1 {
c.Ui.Error("This command takes one argument: <file>")
c.Ui.Error(commandErrorText(c))
return 1
}
path := args[0]
path := flags.Args()[0]
f, err := os.Open(path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err))
@ -57,7 +82,7 @@ func (c *OperatorSnapshotStateCommand) Run(args []string) int {
}
defer f.Close()
state, meta, err := raftutil.RestoreFromArchive(f)
state, meta, err := raftutil.RestoreFromArchive(f, filter)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read archive file: %s", err))
return 1

View File

@ -20,6 +20,7 @@ type nomadFSM interface {
raft.FSM
State() *state.StateStore
Restore(io.ReadCloser) error
RestoreWithFilter(io.ReadCloser, *nomad.FSMFilter) error
}
type FSMHelper struct {

View File

@ -3,16 +3,16 @@ package raftutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/state"
)
func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMeta, error) {
func RestoreFromArchive(archive io.Reader, filter *nomad.FSMFilter) (*state.StateStore, *raft.SnapshotMeta, error) {
logger := hclog.L()
fsm, err := dummyFSM(logger)
@ -20,27 +20,30 @@ func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMet
return nil, nil, fmt.Errorf("failed to create FSM: %w", err)
}
snap, err := ioutil.TempFile("", "snap-")
if err != nil {
return nil, nil, fmt.Errorf("failed to create a temp file: %w", err)
}
defer os.Remove(snap.Name())
defer snap.Close()
// r is closed by RestoreFiltered, w is closed by CopySnapshot
r, w := io.Pipe()
meta, err := snapshot.CopySnapshot(archive, snap)
if err != nil {
return nil, nil, fmt.Errorf("failed to read snapshot: %w", err)
}
errCh := make(chan error)
metaCh := make(chan *raft.SnapshotMeta)
_, err = snap.Seek(0, 0)
go func() {
meta, err := snapshot.CopySnapshot(archive, w)
if err != nil {
return nil, nil, fmt.Errorf("failed to seek: %w", err)
errCh <- fmt.Errorf("failed to read snapshot: %w", err)
} else {
metaCh <- meta
}
}()
err = fsm.Restore(snap)
err = fsm.RestoreWithFilter(r, filter)
if err != nil {
return nil, nil, fmt.Errorf("failed to restore from snapshot: %w", err)
}
select {
case err := <-errCh:
return nil, nil, err
case meta := <-metaCh:
return fsm.State(), meta, nil
}
}

View File

@ -138,13 +138,22 @@ func (s *Snapshot) Close() error {
return os.Remove(s.file.Name())
}
// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, ioutil.Discard)
type Discard struct {
io.Writer
}
// CopySnapshot copies the snapshot content from snapshot archive to dest
func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) {
func (dc Discard) Close() error { return nil }
// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, Discard{Writer: io.Discard})
}
// CopySnapshot copies the snapshot content from snapshot archive to dest.
// It will close the destination once complete.
func CopySnapshot(in io.Reader, dest io.WriteCloser) (*raft.SnapshotMeta, error) {
defer dest.Close()
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
@ -54,6 +55,7 @@ const (
ScalingEventsSnapshot SnapshotType = 19
EventSinkSnapshot SnapshotType = 20
ServiceRegistrationSnapshot SnapshotType = 21
// Namespace appliers were moved from enterprise and therefore start at 64
NamespaceSnapshot SnapshotType = 64
)
@ -1404,7 +1406,20 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
return ns, nil
}
// Restore implements the raft.FSM interface, which doesn't support a
// filtering parameter
func (n *nomadFSM) Restore(old io.ReadCloser) error {
return n.restoreImpl(old, nil)
}
// RestoreWithFilter includes a set of bexpr filter evaluators, so
// that we can create a FSM that excludes a portion of a snapshot
// (typically for debugging and testing)
func (n *nomadFSM) RestoreWithFilter(old io.ReadCloser, filter *FSMFilter) error {
return n.restoreImpl(old, filter)
}
func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
defer old.Close()
// Create a new state store
@ -1459,55 +1474,53 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(node); err != nil {
return err
}
// Handle upgrade paths
node.Canonicalize()
if filter.Include(node) {
node.Canonicalize() // Handle upgrade paths
if err := restore.NodeRestore(node); err != nil {
return err
}
}
case JobSnapshot:
job := new(structs.Job)
if err := dec.Decode(job); err != nil {
return err
}
if filter.Include(job) {
/* Handle upgrade paths:
* - Empty maps and slices should be treated as nil to avoid
* un-intended destructive updates in scheduler since we use
* reflect.DeepEqual. Starting Nomad 0.4.1, job submission sanitizes
* the incoming job.
* reflect.DeepEqual. Job submission sanitizes the incoming job.
* - Migrate from old style upgrade stanza that used only a stagger.
*/
job.Canonicalize()
if err := restore.JobRestore(job); err != nil {
return err
}
}
case EvalSnapshot:
eval := new(structs.Evaluation)
if err := dec.Decode(eval); err != nil {
return err
}
if filter.Include(eval) {
if err := restore.EvalRestore(eval); err != nil {
return err
}
}
case AllocSnapshot:
alloc := new(structs.Allocation)
if err := dec.Decode(alloc); err != nil {
return err
}
// Handle upgrade path
alloc.Canonicalize()
if filter.Include(alloc) {
alloc.Canonicalize() // Handle upgrade path
if err := restore.AllocRestore(alloc); err != nil {
return err
}
}
case IndexSnapshot:
idx := new(state.IndexEntry)
@ -1523,76 +1536,88 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(launch); err != nil {
return err
}
if filter.Include(launch) {
if err := restore.PeriodicLaunchRestore(launch); err != nil {
return err
}
}
case JobSummarySnapshot:
summary := new(structs.JobSummary)
if err := dec.Decode(summary); err != nil {
return err
}
if filter.Include(summary) {
if err := restore.JobSummaryRestore(summary); err != nil {
return err
}
}
case VaultAccessorSnapshot:
accessor := new(structs.VaultAccessor)
if err := dec.Decode(accessor); err != nil {
return err
}
if filter.Include(accessor) {
if err := restore.VaultAccessorRestore(accessor); err != nil {
return err
}
}
case ServiceIdentityTokenAccessorSnapshot:
accessor := new(structs.SITokenAccessor)
if err := dec.Decode(accessor); err != nil {
return err
}
if filter.Include(accessor) {
if err := restore.SITokenAccessorRestore(accessor); err != nil {
return err
}
}
case JobVersionSnapshot:
version := new(structs.Job)
if err := dec.Decode(version); err != nil {
return err
}
if filter.Include(version) {
if err := restore.JobVersionRestore(version); err != nil {
return err
}
}
case DeploymentSnapshot:
deployment := new(structs.Deployment)
if err := dec.Decode(deployment); err != nil {
return err
}
if filter.Include(deployment) {
if err := restore.DeploymentRestore(deployment); err != nil {
return err
}
}
case ACLPolicySnapshot:
policy := new(structs.ACLPolicy)
if err := dec.Decode(policy); err != nil {
return err
}
if filter.Include(policy) {
if err := restore.ACLPolicyRestore(policy); err != nil {
return err
}
}
case ACLTokenSnapshot:
token := new(structs.ACLToken)
if err := dec.Decode(token); err != nil {
return err
}
if filter.Include(token) {
if err := restore.ACLTokenRestore(token); err != nil {
return err
}
}
case SchedulerConfigSnapshot:
schedConfig := new(structs.SchedulerConfiguration)
@ -1618,44 +1643,47 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(jobScalingEvents); err != nil {
return err
}
if filter.Include(jobScalingEvents) {
if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil {
return err
}
}
case ScalingPolicySnapshot:
scalingPolicy := new(structs.ScalingPolicy)
if err := dec.Decode(scalingPolicy); err != nil {
return err
}
if filter.Include(scalingPolicy) {
// Handle upgrade path:
// - Set policy type if empty
scalingPolicy.Canonicalize()
if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
}
case CSIPluginSnapshot:
plugin := new(structs.CSIPlugin)
if err := dec.Decode(plugin); err != nil {
return err
}
if filter.Include(plugin) {
if err := restore.CSIPluginRestore(plugin); err != nil {
return err
}
case CSIVolumeSnapshot:
plugin := new(structs.CSIVolume)
if err := dec.Decode(plugin); err != nil {
return err
}
if err := restore.CSIVolumeRestore(plugin); err != nil {
case CSIVolumeSnapshot:
volume := new(structs.CSIVolume)
if err := dec.Decode(volume); err != nil {
return err
}
if filter.Include(volume) {
if err := restore.CSIVolumeRestore(volume); err != nil {
return err
}
}
case NamespaceSnapshot:
namespace := new(structs.Namespace)
@ -1671,19 +1699,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return nil
case ServiceRegistrationSnapshot:
// Create a new ServiceRegistration object, so we can decode the
// message into it.
serviceRegistration := new(structs.ServiceRegistration)
if err := dec.Decode(serviceRegistration); err != nil {
return err
}
if filter.Include(serviceRegistration) {
// Perform the restoration.
if err := restore.ServiceRegistrationRestore(serviceRegistration); err != nil {
return err
}
}
default:
// Check if this is an enterprise only object being restored
@ -1944,6 +1969,32 @@ func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.Messag
return nil
}
type FSMFilter struct {
evaluator *bexpr.Evaluator
}
func NewFSMFilter(expr string) (*FSMFilter, error) {
if expr == "" {
return nil, nil
}
evaluator, err := bexpr.CreateEvaluator(expr)
if err != nil {
return nil, err
}
return &FSMFilter{evaluator: evaluator}, nil
}
func (f *FSMFilter) Include(item interface{}) bool {
if f == nil {
return true
}
ok, err := f.evaluator.Evaluate(item)
if !ok || err != nil {
return false
}
return true
}
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes

View File

@ -70,7 +70,7 @@ func NewHarnessFromSnapshot(t testing.TB, snapshotPath string) (*scheduler.Harne
}
defer f.Close()
state, _, err := raftutil.RestoreFromArchive(f)
state, _, err := raftutil.RestoreFromArchive(f, nil)
if err != nil {
return nil, err
}