chore: update raft to v1.2.0 (#8822)

This commit is contained in:
Mike Morris 2020-10-08 15:07:10 -04:00 committed by GitHub
parent 141eb60f06
commit 4ae98cde2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 141 additions and 59 deletions

2
go.mod
View File

@ -54,7 +54,7 @@ require (
github.com/hashicorp/hil v0.0.0-20160711231837-1e86c6b523c5
github.com/hashicorp/memberlist v0.2.2
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.1.2
github.com/hashicorp/raft v1.2.0
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086

4
go.sum
View File

@ -284,8 +284,8 @@ github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c=
github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=

View File

@ -1,3 +1,22 @@
# UNRELEASED
IMPROVEMENTS
* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)]
* Allow futures to react to `Shutdown()` to prevent a deadlock with `takeSnapshot()` [[GH-390](https://github.com/hashicorp/raft/pull/390)]
* Prevent non-voters from becoming eligible for leadership elections [[GH-398](https://github.com/hashicorp/raft/pull/398)]
* Remove an unneeded `io.Copy` from snapshot writes [[GH-399](https://github.com/hashicorp/raft/pull/399)]
* Log decoded candidate address in `duplicate requestVote` warning [[GH-400](https://github.com/hashicorp/raft/pull/400)]
* Prevent starting a TCP transport when IP address is `nil` [[GH-403](https://github.com/hashicorp/raft/pull/403)]
* Reject leadership transfer requests when in candidate state to prevent indefinite blocking while unable to elect a leader [[GH-413](https://github.com/hashicorp/raft/pull/413)]
* Add labels for metric metadata to reduce cardinality of metric names [[GH-409](https://github.com/hashicorp/raft/pull/409)]
* Add peers metric [[GH-413](https://github.com/hashicorp/raft/pull/431)]
BUG FIXES
* Make `LeaderCh` always deliver the latest leadership transition [[GH-384](https://github.com/hashicorp/raft/pull/384)]
* Handle updating an existing peer in `startStopReplication` [[GH-419](https://github.com/hashicorp/raft/pull/419)]
# 1.1.2 (January 17th, 2020)
FEATURES

View File

@ -16,8 +16,8 @@ endif
TEST_RESULTS_DIR?=/tmp/test-results
test:
go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race .
integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .

View File

@ -1,4 +1,4 @@
raft [![Build Status](https://travis-ci.org/hashicorp/raft.png)](https://travis-ci.org/hashicorp/raft) [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft)
raft [![CircleCI](https://circleci.com/gh/hashicorp/raft.svg?style=svg)](https://circleci.com/gh/hashicorp/raft)
====
raft is a [Go](http://www.golang.org) library that manages a replicated

View File

@ -503,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
leaderCh: make(chan bool, 1),
localID: localID,
localAddr: localAddr,
logger: logger,
@ -527,13 +527,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Initialize as a follower.
r.setState(Follower)
// Start as leader if specified. This should only be used
// for testing purposes.
if conf.StartAsLeader {
r.setState(Leader)
r.setLeader(r.localAddr)
}
// Restore the current term and the last log.
r.setCurrentTerm(currentTerm)
r.setLastLog(lastLog.Index, lastLog.Term)
@ -959,10 +952,17 @@ func (r *Raft) State() RaftState {
return r.getState()
}
// LeaderCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it. The channel is not buffered,
// and does not block on writes.
// LeaderCh is used to get a channel which delivers signals on acquiring or
// losing leadership. It sends true if we become the leader, and false if we
// lose it.
//
// Receivers can expect to receive a notification only if leadership
// transition has occured.
//
// If receivers aren't ready for the signal, signals may drop and only the
// latest leadership transition. For example, if a receiver receives subsequent
// `true` values, they may deduce that leadership was lost and regained while
// the the receiver was processing first leadership transition.
func (r *Raft) LeaderCh() <-chan bool {
return r.leaderCh
}

View File

@ -178,10 +178,6 @@ type Config struct {
// step down as leader.
LeaderLeaseTimeout time.Duration
// StartAsLeader forces Raft to start in the leader state. This should
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool
// The unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.

View File

@ -5,7 +5,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/hashicorp/go-hclog"
"hash"
"hash/crc64"
"io"
@ -16,6 +15,8 @@ import (
"sort"
"strings"
"time"
hclog "github.com/hashicorp/go-hclog"
)
const (
@ -32,6 +33,10 @@ type FileSnapshotStore struct {
path string
retain int
logger hclog.Logger
// noSync, if true, skips crash-safe file fsync api calls.
// It's a private field, only used in testing
noSync bool
}
type snapMetaSlice []*fileSnapshotMeta
@ -44,6 +49,8 @@ type FileSnapshotSink struct {
parentDir string
meta fileSnapshotMeta
noSync bool
stateFile *os.File
stateHash hash.Hash64
buffered *bufio.Writer
@ -172,6 +179,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
logger: f.logger,
dir: path,
parentDir: f.path,
noSync: f.noSync,
meta: fileSnapshotMeta{
SnapshotMeta: SnapshotMeta{
Version: version,
@ -414,7 +422,7 @@ func (s *FileSnapshotSink) Close() error {
return err
}
if runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
parentFH, err := os.Open(s.parentDir)
defer parentFH.Close()
if err != nil {
@ -462,8 +470,10 @@ func (s *FileSnapshotSink) finalize() error {
}
// Sync to force fsync to disk
if err := s.stateFile.Sync(); err != nil {
return err
if !s.noSync {
if err := s.stateFile.Sync(); err != nil {
return err
}
}
// Get the file size
@ -510,8 +520,10 @@ func (s *FileSnapshotSink) writeMeta() error {
return err
}
if err = fh.Sync(); err != nil {
return err
if !s.noSync {
if err = fh.Sync(); err != nil {
return err
}
}
return nil

View File

@ -84,9 +84,10 @@ func (e errorFuture) Index() uint64 {
// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
err error
errCh chan error
responded bool
err error
errCh chan error
responded bool
ShutdownCh chan struct{}
}
func (d *deferError) init() {
@ -103,7 +104,11 @@ func (d *deferError) Error() error {
if d.errCh == nil {
panic("waiting for response on nil channel")
}
d.err = <-d.errCh
select {
case d.err = <-d.errCh:
case <-d.ShutdownCh:
d.err = ErrRaftShutdown
}
return d.err
}

View File

@ -90,9 +90,9 @@ func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, erro
// Write appends the given bytes to the snapshot contents
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
written, err := io.Copy(s.contents, bytes.NewReader(p))
s.meta.Size += written
return int(written), err
written, err := s.contents.Write(p)
s.meta.Size += int64(written)
return written, err
}
// Close updates the Size and is otherwise a no-op

View File

@ -63,7 +63,7 @@ func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (Se
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
return NewInmemTransportWithTimeout(addr, 500*time.Millisecond)
}
// SetHeartbeatHandler is used to set optional fast-path for
@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
}
// Send the RPC over
respCh := make(chan RPCResponse)
respCh := make(chan RPCResponse, 1)
req := RPC{
Command: args,
Reader: r,

View File

@ -319,7 +319,7 @@ func (n *NetworkTransport) getProviderAddressOrFallback(id ServerID, target Serv
if n.serverAddressProvider != nil {
serverAddressOverride, err := n.serverAddressProvider.ServerAddr(id)
if err != nil {
n.logger.Warn("unable to get address for sever, using fallback address", "id", id, "fallback", target, "error", err)
n.logger.Warn("unable to get address for server, using fallback address", "id", id, "fallback", target, "error", err)
} else {
return serverAddressOverride
}

View File

@ -311,6 +311,10 @@ func (r *Raft) runCandidate() {
// Reject any restores since we are not the leader
r.respond(ErrNotLeader)
case r := <-r.leadershipTransferCh:
// Reject any operations since we are not the leader
r.respond(ErrNotLeader)
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
@ -364,7 +368,7 @@ func (r *Raft) runLeader() {
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
// Notify that we are the leader
asyncNotifyBool(r.leaderCh, true)
overrideNotifyBool(r.leaderCh, true)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
@ -420,7 +424,7 @@ func (r *Raft) runLeader() {
r.leaderLock.Unlock()
// Notify that we are not the leader
asyncNotifyBool(r.leaderCh, false)
overrideNotifyBool(r.leaderCh, false)
// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
@ -469,10 +473,13 @@ func (r *Raft) startStopReplication() {
if server.ID == r.localID {
continue
}
inConfig[server.ID] = true
if _, ok := r.leaderState.replState[server.ID]; !ok {
s, ok := r.leaderState.replState[server.ID]
if !ok {
r.logger.Info("added peer, starting replication", "peer", server.ID)
s := &followerReplication{
s = &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
@ -485,10 +492,14 @@ func (r *Raft) startStopReplication() {
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
r.observe(PeerObservation{Peer: server, Removed: false})
} else if ok && s.peer.Address != server.Address {
r.logger.Info("updating peer", "peer", server.ID)
s.peer = server
}
}
@ -504,6 +515,9 @@ func (r *Raft) startStopReplication() {
delete(r.leaderState.replState, serverID)
r.observe(PeerObservation{Peer: repl.peer, Removed: true})
}
// Update peers metric
metrics.SetGauge([]string{"raft", "peers"}, float32(len(r.configurations.latest.Servers)))
}
// configurationChangeChIfStable returns r.configurationChangeCh if it's safe
@ -982,6 +996,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
fsm := &restoreFuture{ID: sink.ID()}
fsm.ShutdownCh = r.shutdownCh
fsm.init()
select {
case r.fsmMutateCh <- fsm:
@ -1451,7 +1466,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn("duplicate requestVote from", "candidate", req.Candidate)
r.logger.Warn("duplicate requestVote from", "candidate", candidate)
resp.Granted = true
}
return
@ -1576,6 +1591,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
future.ShutdownCh = r.shutdownCh
future.init()
select {
case r.fsmMutateCh <- future:
@ -1732,13 +1748,13 @@ func (r *Raft) lookupServer(id ServerID) *Server {
return nil
}
// pickServer returns the follower that is most up to date. Because it accesses
// leaderstate, it should only be called from the leaderloop.
// pickServer returns the follower that is most up to date and participating in quorum.
// Because it accesses leaderstate, it should only be called from the leaderloop.
func (r *Raft) pickServer() *Server {
var pick *Server
var current uint64
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
if server.ID == r.localID || server.Suffrage != Voter {
continue
}
state, ok := r.leaderState.replState[server.ID]

View File

@ -326,6 +326,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
s.failures++
return false, err
}
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
// Check for a newer term, stop running
@ -386,6 +389,9 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
} else {
s.setLastContact()
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
@ -572,6 +578,10 @@ func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64
// appendStats is used to emit stats about an AppendEntries invocation.
func appendStats(peer string, start time.Time, logs float32) {
labels := []metrics.Label{{Name: "peer_id", Value: peer}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "appendEntries", "rpc"}, start, labels)
metrics.IncrCounterWithLabels([]string{"raft", "replication", "appendEntries", "logs"}, logs, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
}

View File

@ -146,6 +146,7 @@ func (r *Raft) takeSnapshot() (string, error) {
// We have to use the future here to safely get this information since
// it is owned by the main thread.
configReq := &configurationsFuture{}
configReq.ShutdownCh = r.shutdownCh
configReq.init()
select {
case r.configurationsCh <- configReq:

View File

@ -81,7 +81,7 @@ func newTCPTransport(bindAddr string,
list.Close()
return nil, errNotTCP
}
if addr.IP.IsUnspecified() {
if addr.IP == nil || addr.IP.IsUnspecified() {
list.Close()
return nil, errNotAdvertisable
}

View File

@ -2,6 +2,7 @@ package raft
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -276,19 +277,14 @@ func (c *cluster) Close() {
// or a timeout occurs. It is possible to set a filter to look for specific
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made.
func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan struct{} {
func (c *cluster) WaitEventChan(ctx context.Context, filter FilterFn) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
var timeoutCh <-chan time.Time
if timeout > 0 {
timeoutCh = time.After(timeout)
}
for {
select {
case <-timeoutCh:
case <-ctx.Done():
return
case o, ok := <-c.observationCh:
if !ok || filter == nil || filter(&o) {
return
@ -304,11 +300,13 @@ func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan s
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made or a test failure is signaled.
func (c *cluster) WaitEvent(filter FilterFn, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()
case <-c.WaitEventChan(filter, timeout):
case <-eventCh:
}
}
@ -319,7 +317,9 @@ func (c *cluster) WaitForReplication(fsmLength int) {
CHECK:
for {
ch := c.WaitEventChan(nil, c.conf.CommitTimeout)
ctx, cancel := context.WithTimeout(context.Background(), c.conf.CommitTimeout)
defer cancel()
ch := c.WaitEventChan(ctx, nil)
select {
case <-c.failedCh:
c.t.FailNow()
@ -415,6 +415,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()
@ -422,7 +425,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
case <-limitCh:
c.FailNowf("timeout waiting for stable %s state", s)
case <-c.WaitEventChan(filter, 0):
case <-eventCh:
c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C:
@ -805,5 +808,6 @@ func FileSnapTest(t *testing.T) (string, *FileSnapshotStore) {
if err != nil {
t.Fatalf("err: %v", err)
}
snap.noSync = true
return dir, snap
}

View File

@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) {
}
}
// overrideNotifyBool is used to notify on a bool channel
// but override existing value if value is present.
// ch must be 1-item buffered channel.
//
// This method does not support multiple concurrent calls.
func overrideNotifyBool(ch chan bool, v bool) {
select {
case ch <- v:
// value sent, all done
case <-ch:
// channel had an old value
select {
case ch <- v:
default:
panic("race: channel was sent concurrently")
}
}
}
// Decode reverses the encode operation on a byte slice input.
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)

2
vendor/modules.txt vendored
View File

@ -276,7 +276,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.1.2
# github.com/hashicorp/raft v1.2.0
github.com/hashicorp/raft
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/raft-boltdb