Improve raft write performance by utilizing FSM Batching (#7527)

* Start benchmark work

* Add batching FSM function

* dedupe some code

* Update dependency on chunking FSM

* fix raft external tests

* fix go.mod

* Add batching test

* uncomment test

* update raft deps

* update vendor

* Update physical/raft/fsm.go

Co-Authored-By: Michel Vocks <michelvocks@gmail.com>

* Update physical/raft/fsm.go
This commit is contained in:
Brian Kassouf 2019-10-14 09:25:07 -06:00 committed by GitHub
parent cbde4d4357
commit 1167fad704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 899 additions and 399 deletions

4
go.mod
View File

@ -56,7 +56,7 @@ require (
github.com/hashicorp/go-memdb v1.0.2 github.com/hashicorp/go-memdb v1.0.2
github.com/hashicorp/go-msgpack v0.5.5 github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-raftchunking v0.6.2 github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a
github.com/hashicorp/go-rootcerts v1.0.1 github.com/hashicorp/go-rootcerts v1.0.1
github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0 github.com/hashicorp/go-syslog v1.0.0
@ -64,7 +64,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 github.com/hashicorp/golang-lru v0.5.3
github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf
github.com/hashicorp/raft v1.1.1 github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/vault-plugin-auth-alicloud v0.5.2-0.20190814210027-93970f08f2ec github.com/hashicorp/vault-plugin-auth-alicloud v0.5.2-0.20190814210027-93970f08f2ec
github.com/hashicorp/vault-plugin-auth-azure v0.5.2-0.20190814210035-08e00d801115 github.com/hashicorp/vault-plugin-auth-azure v0.5.2-0.20190814210035-08e00d801115

5
go.sum
View File

@ -73,6 +73,7 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc= github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
@ -291,6 +292,8 @@ github.com/hashicorp/go-plugin v1.0.1 h1:4OtAfUGbnKC6yS48p0CtMX2oFYtzFZVv6rok3cR
github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY=
github.com/hashicorp/go-raftchunking v0.6.2 h1:imj6CVkwXj6VzgXZQvzS+fSrkbFCzlJ2t00F3PacnuU= github.com/hashicorp/go-raftchunking v0.6.2 h1:imj6CVkwXj6VzgXZQvzS+fSrkbFCzlJ2t00F3PacnuU=
github.com/hashicorp/go-raftchunking v0.6.2/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0= github.com/hashicorp/go-raftchunking v0.6.2/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a h1:FmnBDwGwlTgugDGbVxwV8UavqSMACbGrUpfc98yFLR4=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a/go.mod h1:xbXnmKqX9/+RhPkJ4zrEx4738HacP72aaUPlT2RZ4sU=
github.com/hashicorp/go-retryablehttp v0.5.3 h1:QlWt0KvWT0lq8MFppF9tsJGF+ynG7ztc2KIPhzRGk7s= github.com/hashicorp/go-retryablehttp v0.5.3 h1:QlWt0KvWT0lq8MFppF9tsJGF+ynG7ztc2KIPhzRGk7s=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE= github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE=
@ -330,6 +333,8 @@ github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf/go.mod h1:BDng
github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI= github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY= github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic= github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic=

View File

@ -48,7 +48,7 @@ var (
var _ physical.Backend = (*FSM)(nil) var _ physical.Backend = (*FSM)(nil)
var _ physical.Transactional = (*FSM)(nil) var _ physical.Transactional = (*FSM)(nil)
var _ raft.FSM = (*FSM)(nil) var _ raft.FSM = (*FSM)(nil)
var _ raft.ConfigurationStore = (*FSM)(nil) var _ raft.BatchingFSM = (*FSM)(nil)
type restoreCallback func(context.Context) error type restoreCallback func(context.Context) error
@ -75,7 +75,6 @@ type FSM struct {
l sync.RWMutex l sync.RWMutex
path string path string
logger log.Logger logger log.Logger
permitPool *physical.PermitPool
noopRestore bool noopRestore bool
db *bolt.DB db *bolt.DB
@ -88,7 +87,7 @@ type FSM struct {
// additional state in the backend. // additional state in the backend.
storeLatestState bool storeLatestState bool
chunker *raftchunking.ChunkingConfigurationStore chunker *raftchunking.ChunkingBatchingFSM
} }
// NewFSM constructs a FSM using the given directory // NewFSM constructs a FSM using the given directory
@ -159,9 +158,8 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
} }
f := &FSM{ f := &FSM{
path: conf["path"], path: conf["path"],
logger: logger, logger: logger,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
db: boltDB, db: boltDB,
latestTerm: latestTerm, latestTerm: latestTerm,
@ -170,7 +168,7 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
storeLatestState: storeLatestState, storeLatestState: storeLatestState,
} }
f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{ f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
f: f, f: f,
ctx: context.Background(), ctx: context.Background(),
}) })
@ -245,9 +243,6 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
func (f *FSM) Delete(ctx context.Context, path string) error { func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now()) defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -260,9 +255,6 @@ func (f *FSM) Delete(ctx context.Context, path string) error {
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now()) defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -287,9 +279,6 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -324,9 +313,6 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now()) defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -340,9 +326,6 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -374,9 +357,6 @@ func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
// Transaction writes all the operations in the provided transaction to the bolt // Transaction writes all the operations in the provided transaction to the bolt
// file. // file.
func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
f.permitPool.Acquire()
defer f.permitPool.Release()
f.l.RLock() f.l.RLock()
defer f.l.RUnlock() defer f.l.RUnlock()
@ -404,27 +384,51 @@ func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error
return err return err
} }
// Apply will apply a log value to the FSM. This is called from the raft // ApplyBatch will apply a set of logs to the FSM. This is called from the raft
// library. // library.
func (f *FSM) Apply(log *raft.Log) interface{} { func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
command := &LogData{} if len(logs) == 0 {
err := proto.Unmarshal(log.Data, command) return []interface{}{}
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
} }
f.l.RLock() // Do the unmarshalling first so we don't hold locks
defer f.l.RUnlock() var latestConfiguration *ConfigurationValue
commands := make([]interface{}, 0, len(logs))
for _, log := range logs {
switch log.Type {
case raft.LogCommand:
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
commands = append(commands, command)
case raft.LogConfiguration:
configuration := raft.DecodeConfiguration(log.Data)
config := raftConfigurationToProtoConfiguration(log.Index, configuration)
commands = append(commands, config)
// Update the latest configuration the fsm has received; we will
// store this after it has been committed to storage.
latestConfiguration = config
default:
panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
}
}
// Only advance latest pointer if this log has a higher index value than // Only advance latest pointer if this log has a higher index value than
// what we have seen in the past. // what we have seen in the past.
var logIndex []byte var logIndex []byte
var err error
latestIndex, _ := f.LatestState() latestIndex, _ := f.LatestState()
if latestIndex.Index < log.Index { lastLog := logs[len(logs)-1]
if latestIndex.Index < lastLog.Index {
logIndex, err = proto.Marshal(&IndexValue{ logIndex, err = proto.Marshal(&IndexValue{
Term: log.Term, Term: lastLog.Term,
Index: log.Index, Index: lastLog.Index,
}) })
if err != nil { if err != nil {
f.logger.Error("unable to marshal latest index", "error", err) f.logger.Error("unable to marshal latest index", "error", err)
@ -432,29 +436,46 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
} }
} }
f.l.RLock()
defer f.l.RUnlock()
err = f.db.Update(func(tx *bolt.Tx) error { err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName) b := tx.Bucket(dataBucketName)
for _, op := range command.Operations { for _, commandRaw := range commands {
var err error switch command := commandRaw.(type) {
switch op.OpType { case *LogData:
case putOp: for _, op := range command.Operations {
err = b.Put([]byte(op.Key), op.Value) var err error
case deleteOp: switch op.OpType {
err = b.Delete([]byte(op.Key)) case putOp:
case restoreCallbackOp: err = b.Put([]byte(op.Key), op.Value)
if f.restoreCb != nil { case deleteOp:
// Kick off the restore callback function in a go routine err = b.Delete([]byte(op.Key))
go f.restoreCb(context.Background()) case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}
case *ConfigurationValue:
b := tx.Bucket(configBucketName)
configBytes, err := proto.Marshal(command)
if err != nil {
return err
}
if err := b.Put(latestConfigKey, configBytes); err != nil {
return err
} }
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
} }
} }
// TODO: benchmark so we can know how much time this adds
if f.storeLatestState && len(logIndex) > 0 { if f.storeLatestState && len(logIndex) > 0 {
b := tx.Bucket(configBucketName) b := tx.Bucket(configBucketName)
err = b.Put(latestIndexKey, logIndex) err = b.Put(latestIndexKey, logIndex)
@ -472,13 +493,32 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
// If we advanced the latest value, update the in-memory representation too. // If we advanced the latest value, update the in-memory representation too.
if len(logIndex) > 0 { if len(logIndex) > 0 {
atomic.StoreUint64(f.latestTerm, log.Term) atomic.StoreUint64(f.latestTerm, lastLog.Term)
atomic.StoreUint64(f.latestIndex, log.Index) atomic.StoreUint64(f.latestIndex, lastLog.Index)
} }
return &FSMApplyResponse{ // If one or more configuration changes were processed, store the latest one.
Success: true, if latestConfiguration != nil {
f.latestConfig.Store(latestConfiguration)
} }
// Build the responses. The logs array is used here to ensure we reply to
// all command values; even if they are not of the types we expect. This
// should future proof this function from more log types being provided.
resp := make([]interface{}, len(logs))
for i := range logs {
resp[i] = &FSMApplyResponse{
Success: true,
}
}
return resp
}
// Apply will apply a log value to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
return f.ApplyBatch([]*raft.Log{log})[0]
} }
type writeErrorCloser interface { type writeErrorCloser interface {
@ -609,61 +649,6 @@ func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
// Release doesn't do anything. // Release doesn't do anything.
func (s *noopSnapshotter) Release() {} func (s *noopSnapshotter) Release() {}
// StoreConfig satisfies the raft.ConfigurationStore interface and persists the
// latest raft server configuration to the bolt file.
func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) {
f.l.RLock()
defer f.l.RUnlock()
var indexBytes []byte
latestIndex, _ := f.LatestState()
// Only write the new index if we are advancing the pointer
if index > latestIndex.Index {
latestIndex.Index = index
var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic(fmt.Sprintf("unable to marshal latest index: %v", err))
}
}
protoConfig := raftConfigurationToProtoConfiguration(index, configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
f.logger.Error("unable to marshal config", "error", err)
panic(fmt.Sprintf("unable to marshal config: %v", err))
}
if f.storeLatestState {
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(configBucketName)
err := b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}
// TODO: benchmark so we can know how much time this adds
if len(indexBytes) > 0 {
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
}
return nil
})
if err != nil {
f.logger.Error("unable to store latest configuration", "error", err)
panic(fmt.Sprintf("unable to store latest configuration: %v", err))
}
}
f.witnessIndex(latestIndex)
f.latestConfig.Store(protoConfig)
}
// raftConfigurationToProtoConfiguration converts a raft configuration object to // raftConfigurationToProtoConfiguration converts a raft configuration object to
// a proto value. // a proto value.
func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue { func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
@ -722,9 +707,6 @@ func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error
Value: b, Value: b,
} }
f.f.permitPool.Acquire()
defer f.f.permitPool.Release()
f.f.l.RLock() f.f.l.RLock()
defer f.f.l.RUnlock() defer f.f.l.RUnlock()

129
physical/raft/fsm_test.go Normal file
View File

@ -0,0 +1,129 @@
package raft
import (
"context"
fmt "fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
proto "github.com/golang/protobuf/proto"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
)
func getFSM(t testing.TB) (*FSM, string) {
raftDir, err := ioutil.TempDir("", "vault-raft-")
if err != nil {
t.Fatal(err)
}
t.Logf("raft dir: %s", raftDir)
logger := hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.Trace,
})
fsm, err := NewFSM(map[string]string{
"path": raftDir,
}, logger)
if err != nil {
t.Fatal(err)
}
return fsm, raftDir
}
func TestFSM_Batching(t *testing.T) {
fsm, dir := getFSM(t)
defer os.RemoveAll(dir)
var index uint64
var term uint64 = 1
getLog := func(i uint64) (int, *raft.Log) {
if rand.Intn(10) >= 8 {
term += 1
return 0, &raft.Log{
Index: i,
Term: term,
Type: raft.LogConfiguration,
Data: raft.EncodeConfiguration(raft.Configuration{
Servers: []raft.Server{
raft.Server{
Address: raft.ServerAddress("test"),
ID: raft.ServerID("test"),
},
},
}),
}
}
command := &LogData{
Operations: make([]*LogOperation, rand.Intn(10)),
}
for j := range command.Operations {
command.Operations[j] = &LogOperation{
OpType: putOp,
Key: fmt.Sprintf("key-%d-%d", i, j),
Value: []byte(fmt.Sprintf("value-%d-%d", i, j)),
}
}
commandBytes, err := proto.Marshal(command)
if err != nil {
t.Fatal(err)
}
return len(command.Operations), &raft.Log{
Index: i,
Term: term,
Type: raft.LogCommand,
Data: commandBytes,
}
}
totalKeys := 0
for i := 0; i < 100; i++ {
batchSize := rand.Intn(64)
batch := make([]*raft.Log, batchSize)
for j := 0; j < batchSize; j++ {
var keys int
index++
keys, batch[j] = getLog(index)
totalKeys += keys
}
resp := fsm.ApplyBatch(batch)
if len(resp) != batchSize {
t.Fatalf("incorrect response length: got %d expected %d", len(resp), batchSize)
}
for _, r := range resp {
if _, ok := r.(*FSMApplyResponse); !ok {
t.Fatal("bad response type")
}
}
}
keys, err := fsm.List(context.Background(), "")
if err != nil {
t.Fatal(err)
}
if len(keys) != totalKeys {
t.Fatalf("incorrect number of keys: got %d expected %d", len(keys), totalKeys)
}
latestIndex, latestConfig := fsm.LatestState()
if latestIndex.Index != index {
t.Fatalf("bad latest index: got %d expected %d", latestIndex.Index, index)
}
if latestIndex.Term != term {
t.Fatalf("bad latest term: got %d expected %d", latestIndex.Term, term)
}
if latestConfig == nil && term > 1 {
t.Fatal("config wasn't updated")
}
}

View File

@ -12,6 +12,7 @@ import (
"sync" "sync"
"time" "time"
metrics "github.com/armon/go-metrics"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap" "github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
@ -94,6 +95,9 @@ type RaftBackend struct {
// serverAddressProvider is used to map server IDs to addresses. // serverAddressProvider is used to map server IDs to addresses.
serverAddressProvider raft.ServerAddressProvider serverAddressProvider raft.ServerAddressProvider
// permitPool is used to limit the number of concurrent storage calls.
permitPool *physical.PermitPool
} }
// EnsurePath is used to make sure a path exists // EnsurePath is used to make sure a path exists
@ -201,6 +205,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
snapStore: snap, snapStore: snap,
dataDir: path, dataDir: path,
localID: localID, localID: localID,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
}, nil }, nil
} }
@ -344,6 +349,9 @@ func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
config.TrailingLogs = uint64(trailingLogs) config.TrailingLogs = uint64(trailingLogs)
} }
config.NoSnapshotRestoreOnStart = true
config.MaxAppendEntries = 64
return nil return nil
} }
@ -708,6 +716,7 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho
// Delete inserts an entry in the log to delete the given path // Delete inserts an entry in the log to delete the given path
func (b *RaftBackend) Delete(ctx context.Context, path string) error { func (b *RaftBackend) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())
command := &LogData{ command := &LogData{
Operations: []*LogOperation{ Operations: []*LogOperation{
&LogOperation{ &LogOperation{
@ -716,6 +725,8 @@ func (b *RaftBackend) Delete(ctx context.Context, path string) error {
}, },
}, },
} }
b.permitPool.Acquire()
defer b.permitPool.Release()
b.l.RLock() b.l.RLock()
err := b.applyLog(ctx, command) err := b.applyLog(ctx, command)
@ -725,15 +736,20 @@ func (b *RaftBackend) Delete(ctx context.Context, path string) error {
// Get returns the value corresponding to the given path from the fsm // Get returns the value corresponding to the given path from the fsm
func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) { func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft-storage", "get"}, time.Now())
if b.fsm == nil { if b.fsm == nil {
return nil, errors.New("raft: fsm not configured") return nil, errors.New("raft: fsm not configured")
} }
b.permitPool.Acquire()
defer b.permitPool.Release()
return b.fsm.Get(ctx, path) return b.fsm.Get(ctx, path)
} }
// Put inserts an entry in the log for the put operation // Put inserts an entry in the log for the put operation
func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error { func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())
command := &LogData{ command := &LogData{
Operations: []*LogOperation{ Operations: []*LogOperation{
&LogOperation{ &LogOperation{
@ -744,6 +760,9 @@ func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
}, },
} }
b.permitPool.Acquire()
defer b.permitPool.Release()
b.l.RLock() b.l.RLock()
err := b.applyLog(ctx, command) err := b.applyLog(ctx, command)
b.l.RUnlock() b.l.RUnlock()
@ -752,16 +771,21 @@ func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
// List enumerates all the items under the prefix from the fsm // List enumerates all the items under the prefix from the fsm
func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) { func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft-storage", "list"}, time.Now())
if b.fsm == nil { if b.fsm == nil {
return nil, errors.New("raft: fsm not configured") return nil, errors.New("raft: fsm not configured")
} }
b.permitPool.Acquire()
defer b.permitPool.Release()
return b.fsm.List(ctx, prefix) return b.fsm.List(ctx, prefix)
} }
// Transaction applies all the given operations into a single log and // Transaction applies all the given operations into a single log and
// applies it. // applies it.
func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())
command := &LogData{ command := &LogData{
Operations: make([]*LogOperation, len(txns)), Operations: make([]*LogOperation, len(txns)),
} }
@ -782,6 +806,9 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry
command.Operations[i] = op command.Operations[i] = op
} }
b.permitPool.Acquire()
defer b.permitPool.Release()
b.l.RLock() b.l.RLock()
err := b.applyLog(ctx, command) err := b.applyLog(ctx, command)
b.l.RUnlock() b.l.RUnlock()

View File

@ -2,6 +2,7 @@ package rafttests
import ( import (
"bytes" "bytes"
"crypto/md5"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -11,6 +12,7 @@ import (
"time" "time"
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api" "github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/helper/testhelpers" "github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/helper/testhelpers/teststorage" "github.com/hashicorp/vault/helper/testhelpers/teststorage"
@ -20,7 +22,7 @@ import (
"golang.org/x/net/http2" "golang.org/x/net/http2"
) )
func raftCluster(t *testing.T) *vault.TestCluster { func raftCluster(t testing.TB) *vault.TestCluster {
var conf vault.CoreConfig var conf vault.CoreConfig
var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler} var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
teststorage.RaftBackendSetup(&conf, &opts) teststorage.RaftBackendSetup(&conf, &opts)
@ -731,3 +733,32 @@ func TestRaft_SnapshotAPI_DifferentCluster(t *testing.T) {
testhelpers.WaitForNCoresSealed(t, cluster2, 3) testhelpers.WaitForNCoresSealed(t, cluster2, 3)
} }
} }
func BenchmarkRaft_SingleNode(b *testing.B) {
cluster := raftCluster(b)
defer cluster.Cleanup()
leaderClient := cluster.Cores[0].Client
bench := func(b *testing.B, dataSize int) {
data, err := uuid.GenerateRandomBytes(dataSize)
if err != nil {
b.Fatal(err)
}
testName := b.Name()
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("secret/%x", md5.Sum([]byte(fmt.Sprintf("%s-%d", testName, i))))
_, err := leaderClient.Logical().Write(key, map[string]interface{}{
"test": data,
})
if err != nil {
b.Fatal(err)
}
}
}
b.Run("256b", func(b *testing.B) { bench(b, 25) })
}

View File

@ -11,6 +11,7 @@ import (
var _ raft.FSM = (*ChunkingFSM)(nil) var _ raft.FSM = (*ChunkingFSM)(nil)
var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil) var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil)
var _ raft.BatchingFSM = (*ChunkingBatchingFSM)(nil)
type ChunkingSuccess struct { type ChunkingSuccess struct {
Response interface{} Response interface{}
@ -28,6 +29,11 @@ type ChunkingFSM struct {
lastTerm uint64 lastTerm uint64
} }
type ChunkingBatchingFSM struct {
*ChunkingFSM
underlyingBatchingFSM raft.BatchingFSM
}
type ChunkingConfigurationStore struct { type ChunkingConfigurationStore struct {
*ChunkingFSM *ChunkingFSM
underlyingConfigurationStore raft.ConfigurationStore underlyingConfigurationStore raft.ConfigurationStore
@ -44,6 +50,20 @@ func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM {
return ret return ret
} }
func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM {
ret := &ChunkingBatchingFSM{
ChunkingFSM: &ChunkingFSM{
underlying: underlying,
store: store,
},
underlyingBatchingFSM: underlying,
}
if store == nil {
ret.ChunkingFSM.store = NewInmemChunkStorage()
}
return ret
}
func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore { func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore {
ret := &ChunkingConfigurationStore{ ret := &ChunkingConfigurationStore{
ChunkingFSM: &ChunkingFSM{ ChunkingFSM: &ChunkingFSM{
@ -58,14 +78,7 @@ func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store Chu
return ret return ret
} }
// Apply applies the log, handling chunking as needed. The return value will func (c *ChunkingFSM) applyChunk(l *raft.Log) (*raft.Log, error) {
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}
if l.Term != c.lastTerm { if l.Term != c.lastTerm {
// Term has changed. A raft library client that was applying chunks // Term has changed. A raft library client that was applying chunks
// should get an error that it's no longer the leader and bail, and // should get an error that it's no longer the leader and bail, and
@ -73,7 +86,7 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// chunking operation automatically, which will be under a different // chunking operation automatically, which will be under a different
// opnum. So it should be safe in this case to clear the map. // opnum. So it should be safe in this case to clear the map.
if err := c.store.RestoreChunks(nil); err != nil { if err := c.store.RestoreChunks(nil); err != nil {
return err return nil, err
} }
c.lastTerm = l.Term c.lastTerm = l.Term
} }
@ -81,7 +94,7 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Get chunk info from extensions // Get chunk info from extensions
var ci types.ChunkInfo var ci types.ChunkInfo
if err := proto.Unmarshal(l.Extensions, &ci); err != nil { if err := proto.Unmarshal(l.Extensions, &ci); err != nil {
return errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err) return nil, errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err)
} }
// Store the current chunk and find out if all chunks have arrived // Store the current chunk and find out if all chunks have arrived
@ -93,19 +106,20 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
Data: l.Data, Data: l.Data,
}) })
if err != nil { if err != nil {
return err return nil, err
} }
if !done { if !done {
return nil return nil, nil
} }
// All chunks are here; get the full set and clear storage of the op // All chunks are here; get the full set and clear storage of the op
chunks, err := c.store.FinalizeOp(ci.OpNum) chunks, err := c.store.FinalizeOp(ci.OpNum)
if err != nil { if err != nil {
return err return nil, err
} }
finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize) finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize)
for _, chunk := range chunks { for _, chunk := range chunks {
finalData = append(finalData, chunk.Data...) finalData = append(finalData, chunk.Data...)
} }
@ -119,7 +133,27 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
Extensions: ci.NextExtensions, Extensions: ci.NextExtensions,
} }
return ChunkingSuccess{Response: c.underlying.Apply(logToApply)} return logToApply, nil
}
// Apply applies the log, handling chunking as needed. The return value will
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}
logToApply, err := c.applyChunk(l)
if err != nil {
return err
}
if logToApply != nil {
return ChunkingSuccess{Response: c.underlying.Apply(logToApply)}
}
return nil
} }
func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) { func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) {
@ -157,3 +191,68 @@ func (c *ChunkingFSM) RestoreState(state *State) error {
func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) { func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) {
c.underlyingConfigurationStore.StoreConfiguration(index, configuration) c.underlyingConfigurationStore.StoreConfiguration(index, configuration)
} }
// ApplyBatch applies the logs, handling chunking as needed. The return value will
// be an array containing an error or whatever is returned from the underlying
// Apply for each log.
func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{} {
// responses has a response for each log; their slice index should match.
responses := make([]interface{}, len(logs))
// sentLogs keeps track of which logs we sent. The key is the raft Index
// associated with the log and the value is true if this is a finalized set
// of chunks.
sentLogs := make(map[uint64]bool)
// sendLogs is the subset of logs that we need to pass onto the underlying
// FSM.
sendLogs := make([]*raft.Log, 0, len(logs))
for i, l := range logs {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
sendLogs = append(sendLogs, l)
sentLogs[l.Index] = false
continue
}
logToApply, err := c.applyChunk(l)
if err != nil {
responses[i] = err
continue
}
if logToApply != nil {
sendLogs = append(sendLogs, logToApply)
sentLogs[l.Index] = true
}
}
// Send remaining logs to the underlying FSM.
var sentResponses []interface{}
if len(sendLogs) > 0 {
sentResponses = c.underlyingBatchingFSM.ApplyBatch(sendLogs)
}
var sentCounter int
for j, l := range logs {
// If the response is already set we errored above and should continue
// onto the next.
if responses[j] != nil {
continue
}
var resp interface{}
if chunked, ok := sentLogs[l.Index]; ok {
resp = sentResponses[sentCounter]
if chunked {
resp = ChunkingSuccess{Response: sentResponses[sentCounter]}
}
sentCounter++
}
responses[j] = resp
}
return responses
}

View File

@ -6,7 +6,7 @@ require (
github.com/go-test/deep v1.0.2 github.com/go-test/deep v1.0.2
github.com/golang/protobuf v1.3.1 github.com/golang/protobuf v1.3.1
github.com/hashicorp/errwrap v1.0.0 github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/raft v1.1.1 github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/kr/pretty v0.1.0 github.com/kr/pretty v0.1.0
github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/copystructure v1.0.0
) )

View File

@ -30,6 +30,8 @@ github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCO
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

View File

@ -1,5 +1,18 @@
# UNRELEASED # UNRELEASED
FEATURES
* Improve FSM apply performance through batching. Implementing the `BatchingFSM` interface enables this new feature [[GH-364](https://github.com/hashicorp/raft/pull/364)]
IMPROVEMENTS
* Replace logger with hclog [[GH-360](https://github.com/hashicorp/raft/pull/360)]
BUG FIXES
* Export the leader field in LeaderObservation [[GH-357](https://github.com/hashicorp/raft/pull/357)]
* Fix snapshot to not attempt to truncate a negative range [[GH-358](https://github.com/hashicorp/raft/pull/358)]
# 1.1.1 (July 23rd, 2019) # 1.1.1 (July 23rd, 2019)
FEATURES FEATURES

View File

@ -2,22 +2,28 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
TEST_RESULTS_DIR?=/tmp/test-results TEST_RESULTS_DIR?=/tmp/test-results
test: test:
go test -timeout=60s -race . go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
integ: test integ: test
INTEG_TESTS=yes go test -timeout=25s -run=Integ . INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ .
ci.test-norace: ci.test-norace:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest
ci.test: ci.test:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race . gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest .
ci.integ: ci.test ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ . INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest .
fuzz: fuzz:
go test -timeout=300s ./fuzzy go test $(TESTARGS) -timeout=500s ./fuzzy
go test $(TESTARGS) -timeout=500s -tags batchtest ./fuzzy
deps: deps:
go get -t -d -v ./... go get -t -d -v ./...

View File

@ -234,7 +234,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
entry.Data = encodePeers(configuration, trans) entry.Data = encodePeers(configuration, trans)
} else { } else {
entry.Type = LogConfiguration entry.Type = LogConfiguration
entry.Data = encodeConfiguration(configuration) entry.Data = EncodeConfiguration(configuration)
} }
if err := logs.StoreLog(entry); err != nil { if err := logs.StoreLog(entry); err != nil {
return fmt.Errorf("failed to append configuration entry to log: %v", err) return fmt.Errorf("failed to append configuration entry to log: %v", err)
@ -528,13 +528,14 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
for index := snapshotIndex + 1; index <= lastLog.Index; index++ { for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
var entry Log var entry Log
if err := r.logs.GetLog(index, &entry); err != nil { if err := r.logs.GetLog(index, &entry); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", index, err)) r.logger.Error("failed to get log", "index", index, "error", err)
panic(err) panic(err)
} }
r.processConfigurationLogEntry(&entry) r.processConfigurationLogEntry(&entry)
} }
r.logger.Info(fmt.Sprintf("Initial configuration (index=%d): %+v", r.logger.Info("initial configuration",
r.configurations.latestIndex, r.configurations.latest.Servers)) "index", r.configurations.latestIndex,
"servers", hclog.Fmt("%+v", r.configurations.latest.Servers))
// Setup a heartbeat fast-path to avoid head-of-line // Setup a heartbeat fast-path to avoid head-of-line
// blocking where possible. It MUST be safe for this // blocking where possible. It MUST be safe for this
@ -554,7 +555,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
func (r *Raft) restoreSnapshot() error { func (r *Raft) restoreSnapshot() error {
snapshots, err := r.snapshots.List() snapshots, err := r.snapshots.List()
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) r.logger.Error("failed to list snapshots", "error", err)
return err return err
} }
@ -563,7 +564,7 @@ func (r *Raft) restoreSnapshot() error {
if !r.conf.NoSnapshotRestoreOnStart { if !r.conf.NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID) _, source, err := r.snapshots.Open(snapshot.ID)
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapshot.ID, err)) r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue continue
} }
@ -571,11 +572,11 @@ func (r *Raft) restoreSnapshot() error {
// Close the source after the restore has completed // Close the source after the restore has completed
source.Close() source.Close()
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to restore snapshot %v: %v", snapshot.ID, err)) r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue continue
} }
r.logger.Info(fmt.Sprintf("Restored from snapshot %v", snapshot.ID)) r.logger.Info("restored from snapshot", "id", snapshot.ID)
} }
// Update the lastApplied so we don't replay old logs // Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index) r.setLastApplied(snapshot.Index)
@ -1013,7 +1014,7 @@ func (r *Raft) Stats() map[string]string {
future := r.GetConfiguration() future := r.GetConfiguration()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
r.logger.Warn(fmt.Sprintf("could not get configuration for Stats: %v", err)) r.logger.Warn("could not get configuration for stats", "error", err)
} else { } else {
configuration := future.Configuration() configuration := future.Configuration()
s["latest_configuration_index"] = toString(future.Index()) s["latest_configuration_index"] = toString(future.Index())

View File

@ -35,7 +35,7 @@ type AppendEntriesRequest struct {
LeaderCommitIndex uint64 LeaderCommitIndex uint64
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader { func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -59,7 +59,7 @@ type AppendEntriesResponse struct {
NoRetryBackoff bool NoRetryBackoff bool
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader { func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -83,7 +83,7 @@ type RequestVoteRequest struct {
LeadershipTransfer bool LeadershipTransfer bool
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *RequestVoteRequest) GetRPCHeader() RPCHeader { func (r *RequestVoteRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -104,7 +104,7 @@ type RequestVoteResponse struct {
Granted bool Granted bool
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *RequestVoteResponse) GetRPCHeader() RPCHeader { func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -136,7 +136,7 @@ type InstallSnapshotRequest struct {
Size int64 Size int64
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader { func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -150,7 +150,7 @@ type InstallSnapshotResponse struct {
Success bool Success bool
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader { func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -161,7 +161,7 @@ type TimeoutNowRequest struct {
RPCHeader RPCHeader
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader { func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }
@ -171,7 +171,7 @@ type TimeoutNowResponse struct {
RPCHeader RPCHeader
} }
// See WithRPCHeader. // GetRPCHeader - See WithRPCHeader.
func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader { func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader return r.RPCHeader
} }

View File

@ -8,8 +8,8 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
) )
// These are the versions of the protocol (which includes RPC messages as // ProtocolVersion is the version of the protocol (which includes RPC messages
// well as Raft-specific log entries) that this server can _understand_. Use // as well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of // the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers. Note that depending on // the protocol to use when _speaking_ to other servers. Note that depending on
// the protocol version being spoken, some otherwise understood RPC messages // the protocol version being spoken, some otherwise understood RPC messages
@ -88,13 +88,15 @@ import (
type ProtocolVersion int type ProtocolVersion int
const ( const (
// ProtocolVersionMin is the minimum protocol version
ProtocolVersionMin ProtocolVersion = 0 ProtocolVersionMin ProtocolVersion = 0
ProtocolVersionMax = 3 // ProtocolVersionMax is the maximum protocol version
ProtocolVersionMax = 3
) )
// These are versions of snapshots that this server can _understand_. Currently, // SnapshotVersion is the version of snapshots that this server can understand.
// it is always assumed that this server generates the latest version, though // Currently, it is always assumed that the server generates the latest version,
// this may be changed in the future to include a configurable version. // though this may be changed in the future to include a configurable version.
// //
// Version History // Version History
// //
@ -112,8 +114,10 @@ const (
type SnapshotVersion int type SnapshotVersion int
const ( const (
// SnapshotVersionMin is the minimum snapshot version
SnapshotVersionMin SnapshotVersion = 0 SnapshotVersionMin SnapshotVersion = 0
SnapshotVersionMax = 1 // SnapshotVersionMax is the maximum snapshot version
SnapshotVersionMax = 1
) )
// Config provides any necessary configuration for the Raft server. // Config provides any necessary configuration for the Raft server.

View File

@ -342,9 +342,9 @@ func decodePeers(buf []byte, trans Transport) Configuration {
} }
} }
// encodeConfiguration serializes a Configuration using MsgPack, or panics on // EncodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors. // errors.
func encodeConfiguration(configuration Configuration) []byte { func EncodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration) buf, err := encodeMsgPack(configuration)
if err != nil { if err != nil {
panic(fmt.Errorf("failed to encode configuration: %v", err)) panic(fmt.Errorf("failed to encode configuration: %v", err))
@ -352,9 +352,9 @@ func encodeConfiguration(configuration Configuration) []byte {
return buf.Bytes() return buf.Bytes()
} }
// decodeConfiguration deserializes a Configuration using MsgPack, or panics on // DecodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors. // errors.
func decodeConfiguration(buf []byte) Configuration { func DecodeConfiguration(buf []byte) Configuration {
var configuration Configuration var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil { if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err)) panic(fmt.Errorf("failed to decode configuration: %v", err))

View File

@ -12,6 +12,11 @@ import (
// suitable for testing. // suitable for testing.
type DiscardSnapshotStore struct{} type DiscardSnapshotStore struct{}
// DiscardSnapshotSink is used to fulfill the SnapshotSink interface
// while always discarding the . This is useful for when the log
// should be truncated but no snapshot should be retained. This
// should never be used for production use, and is only suitable
// for testing.
type DiscardSnapshotSink struct{} type DiscardSnapshotSink struct{}
// NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore. // NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore.
@ -19,31 +24,41 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore {
return &DiscardSnapshotStore{} return &DiscardSnapshotStore{}
} }
// Create returns a valid type implementing the SnapshotSink which
// always discards the snapshot.
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64, func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) { configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
return &DiscardSnapshotSink{}, nil return &DiscardSnapshotSink{}, nil
} }
// List returns successfully with a nil for []*SnapshotMeta.
func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) { func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) {
return nil, nil return nil, nil
} }
// Open returns an error since the DiscardSnapshotStore does not
// support opening snapshots.
func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) { func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
return nil, nil, fmt.Errorf("open is not supported") return nil, nil, fmt.Errorf("open is not supported")
} }
// Write returns successfully with the lenght of the input byte slice
// to satisfy the WriteCloser interface
func (d *DiscardSnapshotSink) Write(b []byte) (int, error) { func (d *DiscardSnapshotSink) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
// Close returns a nil error
func (d *DiscardSnapshotSink) Close() error { func (d *DiscardSnapshotSink) Close() error {
return nil return nil
} }
// ID returns "discard" for DiscardSnapshotSink
func (d *DiscardSnapshotSink) ID() string { func (d *DiscardSnapshotSink) ID() string {
return "discard" return "discard"
} }
// Cancel returns successfully with a nil error
func (d *DiscardSnapshotSink) Cancel() error { func (d *DiscardSnapshotSink) Cancel() error {
return nil return nil
} }

View File

@ -5,11 +5,11 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/hashicorp/go-hclog"
"hash" "hash"
"hash/crc64" "hash/crc64"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -31,7 +31,7 @@ const (
type FileSnapshotStore struct { type FileSnapshotStore struct {
path string path string
retain int retain int
logger *log.Logger logger hclog.Logger
} }
type snapMetaSlice []*fileSnapshotMeta type snapMetaSlice []*fileSnapshotMeta
@ -39,7 +39,7 @@ type snapMetaSlice []*fileSnapshotMeta
// FileSnapshotSink implements SnapshotSink with a file. // FileSnapshotSink implements SnapshotSink with a file.
type FileSnapshotSink struct { type FileSnapshotSink struct {
store *FileSnapshotStore store *FileSnapshotStore
logger *log.Logger logger hclog.Logger
dir string dir string
parentDir string parentDir string
meta fileSnapshotMeta meta fileSnapshotMeta
@ -76,12 +76,16 @@ func (b *bufferedFile) Close() error {
// NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based // NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based
// on a base directory. The `retain` parameter controls how many // on a base directory. The `retain` parameter controls how many
// snapshots are retained. Must be at least 1. // snapshots are retained. Must be at least 1.
func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error) { func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) {
if retain < 1 { if retain < 1 {
return nil, fmt.Errorf("must retain at least one snapshot") return nil, fmt.Errorf("must retain at least one snapshot")
} }
if logger == nil { if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags) logger = hclog.New(&hclog.LoggerOptions{
Name: "snapshot",
Output: hclog.DefaultOutput,
Level: hclog.DefaultLevel,
})
} }
// Ensure our path exists // Ensure our path exists
@ -111,7 +115,11 @@ func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSn
if logOutput == nil { if logOutput == nil {
logOutput = os.Stderr logOutput = os.Stderr
} }
return NewFileSnapshotStoreWithLogger(base, retain, log.New(logOutput, "", log.LstdFlags)) return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{
Name: "snapshot",
Output: logOutput,
Level: hclog.DefaultLevel,
}))
} }
// testPermissions tries to touch a file in our path to see if it works. // testPermissions tries to touch a file in our path to see if it works.
@ -150,11 +158,11 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
// Create a new path // Create a new path
name := snapshotName(term, index) name := snapshotName(term, index)
path := filepath.Join(f.path, name+tmpSuffix) path := filepath.Join(f.path, name+tmpSuffix)
f.logger.Printf("[INFO] snapshot: Creating new snapshot at %s", path) f.logger.Info("creating new snapshot", "path", path)
// Make the directory // Make the directory
if err := os.MkdirAll(path, 0755); err != nil { if err := os.MkdirAll(path, 0755); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to make snapshot directory: %v", err) f.logger.Error("failed to make snapshot directly", "error", err)
return nil, err return nil, err
} }
@ -180,7 +188,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
// Write out the meta data // Write out the meta data
if err := sink.writeMeta(); err != nil { if err := sink.writeMeta(); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) f.logger.Error("failed to write metadata", "error", err)
return nil, err return nil, err
} }
@ -188,7 +196,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
statePath := filepath.Join(path, stateFilePath) statePath := filepath.Join(path, stateFilePath)
fh, err := os.Create(statePath) fh, err := os.Create(statePath)
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to create state file: %v", err) f.logger.Error("failed to create state file", "error", err)
return nil, err return nil, err
} }
sink.stateFile = fh sink.stateFile = fh
@ -209,7 +217,7 @@ func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) {
// Get the eligible snapshots // Get the eligible snapshots
snapshots, err := f.getSnapshots() snapshots, err := f.getSnapshots()
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) f.logger.Error("failed to get snapshots", "error", err)
return nil, err return nil, err
} }
@ -228,7 +236,7 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Get the eligible snapshots // Get the eligible snapshots
snapshots, err := ioutil.ReadDir(f.path) snapshots, err := ioutil.ReadDir(f.path)
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to scan snapshot dir: %v", err) f.logger.Error("failed to scan snapshot directory", "error", err)
return nil, err return nil, err
} }
@ -243,20 +251,20 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Ignore any temporary snapshots // Ignore any temporary snapshots
dirName := snap.Name() dirName := snap.Name()
if strings.HasSuffix(dirName, tmpSuffix) { if strings.HasSuffix(dirName, tmpSuffix) {
f.logger.Printf("[WARN] snapshot: Found temporary snapshot: %v", dirName) f.logger.Warn("found temporary snapshot", "name", dirName)
continue continue
} }
// Try to read the meta data // Try to read the meta data
meta, err := f.readMeta(dirName) meta, err := f.readMeta(dirName)
if err != nil { if err != nil {
f.logger.Printf("[WARN] snapshot: Failed to read metadata for %v: %v", dirName, err) f.logger.Warn("failed to read metadata", "name", dirName, "error", err)
continue continue
} }
// Make sure we can understand this version. // Make sure we can understand this version.
if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax { if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax {
f.logger.Printf("[WARN] snapshot: Snapshot version for %v not supported: %d", dirName, meta.Version) f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version)
continue continue
} }
@ -297,7 +305,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Get the metadata // Get the metadata
meta, err := f.readMeta(id) meta, err := f.readMeta(id)
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get meta data to open snapshot: %v", err) f.logger.Error("failed to get meta data to open snapshot", "error", err)
return nil, nil, err return nil, nil, err
} }
@ -305,7 +313,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
statePath := filepath.Join(f.path, id, stateFilePath) statePath := filepath.Join(f.path, id, stateFilePath)
fh, err := os.Open(statePath) fh, err := os.Open(statePath)
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to open state file: %v", err) f.logger.Error("failed to open state file", "error", err)
return nil, nil, err return nil, nil, err
} }
@ -315,7 +323,7 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Compute the hash // Compute the hash
_, err = io.Copy(stateHash, fh) _, err = io.Copy(stateHash, fh)
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to read state file: %v", err) f.logger.Error("failed to read state file", "error", err)
fh.Close() fh.Close()
return nil, nil, err return nil, nil, err
} }
@ -323,15 +331,14 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
// Verify the hash // Verify the hash
computed := stateHash.Sum(nil) computed := stateHash.Sum(nil)
if bytes.Compare(meta.CRC, computed) != 0 { if bytes.Compare(meta.CRC, computed) != 0 {
f.logger.Printf("[ERR] snapshot: CRC checksum failed (stored: %v computed: %v)", f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed)
meta.CRC, computed)
fh.Close() fh.Close()
return nil, nil, fmt.Errorf("CRC mismatch") return nil, nil, fmt.Errorf("CRC mismatch")
} }
// Seek to the start // Seek to the start
if _, err := fh.Seek(0, 0); err != nil { if _, err := fh.Seek(0, 0); err != nil {
f.logger.Printf("[ERR] snapshot: State file seek failed: %v", err) f.logger.Error("state file seek failed", "error", err)
fh.Close() fh.Close()
return nil, nil, err return nil, nil, err
} }
@ -349,15 +356,15 @@ func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error
func (f *FileSnapshotStore) ReapSnapshots() error { func (f *FileSnapshotStore) ReapSnapshots() error {
snapshots, err := f.getSnapshots() snapshots, err := f.getSnapshots()
if err != nil { if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err) f.logger.Error("failed to get snapshots", "error", err)
return err return err
} }
for i := f.retain; i < len(snapshots); i++ { for i := f.retain; i < len(snapshots); i++ {
path := filepath.Join(f.path, snapshots[i].ID) path := filepath.Join(f.path, snapshots[i].ID)
f.logger.Printf("[INFO] snapshot: reaping snapshot %v", path) f.logger.Info("reaping snapshot", "path", path)
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(path); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to reap snapshot %v: %v", path, err) f.logger.Error("failed to reap snapshot", "path", path, "error", err)
return err return err
} }
} }
@ -386,9 +393,9 @@ func (s *FileSnapshotSink) Close() error {
// Close the open handles // Close the open handles
if err := s.finalize(); err != nil { if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) s.logger.Error("failed to finalize snapshot", "error", err)
if delErr := os.RemoveAll(s.dir); delErr != nil { if delErr := os.RemoveAll(s.dir); delErr != nil {
s.logger.Printf("[ERR] snapshot: Failed to delete temporary snapshot directory at path %v: %v", s.dir, delErr) s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr)
return delErr return delErr
} }
return err return err
@ -396,27 +403,27 @@ func (s *FileSnapshotSink) Close() error {
// Write out the meta data // Write out the meta data
if err := s.writeMeta(); err != nil { if err := s.writeMeta(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err) s.logger.Error("failed to write metadata", "error", err)
return err return err
} }
// Move the directory into place // Move the directory into place
newPath := strings.TrimSuffix(s.dir, tmpSuffix) newPath := strings.TrimSuffix(s.dir, tmpSuffix)
if err := os.Rename(s.dir, newPath); err != nil { if err := os.Rename(s.dir, newPath); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to move snapshot into place: %v", err) s.logger.Error("failed to move snapshot into place", "error", err)
return err return err
} }
if runtime.GOOS != "windows" { //skipping fsync for directory entry edits on Windows, only needed for *nix style file systems if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
parentFH, err := os.Open(s.parentDir) parentFH, err := os.Open(s.parentDir)
defer parentFH.Close() defer parentFH.Close()
if err != nil { if err != nil {
s.logger.Printf("[ERR] snapshot: Failed to open snapshot parent directory %v, error: %v", s.parentDir, err) s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err)
return err return err
} }
if err = parentFH.Sync(); err != nil { if err = parentFH.Sync(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed syncing parent directory %v, error: %v", s.parentDir, err) s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err)
return err return err
} }
} }
@ -439,7 +446,7 @@ func (s *FileSnapshotSink) Cancel() error {
// Close the open handles // Close the open handles
if err := s.finalize(); err != nil { if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err) s.logger.Error("failed to finalize snapshot", "error", err)
return err return err
} }

View File

@ -31,6 +31,26 @@ type FSM interface {
Restore(io.ReadCloser) error Restore(io.ReadCloser) error
} }
// BatchingFSM extends the FSM interface to add an ApplyBatch function. This can
// optionally be implemented by clients to enable multiple logs to be applied to
// the FSM in batches. Up to MaxAppendEntries could be sent in a batch.
type BatchingFSM interface {
// ApplyBatch is invoked once a batch of log entries has been committed and
// are ready to be applied to the FSM. ApplyBatch will take in an array of
// log entries. These log entries will be in the order they were committed,
// will not have gaps, and could be of a few log types. Clients should check
// the log type prior to attempting to decode the data attached. Presently
// the LogCommand and LogConfiguration types will be sent.
//
// The returned slice must be the same length as the input and each response
// should correlate to the log at the same index of the input. The returned
// values will be made available in the ApplyFuture returned by Raft.Apply
// method if that method was called on the same Raft node as the FSM.
ApplyBatch([]*Log) []interface{}
FSM
}
// FSMSnapshot is returned by an FSM in response to a Snapshot // FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent // It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply. // calls to Apply.
@ -49,7 +69,10 @@ type FSMSnapshot interface {
func (r *Raft) runFSM() { func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64 var lastIndex, lastTerm uint64
commit := func(req *commitTuple) { batchingFSM, batchingEnabled := r.fsm.(BatchingFSM)
configStore, configStoreEnabled := r.fsm.(ConfigurationStore)
commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change // Apply the log if a command or config change
var resp interface{} var resp interface{}
// Make sure we send a response // Make sure we send a response
@ -68,15 +91,14 @@ func (r *Raft) runFSM() {
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
case LogConfiguration: case LogConfiguration:
configStore, ok := r.fsm.(ConfigurationStore) if !configStoreEnabled {
if !ok {
// Return early to avoid incrementing the index and term for // Return early to avoid incrementing the index and term for
// an unimplemented operation. // an unimplemented operation.
return return
} }
start := time.Now() start := time.Now()
configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data)) configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data))
metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start) metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start)
} }
@ -85,6 +107,67 @@ func (r *Raft) runFSM() {
lastTerm = req.log.Term lastTerm = req.log.Term
} }
commitBatch := func(reqs []*commitTuple) {
if !batchingEnabled {
for _, ct := range reqs {
commitSingle(ct)
}
return
}
// Only send LogCommand and LogConfiguration log types. LogBarrier types
// will not be sent to the FSM.
shouldSend := func(l *Log) bool {
switch l.Type {
case LogCommand, LogConfiguration:
return true
}
return false
}
var lastBatchIndex, lastBatchTerm uint64
sendLogs := make([]*Log, 0, len(reqs))
for _, req := range reqs {
if shouldSend(req.log) {
sendLogs = append(sendLogs, req.log)
}
lastBatchIndex = req.log.Index
lastBatchTerm = req.log.Term
}
var responses []interface{}
if len(sendLogs) > 0 {
start := time.Now()
responses = batchingFSM.ApplyBatch(sendLogs)
metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start)
metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs)))
// Ensure we get the expected responses
if len(sendLogs) != len(responses) {
panic("invalid number of responses")
}
}
// Update the indexes
lastIndex = lastBatchIndex
lastTerm = lastBatchTerm
var i int
for _, req := range reqs {
var resp interface{}
// If the log was sent to the FSM, retrieve the response.
if shouldSend(req.log) {
resp = responses[i]
i++
}
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
}
restore := func(req *restoreFuture) { restore := func(req *restoreFuture) {
// Open the snapshot // Open the snapshot
meta, source, err := r.snapshots.Open(req.ID) meta, source, err := r.snapshots.Open(req.ID)
@ -132,8 +215,8 @@ func (r *Raft) runFSM() {
select { select {
case ptr := <-r.fsmMutateCh: case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) { switch req := ptr.(type) {
case *commitTuple: case []*commitTuple:
commit(req) commitBatch(req)
case *restoreFuture: case *restoreFuture:
restore(req) restore(req)

View File

@ -183,14 +183,13 @@ type userSnapshotFuture struct {
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil { if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available") return nil, nil, fmt.Errorf("no snapshot available")
} else {
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
} }
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
} }
// userRestoreFuture is used for waiting on a user-triggered restore of an // userRestoreFuture is used for waiting on a user-triggered restore of an

View File

@ -100,10 +100,12 @@ func (s *InmemSnapshotSink) Close() error {
return nil return nil
} }
// ID returns the ID of the SnapshotMeta
func (s *InmemSnapshotSink) ID() string { func (s *InmemSnapshotSink) ID() string {
return s.meta.ID return s.meta.ID
} }
// Cancel returns successfully with a nil error
func (s *InmemSnapshotSink) Cancel() error { func (s *InmemSnapshotSink) Cancel() error {
return nil return nil
} }

View File

@ -10,12 +10,12 @@ const (
// LogNoop is used to assert leadership. // LogNoop is used to assert leadership.
LogNoop LogNoop
// LogAddPeer is used to add a new peer. This should only be used with // LogAddPeerDeprecated is used to add a new peer. This should only be used with
// older protocol versions designed to be compatible with unversioned // older protocol versions designed to be compatible with unversioned
// Raft servers. See comments in config.go for details. // Raft servers. See comments in config.go for details.
LogAddPeerDeprecated LogAddPeerDeprecated
// LogRemovePeer is used to remove an existing peer. This should only be // LogRemovePeerDeprecated is used to remove an existing peer. This should only be
// used with older protocol versions designed to be compatible with // used with older protocol versions designed to be compatible with
// unversioned Raft servers. See comments in config.go for details. // unversioned Raft servers. See comments in config.go for details.
LogRemovePeerDeprecated LogRemovePeerDeprecated

View File

@ -5,8 +5,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/hashicorp/go-hclog"
"io" "io"
"log"
"net" "net"
"os" "os"
"sync" "sync"
@ -66,7 +66,7 @@ type NetworkTransport struct {
heartbeatFn func(RPC) heartbeatFn func(RPC)
heartbeatFnLock sync.Mutex heartbeatFnLock sync.Mutex
logger *log.Logger logger hclog.Logger
maxPool int maxPool int
@ -92,7 +92,7 @@ type NetworkTransportConfig struct {
// ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC // ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC
ServerAddressProvider ServerAddressProvider ServerAddressProvider ServerAddressProvider
Logger *log.Logger Logger hclog.Logger
// Dialer // Dialer
Stream StreamLayer Stream StreamLayer
@ -105,6 +105,7 @@ type NetworkTransportConfig struct {
Timeout time.Duration Timeout time.Duration
} }
// ServerAddressProvider is a target address to which we invoke an RPC when establishing a connection
type ServerAddressProvider interface { type ServerAddressProvider interface {
ServerAddr(id ServerID) (ServerAddress, error) ServerAddr(id ServerID) (ServerAddress, error)
} }
@ -148,7 +149,11 @@ func NewNetworkTransportWithConfig(
config *NetworkTransportConfig, config *NetworkTransportConfig,
) *NetworkTransport { ) *NetworkTransport {
if config.Logger == nil { if config.Logger == nil {
config.Logger = log.New(os.Stderr, "", log.LstdFlags) config.Logger = hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: hclog.DefaultOutput,
Level: hclog.DefaultLevel,
})
} }
trans := &NetworkTransport{ trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn), connPool: make(map[ServerAddress][]*netConn),
@ -182,7 +187,11 @@ func NewNetworkTransport(
if logOutput == nil { if logOutput == nil {
logOutput = os.Stderr logOutput = os.Stderr
} }
logger := log.New(logOutput, "", log.LstdFlags) logger := hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: logOutput,
Level: hclog.DefaultLevel,
})
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config) return NewNetworkTransportWithConfig(config)
} }
@ -195,7 +204,7 @@ func NewNetworkTransportWithLogger(
stream StreamLayer, stream StreamLayer,
maxPool int, maxPool int,
timeout time.Duration, timeout time.Duration,
logger *log.Logger, logger hclog.Logger,
) *NetworkTransport { ) *NetworkTransport {
config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger} config := &NetworkTransportConfig{Stream: stream, MaxPool: maxPool, Timeout: timeout, Logger: logger}
return NewNetworkTransportWithConfig(config) return NewNetworkTransportWithConfig(config)
@ -310,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil { if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id) serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil { if err != nil {
n.logger.Printf("[WARN] raft: Unable to get address for server id %v, using fallback address %v: %v", id, target, err) n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err)
} else { } else {
return serverAddressOverride return serverAddressOverride
} }
@ -486,7 +495,7 @@ func (n *NetworkTransport) listen() {
} }
if !n.IsShutdown() { if !n.IsShutdown() {
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err) n.logger.Error("failed to accept connection", "error", err)
} }
select { select {
@ -499,7 +508,7 @@ func (n *NetworkTransport) listen() {
// No error, reset loop delay // No error, reset loop delay
loopDelay = 0 loopDelay = 0
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr()) n.logger.Debug("accepted connection", "local-address", n.LocalAddr(), "remote-address", conn.RemoteAddr().String())
// Handle the connection in dedicated routine // Handle the connection in dedicated routine
go n.handleConn(n.getStreamContext(), conn) go n.handleConn(n.getStreamContext(), conn)
@ -519,19 +528,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
for { for {
select { select {
case <-connCtx.Done(): case <-connCtx.Done():
n.logger.Println("[DEBUG] raft-net: stream layer is closed") n.logger.Debug("stream layer is closed")
return return
default: default:
} }
if err := n.handleCommand(r, dec, enc); err != nil { if err := n.handleCommand(r, dec, enc); err != nil {
if err != io.EOF { if err != io.EOF {
n.logger.Printf("[ERR] raft-net: Failed to decode incoming command: %v", err) n.logger.Error("failed to decode incoming command", "error", err)
} }
return return
} }
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
n.logger.Printf("[ERR] raft-net: Failed to flush response: %v", err) n.logger.Error("failed to flush response", "error", err)
return return
} }
} }

View File

@ -18,7 +18,7 @@ type Observation struct {
// LeaderObservation is used for the data when leadership changes. // LeaderObservation is used for the data when leadership changes.
type LeaderObservation struct { type LeaderObservation struct {
leader ServerAddress Leader ServerAddress
} }
// PeerObservation is sent to observers when peers change. // PeerObservation is sent to observers when peers change.

View File

@ -9,6 +9,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
) )
@ -94,7 +96,7 @@ func (r *Raft) setLeader(leader ServerAddress) {
r.leader = leader r.leader = leader
r.leaderLock.Unlock() r.leaderLock.Unlock()
if oldLeader != leader { if oldLeader != leader {
r.observe(LeaderObservation{leader: leader}) r.observe(LeaderObservation{Leader: leader})
} }
} }
@ -147,7 +149,7 @@ func (r *Raft) run() {
// runFollower runs the FSM for a follower. // runFollower runs the FSM for a follower.
func (r *Raft) runFollower() { func (r *Raft) runFollower() {
didWarn := false didWarn := false
r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader())) r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
@ -209,7 +211,7 @@ func (r *Raft) runFollower() {
didWarn = true didWarn = true
} }
} else { } else {
r.logger.Warn(fmt.Sprintf("Heartbeat timeout from %q reached, starting election", lastLeader)) r.logger.Warn("heartbeat timeout reached, starting election", "last-leader", lastLeader)
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
r.setState(Candidate) r.setState(Candidate)
return return
@ -245,7 +247,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
// runCandidate runs the FSM for a candidate. // runCandidate runs the FSM for a candidate.
func (r *Raft) runCandidate() { func (r *Raft) runCandidate() {
r.logger.Info(fmt.Sprintf("%v entering Candidate state in term %v", r, r.getCurrentTerm()+1)) r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1)
metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1)
// Start vote for us, and set a timeout // Start vote for us, and set a timeout
@ -263,7 +265,7 @@ func (r *Raft) runCandidate() {
// Tally the votes, need a simple majority // Tally the votes, need a simple majority
grantedVotes := 0 grantedVotes := 0
votesNeeded := r.quorumSize() votesNeeded := r.quorumSize()
r.logger.Debug(fmt.Sprintf("Votes needed: %d", votesNeeded)) r.logger.Debug("votes", "needed", votesNeeded)
for r.getState() == Candidate { for r.getState() == Candidate {
select { select {
@ -273,7 +275,7 @@ func (r *Raft) runCandidate() {
case vote := <-voteCh: case vote := <-voteCh:
// Check if the term is greater than ours, bail // Check if the term is greater than ours, bail
if vote.Term > r.getCurrentTerm() { if vote.Term > r.getCurrentTerm() {
r.logger.Debug("Newer term discovered, fallback to follower") r.logger.Debug("newer term discovered, fallback to follower")
r.setState(Follower) r.setState(Follower)
r.setCurrentTerm(vote.Term) r.setCurrentTerm(vote.Term)
return return
@ -282,13 +284,12 @@ func (r *Raft) runCandidate() {
// Check if the vote is granted // Check if the vote is granted
if vote.Granted { if vote.Granted {
grantedVotes++ grantedVotes++
r.logger.Debug(fmt.Sprintf("Vote granted from %s in term %v. Tally: %d", r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
vote.voterID, vote.Term, grantedVotes))
} }
// Check if we've become the leader // Check if we've become the leader
if grantedVotes >= votesNeeded { if grantedVotes >= votesNeeded {
r.logger.Info(fmt.Sprintf("Election won. Tally: %d", grantedVotes)) r.logger.Info("election won", "tally", grantedVotes)
r.setState(Leader) r.setState(Leader)
r.setLeader(r.localAddr) r.setLeader(r.localAddr)
return return
@ -359,7 +360,7 @@ func (r *Raft) setupLeaderState() {
// runLeader runs the FSM for a leader. Do the setup here and drop into // runLeader runs the FSM for a leader. Do the setup here and drop into
// the leaderLoop for the hot loop. // the leaderLoop for the hot loop.
func (r *Raft) runLeader() { func (r *Raft) runLeader() {
r.logger.Info(fmt.Sprintf("%v entering Leader state", r)) r.logger.Info("entering leader state", "leader", r)
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
// Notify that we are the leader // Notify that we are the leader
@ -470,7 +471,7 @@ func (r *Raft) startStopReplication() {
} }
inConfig[server.ID] = true inConfig[server.ID] = true
if _, ok := r.leaderState.replState[server.ID]; !ok { if _, ok := r.leaderState.replState[server.ID]; !ok {
r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID)) r.logger.Info("added peer, starting replication", "peer", server.ID)
s := &followerReplication{ s := &followerReplication{
peer: server, peer: server,
commitment: r.leaderState.commitment, commitment: r.leaderState.commitment,
@ -497,7 +498,7 @@ func (r *Raft) startStopReplication() {
continue continue
} }
// Replicate up to lastIdx and stop // Replicate up to lastIdx and stop
r.logger.Info(fmt.Sprintf("Removed peer %v, stopping replication after %v", serverID, lastIdx)) r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx)
repl.stopCh <- lastIdx repl.stopCh <- lastIdx
close(repl.stopCh) close(repl.stopCh)
delete(r.leaderState.replState, serverID) delete(r.leaderState.replState, serverID)
@ -618,6 +619,8 @@ func (r *Raft) leaderLoop() {
commitIndex := r.leaderState.commitment.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex) r.setCommitIndex(commitIndex)
// New configration has been committed, set it as the committed
// value.
if r.configurations.latestIndex > oldCommitIndex && if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex { r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest r.configurations.committed = r.configurations.latest
@ -627,40 +630,48 @@ func (r *Raft) leaderLoop() {
} }
} }
var numProcessed int
start := time.Now() start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
for { // Pull all inflight logs that are committed off the queue.
e := r.leaderState.inflight.Front() for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
if e == nil {
break
}
commitLog := e.Value.(*logFuture) commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index idx := commitLog.log.Index
if idx > commitIndex { if idx > commitIndex {
// Don't go past the committed index
break break
} }
// Measure the commit time // Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
r.processLogs(idx, commitLog) // Process the group
if len(groupReady) != 0 {
r.processLogs(lastIdxInGroup, groupFutures)
r.leaderState.inflight.Remove(e) for _, e := range groupReady {
numProcessed++ r.leaderState.inflight.Remove(e)
}
} }
// Measure the time to enqueue batch of logs for FSM to apply // Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start) metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued // Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed)) metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown { if stepDown {
if r.conf.ShutdownOnRemove { if r.conf.ShutdownOnRemove {
r.logger.Info("Removed ourself, shutting down") r.logger.Info("removed ourself, shutting down")
r.Shutdown() r.Shutdown()
} else { } else {
r.logger.Info("Removed ourself, transitioning to follower") r.logger.Info("removed ourself, transitioning to follower")
r.setState(Follower) r.setState(Follower)
} }
} }
@ -672,7 +683,7 @@ func (r *Raft) leaderLoop() {
} else if v.votes < v.quorumSize { } else if v.votes < v.quorumSize {
// Early return, means there must be a new leader // Early return, means there must be a new leader
r.logger.Warn("New leader elected, stepping down") r.logger.Warn("new leader elected, stepping down")
r.setState(Follower) r.setState(Follower)
delete(r.leaderState.notify, v) delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState { for _, repl := range r.leaderState.replState {
@ -867,9 +878,9 @@ func (r *Raft) checkLeaderLease() time.Duration {
} else { } else {
// Log at least once at high value, then debug. Otherwise it gets very verbose. // Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout { if diff <= 3*r.conf.LeaderLeaseTimeout {
r.logger.Warn(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else { } else {
r.logger.Debug(fmt.Sprintf("Failed to contact %v in %v", server.ID, diff)) r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
} }
} }
metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond)) metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond))
@ -879,7 +890,7 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Verify we can contact a quorum // Verify we can contact a quorum
quorum := r.quorumSize() quorum := r.quorumSize()
if contacted < quorum { if contacted < quorum {
r.logger.Warn("Failed to contact quorum of nodes, stepping down") r.logger.Warn("failed to contact quorum of nodes, stepping down")
r.setState(Follower) r.setState(Follower)
metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1) metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1)
} }
@ -967,7 +978,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
if err := sink.Close(); err != nil { if err := sink.Close(); err != nil {
return fmt.Errorf("failed to close snapshot: %v", err) return fmt.Errorf("failed to close snapshot: %v", err)
} }
r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) r.logger.Info("copied to local snapshot", "bytes", n)
// Restore the snapshot into the FSM. If this fails we are in a // Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out. // bad state so we panic to take ourselves out.
@ -991,7 +1002,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex) r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term) r.setLastSnapshot(lastIndex, term)
r.logger.Info(fmt.Sprintf("Restored user snapshot (index %d)", lastIndex)) r.logger.Info("restored user snapshot", "index", latestIndex)
return nil return nil
} }
@ -1005,8 +1016,11 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
return return
} }
r.logger.Info(fmt.Sprintf("Updating configuration with %s (%v, %v) to %+v", r.logger.Info("updating configuration",
future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers)) "command", future.req.command,
"server-id", future.req.serverID,
"server-addr", future.req.serverAddress,
"servers", hclog.Fmt("%+v", configuration.Servers))
// In pre-ID compatibility mode we translate all configuration changes // In pre-ID compatibility mode we translate all configuration changes
// in to an old remove peer message, which can handle all supported // in to an old remove peer message, which can handle all supported
@ -1023,7 +1037,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
} else { } else {
future.log = Log{ future.log = Log{
Type: LogConfiguration, Type: LogConfiguration,
Data: encodeConfiguration(configuration), Data: EncodeConfiguration(configuration),
} }
} }
@ -1059,7 +1073,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
// Write the log entry locally // Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil { if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error(fmt.Sprintf("Failed to commit logs: %v", err)) r.logger.Error("failed to commit logs", "error", err)
for _, applyLog := range applyLogs { for _, applyLog := range applyLogs {
applyLog.respond(err) applyLog.respond(err)
} }
@ -1081,72 +1095,88 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
// applied up to the given index limit. // applied up to the given index limit.
// This can be called from both leaders and followers. // This can be called from both leaders and followers.
// Followers call this from AppendEntries, for n entries at a time, and always // Followers call this from AppendEntries, for n entries at a time, and always
// pass future=nil. // pass futures=nil.
// Leaders call this once per inflight when entries are committed. They pass // Leaders call this when entries are committed. They pass the futures from any
// the future from inflights. // inflight logs.
func (r *Raft) processLogs(index uint64, future *logFuture) { func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already // Reject logs we've applied already
lastApplied := r.getLastApplied() lastApplied := r.getLastApplied()
if index <= lastApplied { if index <= lastApplied {
r.logger.Warn(fmt.Sprintf("Skipping application of old log: %d", index)) r.logger.Warn("skipping application of old log", "index", index)
return return
} }
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Apply all the preceding logs // Apply all the preceding logs
for idx := r.getLastApplied() + 1; idx <= index; idx++ { for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store // Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx { future, futureOk := futures[idx]
r.processLog(&future.log, future) if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else { } else {
l := new(Log) l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil { if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at %d: %v", idx, err)) r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err) panic(err)
} }
r.processLog(l, nil) preparedLog = r.prepareLog(l, nil)
} }
// Update the lastApplied index and term switch {
r.setLastApplied(idx) case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
} }
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
} }
// processLog is invoked to process the application of a single committed log entry. // processLog is invoked to process the application of a single committed log entry.
func (r *Raft) processLog(l *Log, future *logFuture) { func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple {
switch l.Type { switch l.Type {
case LogBarrier: case LogBarrier:
// Barrier is handled by the FSM // Barrier is handled by the FSM
fallthrough fallthrough
case LogCommand: case LogCommand:
// Forward to the fsm handler return &commitTuple{l, future}
select {
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
}
}
// Return so that the future is only responded to
// by the FSM handler when the application is done
return
case LogConfiguration: case LogConfiguration:
// Only support this with the v2 configuration format // Only support this with the v2 configuration format
if r.protocolVersion > 2 { if r.protocolVersion > 2 {
// Forward to the fsm handler return &commitTuple{l, future}
select {
case r.fsmMutateCh <- &commitTuple{l, future}:
case <-r.shutdownCh:
if future != nil {
future.respond(ErrRaftShutdown)
}
}
// Return so that the future is only responded to
// by the FSM handler when the application is done
return
} }
case LogAddPeerDeprecated: case LogAddPeerDeprecated:
case LogRemovePeerDeprecated: case LogRemovePeerDeprecated:
@ -1157,10 +1187,7 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
panic(fmt.Errorf("unrecognized log type: %#v", l)) panic(fmt.Errorf("unrecognized log type: %#v", l))
} }
// Invoke the future if given return nil
if future != nil {
future.respond(nil)
}
} }
// processRPC is called to handle an incoming RPC request. This must only be // processRPC is called to handle an incoming RPC request. This must only be
@ -1181,7 +1208,8 @@ func (r *Raft) processRPC(rpc RPC) {
case *TimeoutNowRequest: case *TimeoutNowRequest:
r.timeoutNow(rpc, cmd) r.timeoutNow(rpc, cmd)
default: default:
r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command)) r.logger.Error("got unexpected command",
"command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command")) rpc.Respond(nil, fmt.Errorf("unexpected command"))
} }
} }
@ -1204,7 +1232,7 @@ func (r *Raft) processHeartbeat(rpc RPC) {
case *AppendEntriesRequest: case *AppendEntriesRequest:
r.appendEntries(rpc, cmd) r.appendEntries(rpc, cmd)
default: default:
r.logger.Error(fmt.Sprintf("Expected heartbeat, got command: %#v", rpc.Command)) r.logger.Error("expected heartbeat, got", "command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command")) rpc.Respond(nil, fmt.Errorf("unexpected command"))
} }
} }
@ -1254,8 +1282,10 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} else { } else {
var prevLog Log var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn(fmt.Sprintf("Failed to get previous log: %d %v (last: %d)", r.logger.Warn("failed to get previous log",
a.PrevLogEntry, err, lastIdx)) "previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true resp.NoRetryBackoff = true
return return
} }
@ -1263,8 +1293,9 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} }
if a.PrevLogTerm != prevLogTerm { if a.PrevLogTerm != prevLogTerm {
r.logger.Warn(fmt.Sprintf("Previous log term mis-match: ours: %d remote: %d", r.logger.Warn("previous log term mis-match",
prevLogTerm, a.PrevLogTerm)) "ours", prevLogTerm,
"remote", a.PrevLogTerm)
resp.NoRetryBackoff = true resp.NoRetryBackoff = true
return return
} }
@ -1284,14 +1315,17 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} }
var storeEntry Log var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn(fmt.Sprintf("Failed to get log entry %d: %v", r.logger.Warn("failed to get log entry",
entry.Index, err)) "index", entry.Index,
"error", err)
return return
} }
if entry.Term != storeEntry.Term { if entry.Term != storeEntry.Term {
r.logger.Warn(fmt.Sprintf("Clearing log suffix from %d to %d", entry.Index, lastLogIdx)) r.logger.Warn("clearing log suffix",
"from", entry.Index,
"to", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error(fmt.Sprintf("Failed to clear log suffix: %v", err)) r.logger.Error("failed to clear log suffix", "error", err)
return return
} }
if entry.Index <= r.configurations.latestIndex { if entry.Index <= r.configurations.latestIndex {
@ -1306,7 +1340,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
if n := len(newEntries); n > 0 { if n := len(newEntries); n > 0 {
// Append the new entries // Append the new entries
if err := r.logs.StoreLogs(newEntries); err != nil { if err := r.logs.StoreLogs(newEntries); err != nil {
r.logger.Error(fmt.Sprintf("Failed to append to logs: %v", err)) r.logger.Error("failed to append to logs", "error", err)
// TODO: leaving r.getLastLog() in the wrong // TODO: leaving r.getLastLog() in the wrong
// state if there was a truncation above // state if there was a truncation above
return return
@ -1351,7 +1385,7 @@ func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration { if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodeConfiguration(entry.Data) r.configurations.latest = DecodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index r.configurations.latestIndex = entry.Index
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest r.configurations.committed = r.configurations.latest
@ -1389,8 +1423,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// vote! // vote!
candidate := r.trans.DecodePeer(req.Candidate) candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v", r.logger.Warn("rejecting vote request since we have a leader",
candidate, leader)) "from", candidate,
"leader", leader)
return return
} }
@ -1402,7 +1437,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Increase the term if we see a newer one // Increase the term if we see a newer one
if req.Term > r.getCurrentTerm() { if req.Term > r.getCurrentTerm() {
// Ensure transition to follower // Ensure transition to follower
r.logger.Debug("lost leadership because received a requestvote with newer term") r.logger.Debug("lost leadership because received a requestVote with a newer term")
r.setState(Follower) r.setState(Follower)
r.setCurrentTerm(req.Term) r.setCurrentTerm(req.Term)
resp.Term = req.Term resp.Term = req.Term
@ -1411,20 +1446,20 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Check if we have voted yet // Check if we have voted yet
lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)
if err != nil && err.Error() != "not found" { if err != nil && err.Error() != "not found" {
r.logger.Error(fmt.Sprintf("Failed to get last vote term: %v", err)) r.logger.Error("failed to get last vote term", "error", err)
return return
} }
lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)
if err != nil && err.Error() != "not found" { if err != nil && err.Error() != "not found" {
r.logger.Error(fmt.Sprintf("Failed to get last vote candidate: %v", err)) r.logger.Error("failed to get last vote candidate", "error", err)
return return
} }
// Check if we've voted in this election before // Check if we've voted in this election before
if lastVoteTerm == req.Term && lastVoteCandBytes != nil { if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info(fmt.Sprintf("Duplicate RequestVote for same term: %d", req.Term)) r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn(fmt.Sprintf("Duplicate RequestVote from candidate: %s", req.Candidate)) r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate)
resp.Granted = true resp.Granted = true
} }
return return
@ -1433,20 +1468,24 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
// Reject if their term is older // Reject if their term is older
lastIdx, lastTerm := r.getLastEntry() lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm { if lastTerm > req.LastLogTerm {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last term is greater (%d, %d)", r.logger.Warn("rejecting vote request since our last term is greater",
candidate, lastTerm, req.LastLogTerm)) "candidate", candidate,
"last-term", lastTerm,
"last-candidate-term", req.LastLogTerm)
return return
} }
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since our last index is greater (%d, %d)", r.logger.Warn("rejecting vote request since our last index is greater",
candidate, lastIdx, req.LastLogIndex)) "candidate", candidate,
"last-index", lastIdx,
"last-candidate-index", req.LastLogIndex)
return return
} }
// Persist a vote for safety // Persist a vote for safety
if err := r.persistVote(req.Term, req.Candidate); err != nil { if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error(fmt.Sprintf("Failed to persist vote: %v", err)) r.logger.Error("failed to persist vote", "error", err)
return return
} }
@ -1481,8 +1520,9 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Ignore an older term // Ignore an older term
if req.Term < r.getCurrentTerm() { if req.Term < r.getCurrentTerm() {
r.logger.Info(fmt.Sprintf("Ignoring installSnapshot request with older term of %d vs currentTerm %d", r.logger.Info("ignoring installSnapshot request with older term than current term",
req.Term, r.getCurrentTerm())) "request-term", req.Term,
"current-term", r.getCurrentTerm())
return return
} }
@ -1501,7 +1541,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
var reqConfiguration Configuration var reqConfiguration Configuration
var reqConfigurationIndex uint64 var reqConfigurationIndex uint64
if req.SnapshotVersion > 0 { if req.SnapshotVersion > 0 {
reqConfiguration = decodeConfiguration(req.Configuration) reqConfiguration = DecodeConfiguration(req.Configuration)
reqConfigurationIndex = req.ConfigurationIndex reqConfigurationIndex = req.ConfigurationIndex
} else { } else {
reqConfiguration = decodePeers(req.Peers, r.trans) reqConfiguration = decodePeers(req.Peers, r.trans)
@ -1511,7 +1551,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm, sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm,
reqConfiguration, reqConfigurationIndex, r.trans) reqConfiguration, reqConfigurationIndex, r.trans)
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to create snapshot to install: %v", err)) r.logger.Error("failed to create snapshot to install", "error", err)
rpcErr = fmt.Errorf("failed to create snapshot: %v", err) rpcErr = fmt.Errorf("failed to create snapshot: %v", err)
return return
} }
@ -1520,7 +1560,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
n, err := io.Copy(sink, rpc.Reader) n, err := io.Copy(sink, rpc.Reader)
if err != nil { if err != nil {
sink.Cancel() sink.Cancel()
r.logger.Error(fmt.Sprintf("Failed to copy snapshot: %v", err)) r.logger.Error("failed to copy snapshot", "error", err)
rpcErr = err rpcErr = err
return return
} }
@ -1528,18 +1568,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Check that we received it all // Check that we received it all
if n != req.Size { if n != req.Size {
sink.Cancel() sink.Cancel()
r.logger.Error(fmt.Sprintf("Failed to receive whole snapshot: %d / %d", n, req.Size)) r.logger.Error("failed to receive whole snapshot",
"received", hclog.Fmt("%d / %d", n, req.Size))
rpcErr = fmt.Errorf("short read") rpcErr = fmt.Errorf("short read")
return return
} }
// Finalize the snapshot // Finalize the snapshot
if err := sink.Close(); err != nil { if err := sink.Close(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to finalize snapshot: %v", err)) r.logger.Error("failed to finalize snapshot", "error", err)
rpcErr = err rpcErr = err
return return
} }
r.logger.Info(fmt.Sprintf("Copied %d bytes to local snapshot", n)) r.logger.Info("copied to local snapshot", "bytes", n)
// Restore snapshot // Restore snapshot
future := &restoreFuture{ID: sink.ID()} future := &restoreFuture{ID: sink.ID()}
@ -1553,7 +1594,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Wait for the restore to happen // Wait for the restore to happen
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to restore snapshot: %v", err)) r.logger.Error("failed to restore snapshot", "error", err)
rpcErr = err rpcErr = err
return return
} }
@ -1572,7 +1613,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Compact logs, continue even if this fails // Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil { if err := r.compactLogs(req.LastLogIndex); err != nil {
r.logger.Error(fmt.Sprintf("Failed to compact logs: %v", err)) r.logger.Error("failed to compact logs", "error", err)
} }
r.logger.Info("Installed remote snapshot") r.logger.Info("Installed remote snapshot")
@ -1622,7 +1663,9 @@ func (r *Raft) electSelf() <-chan *voteResult {
resp := &voteResult{voterID: peer.ID} resp := &voteResult{voterID: peer.ID}
err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse)
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to make RequestVote RPC to %v: %v", peer, err)) r.logger.Error("failed to make requestVote RPC",
"target", peer,
"error", err)
resp.Term = req.Term resp.Term = req.Term
resp.Granted = false resp.Granted = false
} }
@ -1636,7 +1679,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
if server.ID == r.localID { if server.ID == r.localID {
// Persist a vote for ourselves // Persist a vote for ourselves
if err := r.persistVote(req.Term, req.Candidate); err != nil { if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error(fmt.Sprintf("Failed to persist vote : %v", err)) r.logger.Error("failed to persist vote", "error", err)
return nil return nil
} }
// Include our own vote // Include our own vote

View File

@ -100,7 +100,7 @@ func (s *followerReplication) notifyAll(leader bool) {
s.notifyLock.Unlock() s.notifyLock.Unlock()
// Submit our votes // Submit our votes
for v, _ := range n { for v := range n {
v.vote(leader) v.vote(leader)
} }
} }
@ -182,7 +182,7 @@ PIPELINE:
// to standard mode on failure. // to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil { if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported { if err != ErrPipelineReplicationNotSupported {
r.logger.Error(fmt.Sprintf("Failed to start pipeline replication to %s: %s", s.peer, err)) r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
} }
} }
goto RPC goto RPC
@ -215,7 +215,7 @@ START:
// Make the RPC call // Make the RPC call
start = time.Now() start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error(fmt.Sprintf("Failed to AppendEntries to %v: %v", s.peer, err)) r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++ s.failures++
return return
} }
@ -245,7 +245,7 @@ START:
} else { } else {
s.failures++ s.failures++
} }
r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, atomic.LoadUint64(&s.nextIndex))) r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
} }
CHECK_MORE: CHECK_MORE:
@ -272,7 +272,7 @@ SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop { if stop, err := r.sendLatestSnapshot(s); stop {
return true return true
} else if err != nil { } else if err != nil {
r.logger.Error(fmt.Sprintf("Failed to send snapshot to %v: %v", s.peer, err)) r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return return
} }
@ -286,7 +286,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
// Get the snapshots // Get the snapshots
snapshots, err := r.snapshots.List() snapshots, err := r.snapshots.List()
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to list snapshots: %v", err)) r.logger.Error("failed to list snapshots", "error", err)
return false, err return false, err
} }
@ -299,7 +299,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
snapID := snapshots[0].ID snapID := snapshots[0].ID
meta, snapshot, err := r.snapshots.Open(snapID) meta, snapshot, err := r.snapshots.Open(snapID)
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to open snapshot %v: %v", snapID, err)) r.logger.Error("failed to open snapshot", "id", snapID, "error", err)
return false, err return false, err
} }
defer snapshot.Close() defer snapshot.Close()
@ -314,7 +314,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
LastLogTerm: meta.Term, LastLogTerm: meta.Term,
Peers: meta.Peers, Peers: meta.Peers,
Size: meta.Size, Size: meta.Size,
Configuration: encodeConfiguration(meta.Configuration), Configuration: EncodeConfiguration(meta.Configuration),
ConfigurationIndex: meta.ConfigurationIndex, ConfigurationIndex: meta.ConfigurationIndex,
} }
@ -322,7 +322,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
start := time.Now() start := time.Now()
var resp InstallSnapshotResponse var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil { if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Error(fmt.Sprintf("Failed to install snapshot %v: %v", snapID, err)) r.logger.Error("failed to install snapshot", "id", snapID, "error", err)
s.failures++ s.failures++
return false, err return false, err
} }
@ -350,7 +350,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
s.notifyAll(true) s.notifyAll(true)
} else { } else {
s.failures++ s.failures++
r.logger.Warn(fmt.Sprintf("InstallSnapshot to %v rejected", s.peer)) r.logger.Warn("installSnapshot rejected to", "peer", s.peer)
} }
return false, nil return false, nil
} }
@ -377,7 +377,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
start := time.Now() start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error(fmt.Sprintf("Failed to heartbeat to %v: %v", s.peer.Address, err)) r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
failures++ failures++
select { select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)): case <-time.After(backoff(failureWait, failures, maxFailureScale)):
@ -405,8 +405,8 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error {
defer pipeline.Close() defer pipeline.Close()
// Log start and stop of pipeline // Log start and stop of pipeline
r.logger.Info(fmt.Sprintf("pipelining replication to peer %v", s.peer)) r.logger.Info("pipelining replication", "peer", s.peer)
defer r.logger.Info(fmt.Sprintf("aborting pipeline replication to peer %v", s.peer)) defer r.logger.Info("aborting pipeline replication", "peer", s.peer)
// Create a shutdown and finish channel // Create a shutdown and finish channel
stopCh := make(chan struct{}) stopCh := make(chan struct{})
@ -467,7 +467,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u
// Pipeline the append entries // Pipeline the append entries
if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil { if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil {
r.logger.Error(fmt.Sprintf("Failed to pipeline AppendEntries to %v: %v", s.peer, err)) r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err)
return true return true
} }
@ -543,7 +543,7 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error
} else { } else {
var l Log var l Log
if err := r.logs.GetLog(nextIndex-1, &l); err != nil { if err := r.logs.GetLog(nextIndex-1, &l); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", nextIndex-1, err)) r.logger.Error("failed to get log", "index", nextIndex-1, "error", err)
return err return err
} }
@ -562,7 +562,7 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
for i := nextIndex; i <= maxIndex; i++ { for i := nextIndex; i <= maxIndex; i++ {
oldLog := new(Log) oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil { if err := r.logs.GetLog(i, oldLog); err != nil {
r.logger.Error(fmt.Sprintf("Failed to get log at index %d: %v", i, err)) r.logger.Error("failed to get log", "index", i, "error", err)
return err return err
} }
req.Entries = append(req.Entries, oldLog) req.Entries = append(req.Entries, oldLog)
@ -578,7 +578,7 @@ func appendStats(peer string, start time.Time, logs float32) {
// handleStaleTerm is used when a follower indicates that we have a stale term. // handleStaleTerm is used when a follower indicates that we have a stale term.
func (r *Raft) handleStaleTerm(s *followerReplication) { func (r *Raft) handleStaleTerm(s *followerReplication) {
r.logger.Error(fmt.Sprintf("peer %v has newer term, stopping replication", s.peer)) r.logger.Error("peer has newer term, stopping replication", "peer", s.peer)
s.notifyAll(false) // No longer leader s.notifyAll(false) // No longer leader
asyncNotifyCh(s.stepDown) asyncNotifyCh(s.stepDown)
} }

View File

@ -77,14 +77,14 @@ func (r *Raft) runSnapshots() {
// Trigger a snapshot // Trigger a snapshot
if _, err := r.takeSnapshot(); err != nil { if _, err := r.takeSnapshot(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) r.logger.Error("failed to take snapshot", "error", err)
} }
case future := <-r.userSnapshotCh: case future := <-r.userSnapshotCh:
// User-triggered, run immediately // User-triggered, run immediately
id, err := r.takeSnapshot() id, err := r.takeSnapshot()
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err)) r.logger.Error("failed to take snapshot", "error", err)
} else { } else {
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
return r.snapshots.Open(id) return r.snapshots.Open(id)
@ -107,7 +107,7 @@ func (r *Raft) shouldSnapshot() bool {
// Check the last log index // Check the last log index
lastIdx, err := r.logs.LastIndex() lastIdx, err := r.logs.LastIndex()
if err != nil { if err != nil {
r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err)) r.logger.Error("failed to get last log index", "error", err)
return false return false
} }
@ -172,7 +172,7 @@ func (r *Raft) takeSnapshot() (string, error) {
} }
// Create a new snapshot. // Create a new snapshot.
r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index)) r.logger.Info("starting snapshot up to", "index", snapReq.index)
start := time.Now() start := time.Now()
version := getSnapshotVersion(r.protocolVersion) version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans) sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
@ -202,7 +202,7 @@ func (r *Raft) takeSnapshot() (string, error) {
return "", err return "", err
} }
r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index)) r.logger.Info("snapshot complete up to", "index", snapReq.index)
return sink.ID(), nil return sink.ID(), nil
} }
@ -228,8 +228,12 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// after the snapshot to be removed. // after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
// Log this if minLog > maxLog {
r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog)) r.logger.Info("no logs to truncate")
return nil
}
r.logger.Info("compacting logs", "from", minLog, "to", maxLog)
// Compact the logs // Compact the logs
if err := r.logs.DeleteRange(minLog, maxLog); err != nil { if err := r.logs.DeleteRange(minLog, maxLog); err != nil {

View File

@ -2,8 +2,8 @@ package raft
import ( import (
"errors" "errors"
"github.com/hashicorp/go-hclog"
"io" "io"
"log"
"net" "net"
"time" "time"
) )
@ -40,7 +40,7 @@ func NewTCPTransportWithLogger(
advertise net.Addr, advertise net.Addr,
maxPool int, maxPool int,
timeout time.Duration, timeout time.Duration,
logger *log.Logger, logger hclog.Logger,
) (*NetworkTransport, error) { ) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport { return newTCPTransport(bindAddr, advertise, func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger) return NewNetworkTransportWithLogger(stream, maxPool, timeout, logger)

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"reflect" "reflect"
"sync" "sync"
@ -16,6 +15,10 @@ import (
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
) )
var (
userSnapshotErrorsOnNoData = true
)
// Return configurations optimized for in-memory // Return configurations optimized for in-memory
func inmemConfig(t *testing.T) *Config { func inmemConfig(t *testing.T) *Config {
conf := DefaultConfig() conf := DefaultConfig()
@ -151,12 +154,18 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
return len(d), nil return len(d), nil
} }
func newTestLogger(t *testing.T) *log.Logger { func newTestLogger(t *testing.T) hclog.Logger {
return log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds) return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t},
Level: hclog.DefaultLevel,
})
} }
func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger { func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds) return hclog.New(&hclog.LoggerOptions{
Output: &testLoggerAdapter{t: t, prefix: prefix},
Level: hclog.DefaultLevel,
})
} }
func newTestLeveledLogger(t *testing.T) hclog.Logger { func newTestLeveledLogger(t *testing.T) hclog.Logger {
@ -185,7 +194,7 @@ type cluster struct {
conf *Config conf *Config
propagateTimeout time.Duration propagateTimeout time.Duration
longstopTimeout time.Duration longstopTimeout time.Duration
logger *log.Logger logger hclog.Logger
startTime time.Time startTime time.Time
failedLock sync.Mutex failedLock sync.Mutex
@ -222,7 +231,7 @@ func (c *cluster) notifyFailed() {
// thread to block until all goroutines have completed in order to reliably // thread to block until all goroutines have completed in order to reliably
// fail tests using this function. // fail tests using this function.
func (c *cluster) Failf(format string, args ...interface{}) { func (c *cluster) Failf(format string, args ...interface{}) {
c.logger.Printf(format, args...) c.logger.Error(fmt.Sprintf(format, args...))
c.t.Fail() c.t.Fail()
c.notifyFailed() c.notifyFailed()
} }
@ -233,7 +242,7 @@ func (c *cluster) Failf(format string, args ...interface{}) {
// other goroutines created during the test. Calling FailNowf does not stop // other goroutines created during the test. Calling FailNowf does not stop
// those other goroutines. // those other goroutines.
func (c *cluster) FailNowf(format string, args ...interface{}) { func (c *cluster) FailNowf(format string, args ...interface{}) {
c.logger.Printf(format, args...) c.logger.Error(fmt.Sprintf(format, args...))
c.t.FailNow() c.t.FailNow()
} }
@ -254,7 +263,7 @@ func (c *cluster) Close() {
for _, f := range futures { for _, f := range futures {
if err := f.Error(); err != nil { if err := f.Error(); err != nil {
c.FailNowf("[ERR] shutdown future err: %v", err) c.FailNowf("shutdown future err: %v", err)
} }
} }
@ -316,7 +325,7 @@ CHECK:
c.t.FailNow() c.t.FailNow()
case <-limitCh: case <-limitCh:
c.FailNowf("[ERR] Timeout waiting for replication") c.FailNowf("timeout waiting for replication")
case <-ch: case <-ch:
for _, fsmRaw := range c.fsms { for _, fsmRaw := range c.fsms {
@ -354,7 +363,7 @@ func (c *cluster) pollState(s RaftState) ([]*Raft, uint64) {
// GetInState polls the state of the cluster and attempts to identify when it has // GetInState polls the state of the cluster and attempts to identify when it has
// settled into the given state. // settled into the given state.
func (c *cluster) GetInState(s RaftState) []*Raft { func (c *cluster) GetInState(s RaftState) []*Raft {
c.logger.Printf("[INFO] Starting stability test for raft state: %+v", s) c.logger.Info("starting stability test", "raft-state", s)
limitCh := time.After(c.longstopTimeout) limitCh := time.After(c.longstopTimeout)
// An election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout) // An election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout)
@ -411,17 +420,18 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.FailNow() c.t.FailNow()
case <-limitCh: case <-limitCh:
c.FailNowf("[ERR] Timeout waiting for stable %s state", s) c.FailNowf("timeout waiting for stable %s state", s)
case <-c.WaitEventChan(filter, 0): case <-c.WaitEventChan(filter, 0):
c.logger.Printf("[DEBUG] Resetting stability timeout") c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C: case t, ok := <-timer.C:
if !ok { if !ok {
c.FailNowf("[ERR] Timer channel errored") c.FailNowf("timer channel errored")
} }
c.logger.Printf("[INFO] Stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)) c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
return inState return inState
} }
} }
@ -431,7 +441,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
func (c *cluster) Leader() *Raft { func (c *cluster) Leader() *Raft {
leaders := c.GetInState(Leader) leaders := c.GetInState(Leader)
if len(leaders) != 1 { if len(leaders) != 1 {
c.FailNowf("[ERR] expected one leader: %v", leaders) c.FailNowf("expected one leader: %v", leaders)
} }
return leaders[0] return leaders[0]
} }
@ -442,14 +452,14 @@ func (c *cluster) Followers() []*Raft {
expFollowers := len(c.rafts) - 1 expFollowers := len(c.rafts) - 1
followers := c.GetInState(Follower) followers := c.GetInState(Follower)
if len(followers) != expFollowers { if len(followers) != expFollowers {
c.FailNowf("[ERR] timeout waiting for %d followers (followers are %v)", expFollowers, followers) c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers)
} }
return followers return followers
} }
// FullyConnect connects all the transports together. // FullyConnect connects all the transports together.
func (c *cluster) FullyConnect() { func (c *cluster) FullyConnect() {
c.logger.Printf("[DEBUG] Fully Connecting") c.logger.Debug("fully connecting")
for i, t1 := range c.trans { for i, t1 := range c.trans {
for j, t2 := range c.trans { for j, t2 := range c.trans {
if i != j { if i != j {
@ -462,7 +472,7 @@ func (c *cluster) FullyConnect() {
// Disconnect disconnects all transports from the given address. // Disconnect disconnects all transports from the given address.
func (c *cluster) Disconnect(a ServerAddress) { func (c *cluster) Disconnect(a ServerAddress) {
c.logger.Printf("[DEBUG] Disconnecting %v", a) c.logger.Debug("disconnecting", "address", a)
for _, t := range c.trans { for _, t := range c.trans {
if t.LocalAddr() == a { if t.LocalAddr() == a {
t.DisconnectAll() t.DisconnectAll()
@ -475,7 +485,7 @@ func (c *cluster) Disconnect(a ServerAddress) {
// Partition keeps the given list of addresses connected but isolates them // Partition keeps the given list of addresses connected but isolates them
// from the other members of the cluster. // from the other members of the cluster.
func (c *cluster) Partition(far []ServerAddress) { func (c *cluster) Partition(far []ServerAddress) {
c.logger.Printf("[DEBUG] Partitioning %v", far) c.logger.Debug("partitioning", "addresses", far)
// Gather the set of nodes on the "near" side of the partition (we // Gather the set of nodes on the "near" side of the partition (we
// will call the supplied list of nodes the "far" side). // will call the supplied list of nodes the "far" side).
@ -500,7 +510,7 @@ OUTER:
t.Disconnect(a) t.Disconnect(a)
} }
} else { } else {
for a, _ := range near { for a := range near {
t.Disconnect(a) t.Disconnect(a)
} }
} }
@ -530,15 +540,15 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) {
leader = "[none]" leader = "[none]"
} }
if expect == "" { if expect == "" {
c.logger.Printf("[ERR] Peer %s sees leader %v expected [none]", r, leader) c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", "[none]")
} else { } else {
c.logger.Printf("[ERR] Peer %s sees leader %v expected %v", r, leader, expect) c.logger.Error("peer sees incorrect leader", "peer", r, "leader", leader, "expected-leader", expect)
} }
fail = true fail = true
} }
} }
if fail { if fail {
c.FailNowf("[ERR] At least one peer has the wrong notion of leader") c.FailNowf("at least one peer has the wrong notion of leader")
} }
} }
@ -559,7 +569,7 @@ CHECK:
if len(first.logs) != len(fsm.logs) { if len(first.logs) != len(fsm.logs) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("[ERR] FSM log length mismatch: %d %d", c.FailNowf("FSM log length mismatch: %d %d",
len(first.logs), len(fsm.logs)) len(first.logs), len(fsm.logs))
} else { } else {
goto WAIT goto WAIT
@ -570,7 +580,7 @@ CHECK:
if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 { if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("[ERR] FSM log mismatch at index %d", idx) c.FailNowf("FSM log mismatch at index %d", idx)
} else { } else {
goto WAIT goto WAIT
} }
@ -579,7 +589,7 @@ CHECK:
if len(first.configurations) != len(fsm.configurations) { if len(first.configurations) != len(fsm.configurations) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("[ERR] FSM configuration length mismatch: %d %d", c.FailNowf("FSM configuration length mismatch: %d %d",
len(first.logs), len(fsm.logs)) len(first.logs), len(fsm.logs))
} else { } else {
goto WAIT goto WAIT
@ -590,7 +600,7 @@ CHECK:
if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) { if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("[ERR] FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
} else { } else {
goto WAIT goto WAIT
} }
@ -613,7 +623,7 @@ WAIT:
func (c *cluster) getConfiguration(r *Raft) Configuration { func (c *cluster) getConfiguration(r *Raft) Configuration {
future := r.GetConfiguration() future := r.GetConfiguration()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
c.FailNowf("[ERR] failed to get configuration: %v", err) c.FailNowf("failed to get configuration: %v", err)
return Configuration{} return Configuration{}
} }
@ -634,7 +644,7 @@ CHECK:
otherSet := c.getConfiguration(raft) otherSet := c.getConfiguration(raft)
if !reflect.DeepEqual(peerSet, otherSet) { if !reflect.DeepEqual(peerSet, otherSet) {
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("[ERR] peer mismatch: %+v %+v", peerSet, otherSet) c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet)
} else { } else {
goto WAIT goto WAIT
} }
@ -687,7 +697,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
for i := 0; i < opts.Peers; i++ { for i := 0; i < opts.Peers; i++ {
dir, err := ioutil.TempDir("", "raft") dir, err := ioutil.TempDir("", "raft")
if err != nil { if err != nil {
c.FailNowf("[ERR] err: %v ", err) c.FailNowf("err: %v", err)
} }
store := NewInmemStore() store := NewInmemStore()
@ -742,18 +752,18 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
if opts.Bootstrap { if opts.Bootstrap {
err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration)
if err != nil { if err != nil {
c.FailNowf("[ERR] BootstrapCluster failed: %v", err) c.FailNowf("BootstrapCluster failed: %v", err)
} }
} }
raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans)
if err != nil { if err != nil {
c.FailNowf("[ERR] NewRaft failed: %v", err) c.FailNowf("NewRaft failed: %v", err)
} }
raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) raft.RegisterObserver(NewObserver(c.observationCh, false, nil))
if err != nil { if err != nil {
c.FailNowf("[ERR] RegisterObserver failed: %v", err) c.FailNowf("RegisterObserver failed: %v", err)
} }
c.rafts = append(c.rafts, raft) c.rafts = append(c.rafts, raft)
} }

29
vendor/github.com/hashicorp/raft/testing_batch.go generated vendored Normal file
View File

@ -0,0 +1,29 @@
// +build batchtest
package raft
func init() {
userSnapshotErrorsOnNoData = false
}
// ApplyBatch enables MockFSM to satisfy the BatchingFSM interface. This
// function is gated by the batchtest build flag.
//
// NOTE: This is exposed for middleware testing purposes and is not a stable API
func (m *MockFSM) ApplyBatch(logs []*Log) []interface{} {
m.Lock()
defer m.Unlock()
ret := make([]interface{}, len(logs))
for i, log := range logs {
switch log.Type {
case LogCommand:
m.logs = append(m.logs, log.Data)
ret[i] = len(m.logs)
default:
ret[i] = nil
}
}
return ret
}

4
vendor/modules.txt vendored
View File

@ -298,7 +298,7 @@ github.com/hashicorp/go-multierror
# github.com/hashicorp/go-plugin v1.0.1 # github.com/hashicorp/go-plugin v1.0.1
github.com/hashicorp/go-plugin github.com/hashicorp/go-plugin
github.com/hashicorp/go-plugin/internal/plugin github.com/hashicorp/go-plugin/internal/plugin
# github.com/hashicorp/go-raftchunking v0.6.2 # github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a
github.com/hashicorp/go-raftchunking github.com/hashicorp/go-raftchunking
github.com/hashicorp/go-raftchunking/types github.com/hashicorp/go-raftchunking/types
# github.com/hashicorp/go-retryablehttp v0.6.2 # github.com/hashicorp/go-retryablehttp v0.6.2
@ -330,7 +330,7 @@ github.com/hashicorp/hcl/json/token
# github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf # github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf
github.com/hashicorp/nomad/api github.com/hashicorp/nomad/api
github.com/hashicorp/nomad/api/contexts github.com/hashicorp/nomad/api/contexts
# github.com/hashicorp/raft v1.1.1 # github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/hashicorp/raft github.com/hashicorp/raft
# github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab # github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/raft-snapshot github.com/hashicorp/raft-snapshot