Add additional raft metrics (#5628)

* Add documentation for new raft metrics
* Revendor raft from master
This commit is contained in:
Freddy 2019-04-09 16:09:22 -06:00 committed by GitHub
parent 4a73702460
commit 4fa4cffd41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 124 additions and 30 deletions

2
go.mod
View File

@ -74,7 +74,7 @@ require (
github.com/hashicorp/mdns v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.3
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1
github.com/hashicorp/serf v0.8.2
github.com/hashicorp/vault v0.10.3

4
go.sum
View File

@ -186,8 +186,8 @@ github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG67
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c h1:V0ncQQu4St5edf3p566gGAU8jxvrWdjny9dIFl2aV/s=
github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472 h1:9EPzHJ1bJFaFbGOz3UV3DDFmGYANr+SF+eapmiK5zV4=
github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1 h1:LHTrLUnNkk+2YkO5EMG49q0lHdR9AZhDbCpu0+M3e0E=
github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=

17
vendor/github.com/hashicorp/raft/CHANGELOG.md generated vendored Normal file
View File

@ -0,0 +1,17 @@
# UNRELEASED
IMPROVEMENTS
* InMemTransport: Add timeout for sending a message [[GH-313](https://github.com/hashicorp/raft/pull/313)]
* ensure 'make deps' downloads test dependencies like testify [[GH-310](https://github.com/hashicorp/raft/pull/310)]
* Clarifies function of CommitTimeout [[GH-309](https://github.com/hashicorp/raft/pull/309)]
# 1.0.0 (October 3rd, 2017)
v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version manages server identities using a UUID, so introduces some breaking API changes. It also versions the Raft protocol, and requires some special steps when interoperating with Raft servers running older versions of the library (see the detailed comment in config.go about version compatibility). You can reference https://github.com/hashicorp/consul/pull/2222 for an idea of what was required to port Consul to these new interfaces.
# 0.1.0 (September 29th, 2017)
v0.1.0 is the original stable version of the library that was in master and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0.

View File

@ -10,7 +10,7 @@ fuzz:
go test -timeout=300s ./fuzzy
deps:
go get -d -v ./...
go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
cov:

View File

@ -34,7 +34,7 @@ and `StableStore`.
## Tagged Releases
As of September 2017, Hashicorp will start using tags for this library to clearly indicate
As of September 2017, HashiCorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained

View File

@ -164,7 +164,7 @@ type Raft struct {
// configuration on all the Voter servers. There is no need to bootstrap
// Nonvoter and Staging servers.
//
// One sane approach is to boostrap a single server with a configuration
// One sane approach is to bootstrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,

View File

@ -9,7 +9,7 @@ import (
// replication goroutines report in newly written entries with Match(), and
// this notifies on commitCh when the commit index has advanced.
type commitment struct {
// protectes matchIndexes and commitIndex
// protects matchIndexes and commitIndex
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}

View File

@ -115,7 +115,7 @@ type configurationChangeRequest struct {
// prior one has been committed).
//
// One downside to storing just two configurations is that if you try to take a
// snahpsot when your state machine hasn't yet applied the committedIndex, we
// snapshot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the
@ -198,7 +198,7 @@ func nextConfiguration(current Configuration, currentIndex uint64, change config
// TODO: barf on new address?
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promoton to Voter is not yet
// promoted to Voter later. However, the promotion to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is

View File

@ -1,6 +1,7 @@
package raft
import (
"errors"
"sync"
)
@ -106,7 +107,11 @@ func (i *InmemStore) Set(key []byte, val []byte) error {
func (i *InmemStore) Get(key []byte) ([]byte, error) {
i.l.RLock()
defer i.l.RUnlock()
return i.kv[string(key)], nil
val := i.kv[string(key)]
if val == nil {
return nil, errors.New("not found")
}
return val, nil
}
// SetUint64 implements the StableStore interface.

View File

@ -43,9 +43,11 @@ type InmemTransport struct {
timeout time.Duration
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
// NewInmemTransportWithTimeout is used to initialize a new transport and
// generates a random local address if none is specified. The given timeout
// will be used to decide how long to wait for a connected peer to process the
// RPCs that we're sending it. See also Connect() and Consumer().
func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (ServerAddress, *InmemTransport) {
if string(addr) == "" {
addr = NewInmemAddr()
}
@ -53,11 +55,17 @@ func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
consumerCh: make(chan RPC, 16),
localAddr: addr,
peers: make(map[ServerAddress]*InmemTransport),
timeout: 50 * time.Millisecond,
timeout: timeout,
}
return addr, trans
}
// NewInmemTransport is used to initialize a new transport
// and generates a random local address if none is specified
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) {
return NewInmemTransportWithTimeout(addr, 50*time.Millisecond)
}
// SetHeartbeatHandler is used to set optional fast-path for
// heartbeats, not supported for this transport.
func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) {
@ -76,16 +84,15 @@ func (i *InmemTransport) LocalAddr() ServerAddress {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
i.RLock()
i.Lock()
defer i.Unlock()
peer, ok := i.peers[target]
i.RUnlock()
if !ok {
return nil, fmt.Errorf("failed to connect to peer: %v", target)
}
pipeline := newInmemPipeline(i, peer, target)
i.Lock()
i.pipelines = append(i.pipelines, pipeline)
i.Unlock()
return pipeline, nil
}
@ -140,11 +147,17 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
// Send the RPC over
respCh := make(chan RPCResponse)
peer.consumerCh <- RPC{
req := RPC{
Command: args,
Reader: r,
RespChan: respCh,
}
select {
case peer.consumerCh <- req:
case <-time.After(timeout):
err = fmt.Errorf("send timed out")
return
}
// Wait for a response
select {

View File

@ -461,16 +461,38 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress {
// listen is used to handling incoming connections.
func (n *NetworkTransport) listen() {
const baseDelay = 5 * time.Millisecond
const maxDelay = 1 * time.Second
var loopDelay time.Duration
for {
// Accept incoming connections
conn, err := n.stream.Accept()
if err != nil {
if n.IsShutdown() {
return
if loopDelay == 0 {
loopDelay = baseDelay
} else {
loopDelay *= 2
}
if loopDelay > maxDelay {
loopDelay = maxDelay
}
if !n.IsShutdown() {
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
}
select {
case <-n.shutdownCh:
return
case <-time.After(loopDelay):
continue
}
n.logger.Printf("[ERR] raft-net: Failed to accept connection: %v", err)
continue
}
// No error, reset loop delay
loopDelay = 0
n.logger.Printf("[DEBUG] raft-net: %v accepted connection from: %v", n.LocalAddr(), conn.RemoteAddr())
// Handle the connection in dedicated routine

View File

@ -520,6 +520,9 @@ func (r *Raft) leaderLoop() {
}
}
var numProcessed int
start := time.Now()
for {
e := r.leaderState.inflight.Front()
if e == nil {
@ -532,10 +535,19 @@ func (r *Raft) leaderLoop() {
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
r.processLogs(idx, commitLog)
r.leaderState.inflight.Remove(e)
numProcessed++
}
// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed))
if stepDown {
if r.conf.ShutdownOnRemove {
r.logger.Printf("[INFO] raft: Removed ourself, shutting down")
@ -848,7 +860,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
logs := make([]*Log, len(applyLogs))
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
@ -879,10 +894,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
}
}
// processLogs is used to apply all the committed entires that haven't been
// processLogs is used to apply all the committed entries that haven't been
// applied up to the given index limit.
// This can be called from both leaders and followers.
// Followers call this from AppendEntires, for n entires at a time, and always
// Followers call this from AppendEntries, for n entries at a time, and always
// pass future=nil.
// Leaders call this once per inflight when entries are committed. They pass
// the future from inflights.
@ -899,7 +914,6 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
// Get the log, either from the future or from our log store
if future != nil && future.log.Index == idx {
r.processLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {

View File

@ -31,7 +31,7 @@ type followerReplication struct {
peer Server
// commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successsful
// leader's commit index can advance. It is updated on successful
// AppendEntries responses.
commitment *commitment
@ -137,7 +137,12 @@ RPC:
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout): // TODO: what is this?
// This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}

2
vendor/modules.txt vendored
View File

@ -264,7 +264,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v0.0.0-20180817181211-da92cfe76e0c
# github.com/hashicorp/raft v1.0.1-0.20190409200437-d9fe23f7d472
github.com/hashicorp/raft
# github.com/hashicorp/raft-boltdb v0.0.0-20150201200839-d1e82c1ec3f1
github.com/hashicorp/raft-boltdb

View File

@ -379,6 +379,18 @@ These metrics are used to monitor the health of the Consul servers.
<td>commit logs / interval</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.raft.commitNumLogs`</td>
<td>This metric measures the count of logs processed for application to the FSM in a single batch.</td>
<td>logs</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.raft.fsm.enqueue`</td>
<td>This metric measures the amount of time to enqueue a batch of logs for the FSM to apply.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.raft.fsm.restore`</td>
<td>This metric measures the time taken by the FSM to restore its state from a snapshot.</td>
@ -470,6 +482,12 @@ These metrics are used to monitor the health of the Consul servers.
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.raft.leader.dispatchNumLogs`</td>
<td>This metric measures the number of logs committed to disk in a batch.</td>
<td>logs</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.raft.replication.appendEntries`</td>
<td>This measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers.</td>