open-vault/physical/raft/fsm.go

876 lines
23 KiB
Go

package raft
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-raftchunking"
"github.com/hashicorp/raft"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/plugin/pb"
bolt "go.etcd.io/bbolt"
)
const (
deleteOp uint32 = 1 << iota
putOp
restoreCallbackOp
chunkingPrefix = "raftchunking/"
databaseFilename = "vault.db"
)
var (
// dataBucketName is the value we use for the bucket
dataBucketName = []byte("data")
configBucketName = []byte("config")
latestIndexKey = []byte("latest_indexes")
latestConfigKey = []byte("latest_config")
)
// Verify FSM satisfies the correct interfaces
var _ physical.Backend = (*FSM)(nil)
var _ physical.Transactional = (*FSM)(nil)
var _ raft.FSM = (*FSM)(nil)
var _ raft.BatchingFSM = (*FSM)(nil)
type restoreCallback func(context.Context) error
// FSMApplyResponse is returned from an FSM apply. It indicates if the apply was
// successful or not.
type FSMApplyResponse struct {
Success bool
}
// FSM is Vault's primary state storage. It writes updates to an bolt db file
// that lives on local disk. FSM implements raft.FSM and physical.Backend
// interfaces.
type FSM struct {
// latestIndex and latestTerm must stay at the top of this struct to be
// properly 64-bit aligned.
// latestIndex and latestTerm are the term and index of the last log we
// received
latestIndex *uint64
latestTerm *uint64
// latestConfig is the latest server configuration we've seen
latestConfig atomic.Value
l sync.RWMutex
path string
logger log.Logger
noopRestore bool
db *bolt.DB
// retoreCb is called after we've restored a snapshot
restoreCb restoreCallback
chunker *raftchunking.ChunkingBatchingFSM
}
// NewFSM constructs a FSM using the given directory
func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
path, ok := conf["path"]
if !ok {
return nil, fmt.Errorf("'path' must be set")
}
// Initialize the latest term, index, and config values
latestTerm := new(uint64)
latestIndex := new(uint64)
latestConfig := atomic.Value{}
atomic.StoreUint64(latestTerm, 0)
atomic.StoreUint64(latestIndex, 0)
latestConfig.Store((*ConfigurationValue)(nil))
f := &FSM{
path: conf["path"],
logger: logger,
latestTerm: latestTerm,
latestIndex: latestIndex,
latestConfig: latestConfig,
}
f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
f: f,
ctx: context.Background(),
})
dbPath := filepath.Join(path, databaseFilename)
if err := f.openDBFile(dbPath); err != nil {
return nil, errwrap.Wrapf("failed to open bolt file: {{err}}", err)
}
return f, nil
}
func (f *FSM) openDBFile(dbPath string) error {
if len(dbPath) == 0 {
return errors.New("can not open empty filename")
}
boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
err = boltDB.Update(func(tx *bolt.Tx) error {
// make sure we have the necessary buckets created
_, err := tx.CreateBucketIfNotExists(dataBucketName)
if err != nil {
return fmt.Errorf("failed to create bucket: %v", err)
}
b, err := tx.CreateBucketIfNotExists(configBucketName)
if err != nil {
return fmt.Errorf("failed to create bucket: %v", err)
}
// Read in our latest index and term and populate it inmemory
val := b.Get(latestIndexKey)
if val != nil {
var latest IndexValue
err := proto.Unmarshal(val, &latest)
if err != nil {
return err
}
atomic.StoreUint64(f.latestTerm, latest.Term)
atomic.StoreUint64(f.latestIndex, latest.Index)
}
// Read in our latest config and populate it inmemory
val = b.Get(latestConfigKey)
if val != nil {
var latest ConfigurationValue
err := proto.Unmarshal(val, &latest)
if err != nil {
return err
}
f.latestConfig.Store(&latest)
}
return nil
})
if err != nil {
return err
}
f.db = boltDB
return nil
}
func (f *FSM) Close() error {
f.l.RLock()
defer f.l.RUnlock()
return f.db.Close()
}
func writeSnapshotMetaToDB(metadata *raft.SnapshotMeta, db *bolt.DB) error {
latestIndex := &IndexValue{
Term: metadata.Term,
Index: metadata.Index,
}
indexBytes, err := proto.Marshal(latestIndex)
if err != nil {
return err
}
protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
return err
}
err = db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(configBucketName)
if err != nil {
return err
}
err = b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
err := writeSnapshotMetaToDB(metadata, f.db)
if err != nil {
return err
}
atomic.StoreUint64(f.latestIndex, metadata.Index)
atomic.StoreUint64(f.latestTerm, metadata.Term)
f.latestConfig.Store(raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration))
return nil
}
// LatestState returns the latest index and configuration values we have seen on
// this FSM.
func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
return &IndexValue{
Term: atomic.LoadUint64(f.latestTerm),
Index: atomic.LoadUint64(f.latestIndex),
}, f.latestConfig.Load().(*ConfigurationValue)
}
// Delete deletes the given key from the bolt file.
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now())
f.l.RLock()
defer f.l.RUnlock()
return f.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(dataBucketName).Delete([]byte(path))
})
}
// Delete deletes the given key from the bolt file.
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now())
f.l.RLock()
defer f.l.RUnlock()
err := f.db.Update(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket(dataBucketName).Cursor()
prefixBytes := []byte(prefix)
for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
if err := c.Delete(); err != nil {
return err
}
}
return nil
})
return err
}
// Get retrieves the value at the given path from the bolt file.
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
// TODO: Remove this outdated metric name in an older release
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now())
f.l.RLock()
defer f.l.RUnlock()
var valCopy []byte
var found bool
err := f.db.View(func(tx *bolt.Tx) error {
value := tx.Bucket(dataBucketName).Get([]byte(path))
if value != nil {
found = true
valCopy = make([]byte, len(value))
copy(valCopy, value)
}
return nil
})
if err != nil {
return nil, err
}
if !found {
return nil, nil
}
return &physical.Entry{
Key: path,
Value: valCopy,
}, nil
}
// Put writes the given entry to the bolt file.
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now())
f.l.RLock()
defer f.l.RUnlock()
// Start a write transaction.
return f.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value)
})
}
// List retrieves the set of keys with the given prefix from the bolt file.
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
// TODO: Remove this outdated metric name in a future release
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())
f.l.RLock()
defer f.l.RUnlock()
var keys []string
err := f.db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket(dataBucketName).Cursor()
prefixBytes := []byte(prefix)
for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
key := string(k)
key = strings.TrimPrefix(key, prefix)
if i := strings.Index(key, "/"); i == -1 {
// Add objects only from the current 'folder'
keys = append(keys, key)
} else {
// Add truncated 'folder' paths
keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
}
}
return nil
})
return keys, err
}
// Transaction writes all the operations in the provided transaction to the bolt
// file.
func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
f.l.RLock()
defer f.l.RUnlock()
// Start a write transaction.
err := f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, txn := range txns {
var err error
switch txn.Operation {
case physical.PutOperation:
err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value)
case physical.DeleteOperation:
err = b.Delete([]byte(txn.Entry.Key))
default:
return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
}
if err != nil {
return err
}
}
return nil
})
return err
}
// ApplyBatch will apply a set of logs to the FSM. This is called from the raft
// library.
func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
if len(logs) == 0 {
return []interface{}{}
}
// Do the unmarshalling first so we don't hold locks
var latestConfiguration *ConfigurationValue
commands := make([]interface{}, 0, len(logs))
for _, log := range logs {
switch log.Type {
case raft.LogCommand:
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
commands = append(commands, command)
case raft.LogConfiguration:
configuration := raft.DecodeConfiguration(log.Data)
config := raftConfigurationToProtoConfiguration(log.Index, configuration)
commands = append(commands, config)
// Update the latest configuration the fsm has received; we will
// store this after it has been committed to storage.
latestConfiguration = config
default:
panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
}
}
// Only advance latest pointer if this log has a higher index value than
// what we have seen in the past.
var logIndex []byte
var err error
latestIndex, _ := f.LatestState()
lastLog := logs[len(logs)-1]
if latestIndex.Index < lastLog.Index {
logIndex, err = proto.Marshal(&IndexValue{
Term: lastLog.Term,
Index: lastLog.Index,
})
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic("unable to marshal latest index")
}
}
f.l.RLock()
defer f.l.RUnlock()
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, commandRaw := range commands {
switch command := commandRaw.(type) {
case *LogData:
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}
case *ConfigurationValue:
b := tx.Bucket(configBucketName)
configBytes, err := proto.Marshal(command)
if err != nil {
return err
}
if err := b.Put(latestConfigKey, configBytes); err != nil {
return err
}
}
}
if len(logIndex) > 0 {
b := tx.Bucket(configBucketName)
err = b.Put(latestIndexKey, logIndex)
if err != nil {
return err
}
}
return nil
})
if err != nil {
f.logger.Error("failed to store data", "error", err)
panic("failed to store data")
}
// If we advanced the latest value, update the in-memory representation too.
if len(logIndex) > 0 {
atomic.StoreUint64(f.latestTerm, lastLog.Term)
atomic.StoreUint64(f.latestIndex, lastLog.Index)
}
// If one or more configuration changes were processed, store the latest one.
if latestConfiguration != nil {
f.latestConfig.Store(latestConfiguration)
}
// Build the responses. The logs array is used here to ensure we reply to
// all command values; even if they are not of the types we expect. This
// should future proof this function from more log types being provided.
resp := make([]interface{}, len(logs))
for i := range logs {
resp[i] = &FSMApplyResponse{
Success: true,
}
}
return resp
}
// Apply will apply a log value to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
return f.ApplyBatch([]*raft.Log{log})[0]
}
type writeErrorCloser interface {
io.WriteCloser
CloseWithError(error) error
}
// writeTo will copy the FSM's content to a remote sink. The data is written
// twice, once for use in determining various metadata attributes of the dataset
// (size, checksum, etc) and a second for the sink of the data. We also use a
// proto delimited writer so we can stream proto messages to the sink.
func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now())
protoWriter := NewDelimitedWriter(sink)
metadataProtoWriter := NewDelimitedWriter(metaSink)
f.l.RLock()
defer f.l.RUnlock()
err := f.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
c := b.Cursor()
// Do the first scan of the data for metadata purposes.
for k, v := c.First(); k != nil; k, v = c.Next() {
err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{
Key: string(k),
Value: v,
})
if err != nil {
metaSink.CloseWithError(err)
return err
}
}
metaSink.Close()
// Do the second scan for copy purposes.
for k, v := c.First(); k != nil; k, v = c.Next() {
err := protoWriter.WriteMsg(&pb.StorageEntry{
Key: string(k),
Value: v,
})
if err != nil {
return err
}
}
return nil
})
sink.CloseWithError(err)
}
// Snapshot implements the FSM interface. It returns a noop snapshot object.
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
return &noopSnapshotter{
fsm: f,
}, nil
}
// SetNoopRestore is used to disable restore operations on raft startup. Because
// we are using persistent storage in our FSM we do not need to issue a restore
// on startup.
func (f *FSM) SetNoopRestore(enabled bool) {
f.l.Lock()
f.noopRestore = enabled
f.l.Unlock()
}
// Restore installs a new snapshot from the provided reader. It does an atomic
// rename of the snapshot file into the database filepath. While a restore is
// happening the FSM is locked and no writes or reads can be performed.
func (f *FSM) Restore(r io.ReadCloser) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now())
if f.noopRestore == true {
return nil
}
snapshotInstaller, ok := r.(*boltSnapshotInstaller)
if !ok {
return errors.New("expected snapshot installer object")
}
f.l.Lock()
defer f.l.Unlock()
// Close the db file
if err := f.db.Close(); err != nil {
f.logger.Error("failed to close database file", "error", err)
return err
}
dbPath := filepath.Join(f.path, databaseFilename)
f.logger.Info("installing snapshot to FSM")
// Install the new boltdb file
var retErr *multierror.Error
if err := snapshotInstaller.Install(dbPath); err != nil {
f.logger.Error("failed to install snapshot", "error", err)
retErr = multierror.Append(retErr, errwrap.Wrapf("failed to install snapshot database: {{err}}", err))
} else {
f.logger.Info("snapshot installed")
}
// Open the db file. We want to do this regardless of if the above install
// worked. If the install failed we should try to open the old DB file.
if err := f.openDBFile(dbPath); err != nil {
f.logger.Error("failed to open new database file", "error", err)
retErr = multierror.Append(retErr, errwrap.Wrapf("failed to open new bolt file: {{err}}", err))
}
return retErr.ErrorOrNil()
}
// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
// since our SnapshotStore reads data out of the FSM on Open().
type noopSnapshotter struct {
fsm *FSM
}
// Persist implements the fsm.Snapshot interface. It doesn't need to persist any
// state data, but it does persist the raft metadata. This is necessary so we
// can be sure to capture indexes for operation types that are not sent to the
// FSM.
func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
boltSnapshotSink := sink.(*BoltSnapshotSink)
// We are processing a snapshot, fastforward the index, term, and
// configuration to the latest seen by the raft system.
if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil {
return err
}
return nil
}
// Release doesn't do anything.
func (s *noopSnapshotter) Release() {}
// raftConfigurationToProtoConfiguration converts a raft configuration object to
// a proto value.
func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
servers := make([]*Server, len(configuration.Servers))
for i, s := range configuration.Servers {
servers[i] = &Server{
Suffrage: int32(s.Suffrage),
Id: string(s.ID),
Address: string(s.Address),
}
}
return &ConfigurationValue{
Index: index,
Servers: servers,
}
}
// protoConfigurationToRaftConfiguration converts a proto configuration object
// to a raft object.
func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) {
servers := make([]raft.Server, len(configuration.Servers))
for i, s := range configuration.Servers {
servers[i] = raft.Server{
Suffrage: raft.ServerSuffrage(s.Suffrage),
ID: raft.ServerID(s.Id),
Address: raft.ServerAddress(s.Address),
}
}
return configuration.Index, raft.Configuration{
Servers: servers,
}
}
type FSMChunkStorage struct {
f *FSM
ctx context.Context
}
// chunkPaths returns a disk prefix and key given chunkinfo
func (f *FSMChunkStorage) chunkPaths(chunk *raftchunking.ChunkInfo) (string, string) {
prefix := fmt.Sprintf("%s%d/", chunkingPrefix, chunk.OpNum)
key := fmt.Sprintf("%s%d", prefix, chunk.SequenceNum)
return prefix, key
}
func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error) {
b, err := jsonutil.EncodeJSON(chunk)
if err != nil {
return false, errwrap.Wrapf("error encoding chunk info: {{err}}", err)
}
prefix, key := f.chunkPaths(chunk)
entry := &physical.Entry{
Key: key,
Value: b,
}
f.f.l.RLock()
defer f.f.l.RUnlock()
// Start a write transaction.
done := new(bool)
if err := f.f.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value); err != nil {
return errwrap.Wrapf("error storing chunk info: {{err}}", err)
}
// Assume bucket exists and has keys
c := tx.Bucket(dataBucketName).Cursor()
var keys []string
prefixBytes := []byte(prefix)
for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
key := string(k)
key = strings.TrimPrefix(key, prefix)
if i := strings.Index(key, "/"); i == -1 {
// Add objects only from the current 'folder'
keys = append(keys, key)
} else {
// Add truncated 'folder' paths
keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
}
}
*done = uint32(len(keys)) == chunk.NumChunks
return nil
}); err != nil {
return false, err
}
return *done, nil
}
func (f *FSMChunkStorage) FinalizeOp(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
ret, err := f.chunksForOpNum(opNum)
if err != nil {
return nil, errwrap.Wrapf("error getting chunks for op keys: {{err}}", err)
}
prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
if err := f.f.DeletePrefix(f.ctx, prefix); err != nil {
return nil, errwrap.Wrapf("error deleting prefix after op finalization: {{err}}", err)
}
return ret, nil
}
func (f *FSMChunkStorage) chunksForOpNum(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
opChunkKeys, err := f.f.List(f.ctx, prefix)
if err != nil {
return nil, errwrap.Wrapf("error fetching op chunk keys: {{err}}", err)
}
if len(opChunkKeys) == 0 {
return nil, nil
}
var ret []*raftchunking.ChunkInfo
for _, v := range opChunkKeys {
seqNum, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return nil, errwrap.Wrapf("error converting seqnum to integer: {{err}}", err)
}
entry, err := f.f.Get(f.ctx, prefix+v)
if err != nil {
return nil, errwrap.Wrapf("error fetching chunkinfo: {{err}}", err)
}
var ci raftchunking.ChunkInfo
if err := jsonutil.DecodeJSON(entry.Value, &ci); err != nil {
return nil, errwrap.Wrapf("error decoding chunkinfo json: {{err}}", err)
}
if ret == nil {
ret = make([]*raftchunking.ChunkInfo, ci.NumChunks)
}
ret[seqNum] = &ci
}
return ret, nil
}
func (f *FSMChunkStorage) GetChunks() (raftchunking.ChunkMap, error) {
opNums, err := f.f.List(f.ctx, chunkingPrefix)
if err != nil {
return nil, errwrap.Wrapf("error doing recursive list for chunk saving: {{err}}", err)
}
if len(opNums) == 0 {
return nil, nil
}
ret := make(raftchunking.ChunkMap, len(opNums))
for _, opNumStr := range opNums {
opNum, err := strconv.ParseInt(opNumStr, 10, 64)
if err != nil {
return nil, errwrap.Wrapf("error parsing op num during chunk saving: {{err}}", err)
}
opChunks, err := f.chunksForOpNum(uint64(opNum))
if err != nil {
return nil, errwrap.Wrapf("error getting chunks for op keys during chunk saving: {{err}}", err)
}
ret[uint64(opNum)] = opChunks
}
return ret, nil
}
func (f *FSMChunkStorage) RestoreChunks(chunks raftchunking.ChunkMap) error {
if err := f.f.DeletePrefix(f.ctx, chunkingPrefix); err != nil {
return errwrap.Wrapf("error deleting prefix for chunk restoration: {{err}}", err)
}
if len(chunks) == 0 {
return nil
}
for opNum, opChunks := range chunks {
for _, chunk := range opChunks {
if chunk == nil {
continue
}
if chunk.OpNum != opNum {
return errors.New("unexpected op number in chunk")
}
if _, err := f.StoreChunk(chunk); err != nil {
return errwrap.Wrapf("error storing chunk during restoration: {{err}}", err)
}
}
}
return nil
}