open-nomad/helper/raftutil/state.go
Michael Schurter 4949488217 cli: return error from raft commands if db is open
Before this change trying to run `nomad operator raft {info,logs}` on an
inuse raft.db would cause the command to block until the agent using
raft.db is closed.

After this change the command will block for 1s before returning a
(hopefully) helpful error message.

This change also sets the ReadOnly mode on the underlying BoltDb to
ensure diagnostics make no changes to the underlying store. We have no
evidence this has ever occurred, but it seems like a useful safety
measure.

No changelog added since this is a minor tweak in a "new" feature (it
was hidden in previous relases).
2021-12-16 11:41:01 -08:00

257 lines
6.3 KiB
Go

package raftutil
import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/boltdb/bolt"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
)
var (
errAlreadyOpen = errors.New("unable to open raft logs that are in use")
)
// RaftStateInfo returns info about the nomad state, as found in the passed data-dir directory
func RaftStateInfo(p string) (store *raftboltdb.BoltStore, firstIdx uint64, lastIdx uint64, err error) {
opts := raftboltdb.Options{
Path: p,
BoltOptions: &bolt.Options{
ReadOnly: true,
Timeout: 1 * time.Second,
},
}
s, err := raftboltdb.New(opts)
if err != nil {
if strings.HasSuffix(err.Error(), "timeout") {
return nil, 0, 0, errAlreadyOpen
}
return nil, 0, 0, fmt.Errorf("failed to open raft logs: %v", err)
}
firstIdx, err = s.FirstIndex()
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to fetch first index: %v", err)
}
lastIdx, err = s.LastIndex()
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to fetch last index: %v", err)
}
return s, firstIdx, lastIdx, nil
}
// LogEntries reads the raft logs found in the data directory found at
// the path `p`, and returns a channel of logs, and a channel of
// warnings. If opening the raft state returns an error, both channels
// will be nil.
func LogEntries(p string) (<-chan interface{}, <-chan error, error) {
store, firstIdx, lastIdx, err := RaftStateInfo(p)
if err != nil {
return nil, nil, fmt.Errorf("failed to open raft logs: %v", err)
}
entries := make(chan interface{})
warnings := make(chan error)
go func() {
defer store.Close()
defer close(entries)
for i := firstIdx; i <= lastIdx; i++ {
var e raft.Log
err := store.GetLog(i, &e)
if err != nil {
warnings <- fmt.Errorf(
"failed to read log entry at index %d (firstIdx: %d, lastIdx: %d): %v",
i, firstIdx, lastIdx, err)
continue
}
entry, err := decode(&e)
if err != nil {
warnings <- fmt.Errorf(
"failed to decode log entry at index %d: %v", i, err)
continue
}
entries <- entry
}
}()
return entries, warnings, nil
}
type logMessage struct {
LogType string
Term uint64
Index uint64
CommandType string `json:",omitempty"`
IgnoreUnknownTypeFlag bool `json:",omitempty"`
Body interface{} `json:",omitempty"`
}
func decode(e *raft.Log) (*logMessage, error) {
m := &logMessage{
LogType: logTypes[e.Type],
Term: e.Term,
Index: e.Index,
}
if m.LogType == "" {
m.LogType = fmt.Sprintf("%d", e.Type)
}
var data []byte
if e.Type == raft.LogCommand {
if len(e.Data) == 0 {
return nil, fmt.Errorf("command did not include data")
}
msgType := structs.MessageType(e.Data[0])
m.CommandType = commandName(msgType & ^structs.IgnoreUnknownTypeFlag)
m.IgnoreUnknownTypeFlag = (msgType & structs.IgnoreUnknownTypeFlag) != 0
data = e.Data[1:]
} else {
data = e.Data
}
if len(data) != 0 {
decoder := codec.NewDecoder(bytes.NewReader(data), structs.MsgpackHandle)
var v interface{}
var err error
if m.CommandType == commandName(structs.JobBatchDeregisterRequestType) {
var vr structs.JobBatchDeregisterRequest
err = decoder.Decode(&vr)
v = jsonifyJobBatchDeregisterRequest(&vr)
} else {
var vr interface{}
err = decoder.Decode(&vr)
v = vr
}
if err != nil {
fmt.Fprintf(os.Stderr, "failed to decode log entry at index %d: failed to decode body of %v.%v %v\n", e.Index, e.Type, m.CommandType, err)
v = "FAILED TO DECODE DATA"
}
fixTime(v)
m.Body = v
}
return m, nil
}
// jsonifyJobBatchDeregisterRequest special case JsonBatchDeregisterRequest object
// as the actual type is not json friendly.
func jsonifyJobBatchDeregisterRequest(v *structs.JobBatchDeregisterRequest) interface{} {
var data struct {
Jobs map[string]*structs.JobDeregisterOptions
Evals []*structs.Evaluation
structs.WriteRequest
}
data.Evals = v.Evals
data.WriteRequest = v.WriteRequest
data.Jobs = make(map[string]*structs.JobDeregisterOptions, len(v.Jobs))
if len(v.Jobs) != 0 {
for k, v := range v.Jobs {
data.Jobs[k.Namespace+"."+k.ID] = v
}
}
return data
}
var logTypes = map[raft.LogType]string{
raft.LogCommand: "LogCommand",
raft.LogNoop: "LogNoop",
raft.LogAddPeerDeprecated: "LogAddPeerDeprecated",
raft.LogRemovePeerDeprecated: "LogRemovePeerDeprecated",
raft.LogBarrier: "LogBarrier",
raft.LogConfiguration: "LogConfiguration",
}
func commandName(mt structs.MessageType) string {
n := msgTypeNames[mt]
if n != "" {
return n
}
return fmt.Sprintf("%v", mt)
}
// FindRaftFile finds raft.db and returns path
func FindRaftFile(p string) (raftpath string, err error) {
// Try known locations before traversal to avoid walking deep structure
if _, err = os.Stat(filepath.Join(p, "server", "raft", "raft.db")); err == nil {
raftpath = filepath.Join(p, "server", "raft", "raft.db")
} else if _, err = os.Stat(filepath.Join(p, "raft", "raft.db")); err == nil {
raftpath = filepath.Join(p, "raft", "raft.db")
} else if _, err = os.Stat(filepath.Join(p, "raft.db")); err == nil {
raftpath = filepath.Join(p, "raft.db")
} else if _, err = os.Stat(p); err == nil && filepath.Ext(p) == ".db" {
// Also accept path to .db file
raftpath = p
} else {
raftpath, err = FindFileInPath("raft.db", p)
}
if err != nil {
return "", err
}
return raftpath, nil
}
// FindRaftDir finds raft.db and returns parent directory path
func FindRaftDir(p string) (raftpath string, err error) {
raftpath, err = FindRaftFile(p)
if err != nil {
return "", err
}
if raftpath == "" {
return "", fmt.Errorf("failed to find raft dir in %s", p)
}
return filepath.Dir(raftpath), nil
}
// FindFileInPath searches for file in path p
func FindFileInPath(file string, p string) (path string, err error) {
// Define walk function to find file
walkFn := func(walkPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if filepath.Base(walkPath) == file {
path = walkPath
}
return nil
}
// Walk path p looking for file
walkErr := filepath.Walk(p, walkFn)
if walkErr != nil {
return "", walkErr
}
if path == "" {
return "", fmt.Errorf("File %s not found in path %s", file, p)
}
return path, nil
}