Upgrade raft library (#9170)

* Upgrade raft library

* Update vendor

* Update physical/raft/snapshot_test.go

Co-authored-by: Calvin Leung Huang <cleung2010@gmail.com>

* Update physical/raft/snapshot_test.go

Co-authored-by: Calvin Leung Huang <cleung2010@gmail.com>
This commit is contained in:
Brian Kassouf 2020-06-08 16:34:20 -07:00 committed by GitHub
parent 77dfab2b62
commit 3b4ba9d1fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 300 additions and 144 deletions

2
go.mod
View File

@ -70,7 +70,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/nomad/api v0.0.0-20191220223628-edc62acd919d
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/hashicorp/raft v1.1.3-0.20200501224250-c95aa91e604e
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/vault-plugin-auth-alicloud v0.5.5
github.com/hashicorp/vault-plugin-auth-azure v0.5.5

4
go.sum
View File

@ -471,6 +471,10 @@ github.com/hashicorp/nomad/api v0.0.0-20191220223628-edc62acd919d/go.mod h1:WKCL
github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c=
github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.3-0.20200501224250-c95aa91e604e h1:hMRRBhY9cayPJzEgNGNAl74TJ0rwY3Csbr43ogjKh1I=
github.com/hashicorp/raft v1.1.3-0.20200501224250-c95aa91e604e/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic=

View File

@ -615,13 +615,8 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
if err := raft.BootstrapCluster(raftConfig, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *bootstrapConfig); err != nil {
return err
}
// If we are the only node we should start as the leader.
if len(bootstrapConfig.Servers) == 1 {
opts.StartAsLeader = true
}
}
raftConfig.StartAsLeader = opts.StartAsLeader
// Setup the Raft store.
b.fsm.SetNoopRestore(true)
@ -670,6 +665,30 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
if err != nil {
return err
}
// If we are expecting to start as leader wait until we win the election.
// This should happen quickly since there is only one node in the cluster.
// StartAsLeader is only set during init, recovery mode, storage migration,
// and tests.
if opts.StartAsLeader {
for {
if raftObj.State() == raft.Leader {
break
}
select {
case <-ctx.Done():
future := raftObj.Shutdown()
if future.Error() != nil {
return errwrap.Wrapf("shutdown while waiting for leadership: {{err}}", future.Error())
}
return errors.New("shutdown while waiting for leadership")
case <-time.After(10 * time.Millisecond):
}
}
}
b.raft = raftObj
b.raftNotifyCh = raftNotifyCh

View File

@ -148,8 +148,8 @@ func TestRaft_Snapshot_Index(t *testing.T) {
// Get index
index, _ := raft.fsm.LatestState()
if index.Term != 1 {
t.Fatalf("unexpected term, got %d expected 1", index.Term)
if index.Term != 2 {
t.Fatalf("unexpected term, got %d expected 2", index.Term)
}
if index.Index != 3 {
t.Fatalf("unexpected index, got %d expected 3", index.Term)
@ -168,8 +168,8 @@ func TestRaft_Snapshot_Index(t *testing.T) {
// Get index
index, _ = raft.fsm.LatestState()
if index.Term != 1 {
t.Fatalf("unexpected term, got %d expected 1", index.Term)
if index.Term != 2 {
t.Fatalf("unexpected term, got %d expected 2", index.Term)
}
if index.Index != 103 {
t.Fatalf("unexpected index, got %d expected 103", index.Term)
@ -216,7 +216,7 @@ func TestRaft_Snapshot_Index(t *testing.T) {
if meta.Index != 203 {
t.Fatalf("unexpected snapshot index %d", meta.Index)
}
if meta.Term != 1 {
if meta.Term != 2 {
t.Fatalf("unexpected snapshot term %d", meta.Term)
}
}

View File

@ -8,7 +8,10 @@ go:
- 1.12
- tip
install: make deps
install:
- make deps
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin latest
script:
- make integ

View File

@ -1,17 +1,27 @@
# UNRELEASED
IMPROVEMENTS
* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)]
# 1.1.2 (January 17th, 2020)
FEATURES
* Improve FSM apply performance through batching. Implementing the `BatchingFSM` interface enables this new feature [[GH-364](https://github.com/hashicorp/raft/pull/364)]
* Add ability to obtain Raft configuration before Raft starts with GetConfiguration [[GH-369](https://github.com/hashicorp/raft/pull/369)]
IMPROVEMENTS
* Remove lint violations and add a `make` rule for running the linter.
* Replace logger with hclog [[GH-360](https://github.com/hashicorp/raft/pull/360)]
* Read latest configuration independently from main loop [[GH-379](https://github.com/hashicorp/raft/pull/379)]
BUG FIXES
* Export the leader field in LeaderObservation [[GH-357](https://github.com/hashicorp/raft/pull/357)]
* Fix snapshot to not attempt to truncate a negative range [[GH-358](https://github.com/hashicorp/raft/pull/358)]
* Check for shutdown in inmemPipeline before sending RPCs [[GH-276](https://github.com/hashicorp/raft/pull/276)]
# 1.1.1 (July 23rd, 2019)

View File

@ -1,9 +1,23 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
ENV = $(shell go env GOPATH)
GO_VERSION = $(shell go version)
GOLANG_CI_VERSION = v1.19.0
# Look for versions prior to 1.10 which have a different fmt output
# and don't lint with gofmt against them.
ifneq (,$(findstring go version go1.8, $(GO_VERSION)))
FMT=
else ifneq (,$(findstring go version go1.9, $(GO_VERSION)))
FMT=
else
FMT=--enable gofmt
endif
TEST_RESULTS_DIR?=/tmp/test-results
test:
go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race .
integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
@ -22,15 +36,22 @@ ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest .
fuzz:
go test $(TESTARGS) -timeout=500s ./fuzzy
go test $(TESTARGS) -timeout=500s -tags batchtest ./fuzzy
go test $(TESTARGS) -timeout=20m ./fuzzy
go test $(TESTARGS) -timeout=20m -tags batchtest ./fuzzy
deps:
go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
lint:
gofmt -s -w .
golangci-lint run -c .golangci-lint.yml $(FMT) .
dep-linter:
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(ENV)/bin $(GOLANG_CI_VERSION)
cov:
INTEG_TESTS=yes gocov test github.com/hashicorp/raft | gocov-html > /tmp/coverage.html
open /tmp/coverage.html
.PHONY: test cov integ deps
.PHONY: test cov integ deps dep-linter lint

View File

@ -7,11 +7,11 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
hclog "github.com/hashicorp/go-hclog"
)
const (
@ -139,6 +139,10 @@ type Raft struct {
// the log/snapshot.
configurations configurations
// Holds a copy of the latest configuration which can be read
// independently from main loop.
latestConfiguration atomic.Value
// RPC chan comes from the transport layer
rpcCh <-chan RPC
@ -288,37 +292,36 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
// expect data to be there and it's not. By refusing, we force them
// to show intent to start a cluster fresh by explicitly doing a
// bootstrap, rather than quietly fire up a fresh cluster here.
hasState, err := HasExistingState(logs, stable, snaps)
if err != nil {
if hasState, err := HasExistingState(logs, stable, snaps); err != nil {
return fmt.Errorf("failed to check for existing state: %v", err)
}
if !hasState {
} else if !hasState {
return fmt.Errorf("refused to recover cluster with no initial state, this is probably an operator error")
}
// Attempt to restore any snapshots we find, newest to oldest.
var snapshotIndex uint64
var snapshotTerm uint64
snapshots, err := snaps.List()
var (
snapshotIndex uint64
snapshotTerm uint64
snapshots, err = snaps.List()
)
if err != nil {
return fmt.Errorf("failed to list snapshots: %v", err)
}
for _, snapshot := range snapshots {
if !conf.NoSnapshotRestoreOnStart {
_, source, err := snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
var source io.ReadCloser
_, source, err = snaps.Open(snapshot.ID)
if err != nil {
// Skip this one and try the next. We will detect if we
// couldn't open any snapshots.
continue
}
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
if err != nil {
// Same here, skip and try the next one.
continue
}
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
if err != nil {
// Same here, skip and try the next one.
continue
}
snapshotIndex = snapshot.Index
@ -341,7 +344,7 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
}
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
var entry Log
if err := logs.GetLog(index, &entry); err != nil {
if err = logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == LogCommand {
@ -362,10 +365,10 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
if err := snapshot.Persist(sink); err != nil {
if err = snapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
}
if err := sink.Close(); err != nil {
if err = sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
@ -382,6 +385,23 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil
}
// GetConfiguration returns the configuration of the Raft cluster without
// starting a Raft instance or connecting to the cluster
// This function has identical behavior to Raft.GetConfiguration
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true
r, err := NewRaft(conf, fsm, logs, stable, snaps, trans)
if err != nil {
return Configuration{}, err
}
future := r.GetConfiguration()
if err = future.Error(); err != nil {
return Configuration{}, err
}
return future.Configuration(), nil
}
// HasExistingState returns true if the server has any existing state (logs,
// knowledge of a current term, or any snapshots).
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) {
@ -483,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
leaderCh: make(chan bool, 1),
localID: localID,
localAddr: localAddr,
logger: logger,
@ -507,13 +527,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Initialize as a follower.
r.setState(Follower)
// Start as leader if specified. This should only be used
// for testing purposes.
if conf.StartAsLeader {
r.setState(Leader)
r.setLeader(r.localAddr)
}
// Restore the current term and the last log.
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)
@ -542,6 +555,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)
if conf.skipStartup {
return r, nil
}
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
@ -585,18 +601,17 @@ func (r *Raft) restoreSnapshot() error {
r.setLastSnapshot(snapshot.Index, snapshot.Term)
// Update the configuration
var conf Configuration
var index uint64
if snapshot.Version > 0 {
r.configurations.committed = snapshot.Configuration
r.configurations.committedIndex = snapshot.ConfigurationIndex
r.configurations.latest = snapshot.Configuration
r.configurations.latestIndex = snapshot.ConfigurationIndex
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
configuration := decodePeers(snapshot.Peers, r.trans)
r.configurations.committed = configuration
r.configurations.committedIndex = snapshot.Index
r.configurations.latest = configuration
r.configurations.latestIndex = snapshot.Index
conf = decodePeers(snapshot.Peers, r.trans)
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)
// Success!
return nil
@ -728,19 +743,14 @@ func (r *Raft) VerifyLeader() Future {
}
}
// GetConfiguration returns the latest configuration and its associated index
// currently in use. This may not yet be committed. This must not be called on
// the main thread (which can access the information directly).
// GetConfiguration returns the latest configuration. This may not yet be
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
select {
case <-r.shutdownCh:
configReq.respond(ErrRaftShutdown)
return configReq
case r.configurationsCh <- configReq:
return configReq
}
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
return configReq
}
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
@ -942,10 +952,17 @@ func (r *Raft) State() RaftState {
return r.getState()
}
// LeaderCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. The channel is not buffered,
// and does not block on writes.
// LeaderCh is used to get a channel which delivers signals on acquiring or
// losing leadership. It sends true if we become the leader, and false if we
// lose it.
//
// Receivers can expect to receive a notification only if leadership
// transition has occured.
//
// If receivers aren't ready for the signal, signals may drop and only the
// latest leadership transition. For example, if a receiver receives subsequent
// `true` values, they may deduce that leadership was lost and regained while
// the the receiver was processing first leadership transition.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}

View File

@ -178,10 +178,6 @@ type Config struct {
// step down as leader.
LeaderLeaseTimeout time.Duration
// StartAsLeader forces Raft to start in the leader state. This should
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool
// The unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.
@ -206,10 +202,13 @@ type Config struct {
// NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// FSM on start. This is useful if your FSM recovers from other mechanisms
// than raft snapshotting. Snapshot metadata will still be used to initalize
// than raft snapshotting. Snapshot metadata will still be used to initialize
// raft's configuration and index values. This is used in NewRaft and
// RestoreCluster.
NoSnapshotRestoreOnStart bool
// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
// DefaultConfig returns a Config with usable defaults.

View File

@ -42,7 +42,7 @@ func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, er
return nil, nil, fmt.Errorf("open is not supported")
}
// Write returns successfully with the lenght of the input byte slice
// Write returns successfully with the length of the input byte slice
// to satisfy the WriteCloser interface
func (d *DiscardSnapshotSink) Write(b []byte) (int, error) {
return len(b), nil

View File

@ -5,7 +5,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/hashicorp/go-hclog"
"hash"
"hash/crc64"
"io"
@ -16,6 +15,8 @@ import (
"sort"
"strings"
"time"
hclog "github.com/hashicorp/go-hclog"
)
const (
@ -32,6 +33,10 @@ type FileSnapshotStore struct {
path string
retain int
logger hclog.Logger
// noSync, if true, skips crash-safe file fsync api calls.
// It's a private field, only used in testing
noSync bool
}
type snapMetaSlice []*fileSnapshotMeta
@ -44,6 +49,8 @@ type FileSnapshotSink struct {
parentDir string
meta fileSnapshotMeta
noSync bool
stateFile *os.File
stateHash hash.Hash64
buffered *bufio.Writer
@ -172,6 +179,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
logger: f.logger,
dir: path,
parentDir: f.path,
noSync: f.noSync,
meta: fileSnapshotMeta{
SnapshotMeta: SnapshotMeta{
Version: version,
@ -414,7 +422,7 @@ func (s *FileSnapshotSink) Close() error {
return err
}
if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
parentFH, err := os.Open(s.parentDir)
defer parentFH.Close()
if err != nil {
@ -462,8 +470,10 @@ func (s *FileSnapshotSink) finalize() error {
}
// Sync to force fsync to disk
if err := s.stateFile.Sync(); err != nil {
return err
if !s.noSync {
if err := s.stateFile.Sync(); err != nil {
return err
}
}
// Get the file size
@ -487,9 +497,11 @@ func (s *FileSnapshotSink) finalize() error {
// writeMeta is used to write out the metadata we have.
func (s *FileSnapshotSink) writeMeta() error {
var err error
// Open the meta file
metaPath := filepath.Join(s.dir, metaFilePath)
fh, err := os.Create(metaPath)
var fh *os.File
fh, err = os.Create(metaPath)
if err != nil {
return err
}
@ -500,7 +512,7 @@ func (s *FileSnapshotSink) writeMeta() error {
// Write out as JSON
enc := json.NewEncoder(buffered)
if err := enc.Encode(&s.meta); err != nil {
if err = enc.Encode(&s.meta); err != nil {
return err
}
@ -508,8 +520,10 @@ func (s *FileSnapshotSink) writeMeta() error {
return err
}
if err = fh.Sync(); err != nil {
return err
if !s.noSync {
if err = fh.Sync(); err != nil {
return err
}
}
return nil

View File

@ -84,9 +84,10 @@ func (e errorFuture) Index() uint64 {
// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
err error
errCh chan error
responded bool
err error
errCh chan error
responded bool
ShutdownCh chan struct{}
}
func (d *deferError) init() {
@ -103,7 +104,11 @@ func (d *deferError) Error() error {
if d.errCh == nil {
panic("waiting for response on nil channel")
}
d.err = <-d.errCh
select {
case d.err = <-d.errCh:
case <-d.ShutdownCh:
d.err = ErrRaftShutdown
}
return d.err
}

View File

@ -90,9 +90,9 @@ func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, erro
// Write appends the given bytes to the snapshot contents
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
written, err := io.Copy(s.contents, bytes.NewReader(p))
s.meta.Size += written
return int(written), err
written, err := s.contents.Write(p)
s.meta.Size += int64(written)
return written, err
}
// Close updates the Size and is otherwise a no-op

View File

@ -24,7 +24,7 @@ type inmemPipeline struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
}
type inmemPipelineInflight struct {
@ -63,7 +63,7 @@ func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (Se
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
return NewInmemTransportWithTimeout(addr, 500*time.Millisecond)
}
// SetHeartbeatHandler is used to set optional fast-path for
@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
}
// Send the RPC over
respCh := make(chan RPCResponse)
respCh := make(chan RPCResponse, 1)
req := RPC{
Command: args,
Reader: r,
@ -314,6 +314,17 @@ func (i *inmemPipeline) AppendEntries(args *AppendEntriesRequest, resp *AppendEn
Command: args,
RespChan: respCh,
}
// Check if we have been already shutdown, otherwise the random choose
// made by select statement below might pick consumerCh even if
// shutdownCh was closed.
i.shutdownLock.RLock()
shutdown := i.shutdown
i.shutdownLock.RUnlock()
if shutdown {
return nil, ErrPipelineShutdown
}
select {
case i.peer.consumerCh <- rpc:
case <-timeout:

View File

@ -319,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil {
n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err)
n.logger.Warn("unable to get address for server, using fallback address", "id", id, "fallback", target, "error", err)
} else {
return serverAddressOverride
}

View File

@ -364,7 +364,7 @@ func (r *Raft) runLeader() {
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
// Notify that we are the leader
asyncNotifyBool(r.leaderCh, true)
overrideNotifyBool(r.leaderCh, true)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
@ -420,7 +420,7 @@ func (r *Raft) runLeader() {
r.leaderLock.Unlock()
// Notify that we are not the leader
asyncNotifyBool(r.leaderCh, false)
overrideNotifyBool(r.leaderCh, false)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
@ -623,8 +623,7 @@ func (r *Raft) leaderLoop() {
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
@ -983,6 +982,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
fsm := &restoreFuture{ID: sink.ID()}
fsm.ShutdownCh = r.shutdownCh
fsm.init()
select {
case r.fsmMutateCh <- fsm:
@ -1043,8 +1043,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
r.dispatchLogs([]*logFuture{&future.logFuture})
index := future.Index()
r.configurations.latest = configuration
r.configurations.latestIndex = index
r.setLatestConfiguration(configuration, index)
r.leaderState.commitment.setConfiguration(configuration)
r.startStopReplication()
}
@ -1329,8 +1328,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
return
}
if entry.Index <= r.configurations.latestIndex {
r.configurations.latest = r.configurations.committed
r.configurations.latestIndex = r.configurations.committedIndex
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
@ -1365,8 +1363,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
@ -1383,15 +1380,11 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = DecodeConfiguration(entry.Data)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
r.configurations.committed = r.configurations.latest
r.configurations.committedIndex = r.configurations.latestIndex
r.configurations.latest = decodePeers(entry.Data, r.trans)
r.configurations.latestIndex = entry.Index
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
}
}
@ -1459,7 +1452,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate)
r.logger.Warn("duplicate requestVote from", "candidate", candidate)
resp.Granted = true
}
return
@ -1584,6 +1577,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
future.ShutdownCh = r.shutdownCh
future.init()
select {
case r.fsmMutateCh <- future:
@ -1606,10 +1600,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)
// Restore the peer set
r.configurations.latest = reqConfiguration
r.configurations.latestIndex = reqConfigurationIndex
r.configurations.committed = reqConfiguration
r.configurations.committedIndex = reqConfigurationIndex
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)
// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
@ -1742,13 +1734,13 @@ func (r *Raft) lookupServer(id ServerID) *Server {
return nil
}
// pickServer returns the follower that is most up to date. Because it accesses
// leaderstate, it should only be called from the leaderloop.
// pickServer returns the follower that is most up to date and participating in quorum.
// Because it accesses leaderstate, it should only be called from the leaderloop.
func (r *Raft) pickServer() *Server {
var pick *Server
var current uint64
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
if server.ID == r.localID || server.Suffrage != Voter {
continue
}
state, ok := r.leaderState.replState[server.ID]
@ -1796,3 +1788,30 @@ func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.candidateFromLeadershipTransfer = true
rpc.Respond(&TimeoutNowResponse{}, nil)
}
// setLatestConfiguration stores the latest configuration and updates a copy of it.
func (r *Raft) setLatestConfiguration(c Configuration, i uint64) {
r.configurations.latest = c
r.configurations.latestIndex = i
r.latestConfiguration.Store(c.Clone())
}
// setCommittedConfiguration stores the committed configuration.
func (r *Raft) setCommittedConfiguration(c Configuration, i uint64) {
r.configurations.committed = c
r.configurations.committedIndex = i
}
// getLatestConfiguration reads the configuration from a copy of the main
// configuration, which means it can be accessed independently from the main
// loop.
func (r *Raft) getLatestConfiguration() Configuration {
// this switch catches the case where this is called without having set
// a configuration previously.
switch c := r.latestConfiguration.Load().(type) {
case Configuration:
return c
default:
return Configuration{}
}
}

View File

@ -146,6 +146,7 @@ func (r *Raft) takeSnapshot() (string, error) {
// We have to use the future here to safely get this information since
// it is owned by the main thread.
configReq := &configurationsFuture{}
configReq.ShutdownCh = r.shutdownCh
configReq.init()
select {
case r.configurationsCh <- configReq:

View File

@ -2,6 +2,7 @@ package raft
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -276,19 +277,14 @@ func (c *cluster) Close() {
// or a timeout occurs. It is possible to set a filter to look for specific
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made.
func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan struct{} {
func (c *cluster) WaitEventChan(ctx context.Context, filter FilterFn) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
var timeoutCh <-chan time.Time
if timeout > 0 {
timeoutCh = time.After(timeout)
}
for {
select {
case <-timeoutCh:
case <-ctx.Done():
return
case o, ok := <-c.observationCh:
if !ok || filter == nil || filter(&o) {
return
@ -304,11 +300,13 @@ func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan s
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made or a test failure is signaled.
func (c *cluster) WaitEvent(filter FilterFn, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()
case <-c.WaitEventChan(filter, timeout):
case <-eventCh:
}
}
@ -319,7 +317,9 @@ func (c *cluster) WaitForReplication(fsmLength int) {
CHECK:
for {
ch := c.WaitEventChan(nil, c.conf.CommitTimeout)
ctx, cancel := context.WithTimeout(context.Background(), c.conf.CommitTimeout)
defer cancel()
ch := c.WaitEventChan(ctx, nil)
select {
case <-c.failedCh:
c.t.FailNow()
@ -415,6 +415,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()
@ -422,7 +425,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
case <-limitCh:
c.FailNowf("timeout waiting for stable %s state", s)
case <-c.WaitEventChan(filter, 0):
case <-eventCh:
c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C:
@ -805,5 +808,6 @@ func FileSnapTest(t *testing.T) (string, *FileSnapshotStore) {
if err != nil {
t.Fatalf("err: %v", err)
}
snap.noSync = true
return dir, snap
}

View File

@ -13,7 +13,7 @@ import (
)
func init() {
// Ensure we use a high-entropy seed for the psuedo-random generator
// Ensure we use a high-entropy seed for the pseudo-random generator
rand.Seed(newSeed())
}
@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) {
}
}
// overrideNotifyBool is used to notify on a bool channel
// but override existing value if value is present.
// ch must be 1-item buffered channel.
//
// This method does not support multiple concurrent calls.
func overrideNotifyBool(ch chan bool, v bool) {
select {
case ch <- v:
// value sent, all done
case <-ch:
// channel had an old value
select {
case ch <- v:
default:
panic("race: channel was sent concurrently")
}
}
}
// Decode reverses the encode operation on a byte slice input.
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)

View File

@ -50,7 +50,8 @@ const EnvVaultInsecure = "VAULT_SKIP_VERIFY"
// returns an optional string duration to be used for response wrapping (e.g.
// "15s", or simply "15"). The path will not begin with "/v1/" or "v1/" or "/",
// however, end-of-path forward slashes are not trimmed, so must match your
// called path precisely.
// called path precisely. Response wrapping will only be used when the return
// value is not the empty string.
type WrappingLookupFunc func(operation, path string) string
// Config is used to configure the creation of the client.
@ -547,7 +548,7 @@ func (c *Client) SetOutputCurlString(curl bool) {
}
// CurrentWrappingLookupFunc sets a lookup function that returns desired wrap TTLs
// for a given operation and path
// for a given operation and path.
func (c *Client) CurrentWrappingLookupFunc() WrappingLookupFunc {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
@ -556,7 +557,7 @@ func (c *Client) CurrentWrappingLookupFunc() WrappingLookupFunc {
}
// SetWrappingLookupFunc sets a lookup function that returns desired wrap TTLs
// for a given operation and path
// for a given operation and path.
func (c *Client) SetWrappingLookupFunc(lookupFunc WrappingLookupFunc) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()

View File

@ -21,8 +21,9 @@ var (
// changed
DefaultWrappingTTL = "5m"
// The default function used if no other function is set, which honors the
// env var and wraps `sys/wrapping/wrap`
// The default function used if no other function is set. It honors the env
// var to set the wrap TTL. The default wrap TTL will apply when when writing
// to `sys/wrapping/wrap` when the env var is not set.
DefaultWrappingLookupFunc = func(operation, path string) string {
if os.Getenv(EnvVaultWrapTTL) != "" {
return os.Getenv(EnvVaultWrapTTL)

View File

@ -60,6 +60,7 @@ type SSHVerifyResponse struct {
type SSHHelperConfig struct {
VaultAddr string `hcl:"vault_addr"`
SSHMountPoint string `hcl:"ssh_mount_point"`
Namespace string `hcl:"namespace"`
CACert string `hcl:"ca_cert"`
CAPath string `hcl:"ca_path"`
AllowedCidrList string `hcl:"allowed_cidr_list"`
@ -123,6 +124,11 @@ func (c *SSHHelperConfig) NewClient() (*Client, error) {
return nil, err
}
// Configure namespace
if c.Namespace != "" {
client.SetNamespace(c.Namespace)
}
return client, nil
}
@ -155,6 +161,7 @@ func ParseSSHHelperConfig(contents string) (*SSHHelperConfig, error) {
valid := []string{
"vault_addr",
"ssh_mount_point",
"namespace",
"ca_cert",
"ca_path",
"allowed_cidr_list",

View File

@ -464,6 +464,7 @@ func ValidateKeyTypeLength(keyType string, keyBits int) error {
case "rsa":
switch keyBits {
case 2048:
case 3072:
case 4096:
case 8192:
default:

2
vendor/modules.txt vendored
View File

@ -398,7 +398,7 @@ github.com/hashicorp/logutils
# github.com/hashicorp/nomad/api v0.0.0-20191220223628-edc62acd919d
github.com/hashicorp/nomad/api
github.com/hashicorp/nomad/api/contexts
# github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
# github.com/hashicorp/raft v1.1.3-0.20200501224250-c95aa91e604e
github.com/hashicorp/raft
# github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/raft-snapshot