Raft Debugging Improvements (#11414)

This commit is contained in:
Mahmood Ali 2021-11-04 10:16:12 -04:00 committed by GitHub
parent 992abe6597
commit 4fc6e50782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 407 additions and 63 deletions

View File

@ -566,6 +566,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator snapshot _state": func() (cli.Command, error) {
return &OperatorSnapshotStateCommand{
Meta: meta,
}, nil
},
"operator snapshot restore": func() (cli.Command, error) {
return &OperatorSnapshotRestoreCommand{
Meta: meta,

View File

@ -76,12 +76,20 @@ func (c *OperatorRaftStateCommand) Run(args []string) int {
return 1
}
state, err := raftutil.FSMState(raftPath, fLastIdx)
fsm, err := raftutil.NewFSM(raftPath)
if err != nil {
c.Ui.Error(err.Error())
return 1
}
defer fsm.Close()
_, _, err = fsm.ApplyAll()
if err != nil {
c.Ui.Error(err.Error())
return 1
}
state := fsm.StateAsMap()
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(state); err != nil {

View File

@ -0,0 +1,77 @@
package command
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/hashicorp/nomad/helper/raftutil"
"github.com/posener/complete"
)
type OperatorSnapshotStateCommand struct {
Meta
}
func (c *OperatorSnapshotStateCommand) Help() string {
helpText := `
Usage: nomad operator snapshot _state <file>
Displays a JSON representation of state in the snapshot.
To inspect the file "backup.snap":
$ nomad operator snapshot _state backup.snap
`
return strings.TrimSpace(helpText)
}
func (c *OperatorSnapshotStateCommand) AutocompleteFlags() complete.Flags {
return complete.Flags{}
}
func (c *OperatorSnapshotStateCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (c *OperatorSnapshotStateCommand) Synopsis() string {
return "Displays information about a Nomad snapshot file"
}
func (c *OperatorSnapshotStateCommand) Name() string { return "operator snapshot _state" }
func (c *OperatorSnapshotStateCommand) Run(args []string) int {
// Check that we either got no filename or exactly one.
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <file>")
c.Ui.Error(commandErrorText(c))
return 1
}
path := args[0]
f, err := os.Open(path)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %s", err))
return 1
}
defer f.Close()
state, meta, err := raftutil.RestoreFromArchive(f)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read archive file: %s", err))
return 1
}
sm := raftutil.StateAsMap(state)
sm["SnapshotMeta"] = []interface{}{meta}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(sm); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to encode output: %v", err))
return 1
}
return 0
}

View File

@ -2,30 +2,76 @@ package raftutil
import (
"fmt"
"os"
"io"
"path/filepath"
"strings"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
)
// FSMState returns a dump of the FSM state as found in data-dir, as of lastIndx value
func FSMState(p string, plastIdx int64) (interface{}, error) {
var ErrNoMoreLogs = fmt.Errorf("no more logs")
type nomadFSM interface {
raft.FSM
State() *state.StateStore
Restore(io.ReadCloser) error
}
type FSMHelper struct {
path string
logger hclog.Logger
// nomad state
store *raftboltdb.BoltStore
fsm nomadFSM
snaps *raft.FileSnapshotStore
// raft
logFirstIdx uint64
logLastIdx uint64
nextIdx uint64
}
func NewFSM(p string) (*FSMHelper, error) {
store, firstIdx, lastIdx, err := RaftStateInfo(filepath.Join(p, "raft.db"))
if err != nil {
return nil, fmt.Errorf("failed to open raft database %v: %v", p, err)
}
defer store.Close()
snaps, err := raft.NewFileSnapshotStore(p, 1000, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
}
logger := hclog.L()
snaps, err := raft.NewFileSnapshotStoreWithLogger(p, 1000, logger)
if err != nil {
store.Close()
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
}
fsm, err := dummyFSM(logger)
if err != nil {
store.Close()
return nil, err
}
return &FSMHelper{
path: p,
logger: logger,
store: store,
fsm: fsm,
snaps: snaps,
logFirstIdx: firstIdx,
logLastIdx: lastIdx,
nextIdx: uint64(1),
}, nil
}
func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
@ -33,6 +79,7 @@ func FSMState(p string, plastIdx int64) (interface{}, error) {
if err != nil {
return nil, err
}
fsmConfig := &nomad.FSMConfig{
EvalBroker: evalBroker,
Periodic: periodicDispatch,
@ -41,38 +88,104 @@ func FSMState(p string, plastIdx int64) (interface{}, error) {
Region: "default",
}
fsm, err := nomad.NewFSM(fsmConfig)
if err != nil {
return nil, err
}
return nomad.NewFSM(fsmConfig)
}
// restore from snapshot first
sFirstIdx, err := restoreFromSnapshot(fsm, snaps, logger)
if err != nil {
return nil, err
}
func (f *FSMHelper) Close() {
f.store.Close()
if sFirstIdx+1 < firstIdx {
return nil, fmt.Errorf("missing logs after snapshot [%v,%v]", sFirstIdx+1, firstIdx-1)
} else if sFirstIdx > 0 {
firstIdx = sFirstIdx + 1
}
}
lastIdx = lastIndex(lastIdx, plastIdx)
for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
func (f *FSMHelper) ApplyNext() (index uint64, term uint64, err error) {
if f.nextIdx == 1 {
// check snapshots first
index, term, err := f.restoreFromSnapshot()
if err != nil {
return nil, fmt.Errorf("failed to read log entry at index %d: %v, firstIdx: %d, lastIdx: %d", i, err, firstIdx, lastIdx)
return 0, 0, err
}
if e.Type == raft.LogCommand {
fsm.Apply(&e)
if index != 0 {
f.nextIdx = index + 1
return index, term, nil
}
}
state := fsm.State()
if f.nextIdx < f.logFirstIdx {
return 0, 0, fmt.Errorf("missing logs [%v, %v]", f.nextIdx, f.logFirstIdx-1)
}
if f.nextIdx > f.logLastIdx {
return 0, 0, ErrNoMoreLogs
}
var e raft.Log
err = f.store.GetLog(f.nextIdx, &e)
if err != nil {
return 0, 0, fmt.Errorf("failed to read log entry at index %d: %v", f.nextIdx, err)
}
defer func() {
r := recover()
if r != nil && strings.HasPrefix(fmt.Sprint(r), "failed to apply request") {
// Enterprise specific log entries will fail to load in OSS repository with "failed to apply request."
// If not relevant to investigation, we can ignore them and simply worn.
f.logger.Warn("failed to apply log; loading Enterprise data-dir in OSS binary?", "index", e.Index)
f.nextIdx++
} else if r != nil {
panic(r)
}
}()
if e.Type == raft.LogCommand {
f.fsm.Apply(&e)
}
f.nextIdx++
return e.Index, e.Term, nil
}
// ApplyUntil applies all raft entries until (inclusive) the passed index.
func (f *FSMHelper) ApplyUntil(stopIdx uint64) (idx uint64, term uint64, err error) {
var lastIdx, lastTerm uint64
for {
idx, term, err := f.ApplyNext()
if err == ErrNoMoreLogs {
return lastIdx, lastTerm, nil
} else if err != nil {
return lastIdx, lastTerm, err
} else if idx >= stopIdx {
return lastIdx, lastTerm, nil
}
lastIdx, lastTerm = idx, term
}
}
func (f *FSMHelper) ApplyAll() (index uint64, term uint64, err error) {
var lastIdx, lastTerm uint64
for {
idx, term, err := f.ApplyNext()
if err == ErrNoMoreLogs {
return lastIdx, lastTerm, nil
} else if err != nil {
return lastIdx, lastTerm, err
}
lastIdx, lastTerm = idx, term
}
}
func (f *FSMHelper) State() *state.StateStore {
return f.fsm.State()
}
func (f *FSMHelper) StateAsMap() map[string][]interface{} {
return StateAsMap(f.fsm.State())
}
// StateAsMap returns a json-able representation of the state
func StateAsMap(state *state.StateStore) map[string][]interface{} {
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil)),
@ -95,52 +208,35 @@ func FSMState(p string, plastIdx int64) (interface{}, error) {
insertEnterpriseState(result, state)
return result, nil
return result
}
func restoreFromSnapshot(fsm raft.FSM, snaps raft.SnapshotStore, logger hclog.Logger) (uint64, error) {
logger = logger.Named("restoreFromSnapshot")
snapshots, err := snaps.List()
func (f *FSMHelper) restoreFromSnapshot() (index uint64, term uint64, err error) {
snapshots, err := f.snaps.List()
if err != nil {
return 0, err
return 0, 0, err
}
logger.Debug("found snapshots", "count", len(snapshots))
f.logger.Debug("found snapshots", "count", len(snapshots))
for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
_, source, err := f.snaps.Open(snapshot.ID)
if err != nil {
logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
f.logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}
err = fsm.Restore(source)
err = f.fsm.Restore(source)
source.Close()
if err != nil {
logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
f.logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}
return snapshot.Index, nil
return snapshot.Index, snapshot.Term, nil
}
return 0, nil
}
func lastIndex(raftLastIdx uint64, cliLastIdx int64) uint64 {
switch {
case cliLastIdx < 0:
if raftLastIdx > uint64(-cliLastIdx) {
return raftLastIdx - uint64(-cliLastIdx)
} else {
return 0
}
case cliLastIdx == 0:
return raftLastIdx
case uint64(cliLastIdx) < raftLastIdx:
return uint64(cliLastIdx)
default:
return raftLastIdx
}
return 0, 0, nil
}
func toArray(iter memdb.ResultIterator, err error) []interface{} {

View File

@ -0,0 +1,106 @@
package raftutil
import (
"testing"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
// TestSampleInvariant illustrates how to find offending log entry for an invariant
func TestSampleInvariant(t *testing.T) {
t.Skip("not a real test")
path := "/tmp/nomad-datadir/server/raft"
ns := "default"
parentID := "myjob"
fsm, err := NewFSM(path)
require.NoError(t, err)
state := fsm.State()
for {
idx, _, err := fsm.ApplyNext()
if err == ErrNoMoreLogs {
break
}
require.NoError(t, err)
// Test invariant for each entry
// For example, test job summary numbers against running jobs
summary, err := state.JobSummaryByID(nil, ns, parentID)
require.NoError(t, err)
if summary == nil {
// job hasn't been created yet
continue
}
summaryCount := summary.Children.Running + summary.Children.Pending + summary.Children.Dead
jobCountByParent := 0
iter, err := state.Jobs(nil)
require.NoError(t, err)
for {
rawJob := iter.Next()
if rawJob == nil {
break
}
job := rawJob.(*structs.Job)
if job.Namespace == ns && job.ParentID == parentID {
jobCountByParent++
}
}
require.Equalf(t, summaryCount, jobCountByParent, "job summary at idx=%v", idx)
}
// any post-assertion follow
}
// TestSchedulerLogic illustrates how to test how to test the scheduler
// logic for handling an eval
func TestSchedulerLogic(t *testing.T) {
t.Skip("not a real test")
path := "/tmp/nomad-datadir/server/raft"
ns := "default"
jobID := "myjob"
testIdx := uint64(3234)
fsm, err := NewFSM(path)
require.NoError(t, err)
_, _, err = fsm.ApplyUntil(testIdx)
require.NoError(t, err)
state := fsm.State()
job, err := state.JobByID(nil, ns, jobID)
require.NoError(t, err)
// Create an eval and schedule it!
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: ns,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
// Process the evaluation
h := scheduler.NewHarnessWithState(t, state)
err = h.Process(scheduler.NewServiceScheduler, eval)
require.NoError(t, err)
require.Len(t, h.Plans, 1)
pretty.Println(h.Plans[0])
}

View File

@ -0,0 +1,46 @@
package raftutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/raft"
)
func RestoreFromArchive(archive io.Reader) (*state.StateStore, *raft.SnapshotMeta, error) {
logger := hclog.L()
fsm, err := dummyFSM(logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to create FSM: %w", err)
}
snap, err := ioutil.TempFile("", "snap-")
if err != nil {
return nil, nil, fmt.Errorf("failed to create a temp file: %w", err)
}
defer os.Remove(snap.Name())
defer snap.Close()
meta, err := snapshot.CopySnapshot(archive, snap)
if err != nil {
return nil, nil, fmt.Errorf("failed to read snapshot: %w", err)
}
_, err = snap.Seek(0, 0)
if err != nil {
return nil, nil, fmt.Errorf("failed to seek: %w", err)
}
err = fsm.Restore(snap)
if err != nil {
return nil, nil, fmt.Errorf("failed to restore from snapshot: %w", err)
}
return fsm.State(), meta, nil
}

View File

@ -140,6 +140,11 @@ func (s *Snapshot) Close() error {
// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, ioutil.Discard)
}
// CopySnapshot copies the snapshot content from snapshot archive to dest
func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
@ -149,7 +154,7 @@ func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
// Read the archive, throwing away the snapshot data.
var metadata raft.SnapshotMeta
if err := read(decomp, &metadata, ioutil.Discard); err != nil {
if err := read(decomp, &metadata, dest); err != nil {
return nil, fmt.Errorf("failed to read snapshot file: %v", err)
}

View File

@ -48,6 +48,7 @@ var (
"operator raft _info",
"operator raft _logs",
"operator raft _state",
"operator snapshot _state",
}
// aliases is the list of aliases we want users to be aware of. We hide