b6dd1191b2
Stream snapshot to FSM when restoring from archive The `RestoreFromArchive` helper decompresses the snapshot archive to a temporary file before reading it into the FSM. For large snapshots this performs a lot of disk IO. Stream decompress the snapshot as we read it, without first writing to a temporary file. Add bexpr filters to the `RestoreFromArchive` helper. The operator can pass these as `-filter` arguments to `nomad operator snapshot state` (and other commands in the future) to include only desired data when reading the snapshot.
260 lines
6.1 KiB
Go
260 lines
6.1 KiB
Go
package raftutil
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/nomad"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/raft"
|
|
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
|
)
|
|
|
|
var ErrNoMoreLogs = fmt.Errorf("no more logs")
|
|
|
|
type nomadFSM interface {
|
|
raft.FSM
|
|
State() *state.StateStore
|
|
Restore(io.ReadCloser) error
|
|
RestoreWithFilter(io.ReadCloser, *nomad.FSMFilter) error
|
|
}
|
|
|
|
type FSMHelper struct {
|
|
path string
|
|
|
|
logger hclog.Logger
|
|
|
|
// nomad state
|
|
store *raftboltdb.BoltStore
|
|
fsm nomadFSM
|
|
snaps *raft.FileSnapshotStore
|
|
|
|
// raft
|
|
logFirstIdx uint64
|
|
logLastIdx uint64
|
|
nextIdx uint64
|
|
}
|
|
|
|
func NewFSM(p string) (*FSMHelper, error) {
|
|
store, firstIdx, lastIdx, err := RaftStateInfo(filepath.Join(p, "raft.db"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open raft database %v: %v", p, err)
|
|
}
|
|
|
|
logger := hclog.L()
|
|
|
|
snaps, err := raft.NewFileSnapshotStoreWithLogger(p, 1000, logger)
|
|
if err != nil {
|
|
store.Close()
|
|
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
|
|
}
|
|
|
|
fsm, err := dummyFSM(logger)
|
|
if err != nil {
|
|
store.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return &FSMHelper{
|
|
path: p,
|
|
logger: logger,
|
|
store: store,
|
|
fsm: fsm,
|
|
snaps: snaps,
|
|
|
|
logFirstIdx: firstIdx,
|
|
logLastIdx: lastIdx,
|
|
nextIdx: uint64(1),
|
|
}, nil
|
|
}
|
|
|
|
func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
|
|
// use dummy non-enabled FSM dependencies
|
|
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
|
|
blockedEvals := nomad.NewBlockedEvals(nil, logger)
|
|
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fsmConfig := &nomad.FSMConfig{
|
|
EvalBroker: evalBroker,
|
|
Periodic: periodicDispatch,
|
|
Blocked: blockedEvals,
|
|
Logger: logger,
|
|
Region: "default",
|
|
}
|
|
|
|
return nomad.NewFSM(fsmConfig)
|
|
}
|
|
|
|
func (f *FSMHelper) Close() {
|
|
f.store.Close()
|
|
|
|
}
|
|
|
|
func (f *FSMHelper) ApplyNext() (index uint64, term uint64, err error) {
|
|
if f.nextIdx == 1 {
|
|
// check snapshots first
|
|
index, term, err := f.restoreFromSnapshot()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
if index != 0 {
|
|
f.nextIdx = index + 1
|
|
return index, term, nil
|
|
}
|
|
}
|
|
|
|
if f.nextIdx < f.logFirstIdx {
|
|
return 0, 0, fmt.Errorf("missing logs [%v, %v]", f.nextIdx, f.logFirstIdx-1)
|
|
}
|
|
|
|
if f.nextIdx > f.logLastIdx {
|
|
return 0, 0, ErrNoMoreLogs
|
|
}
|
|
|
|
var e raft.Log
|
|
err = f.store.GetLog(f.nextIdx, &e)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed to read log entry at index %d: %v", f.nextIdx, err)
|
|
}
|
|
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil && strings.HasPrefix(fmt.Sprint(r), "failed to apply request") {
|
|
// Enterprise specific log entries will fail to load in OSS repository with "failed to apply request."
|
|
// If not relevant to investigation, we can ignore them and simply worn.
|
|
f.logger.Warn("failed to apply log; loading Enterprise data-dir in OSS binary?", "index", e.Index)
|
|
|
|
f.nextIdx++
|
|
} else if r != nil {
|
|
panic(r)
|
|
}
|
|
}()
|
|
|
|
if e.Type == raft.LogCommand {
|
|
f.fsm.Apply(&e)
|
|
}
|
|
|
|
f.nextIdx++
|
|
return e.Index, e.Term, nil
|
|
}
|
|
|
|
// ApplyUntil applies all raft entries until (inclusive) the passed index.
|
|
func (f *FSMHelper) ApplyUntil(stopIdx uint64) (idx uint64, term uint64, err error) {
|
|
var lastIdx, lastTerm uint64
|
|
for {
|
|
idx, term, err := f.ApplyNext()
|
|
if err == ErrNoMoreLogs {
|
|
return lastIdx, lastTerm, nil
|
|
} else if err != nil {
|
|
return lastIdx, lastTerm, err
|
|
} else if idx >= stopIdx {
|
|
return lastIdx, lastTerm, nil
|
|
}
|
|
|
|
lastIdx, lastTerm = idx, term
|
|
}
|
|
}
|
|
|
|
func (f *FSMHelper) ApplyAll() (index uint64, term uint64, err error) {
|
|
var lastIdx, lastTerm uint64
|
|
for {
|
|
idx, term, err := f.ApplyNext()
|
|
if err == ErrNoMoreLogs {
|
|
return lastIdx, lastTerm, nil
|
|
} else if err != nil {
|
|
return lastIdx, lastTerm, err
|
|
}
|
|
|
|
lastIdx, lastTerm = idx, term
|
|
}
|
|
}
|
|
|
|
func (f *FSMHelper) State() *state.StateStore {
|
|
return f.fsm.State()
|
|
}
|
|
|
|
func (f *FSMHelper) StateAsMap() map[string][]interface{} {
|
|
return StateAsMap(f.fsm.State())
|
|
}
|
|
|
|
// StateAsMap returns a json-able representation of the state
|
|
func StateAsMap(store *state.StateStore) map[string][]interface{} {
|
|
result := map[string][]interface{}{
|
|
"ACLPolicies": toArray(store.ACLPolicies(nil)),
|
|
"ACLTokens": toArray(store.ACLTokens(nil, state.SortDefault)),
|
|
"Allocs": toArray(store.Allocs(nil, state.SortDefault)),
|
|
"CSIPlugins": toArray(store.CSIPlugins(nil)),
|
|
"CSIVolumes": toArray(store.CSIVolumes(nil)),
|
|
"Deployments": toArray(store.Deployments(nil, state.SortDefault)),
|
|
"Evals": toArray(store.Evals(nil, state.SortDefault)),
|
|
"Indexes": toArray(store.Indexes()),
|
|
"JobSummaries": toArray(store.JobSummaries(nil)),
|
|
"JobVersions": toArray(store.JobVersions(nil)),
|
|
"Jobs": toArray(store.Jobs(nil)),
|
|
"Nodes": toArray(store.Nodes(nil)),
|
|
"PeriodicLaunches": toArray(store.PeriodicLaunches(nil)),
|
|
"SITokenAccessors": toArray(store.SITokenAccessors(nil)),
|
|
"ScalingEvents": toArray(store.ScalingEvents(nil)),
|
|
"ScalingPolicies": toArray(store.ScalingPolicies(nil)),
|
|
"VaultAccessors": toArray(store.VaultAccessors(nil)),
|
|
}
|
|
|
|
insertEnterpriseState(result, store)
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
func (f *FSMHelper) restoreFromSnapshot() (index uint64, term uint64, err error) {
|
|
snapshots, err := f.snaps.List()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
f.logger.Debug("found snapshots", "count", len(snapshots))
|
|
|
|
for _, snapshot := range snapshots {
|
|
_, source, err := f.snaps.Open(snapshot.ID)
|
|
if err != nil {
|
|
f.logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
|
|
continue
|
|
}
|
|
|
|
err = f.fsm.Restore(source)
|
|
source.Close()
|
|
if err != nil {
|
|
f.logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
|
|
continue
|
|
}
|
|
|
|
return snapshot.Index, snapshot.Term, nil
|
|
}
|
|
|
|
return 0, 0, nil
|
|
}
|
|
|
|
func toArray(iter memdb.ResultIterator, err error) []interface{} {
|
|
if err != nil {
|
|
return []interface{}{err}
|
|
}
|
|
|
|
r := []interface{}{}
|
|
|
|
if iter != nil {
|
|
item := iter.Next()
|
|
for item != nil {
|
|
r = append(r, item)
|
|
item = iter.Next()
|
|
}
|
|
}
|
|
|
|
return r
|
|
}
|