open-vault/physical/raft/fsm.go
Brian Kassouf ed14061578
Raft Storage Backend (#6888)
* Work on raft backend

* Add logstore locally

* Add encryptor and unsealable interfaces

* Add clustering support to raft

* Remove client and handler

* Bootstrap raft on init

* Cleanup raft logic a bit

* More raft work

* Work on TLS config

* More work on bootstrapping

* Fix build

* More work on bootstrapping

* More bootstrapping work

* fix build

* Remove consul dep

* Fix build

* merged oss/master into raft-storage

* Work on bootstrapping

* Get bootstrapping to work

* Clean up FMS and node-id

* Update local node ID logic

* Cleanup node-id change

* Work on snapshotting

* Raft: Add remove peer API (#906)

* Add remove peer API

* Add some comments

* Fix existing snapshotting (#909)

* Raft get peers API (#912)

* Read raft configuration

* address review feedback

* Use the Leadership Transfer API to step-down the active node (#918)

* Raft join and unseal using Shamir keys (#917)

* Raft join using shamir

* Store AEAD instead of master key

* Split the raft join process to answer the challenge after a successful unseal

* get the follower to standby state

* Make unseal work

* minor changes

* Some input checks

* reuse the shamir seal access instead of new default seal access

* refactor joinRaftSendAnswer function

* Synchronously send answer in auto-unseal case

* Address review feedback

* Raft snapshots (#910)

* Fix existing snapshotting

* implement the noop snapshotting

* Add comments and switch log libraries

* add some snapshot tests

* add snapshot test file

* add TODO

* More work on raft snapshotting

* progress on the ConfigStore strategy

* Don't use two buckets

* Update the snapshot store logic to hide the file logic

* Add more backend tests

* Cleanup code a bit

* [WIP] Raft recovery (#938)

* Add recovery functionality

* remove fmt.Printfs

* Fix a few fsm bugs

* Add max size value for raft backend (#942)

* Add max size value for raft backend

* Include physical.ErrValueTooLarge in the message

* Raft snapshot Take/Restore API  (#926)

* Inital work on raft snapshot APIs

* Always redirect snapshot install/download requests

* More work on the snapshot APIs

* Cleanup code a bit

* On restore handle special cases

* Use the seal to encrypt the sha sum file

* Add sealer mechanism and fix some bugs

* Call restore while state lock is held

* Send restore cb trigger through raft log

* Make error messages nicer

* Add test helpers

* Add snapshot test

* Add shamir unseal test

* Add more raft snapshot API tests

* Fix locking

* Change working to initalize

* Add underlying raw object to test cluster core

* Move leaderUUID to core

* Add raft TLS rotation logic (#950)

* Add TLS rotation logic

* Cleanup logic a bit

* Add/Remove from follower state on add/remove peer

* add comments

* Update more comments

* Update request_forwarding_service.proto

* Make sure we populate all nodes in the followerstate obj

* Update times

* Apply review feedback

* Add more raft config setting (#947)

* Add performance config setting

* Add more config options and fix tests

* Test Raft Recovery (#944)

* Test raft recovery

* Leave out a node during recovery

* remove unused struct

* Update physical/raft/snapshot_test.go

* Update physical/raft/snapshot_test.go

* fix vendoring

* Switch to new raft interface

* Remove unused files

* Switch a gogo -> proto instance

* Remove unneeded vault dep in go.sum

* Update helper/testhelpers/testhelpers.go

Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com>

* Update vault/cluster/cluster.go

* track active key within the keyring itself (#6915)

* track active key within the keyring itself

* lookup and store using the active key ID

* update docstring

* minor refactor

* Small text fixes (#6912)

* Update physical/raft/raft.go

Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com>

* review feedback

* Move raft logical system into separate file

* Update help text a bit

* Enforce cluster addr is set and use it for raft bootstrapping

* Fix tests

* fix http test panic

* Pull in latest raft-snapshot library

* Add comment
2019-06-20 12:14:58 -07:00

647 lines
16 KiB
Go

package raft
import (
"bytes"
"context"
"fmt"
"io"
"math"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
protoio "github.com/gogo/protobuf/io"
proto "github.com/golang/protobuf/proto"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/plugin/pb"
bolt "go.etcd.io/bbolt"
)
const (
deleteOp uint32 = 1 << iota
putOp
restoreCallbackOp
)
var (
// dataBucketName is the value we use for the bucket
dataBucketName = []byte("data")
configBucketName = []byte("config")
latestIndexKey = []byte("latest_indexes")
latestConfigKey = []byte("latest_config")
)
// Verify FSM satisfies the correct interfaces
var _ physical.Backend = (*FSM)(nil)
var _ physical.Transactional = (*FSM)(nil)
var _ raft.FSM = (*FSM)(nil)
var _ raft.ConfigurationStore = (*FSM)(nil)
type restoreCallback func() error
// FSMApplyResponse is returned from an FSM apply. It indicates if the apply was
// successful or not.
type FSMApplyResponse struct {
Success bool
}
// FSM is Vault's primary state storage. It writes updates to an bolt db file
// that lives on local disk. FSM implements raft.FSM and physical.Backend
// interfaces.
type FSM struct {
l sync.RWMutex
path string
logger log.Logger
permitPool *physical.PermitPool
noopRestore bool
db *bolt.DB
// retoreCb is called after we've restored a snapshot
restoreCb restoreCallback
// latestIndex and latestTerm are the term and index of the last log we
// received
latestIndex *uint64
latestTerm *uint64
// latestConfig is the latest server configuration we've seen
latestConfig atomic.Value
// This is just used in tests to disable to storing the latest indexes and
// configs so we can conform to the standard backend tests, which expect to
// additional state in the backend.
storeLatestState bool
}
// NewFSM constructs a FSM using the given directory
func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
path, ok := conf["path"]
if !ok {
return nil, fmt.Errorf("'path' must be set")
}
dbPath := filepath.Join(path, "vault.db")
boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return nil, err
}
// Initialize the latest term, index, and config values
latestTerm := new(uint64)
latestIndex := new(uint64)
latestConfig := atomic.Value{}
atomic.StoreUint64(latestTerm, 0)
atomic.StoreUint64(latestIndex, 0)
latestConfig.Store((*ConfigurationValue)(nil))
err = boltDB.Update(func(tx *bolt.Tx) error {
// make sure we have the necessary buckets created
_, err := tx.CreateBucketIfNotExists(dataBucketName)
if err != nil {
return fmt.Errorf("failed to create bucket: %v", err)
}
b, err := tx.CreateBucketIfNotExists(configBucketName)
if err != nil {
return fmt.Errorf("failed to create bucket: %v", err)
}
// Read in our latest index and term and populate it inmemory
val := b.Get(latestIndexKey)
if val != nil {
var latest IndexValue
err := proto.Unmarshal(val, &latest)
if err != nil {
return err
}
atomic.StoreUint64(latestTerm, latest.Term)
atomic.StoreUint64(latestIndex, latest.Index)
}
// Read in our latest config and populate it inmemory
val = b.Get(latestConfigKey)
if val != nil {
var latest ConfigurationValue
err := proto.Unmarshal(val, &latest)
if err != nil {
return err
}
latestConfig.Store(&latest)
}
return nil
})
if err != nil {
return nil, err
}
storeLatestState := true
if _, ok := conf["doNotStoreLatestState"]; ok {
storeLatestState = false
}
return &FSM{
path: path,
logger: logger,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
db: boltDB,
latestTerm: latestTerm,
latestIndex: latestIndex,
latestConfig: latestConfig,
storeLatestState: storeLatestState,
}, nil
}
// LatestState returns the latest index and configuration values we have seen on
// this FSM.
func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
return &IndexValue{
Term: atomic.LoadUint64(f.latestTerm),
Index: atomic.LoadUint64(f.latestIndex),
}, f.latestConfig.Load().(*ConfigurationValue)
}
func (f *FSM) witnessIndex(i *IndexValue) {
seen, _ := f.LatestState()
if seen.Index < i.Index {
atomic.StoreUint64(f.latestIndex, i.Index)
atomic.StoreUint64(f.latestTerm, i.Term)
}
}
func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error {
var indexBytes []byte
latestIndex, _ := f.LatestState()
latestIndex.Index = index
latestIndex.Term = term
var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
return err
}
protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
return err
}
if f.storeLatestState {
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(configBucketName)
err := b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
}
atomic.StoreUint64(f.latestIndex, index)
atomic.StoreUint64(f.latestTerm, term)
f.latestConfig.Store(protoConfig)
return nil
}
// Delete deletes the given key from the bolt file.
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock()
defer f.l.RUnlock()
return f.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(dataBucketName).Delete([]byte(path))
})
}
// Get retrieves the value at the given path from the bolt file.
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock()
defer f.l.RUnlock()
var valCopy []byte
var found bool
err := f.db.View(func(tx *bolt.Tx) error {
value := tx.Bucket(dataBucketName).Get([]byte(path))
if value != nil {
found = true
valCopy = make([]byte, len(value))
copy(valCopy, value)
}
return nil
})
if err != nil {
return nil, err
}
if !found {
return nil, nil
}
return &physical.Entry{
Key: path,
Value: valCopy,
}, nil
}
// Put writes the given entry to the bolt file.
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock()
defer f.l.RUnlock()
// Start a write transaction.
return f.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value)
})
}
// List retrieves the set of keys with the given prefix from the bolt file.
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock()
defer f.l.RUnlock()
var keys []string
err := f.db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket(dataBucketName).Cursor()
prefixBytes := []byte(prefix)
for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
key := string(k)
key = strings.TrimPrefix(key, prefix)
if i := strings.Index(key, "/"); i == -1 {
// Add objects only from the current 'folder'
keys = append(keys, key)
} else if i != -1 {
// Add truncated 'folder' paths
keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
}
}
return nil
})
return keys, err
}
// Transaction writes all the operations in the provided transaction to the bolt
// file.
func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock()
defer f.l.RUnlock()
// TODO: should this be a Batch?
// Start a write transaction.
err := f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, txn := range txns {
var err error
switch txn.Operation {
case physical.PutOperation:
err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value)
case physical.DeleteOperation:
err = b.Delete([]byte(txn.Entry.Key))
default:
return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
}
if err != nil {
return err
}
}
return nil
})
return err
}
// Apply will apply a log value to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
panic("error proto unmarshaling log data")
}
f.l.RLock()
defer f.l.RUnlock()
// Only advance latest pointer if this log has a higher index value than
// what we have seen in the past.
var logIndex []byte
latestIndex, _ := f.LatestState()
if latestIndex.Index < log.Index {
logIndex, err = proto.Marshal(&IndexValue{
Term: log.Term,
Index: log.Index,
})
if err != nil {
panic("failed to store data")
}
}
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb()
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}
// TODO: benchmark so we can know how much time this adds
if f.storeLatestState && len(logIndex) > 0 {
b := tx.Bucket(configBucketName)
err = b.Put(latestIndexKey, logIndex)
if err != nil {
return err
}
}
return nil
})
if err != nil {
panic("failed to store data")
}
// If we advanced the latest value, update the in-memory representation too.
if len(logIndex) > 0 {
atomic.StoreUint64(f.latestTerm, log.Term)
atomic.StoreUint64(f.latestIndex, log.Index)
}
return &FSMApplyResponse{
Success: true,
}
}
type writeErrorCloser interface {
io.WriteCloser
CloseWithError(error) error
}
// writeTo will copy the FSM's content to a remote sink. The data is written
// twice, once for use in determining various metadata attributes of the dataset
// (size, checksum, etc) and a second for the sink of the data. We also use a
// proto delimited writer so we can stream proto messages to the sink.
func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
protoWriter := protoio.NewDelimitedWriter(sink)
metadataProtoWriter := protoio.NewDelimitedWriter(metaSink)
f.l.RLock()
defer f.l.RUnlock()
err := f.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
c := b.Cursor()
// Do the first scan of the data for metadata purposes.
for k, v := c.First(); k != nil; k, v = c.Next() {
err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{
Key: string(k),
Value: v,
})
if err != nil {
metaSink.CloseWithError(err)
return err
}
}
metaSink.Close()
// Do the second scan for copy purposes.
for k, v := c.First(); k != nil; k, v = c.Next() {
err := protoWriter.WriteMsg(&pb.StorageEntry{
Key: string(k),
Value: v,
})
if err != nil {
return err
}
}
return nil
})
sink.CloseWithError(err)
}
// Snapshot implements the FSM interface. It returns a noop snapshot object.
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
return &noopSnapshotter{}, nil
}
// SetNoopRestore is used to disable restore operations on raft startup. Because
// we are using persistent storage in our FSM we do not need to issue a restore
// on startup.
func (f *FSM) SetNoopRestore(enabled bool) {
f.l.Lock()
f.noopRestore = enabled
f.l.Unlock()
}
// Restore reads data from the provided reader and writes it into the FSM. It
// first deletes the existing bucket to clear all existing data, then recreates
// it so we can copy in the snapshot.
func (f *FSM) Restore(r io.ReadCloser) error {
if f.noopRestore == true {
return nil
}
protoReader := protoio.NewDelimitedReader(r, math.MaxInt64)
defer protoReader.Close()
f.l.Lock()
defer f.l.Unlock()
// Start a write transaction.
err := f.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(dataBucketName)
if err != nil {
return err
}
b, err := tx.CreateBucket(dataBucketName)
if err != nil {
return err
}
for {
s := new(pb.StorageEntry)
err := protoReader.ReadMsg(s)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
err = b.Put([]byte(s.Key), s.Value)
if err != nil {
return err
}
}
return nil
})
if err != nil {
f.logger.Error("could not restore snapshot", "error", err)
return err
}
return nil
}
// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
// since our SnapshotStore reads data out of the FSM on Open().
type noopSnapshotter struct{}
// Persist doesn't do anything.
func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
return nil
}
// Release doesn't do anything.
func (s *noopSnapshotter) Release() {}
// StoreConfig satisfies the raft.ConfigurationStore interface and persists the
// latest raft server configuration to the bolt file.
func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) {
f.l.RLock()
defer f.l.RUnlock()
var indexBytes []byte
latestIndex, _ := f.LatestState()
// Only write the new index if we are advancing the pointer
if index > latestIndex.Index {
latestIndex.Index = index
var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
panic(fmt.Sprintf("unable to marshal latest index: %v", err))
}
}
protoConfig := raftConfigurationToProtoConfiguration(index, configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
panic(fmt.Sprintf("unable to marshal config: %v", err))
}
if f.storeLatestState {
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(configBucketName)
err := b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}
// TODO: benchmark so we can know how much time this adds
if len(indexBytes) > 0 {
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
}
return nil
})
if err != nil {
panic(fmt.Sprintf("unable to store latest configuration: %v", err))
}
}
f.witnessIndex(latestIndex)
f.latestConfig.Store(protoConfig)
}
// raftConfigurationToProtoConfiguration converts a raft configuration object to
// a proto value.
func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
servers := make([]*Server, len(configuration.Servers))
for i, s := range configuration.Servers {
servers[i] = &Server{
Suffrage: int32(s.Suffrage),
Id: string(s.ID),
Address: string(s.Address),
}
}
return &ConfigurationValue{
Index: index,
Servers: servers,
}
}
// protoConfigurationToRaftConfiguration converts a proto configuration object
// to a raft object.
func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) {
servers := make([]raft.Server, len(configuration.Servers))
for i, s := range configuration.Servers {
servers[i] = raft.Server{
Suffrage: raft.ServerSuffrage(s.Suffrage),
ID: raft.ServerID(s.Id),
Address: raft.ServerAddress(s.Address),
}
}
return configuration.Index, raft.Configuration{
Servers: servers,
}
}