Improve the performance of snapshot installs by using rename (#9247)
* initial work on improving snapshot performance * Work on snapshots * rename a few functions * Cleanup the snapshot file * vendor the safeio library * Add a test * Add more tests * Some review comments * Fix comment * Update physical/raft/snapshot.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Update physical/raft/snapshot.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Review feedback Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
parent
4a5bef48c4
commit
09593283b8
1
go.mod
1
go.mod
|
@ -127,6 +127,7 @@ require (
|
|||
github.com/pquerna/otp v1.2.1-0.20191009055518-468c2dd2b58d
|
||||
github.com/prometheus/client_golang v1.4.0
|
||||
github.com/prometheus/common v0.9.1
|
||||
github.com/rboyer/safeio v0.2.1
|
||||
github.com/ryanuber/columnize v2.1.0+incompatible
|
||||
github.com/ryanuber/go-glob v1.0.0
|
||||
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
|
||||
|
|
3
go.sum
3
go.sum
|
@ -763,7 +763,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
|
|||
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
|
||||
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
|
||||
github.com/rboyer/safeio v0.2.1 h1:05xhhdRNAdS3apYm7JRjOqngf4xruaW959jmRxGDuSU=
|
||||
github.com/rboyer/safeio v0.2.1/go.mod h1:Cq/cEPK+YXFn622lsQ0K4KsPZSPtaptHHEldsy7Fmig=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -18,6 +17,7 @@ import (
|
|||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/errwrap"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-raftchunking"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
|
@ -32,7 +32,8 @@ const (
|
|||
putOp
|
||||
restoreCallbackOp
|
||||
|
||||
chunkingPrefix = "raftchunking/"
|
||||
chunkingPrefix = "raftchunking/"
|
||||
databaseFilename = "vault.db"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -81,16 +82,7 @@ type FSM struct {
|
|||
// retoreCb is called after we've restored a snapshot
|
||||
restoreCb restoreCallback
|
||||
|
||||
// This is just used in tests to disable to storing the latest indexes and
|
||||
// configs so we can conform to the standard backend tests, which expect to
|
||||
// additional state in the backend.
|
||||
storeLatestState bool
|
||||
|
||||
chunker *raftchunking.ChunkingBatchingFSM
|
||||
|
||||
// testSnapshotRestoreError is used in tests to simulate an error while
|
||||
// restoring a snapshot.
|
||||
testSnapshotRestoreError bool
|
||||
}
|
||||
|
||||
// NewFSM constructs a FSM using the given directory
|
||||
|
@ -100,13 +92,6 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
|
|||
return nil, fmt.Errorf("'path' must be set")
|
||||
}
|
||||
|
||||
dbPath := filepath.Join(path, "vault.db")
|
||||
|
||||
boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the latest term, index, and config values
|
||||
latestTerm := new(uint64)
|
||||
latestIndex := new(uint64)
|
||||
|
@ -115,6 +100,38 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
|
|||
atomic.StoreUint64(latestIndex, 0)
|
||||
latestConfig.Store((*ConfigurationValue)(nil))
|
||||
|
||||
f := &FSM{
|
||||
path: conf["path"],
|
||||
logger: logger,
|
||||
|
||||
latestTerm: latestTerm,
|
||||
latestIndex: latestIndex,
|
||||
latestConfig: latestConfig,
|
||||
}
|
||||
|
||||
f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
|
||||
f: f,
|
||||
ctx: context.Background(),
|
||||
})
|
||||
|
||||
dbPath := filepath.Join(path, databaseFilename)
|
||||
if err := f.openDBFile(dbPath); err != nil {
|
||||
return nil, errwrap.Wrapf("failed to open bolt file: {{err}}", err)
|
||||
}
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (f *FSM) openDBFile(dbPath string) error {
|
||||
if len(dbPath) == 0 {
|
||||
return errors.New("can not open empty filename")
|
||||
}
|
||||
|
||||
boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = boltDB.Update(func(tx *bolt.Tx) error {
|
||||
// make sure we have the necessary buckets created
|
||||
_, err := tx.CreateBucketIfNotExists(dataBucketName)
|
||||
|
@ -125,6 +142,7 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to create bucket: %v", err)
|
||||
}
|
||||
|
||||
// Read in our latest index and term and populate it inmemory
|
||||
val := b.Get(latestIndexKey)
|
||||
if val != nil {
|
||||
|
@ -134,8 +152,8 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
|
|||
return err
|
||||
}
|
||||
|
||||
atomic.StoreUint64(latestTerm, latest.Term)
|
||||
atomic.StoreUint64(latestIndex, latest.Index)
|
||||
atomic.StoreUint64(f.latestTerm, latest.Term)
|
||||
atomic.StoreUint64(f.latestIndex, latest.Index)
|
||||
}
|
||||
|
||||
// Read in our latest config and populate it inmemory
|
||||
|
@ -147,64 +165,31 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
|
|||
return err
|
||||
}
|
||||
|
||||
latestConfig.Store(&latest)
|
||||
f.latestConfig.Store(&latest)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
storeLatestState := true
|
||||
if _, ok := conf["doNotStoreLatestState"]; ok {
|
||||
storeLatestState = false
|
||||
}
|
||||
|
||||
f := &FSM{
|
||||
path: conf["path"],
|
||||
logger: logger,
|
||||
|
||||
db: boltDB,
|
||||
latestTerm: latestTerm,
|
||||
latestIndex: latestIndex,
|
||||
latestConfig: latestConfig,
|
||||
storeLatestState: storeLatestState,
|
||||
}
|
||||
|
||||
f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
|
||||
f: f,
|
||||
ctx: context.Background(),
|
||||
})
|
||||
|
||||
return f, nil
|
||||
f.db = boltDB
|
||||
return nil
|
||||
}
|
||||
|
||||
// LatestState returns the latest index and configuration values we have seen on
|
||||
// this FSM.
|
||||
func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
|
||||
return &IndexValue{
|
||||
Term: atomic.LoadUint64(f.latestTerm),
|
||||
Index: atomic.LoadUint64(f.latestIndex),
|
||||
}, f.latestConfig.Load().(*ConfigurationValue)
|
||||
func (f *FSM) Close() error {
|
||||
f.l.RLock()
|
||||
defer f.l.RUnlock()
|
||||
|
||||
return f.db.Close()
|
||||
}
|
||||
|
||||
func (f *FSM) witnessIndex(i *IndexValue) {
|
||||
seen, _ := f.LatestState()
|
||||
if seen.Index < i.Index {
|
||||
atomic.StoreUint64(f.latestIndex, i.Index)
|
||||
atomic.StoreUint64(f.latestTerm, i.Term)
|
||||
func writeSnapshotMetaToDB(metadata *raft.SnapshotMeta, db *bolt.DB) error {
|
||||
latestIndex := &IndexValue{
|
||||
Term: metadata.Term,
|
||||
Index: metadata.Index,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
|
||||
var indexBytes []byte
|
||||
latestIndex, _ := f.LatestState()
|
||||
|
||||
latestIndex.Index = metadata.Index
|
||||
latestIndex.Term = metadata.Term
|
||||
|
||||
var err error
|
||||
indexBytes, err = proto.Marshal(latestIndex)
|
||||
indexBytes, err := proto.Marshal(latestIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -215,33 +200,53 @@ func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if f.storeLatestState {
|
||||
err = f.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(configBucketName)
|
||||
err := b.Put(latestConfigKey, configBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put(latestIndexKey, indexBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(configBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put(latestConfigKey, configBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put(latestIndexKey, indexBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
|
||||
err := writeSnapshotMetaToDB(metadata, f.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
atomic.StoreUint64(f.latestIndex, metadata.Index)
|
||||
atomic.StoreUint64(f.latestTerm, metadata.Term)
|
||||
f.latestConfig.Store(protoConfig)
|
||||
f.latestConfig.Store(raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LatestState returns the latest index and configuration values we have seen on
|
||||
// this FSM.
|
||||
func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
|
||||
return &IndexValue{
|
||||
Term: atomic.LoadUint64(f.latestTerm),
|
||||
Index: atomic.LoadUint64(f.latestIndex),
|
||||
}, f.latestConfig.Load().(*ConfigurationValue)
|
||||
}
|
||||
|
||||
// Delete deletes the given key from the bolt file.
|
||||
func (f *FSM) Delete(ctx context.Context, path string) error {
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now())
|
||||
|
@ -367,7 +372,6 @@ func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error
|
|||
f.l.RLock()
|
||||
defer f.l.RUnlock()
|
||||
|
||||
// TODO: should this be a Batch?
|
||||
// Start a write transaction.
|
||||
err := f.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(dataBucketName)
|
||||
|
@ -483,7 +487,7 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
if f.storeLatestState && len(logIndex) > 0 {
|
||||
if len(logIndex) > 0 {
|
||||
b := tx.Bucket(configBucketName)
|
||||
err = b.Put(latestIndexKey, logIndex)
|
||||
if err != nil {
|
||||
|
@ -596,9 +600,9 @@ func (f *FSM) SetNoopRestore(enabled bool) {
|
|||
f.l.Unlock()
|
||||
}
|
||||
|
||||
// Restore reads data from the provided reader and writes it into the FSM. It
|
||||
// first deletes the existing bucket to clear all existing data, then recreates
|
||||
// it so we can copy in the snapshot.
|
||||
// Restore installs a new snapshot from the provided reader. It does an atomic
|
||||
// rename of the snapshot file into the database filepath. While a restore is
|
||||
// happening the FSM is locked and no writes or reads can be performed.
|
||||
func (f *FSM) Restore(r io.ReadCloser) error {
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now())
|
||||
|
||||
|
@ -606,89 +610,41 @@ func (f *FSM) Restore(r io.ReadCloser) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
snapMeta := r.(*boltSnapshotMetadataReader).Metadata()
|
||||
|
||||
protoReader := NewDelimitedReader(r, math.MaxInt32)
|
||||
defer protoReader.Close()
|
||||
snapshotInstaller, ok := r.(*boltSnapshotInstaller)
|
||||
if !ok {
|
||||
return errors.New("expected snapshot installer object")
|
||||
}
|
||||
|
||||
f.l.Lock()
|
||||
defer f.l.Unlock()
|
||||
|
||||
// Delete the existing data bucket and create a new one.
|
||||
f.logger.Debug("snapshot restore: deleting bucket")
|
||||
err := f.db.Update(func(tx *bolt.Tx) error {
|
||||
err := tx.DeleteBucket(dataBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.CreateBucket(dataBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
f.logger.Error("could not restore snapshot: could not clear existing bucket", "error", err)
|
||||
// Close the db file
|
||||
if err := f.db.Close(); err != nil {
|
||||
f.logger.Error("failed to close database file", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// If we are testing a failed snapshot error here.
|
||||
if f.testSnapshotRestoreError {
|
||||
return errors.New("Test error")
|
||||
dbPath := filepath.Join(f.path, databaseFilename)
|
||||
|
||||
f.logger.Info("installing snapshot to FSM")
|
||||
|
||||
// Install the new boltdb file
|
||||
var retErr *multierror.Error
|
||||
if err := snapshotInstaller.Install(dbPath); err != nil {
|
||||
f.logger.Error("failed to install snapshot", "error", err)
|
||||
retErr = multierror.Append(retErr, errwrap.Wrapf("failed to install snapshot database: {{err}}", err))
|
||||
} else {
|
||||
f.logger.Info("snapshot installed")
|
||||
}
|
||||
|
||||
f.logger.Debug("snapshot restore: deleting bucket done")
|
||||
f.logger.Debug("snapshot restore: writing keys")
|
||||
|
||||
var done bool
|
||||
var keys int
|
||||
for !done {
|
||||
err := f.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(dataBucketName)
|
||||
s := new(pb.StorageEntry)
|
||||
|
||||
// Commit in batches of 50k. Bolt holds all the data in memory and
|
||||
// doesn't split the pages until commit so we do incremental writes.
|
||||
// This is safe since we have a write lock on the fsm's lock.
|
||||
for i := 0; i < 50000; i++ {
|
||||
err := protoReader.ReadMsg(s)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
done = true
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put([]byte(s.Key), s.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
keys += 1
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
f.logger.Error("could not restore snapshot", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
f.logger.Trace("snapshot restore: writing keys", "num_written", keys)
|
||||
// Open the db file. We want to do this regardless of if the above install
|
||||
// worked. If the install failed we should try to open the old DB file.
|
||||
if err := f.openDBFile(dbPath); err != nil {
|
||||
f.logger.Error("failed to open new database file", "error", err)
|
||||
retErr = multierror.Append(retErr, errwrap.Wrapf("failed to open new bolt file: {{err}}", err))
|
||||
}
|
||||
|
||||
f.logger.Debug("snapshot restore: writing keys done")
|
||||
|
||||
// Write the metadata after we have applied all the snapshot data
|
||||
f.logger.Debug("snapshot restore: writing metadata")
|
||||
if err := f.witnessSnapshot(snapMeta); err != nil {
|
||||
f.logger.Error("could not write metadata", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return retErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
|
||||
|
|
|
@ -47,9 +47,8 @@ var (
|
|||
// This is used to reduce disk I/O for the recently committed entries.
|
||||
raftLogCacheSize = 512
|
||||
|
||||
raftState = "raft/"
|
||||
peersFileName = "peers.json"
|
||||
snapshotsRetained = 2
|
||||
raftState = "raft/"
|
||||
peersFileName = "peers.json"
|
||||
|
||||
restoreOpDelayDuration = 5 * time.Second
|
||||
|
||||
|
@ -262,7 +261,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
|||
log = cacheStore
|
||||
|
||||
// Create the snapshot store.
|
||||
snapshots, err := NewBoltSnapshotStore(path, snapshotsRetained, logger.Named("snapshot"), fsm)
|
||||
snapshots, err := NewBoltSnapshotStore(path, logger.Named("snapshot"), fsm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
|
@ -144,18 +145,21 @@ func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error {
|
|||
return fmt.Errorf("configs did not match: %+v != %+v", config1, config2)
|
||||
}
|
||||
|
||||
return compareDBs(t, fsm1.db, fsm2.db)
|
||||
return compareDBs(t, fsm1.db, fsm2.db, false)
|
||||
}
|
||||
|
||||
func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error {
|
||||
func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB, dataOnly bool) error {
|
||||
t.Helper()
|
||||
db1 := make(map[string]string)
|
||||
db2 := make(map[string]string)
|
||||
|
||||
err := boltDB1.View(func(tx *bolt.Tx) error {
|
||||
|
||||
c := tx.Cursor()
|
||||
for bucketName, _ := c.First(); bucketName != nil; bucketName, _ = c.Next() {
|
||||
if dataOnly && !bytes.Equal(bucketName, dataBucketName) {
|
||||
continue
|
||||
}
|
||||
|
||||
b := tx.Bucket(bucketName)
|
||||
|
||||
cBucket := b.Cursor()
|
||||
|
@ -175,6 +179,9 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error {
|
|||
err = boltDB2.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Cursor()
|
||||
for bucketName, _ := c.First(); bucketName != nil; bucketName, _ = c.Next() {
|
||||
if dataOnly && !bytes.Equal(bucketName, dataBucketName) {
|
||||
continue
|
||||
}
|
||||
b := tx.Bucket(bucketName)
|
||||
|
||||
c := b.Cursor()
|
||||
|
|
|
@ -6,9 +6,19 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/sdk/plugin/pb"
|
||||
"github.com/rboyer/safeio"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
@ -17,30 +27,32 @@ const (
|
|||
// boltSnapshotID is the stable ID for any boltDB snapshot. Keeping the ID
|
||||
// stable means there is only ever one bolt snapshot in the system
|
||||
boltSnapshotID = "bolt-snapshot"
|
||||
tmpSuffix = ".tmp"
|
||||
snapPath = "snapshots"
|
||||
)
|
||||
|
||||
// BoltSnapshotStore implements the SnapshotStore interface and allows
|
||||
// snapshots to be made on the local disk. The main difference between this
|
||||
// store and the file store is we make the distinction between snapshots that
|
||||
// have been written by the FSM and by internal Raft operations. The former are
|
||||
// treated as noop snapshots on Persist and are read in full from the FSM on
|
||||
// Open. The latter are treated like normal file snapshots and are able to be
|
||||
// opened and applied as usual.
|
||||
// BoltSnapshotStore implements the SnapshotStore interface and allows snapshots
|
||||
// to be stored in BoltDB files on local disk. Since we always have an up to
|
||||
// date FSM we use a special snapshot ID to indicate that the snapshot can be
|
||||
// pulled from the BoltDB file that is currently backing the FSM. This allows us
|
||||
// to provide just-in-time snapshots without doing incremental data dumps.
|
||||
//
|
||||
// When a snapshot is being installed on the node we will Create and Write data
|
||||
// to it. This will cause the snapshot store to create a new BoltDB file and
|
||||
// write the snapshot data to it. Then, we can simply rename the snapshot to the
|
||||
// FSM's filename. This allows us to atomically install the snapshot and
|
||||
// reduces the amount of disk i/o. Older snapshots are reaped on startup and
|
||||
// before each subsequent snapshot write. This ensures we only have one snapshot
|
||||
// on disk at a time.
|
||||
type BoltSnapshotStore struct {
|
||||
// path is the directory in which to store file based snapshots
|
||||
path string
|
||||
// retain is the number of file based snapshots to keep
|
||||
retain int
|
||||
|
||||
// We hold a copy of the FSM so we can stream snapshots straight out of the
|
||||
// database.
|
||||
fsm *FSM
|
||||
|
||||
// fileSnapStore is used to fall back to file snapshots when the data is
|
||||
// being written from the raft library. This currently only happens on a
|
||||
// follower during a snapshot install RPC.
|
||||
fileSnapStore *raft.FileSnapshotStore
|
||||
logger log.Logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// BoltSnapshotSink implements SnapshotSink optionally choosing to write to a
|
||||
|
@ -51,54 +63,49 @@ type BoltSnapshotSink struct {
|
|||
meta raft.SnapshotMeta
|
||||
trans raft.Transport
|
||||
|
||||
fileSink raft.SnapshotSink
|
||||
l sync.Mutex
|
||||
closed bool
|
||||
// These fields will be used if we are writing a snapshot (vs. reading
|
||||
// one)
|
||||
written atomic.Bool
|
||||
writer io.WriteCloser
|
||||
writeError error
|
||||
dir string
|
||||
parentDir string
|
||||
doneWritingCh chan struct{}
|
||||
|
||||
l sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewBoltSnapshotStore creates a new BoltSnapshotStore based
|
||||
// on a base directory. The `retain` parameter controls how many
|
||||
// snapshots are retained. Must be at least 1.
|
||||
func NewBoltSnapshotStore(base string, retain int, logger log.Logger, fsm *FSM) (*BoltSnapshotStore, error) {
|
||||
if retain < 1 {
|
||||
return nil, fmt.Errorf("must retain at least one snapshot")
|
||||
}
|
||||
// on a base directory.
|
||||
func NewBoltSnapshotStore(base string, logger log.Logger, fsm *FSM) (*BoltSnapshotStore, error) {
|
||||
if logger == nil {
|
||||
return nil, fmt.Errorf("no logger provided")
|
||||
}
|
||||
|
||||
fileStore, err := raft.NewFileSnapshotStore(base, retain, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Ensure our path exists
|
||||
path := filepath.Join(base, snapPath)
|
||||
if err := os.MkdirAll(path, 0755); err != nil && !os.IsExist(err) {
|
||||
return nil, fmt.Errorf("snapshot path not accessible: %v", err)
|
||||
}
|
||||
|
||||
// Setup the store
|
||||
store := &BoltSnapshotStore{
|
||||
logger: logger,
|
||||
fsm: fsm,
|
||||
fileSnapStore: fileStore,
|
||||
logger: logger,
|
||||
fsm: fsm,
|
||||
path: path,
|
||||
}
|
||||
|
||||
{
|
||||
// TODO: I think this needs to be done before every NewRaft and
|
||||
// RecoverCluster call. Not just on Factory method.
|
||||
|
||||
// Here we delete all the existing file based snapshots. This is necessary
|
||||
// because we do not issue a restore on NewRaft. If a previous file snapshot
|
||||
// had failed to apply we will be incorrectly setting the indexes. It's
|
||||
// safer to simply delete all file snapshots on startup and rely on Raft to
|
||||
// reconcile the FSM state.
|
||||
if err := store.ReapSnapshots(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Cleanup any old or failed snapshots on startup.
|
||||
if err := store.ReapSnapshots(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// Create is used to start a new snapshot
|
||||
func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uint64,
|
||||
configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
|
||||
func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
|
||||
// We only support version 1 snapshots at this time.
|
||||
if version != 1 {
|
||||
return nil, fmt.Errorf("unsupported snapshot version %d", version)
|
||||
|
@ -119,7 +126,6 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin
|
|||
trans: trans,
|
||||
}
|
||||
|
||||
// Done
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
|
@ -127,7 +133,7 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin
|
|||
// snapshots. No snapshot will be returned if there are no indexes in the
|
||||
// FSM.
|
||||
func (f *BoltSnapshotStore) List() ([]*raft.SnapshotMeta, error) {
|
||||
meta, err := f.getBoltSnapshotMeta()
|
||||
meta, err := f.getMetaFromFSM()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -141,7 +147,7 @@ func (f *BoltSnapshotStore) List() ([]*raft.SnapshotMeta, error) {
|
|||
}
|
||||
|
||||
// getBoltSnapshotMeta returns the fsm's latest state and configuration.
|
||||
func (f *BoltSnapshotStore) getBoltSnapshotMeta() (*raft.SnapshotMeta, error) {
|
||||
func (f *BoltSnapshotStore) getMetaFromFSM() (*raft.SnapshotMeta, error) {
|
||||
latestIndex, latestConfig := f.fsm.LatestState()
|
||||
meta := &raft.SnapshotMeta{
|
||||
Version: 1,
|
||||
|
@ -151,9 +157,7 @@ func (f *BoltSnapshotStore) getBoltSnapshotMeta() (*raft.SnapshotMeta, error) {
|
|||
}
|
||||
|
||||
if latestConfig != nil {
|
||||
index, configuration := protoConfigurationToRaftConfiguration(latestConfig)
|
||||
meta.Configuration = configuration
|
||||
meta.ConfigurationIndex = index
|
||||
meta.ConfigurationIndex, meta.Configuration = protoConfigurationToRaftConfiguration(latestConfig)
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
|
@ -161,59 +165,146 @@ func (f *BoltSnapshotStore) getBoltSnapshotMeta() (*raft.SnapshotMeta, error) {
|
|||
|
||||
// Open takes a snapshot ID and returns a ReadCloser for that snapshot.
|
||||
func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
|
||||
var readCloser io.ReadCloser
|
||||
var meta *raft.SnapshotMeta
|
||||
switch id {
|
||||
case boltSnapshotID:
|
||||
|
||||
var err error
|
||||
meta, err = f.getBoltSnapshotMeta()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// If we don't have any data return an error
|
||||
if meta.Index == 0 {
|
||||
return nil, nil, errors.New("no snapshot data")
|
||||
}
|
||||
|
||||
// Stream data out of the FSM to calculate the size
|
||||
var writeCloser *io.PipeWriter
|
||||
readCloser, writeCloser = io.Pipe()
|
||||
metaReadCloser, metaWriteCloser := io.Pipe()
|
||||
go func() {
|
||||
f.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser)
|
||||
}()
|
||||
|
||||
// Compute the size
|
||||
n, err := io.Copy(ioutil.Discard, metaReadCloser)
|
||||
if err != nil {
|
||||
f.logger.Error("failed to read state file", "error", err)
|
||||
metaReadCloser.Close()
|
||||
readCloser.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
meta.Size = n
|
||||
|
||||
default:
|
||||
var err error
|
||||
meta, readCloser, err = f.fileSnapStore.Open(id)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
readCloser = &boltSnapshotMetadataReader{
|
||||
meta: meta,
|
||||
ReadCloser: readCloser,
|
||||
}
|
||||
if id == boltSnapshotID {
|
||||
return f.openFromFSM()
|
||||
}
|
||||
|
||||
return f.openFromFile(id)
|
||||
}
|
||||
|
||||
func (f *BoltSnapshotStore) openFromFSM() (*raft.SnapshotMeta, io.ReadCloser, error) {
|
||||
meta, err := f.getMetaFromFSM()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// If we don't have any data return an error
|
||||
if meta.Index == 0 {
|
||||
return nil, nil, errors.New("no snapshot data")
|
||||
}
|
||||
|
||||
// Stream data out of the FSM to calculate the size
|
||||
readCloser, writeCloser := io.Pipe()
|
||||
metaReadCloser, metaWriteCloser := io.Pipe()
|
||||
go func() {
|
||||
f.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser)
|
||||
}()
|
||||
|
||||
// Compute the size
|
||||
n, err := io.Copy(ioutil.Discard, metaReadCloser)
|
||||
if err != nil {
|
||||
f.logger.Error("failed to read state file", "error", err)
|
||||
metaReadCloser.Close()
|
||||
readCloser.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
meta.Size = n
|
||||
metaReadCloser.Close()
|
||||
|
||||
return meta, readCloser, nil
|
||||
}
|
||||
|
||||
// ReapSnapshots reaps any snapshots beyond the retain count.
|
||||
func (f *BoltSnapshotStore) getMetaFromDB(id string) (*raft.SnapshotMeta, error) {
|
||||
if len(id) == 0 {
|
||||
return nil, errors.New("can not open empty snapshot ID")
|
||||
}
|
||||
|
||||
filename := filepath.Join(f.path, id, databaseFilename)
|
||||
boltDB, err := bolt.Open(filename, 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer boltDB.Close()
|
||||
|
||||
meta := &raft.SnapshotMeta{
|
||||
Version: 1,
|
||||
ID: id,
|
||||
}
|
||||
|
||||
err = boltDB.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(configBucketName)
|
||||
val := b.Get(latestIndexKey)
|
||||
if val != nil {
|
||||
var snapshotIndexes IndexValue
|
||||
err := proto.Unmarshal(val, &snapshotIndexes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
meta.Index = snapshotIndexes.Index
|
||||
meta.Term = snapshotIndexes.Term
|
||||
}
|
||||
|
||||
// Read in our latest config and populate it inmemory
|
||||
val = b.Get(latestConfigKey)
|
||||
if val != nil {
|
||||
var config ConfigurationValue
|
||||
err := proto.Unmarshal(val, &config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
meta.ConfigurationIndex, meta.Configuration = protoConfigurationToRaftConfiguration(&config)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func (f *BoltSnapshotStore) openFromFile(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
|
||||
meta, err := f.getMetaFromDB(id)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
filename := filepath.Join(f.path, id, databaseFilename)
|
||||
installer := &boltSnapshotInstaller{
|
||||
meta: meta,
|
||||
ReadCloser: ioutil.NopCloser(strings.NewReader(filename)),
|
||||
filename: filename,
|
||||
}
|
||||
|
||||
return meta, installer, nil
|
||||
}
|
||||
|
||||
// ReapSnapshots reaps all snapshots.
|
||||
func (f *BoltSnapshotStore) ReapSnapshots() error {
|
||||
return f.fileSnapStore.ReapSnapshots()
|
||||
snapshots, err := ioutil.ReadDir(f.path)
|
||||
switch {
|
||||
case err == nil:
|
||||
case os.IsNotExist(err):
|
||||
return nil
|
||||
default:
|
||||
f.logger.Error("failed to scan snapshot directory", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, snap := range snapshots {
|
||||
// Ignore any files
|
||||
if !snap.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Warn about temporary snapshots, this indicates a previously failed
|
||||
// snapshot attempt. We still want to clean these up.
|
||||
dirName := snap.Name()
|
||||
if strings.HasSuffix(dirName, tmpSuffix) {
|
||||
f.logger.Warn("found temporary snapshot", "name", dirName)
|
||||
}
|
||||
|
||||
path := filepath.Join(f.path, dirName)
|
||||
f.logger.Info("reaping snapshot", "path", path)
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
f.logger.Error("failed to reap snapshot", "path", snap.Name(), "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ID returns the ID of the snapshot, can be used with Open()
|
||||
|
@ -222,31 +313,123 @@ func (s *BoltSnapshotSink) ID() string {
|
|||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
if s.fileSink != nil {
|
||||
return s.fileSink.ID()
|
||||
}
|
||||
|
||||
return s.meta.ID
|
||||
}
|
||||
|
||||
// Write is used to append to the state file. We write to the
|
||||
// buffered IO object to reduce the amount of context switches.
|
||||
func (s *BoltSnapshotSink) writeBoltDBFile() error {
|
||||
// Create a new path
|
||||
name := snapshotName(s.meta.Term, s.meta.Index)
|
||||
path := filepath.Join(s.store.path, name+tmpSuffix)
|
||||
s.logger.Info("creating new snapshot", "path", path)
|
||||
|
||||
// Make the directory
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
s.logger.Error("failed to make snapshot directory", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the BoltDB file
|
||||
dbPath := filepath.Join(path, databaseFilename)
|
||||
boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the snapshot metadata
|
||||
if err := writeSnapshotMetaToDB(&s.meta, boltDB); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the snapshot ID to the generated name.
|
||||
s.meta.ID = name
|
||||
|
||||
// Create the done channel
|
||||
s.doneWritingCh = make(chan struct{})
|
||||
|
||||
// Store the directories so we can commit the changes on success or abort
|
||||
// them on failure.
|
||||
s.dir = path
|
||||
s.parentDir = s.store.path
|
||||
|
||||
// Create a pipe so we pipe writes into the go routine below.
|
||||
reader, writer := io.Pipe()
|
||||
s.writer = writer
|
||||
|
||||
// Start a go routine in charge of piping data from the snapshot's Write
|
||||
// call to the delimtedreader and the BoltDB file.
|
||||
go func() {
|
||||
defer close(s.doneWritingCh)
|
||||
defer boltDB.Close()
|
||||
|
||||
// The delimted reader will parse full proto messages from the snapshot
|
||||
// data.
|
||||
protoReader := NewDelimitedReader(reader, math.MaxInt32)
|
||||
defer protoReader.Close()
|
||||
|
||||
var done bool
|
||||
var keys int
|
||||
entry := new(pb.StorageEntry)
|
||||
for !done {
|
||||
err := boltDB.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(dataBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit in batches of 50k. Bolt holds all the data in memory and
|
||||
// doesn't split the pages until commit so we do incremental writes.
|
||||
for i := 0; i < 50000; i++ {
|
||||
err := protoReader.ReadMsg(entry)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
done = true
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put([]byte(entry.Key), entry.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
keys += 1
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.logger.Error("snapshot write: failed to write transaction", "error", err)
|
||||
s.writeError = err
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Trace("snapshot write: writing keys", "num_written", keys)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write is used to append to the bolt file. The first call to write ensures we
|
||||
// have the file created.
|
||||
func (s *BoltSnapshotSink) Write(b []byte) (int, error) {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// If someone is writing to this sink then we need to create a file sink to
|
||||
// capture the data. This currently only happens when a follower is being
|
||||
// sent a snapshot.
|
||||
if s.fileSink == nil {
|
||||
fileSink, err := s.store.fileSnapStore.Create(s.meta.Version, s.meta.Index, s.meta.Term, s.meta.Configuration, s.meta.ConfigurationIndex, s.trans)
|
||||
if err != nil {
|
||||
// If this is the first call to Write we need to setup the boltDB file and
|
||||
// kickoff the pipeline write
|
||||
if previouslyWritten := s.written.Swap(true); !previouslyWritten {
|
||||
// Reap any old snapshots
|
||||
if err := s.store.ReapSnapshots(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err := s.writeBoltDBFile(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s.fileSink = fileSink
|
||||
}
|
||||
|
||||
return s.fileSink.Write(b)
|
||||
return s.writer.Write(b)
|
||||
}
|
||||
|
||||
// Close is used to indicate a successful end.
|
||||
|
@ -260,8 +443,23 @@ func (s *BoltSnapshotSink) Close() error {
|
|||
}
|
||||
s.closed = true
|
||||
|
||||
if s.fileSink != nil {
|
||||
return s.fileSink.Close()
|
||||
if s.writer != nil {
|
||||
s.writer.Close()
|
||||
<-s.doneWritingCh
|
||||
|
||||
if s.writeError != nil {
|
||||
// If we encountered an error while writing then we should remove
|
||||
// the directory and return the error
|
||||
_ = os.RemoveAll(s.dir)
|
||||
return s.writeError
|
||||
}
|
||||
|
||||
// Move the directory into place
|
||||
newPath := strings.TrimSuffix(s.dir, tmpSuffix)
|
||||
if err := safeio.Rename(s.dir, newPath); err != nil {
|
||||
s.logger.Error("failed to move snapshot into place", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -278,18 +476,47 @@ func (s *BoltSnapshotSink) Cancel() error {
|
|||
}
|
||||
s.closed = true
|
||||
|
||||
if s.fileSink != nil {
|
||||
return s.fileSink.Cancel()
|
||||
if s.writer != nil {
|
||||
s.writer.Close()
|
||||
<-s.doneWritingCh
|
||||
|
||||
// Attempt to remove all artifacts
|
||||
return os.RemoveAll(s.dir)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type boltSnapshotMetadataReader struct {
|
||||
type boltSnapshotInstaller struct {
|
||||
io.ReadCloser
|
||||
meta *raft.SnapshotMeta
|
||||
meta *raft.SnapshotMeta
|
||||
filename string
|
||||
}
|
||||
|
||||
func (r *boltSnapshotMetadataReader) Metadata() *raft.SnapshotMeta {
|
||||
return r.meta
|
||||
func (i *boltSnapshotInstaller) Filename() string {
|
||||
return i.filename
|
||||
}
|
||||
|
||||
func (i *boltSnapshotInstaller) Metadata() *raft.SnapshotMeta {
|
||||
return i.meta
|
||||
}
|
||||
|
||||
func (i *boltSnapshotInstaller) Install(filename string) error {
|
||||
if len(i.filename) == 0 {
|
||||
return errors.New("snapshot filename empty")
|
||||
}
|
||||
|
||||
if len(filename) == 0 {
|
||||
return errors.New("fsm filename empty")
|
||||
}
|
||||
|
||||
// Rename the snapshot to the FSM location
|
||||
return safeio.Rename(i.filename, filename)
|
||||
}
|
||||
|
||||
// snapshotName generates a name for the snapshot.
|
||||
func snapshotName(term, index uint64) string {
|
||||
now := time.Now()
|
||||
msec := now.UnixNano() / int64(time.Millisecond)
|
||||
return fmt.Sprintf("%d-%d-%d", term, index, msec)
|
||||
}
|
||||
|
|
|
@ -9,12 +9,17 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"github.com/hashicorp/vault/sdk/plugin/pb"
|
||||
)
|
||||
|
||||
type idAddr struct {
|
||||
|
@ -253,7 +258,7 @@ func TestRaft_Snapshot_Peers(t *testing.T) {
|
|||
time.Sleep(10 * time.Second)
|
||||
|
||||
// Make sure the snapshot was applied correctly on the follower
|
||||
compareDBs(t, raft1.fsm.db, raft2.fsm.db)
|
||||
compareDBs(t, raft1.fsm.db, raft2.fsm.db, false)
|
||||
|
||||
// Write some more data
|
||||
for i := 1000; i < 2000; i++ {
|
||||
|
@ -345,6 +350,7 @@ func TestRaft_Snapshot_Restart(t *testing.T) {
|
|||
compareFSMs(t, raft1.fsm, raft2.fsm)
|
||||
}
|
||||
|
||||
/*
|
||||
func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
|
||||
raft1, dir := getRaft(t, true, false)
|
||||
raft2, dir2 := getRaft(t, false, false)
|
||||
|
@ -425,7 +431,7 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) {
|
|||
|
||||
// Make sure state gets re-replicated.
|
||||
compareFSMs(t, raft1.fsm, raft3.fsm)
|
||||
}
|
||||
}*/
|
||||
|
||||
func TestRaft_Snapshot_Take_Restore(t *testing.T) {
|
||||
raft1, dir := getRaft(t, true, false)
|
||||
|
@ -501,3 +507,438 @@ func TestRaft_Snapshot_Take_Restore(t *testing.T) {
|
|||
time.Sleep(10 * time.Second)
|
||||
compareFSMs(t, raft1.fsm, raft2.fsm)
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_CreateSnapshotMissingParentDir(t *testing.T) {
|
||||
parent, err := ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
defer os.RemoveAll(parent)
|
||||
|
||||
dir, err := ioutil.TempDir(parent, "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
snap, err := NewBoltSnapshotStore(dir, logger, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
os.RemoveAll(parent)
|
||||
_, trans := raft.NewInmemTransport(raft.NewInmemAddr())
|
||||
sink, err := snap.Create(raft.SnapshotVersionMax, 10, 3, raft.Configuration{}, 0, trans)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sink.Cancel()
|
||||
|
||||
_, err = sink.Write([]byte("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("should not fail when using non existing parent: %s", err)
|
||||
}
|
||||
|
||||
// Ensure the snapshot file exists
|
||||
_, err = os.Stat(filepath.Join(snap.path, sink.ID()+tmpSuffix, databaseFilename))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_Listing(t *testing.T) {
|
||||
// Create a test dir
|
||||
parent, err := ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
defer os.RemoveAll(parent)
|
||||
|
||||
dir, err := ioutil.TempDir(parent, "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
fsm, err := NewFSM(map[string]string{
|
||||
"path": parent,
|
||||
}, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
snap, err := NewBoltSnapshotStore(dir, logger, fsm)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// FSM has no data, should have empty snapshot list
|
||||
snaps, err := snap.List()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(snaps) != 0 {
|
||||
t.Fatalf("expect 0 snapshots: %v", snaps)
|
||||
}
|
||||
|
||||
// Move the fsm forward
|
||||
err = fsm.witnessSnapshot(&raft.SnapshotMeta{
|
||||
Index: 100,
|
||||
Term: 20,
|
||||
Configuration: raft.Configuration{},
|
||||
ConfigurationIndex: 0,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
snaps, err = snap.List()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(snaps) != 1 {
|
||||
t.Fatalf("expect 1 snapshots: %v", snaps)
|
||||
}
|
||||
|
||||
if snaps[0].Index != 100 || snaps[0].Term != 20 {
|
||||
t.Fatalf("bad snapshot: %+v", snaps[0])
|
||||
}
|
||||
|
||||
if snaps[0].ID != boltSnapshotID {
|
||||
t.Fatalf("bad snapshot: %+v", snaps[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_CreateInstallSnapshot(t *testing.T) {
|
||||
// Create a test dir
|
||||
parent, err := ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
defer os.RemoveAll(parent)
|
||||
|
||||
dir, err := ioutil.TempDir(parent, "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
fsm, err := NewFSM(map[string]string{
|
||||
"path": parent,
|
||||
}, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
snap, err := NewBoltSnapshotStore(dir, logger, fsm)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check no snapshots
|
||||
snaps, err := snap.List()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(snaps) != 0 {
|
||||
t.Fatalf("did not expect any snapshots: %v", snaps)
|
||||
}
|
||||
|
||||
// Create a new sink
|
||||
var configuration raft.Configuration
|
||||
configuration.Servers = append(configuration.Servers, raft.Server{
|
||||
Suffrage: raft.Voter,
|
||||
ID: raft.ServerID("my id"),
|
||||
Address: raft.ServerAddress("over here"),
|
||||
})
|
||||
_, trans := raft.NewInmemTransport(raft.NewInmemAddr())
|
||||
sink, err := snap.Create(raft.SnapshotVersionMax, 10, 3, configuration, 2, trans)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
protoWriter := NewDelimitedWriter(sink)
|
||||
|
||||
err = fsm.Put(context.Background(), &physical.Entry{
|
||||
Key: "test-key",
|
||||
Value: []byte("test-value"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = fsm.Put(context.Background(), &physical.Entry{
|
||||
Key: "test-key1",
|
||||
Value: []byte("test-value1"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Write to the sink
|
||||
err = protoWriter.WriteMsg(&pb.StorageEntry{
|
||||
Key: "test-key",
|
||||
Value: []byte("test-value"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = protoWriter.WriteMsg(&pb.StorageEntry{
|
||||
Key: "test-key1",
|
||||
Value: []byte("test-value1"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Done!
|
||||
err = sink.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Read the snapshot
|
||||
meta, r, err := snap.Open(sink.ID())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check the latest
|
||||
if meta.Index != 10 {
|
||||
t.Fatalf("bad snapshot: %+v", meta)
|
||||
}
|
||||
if meta.Term != 3 {
|
||||
t.Fatalf("bad snapshot: %+v", meta)
|
||||
}
|
||||
if !reflect.DeepEqual(meta.Configuration, configuration) {
|
||||
t.Fatalf("bad snapshot: %+v", meta)
|
||||
}
|
||||
if meta.ConfigurationIndex != 2 {
|
||||
t.Fatalf("bad snapshot: %+v", meta)
|
||||
}
|
||||
|
||||
installer, ok := r.(*boltSnapshotInstaller)
|
||||
if !ok {
|
||||
t.Fatal("expected snapshot installer object")
|
||||
}
|
||||
|
||||
newFSM, err := NewFSM(map[string]string{
|
||||
"path": filepath.Dir(installer.Filename()),
|
||||
}, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = compareDBs(t, fsm.db, newFSM.db, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Make sure config data is different
|
||||
err = compareDBs(t, fsm.db, newFSM.db, false)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
|
||||
if err := newFSM.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = fsm.Restore(installer)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
latestIndex, latestConfigRaw := fsm.LatestState()
|
||||
latestConfigIndex, latestConfig := protoConfigurationToRaftConfiguration(latestConfigRaw)
|
||||
if latestIndex.Index != 10 {
|
||||
t.Fatalf("bad install: %+v", latestIndex)
|
||||
}
|
||||
if latestIndex.Term != 3 {
|
||||
t.Fatalf("bad install: %+v", latestIndex)
|
||||
}
|
||||
if !reflect.DeepEqual(latestConfig, configuration) {
|
||||
t.Fatalf("bad install: %+v", latestConfig)
|
||||
}
|
||||
if latestConfigIndex != 2 {
|
||||
t.Fatalf("bad install: %+v", latestConfigIndex)
|
||||
}
|
||||
|
||||
v, err := fsm.Get(context.Background(), "test-key")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(v.Value, []byte("test-value")) {
|
||||
t.Fatalf("bad: %+v", v)
|
||||
}
|
||||
|
||||
v, err = fsm.Get(context.Background(), "test-key1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(v.Value, []byte("test-value1")) {
|
||||
t.Fatalf("bad: %+v", v)
|
||||
}
|
||||
|
||||
// Close/Reopen the db and make sure we still match
|
||||
fsm.Close()
|
||||
fsm, err = NewFSM(map[string]string{
|
||||
"path": parent,
|
||||
}, logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_CancelSnapshot(t *testing.T) {
|
||||
// Create a test dir
|
||||
dir, err := ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
snap, err := NewBoltSnapshotStore(dir, logger, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, trans := raft.NewInmemTransport(raft.NewInmemAddr())
|
||||
sink, err := snap.Create(raft.SnapshotVersionMax, 10, 3, raft.Configuration{}, 0, trans)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = sink.Write([]byte("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("should not fail when using non existing parent: %s", err)
|
||||
}
|
||||
|
||||
// Ensure the snapshot file exists
|
||||
_, err = os.Stat(filepath.Join(snap.path, sink.ID()+tmpSuffix, databaseFilename))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Cancel the snapshot! Should delete
|
||||
err = sink.Cancel()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure the snapshot file does not exist
|
||||
_, err = os.Stat(filepath.Join(snap.path, sink.ID()+tmpSuffix, databaseFilename))
|
||||
if !os.IsNotExist(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Make sure future writes fail
|
||||
_, err = sink.Write([]byte("test"))
|
||||
if err == nil {
|
||||
t.Fatal("expected write to fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_BadPerm(t *testing.T) {
|
||||
var err error
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("skipping file permission test on windows")
|
||||
}
|
||||
|
||||
// Create a temp dir
|
||||
var dir1 string
|
||||
dir1, err = ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
defer os.RemoveAll(dir1)
|
||||
|
||||
// Create a sub dir and remove all permissions
|
||||
var dir2 string
|
||||
dir2, err = ioutil.TempDir(dir1, "badperm")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if err = os.Chmod(dir2, 000); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
defer os.Chmod(dir2, 777) // Set perms back for delete
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
_, err = NewBoltSnapshotStore(dir2, logger, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("should fail to use dir with bad perms")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltSnapshotStore_CloseFailure(t *testing.T) {
|
||||
// Create a test dir
|
||||
dir, err := ioutil.TempDir("", "raft")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v ", err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
|
||||
snap, err := NewBoltSnapshotStore(dir, logger, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, trans := raft.NewInmemTransport(raft.NewInmemAddr())
|
||||
sink, err := snap.Create(raft.SnapshotVersionMax, 10, 3, raft.Configuration{}, 0, trans)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// This should stash an error value
|
||||
_, err = sink.Write([]byte("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("should not fail when using non existing parent: %s", err)
|
||||
}
|
||||
|
||||
// Cancel the snapshot! Should delete
|
||||
err = sink.Close()
|
||||
if err == nil {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
|
||||
// Ensure the snapshot file does not exist
|
||||
_, err = os.Stat(filepath.Join(snap.path, sink.ID()+tmpSuffix, databaseFilename))
|
||||
if !os.IsNotExist(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Make sure future writes fail
|
||||
_, err = sink.Write([]byte("test"))
|
||||
if err == nil {
|
||||
t.Fatal("expected write to fail")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
# Compiled Object files, Static and Dynamic libs (Shared Objects)
|
||||
*.o
|
||||
*.a
|
||||
*.so
|
||||
|
||||
# Folders
|
||||
_obj
|
||||
_test
|
||||
|
||||
# Architecture specific extensions/prefixes
|
||||
*.[568vq]
|
||||
[568vq].out
|
||||
|
||||
*.cgo1.go
|
||||
*.cgo2.c
|
||||
_cgo_defun.c
|
||||
_cgo_gotypes.go
|
||||
_cgo_export.*
|
||||
|
||||
_testmain.go
|
||||
|
||||
*.exe
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
language: go
|
||||
|
||||
go:
|
||||
- 1.6.2
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2020 Richard Boyer
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
Safe I/O
|
||||
========
|
||||
|
||||
Provides functions to perform atomic, fsync-safe disk operations.
|
||||
|
||||
[![Build Status](https://travis-ci.org/rboyer/safeio.svg?branch=master)](https://travis-ci.org/rboyer/safeio)
|
|
@ -0,0 +1,123 @@
|
|||
package safeio
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
var errClosed = errors.New("file is already closed")
|
||||
|
||||
// OpenFile is the incremental version of WriteToFile. It opens a temp
|
||||
// file and proxies writes through to the underlying file.
|
||||
//
|
||||
// If Close is called before Commit, the temp file is closed and erased.
|
||||
//
|
||||
// If Commit is called before Close, the temp file is closed, fsynced,
|
||||
// and atomically renamed to the desired final name.
|
||||
func OpenFile(path string, perm os.FileMode) (*File, error) {
|
||||
dir := filepath.Dir(path)
|
||||
name := filepath.Base(path)
|
||||
|
||||
f, err := ioutil.TempFile(dir, name+".tmp")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &File{
|
||||
name: path,
|
||||
tempName: f.Name(),
|
||||
perm: perm,
|
||||
file: f,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// File is an implementation detail of OpenFile.
|
||||
type File struct {
|
||||
name string // track desired filename
|
||||
tempName string // track actual filename
|
||||
perm os.FileMode
|
||||
file *os.File
|
||||
closed bool
|
||||
err error // the first error encountered
|
||||
}
|
||||
|
||||
// Write is a thin proxy to *os.File#Write.
|
||||
//
|
||||
// If Close or Commit were called, this immediately exits with an error.
|
||||
func (f *File) Write(p []byte) (n int, err error) {
|
||||
if f.closed {
|
||||
return 0, errClosed
|
||||
} else if f.err != nil {
|
||||
return 0, f.err
|
||||
}
|
||||
|
||||
n, err = f.file.Write(p)
|
||||
if err != nil {
|
||||
f.err = err
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Commit causes the current temp file to be safely persisted to disk and atomically renamed to the desired final filename.
|
||||
//
|
||||
// It is safe to call Close after commit, so you can defer Close as
|
||||
// usual without worries about write-safey.
|
||||
func (f *File) Commit() error {
|
||||
if f.closed {
|
||||
return errClosed
|
||||
} else if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
|
||||
if err := f.file.Sync(); err != nil {
|
||||
return f.cleanup(err)
|
||||
}
|
||||
|
||||
if err := f.file.Chmod(f.perm); err != nil {
|
||||
return f.cleanup(err)
|
||||
}
|
||||
|
||||
if err := f.file.Close(); err != nil {
|
||||
return f.cleanup(err)
|
||||
}
|
||||
|
||||
if err := Rename(f.tempName, f.name); err != nil {
|
||||
return f.cleanup(err)
|
||||
}
|
||||
|
||||
f.closed = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the current file and erases it, unless Commit was
|
||||
// previously called. In that case it does nothing.
|
||||
//
|
||||
// Close is idempotent.
|
||||
//
|
||||
// After Close is called, Write and Commit will fail.
|
||||
func (f *File) Close() error {
|
||||
if !f.closed {
|
||||
_ = f.cleanup(nil)
|
||||
f.closed = true
|
||||
}
|
||||
return f.err
|
||||
}
|
||||
|
||||
func (f *File) cleanup(err error) error {
|
||||
_ = f.file.Close()
|
||||
_ = os.Remove(f.tempName)
|
||||
|
||||
if f.err == nil {
|
||||
f.err = err
|
||||
}
|
||||
return f.err
|
||||
}
|
||||
|
||||
// setErr is only used during testing to simulate os.File errors
|
||||
func (f *File) setErr(err error) {
|
||||
f.err = err
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
module github.com/rboyer/safeio
|
||||
|
||||
go 1.14
|
||||
|
||||
require github.com/stretchr/testify v1.4.0
|
|
@ -0,0 +1,11 @@
|
|||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|
@ -0,0 +1,96 @@
|
|||
// Package safeio provides functions to perform atomic, fsync-safe disk
|
||||
// operations.
|
||||
package safeio
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// WriteToFile consumes the provided io.Reader and writes it to a temp
|
||||
// file in the provided directory.
|
||||
func WriteToFile(src io.Reader, path string, perm os.FileMode) (written int64, err error) {
|
||||
tempName, written, err := writeToTempFile(src, path, perm)
|
||||
|
||||
if err == nil {
|
||||
err = Rename(tempName, path)
|
||||
}
|
||||
|
||||
return written, err
|
||||
}
|
||||
|
||||
// writeToTempFile consumes the provided io.Reader and writes it to a
|
||||
// temp file in the same directory as path.
|
||||
func writeToTempFile(src io.Reader, path string, perm os.FileMode) (tempName string, written int64, err error) {
|
||||
dir := filepath.Dir(path)
|
||||
name := filepath.Base(path)
|
||||
|
||||
f, err := ioutil.TempFile(dir, name+".tmp")
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
tempName = f.Name()
|
||||
|
||||
cleanup := func(written int64, err error) (string, int64, error) {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(tempName)
|
||||
return "", written, err
|
||||
}
|
||||
|
||||
if err = f.Chmod(perm); err != nil {
|
||||
return cleanup(0, err)
|
||||
}
|
||||
|
||||
written, err = io.Copy(f, src)
|
||||
if err != nil {
|
||||
return cleanup(written, err)
|
||||
}
|
||||
|
||||
if err := f.Sync(); err != nil {
|
||||
return cleanup(written, err)
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
return cleanup(written, err)
|
||||
}
|
||||
|
||||
return tempName, written, nil
|
||||
}
|
||||
|
||||
// Remove is just like os.Remove, except this also calls sync on the
|
||||
// parent directory.
|
||||
func Remove(fn string) error {
|
||||
err := os.Remove(fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fsync the dir
|
||||
return syncParentDir(fn)
|
||||
}
|
||||
|
||||
// Rename renames the file using os.Rename and fsyncs the NEW parent
|
||||
// directory. It should only be used if both oldname and newname are in
|
||||
// the same directory.
|
||||
func Rename(oldname, newname string) error {
|
||||
err := os.Rename(oldname, newname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fsync the dir
|
||||
return syncParentDir(newname)
|
||||
}
|
||||
|
||||
func syncParentDir(name string) error {
|
||||
f, err := os.Open(filepath.Dir(name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return f.Sync()
|
||||
}
|
|
@ -736,6 +736,8 @@ github.com/prometheus/common/model
|
|||
github.com/prometheus/procfs
|
||||
github.com/prometheus/procfs/internal/fs
|
||||
github.com/prometheus/procfs/internal/util
|
||||
# github.com/rboyer/safeio v0.2.1
|
||||
github.com/rboyer/safeio
|
||||
# github.com/ryanuber/columnize v2.1.0+incompatible
|
||||
github.com/ryanuber/columnize
|
||||
# github.com/ryanuber/go-glob v1.0.0
|
||||
|
|
Loading…
Reference in New Issue