open-nomad/helper/raftutil/fsm.go

163 lines
4.1 KiB
Go

package raftutil
import (
"fmt"
"os"
"path/filepath"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/raft"
)
// FSMState returns a dump of the FSM state as found in data-dir, as of lastIndx value
func FSMState(p string, plastIdx int64) (interface{}, error) {
store, firstIdx, lastIdx, err := RaftStateInfo(filepath.Join(p, "raft.db"))
if err != nil {
return nil, fmt.Errorf("failed to open raft database %v: %v", p, err)
}
defer store.Close()
snaps, err := raft.NewFileSnapshotStore(p, 1000, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to open snapshot dir: %v", err)
}
logger := hclog.L()
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
if err != nil {
return nil, err
}
fsmConfig := &nomad.FSMConfig{
EvalBroker: evalBroker,
Periodic: periodicDispatch,
Blocked: blockedEvals,
Logger: logger,
Region: "default",
}
fsm, err := nomad.NewFSM(fsmConfig)
if err != nil {
return nil, err
}
// restore from snapshot first
sFirstIdx, err := restoreFromSnapshot(fsm, snaps, logger)
if err != nil {
return nil, err
}
if sFirstIdx+1 < firstIdx {
return nil, fmt.Errorf("missing logs after snapshot [%v,%v]", sFirstIdx+1, firstIdx-1)
} else if sFirstIdx > 0 {
firstIdx = sFirstIdx + 1
}
lastIdx = lastIndex(lastIdx, plastIdx)
for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
if err != nil {
return nil, fmt.Errorf("failed to read log entry at index %d: %v, firstIdx: %d, lastIdx: %d", i, err, firstIdx, lastIdx)
}
if e.Type == raft.LogCommand {
fsm.Apply(&e)
}
}
state := fsm.State()
result := map[string][]interface{}{
"ACLPolicies": toArray(state.ACLPolicies(nil)),
"ACLTokens": toArray(state.ACLTokens(nil)),
"Allocs": toArray(state.Allocs(nil)),
"CSIPlugins": toArray(state.CSIPlugins(nil)),
"CSIVolumes": toArray(state.CSIVolumes(nil)),
"Deployments": toArray(state.Deployments(nil)),
"Evals": toArray(state.Evals(nil)),
"Indexes": toArray(state.Indexes()),
"JobSummaries": toArray(state.JobSummaries(nil)),
"JobVersions": toArray(state.JobVersions(nil)),
"Jobs": toArray(state.Jobs(nil)),
"Nodes": toArray(state.Nodes(nil)),
"PeriodicLaunches": toArray(state.PeriodicLaunches(nil)),
"SITokenAccessors": toArray(state.SITokenAccessors(nil)),
"ScalingEvents": toArray(state.ScalingEvents(nil)),
"ScalingPolicies": toArray(state.ScalingPolicies(nil)),
"VaultAccessors": toArray(state.VaultAccessors(nil)),
}
insertEnterpriseState(result, state)
return result, nil
}
func restoreFromSnapshot(fsm raft.FSM, snaps raft.SnapshotStore, logger hclog.Logger) (uint64, error) {
logger = logger.Named("restoreFromSnapshot")
snapshots, err := snaps.List()
if err != nil {
return 0, err
}
logger.Debug("found snapshots", "count", len(snapshots))
for _, snapshot := range snapshots {
_, source, err := snaps.Open(snapshot.ID)
if err != nil {
logger.Warn("failed to open a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}
err = fsm.Restore(source)
source.Close()
if err != nil {
logger.Warn("failed to restore a snapshot", "snapshot_id", snapshot.ID, "error", err)
continue
}
return snapshot.Index, nil
}
return 0, nil
}
func lastIndex(raftLastIdx uint64, cliLastIdx int64) uint64 {
switch {
case cliLastIdx < 0:
if raftLastIdx > uint64(-cliLastIdx) {
return raftLastIdx - uint64(-cliLastIdx)
} else {
return 0
}
case cliLastIdx == 0:
return raftLastIdx
case uint64(cliLastIdx) < raftLastIdx:
return uint64(cliLastIdx)
default:
return raftLastIdx
}
}
func toArray(iter memdb.ResultIterator, err error) []interface{} {
if err != nil {
return []interface{}{err}
}
r := []interface{}{}
if iter != nil {
item := iter.Next()
for item != nil {
r = append(r, item)
item = iter.Next()
}
}
return r
}