Make Raft trailing logs and snapshot timing reloadable (#10129)

* WIP reloadable raft config

* Pre-define new raft gauges

* Update go-metrics to change gauge reset behaviour

* Update raft to pull in new metric and reloadable config

* Add snapshot persistance timing and installSnapshot to our 'protected' list as they can be infrequent but are important

* Update telemetry docs

* Update config and telemetry docs

* Add note to oldestLogAge on when it is visible

* Add changelog entry

* Update website/content/docs/agent/options.mdx

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>

Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com>
This commit is contained in:
Paul Banks 2021-05-04 15:36:53 +01:00 committed by GitHub
parent eb84a856c4
commit d47eea3a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 805 additions and 258 deletions

4
.changelog/10129.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:improvement
raft: allow reloading of raft trailing logs and snapshot timing to allow recovery from some [replication failure modes](https://github.com/hashicorp/consul/issues/9609).
telemetry: add metrics and documentation for [monitoring for replication issues](https://consul.io/docs/agent/telemetry#raft-replication-capacity-issues).
```

View File

@ -3639,10 +3639,13 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
} }
cc := consul.ReloadableConfig{ cc := consul.ReloadableConfig{
RPCRateLimit: newCfg.RPCRateLimit, RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst, RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient, RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap, ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap,
RaftSnapshotThreshold: newCfg.RaftSnapshotThreshold,
RaftSnapshotInterval: newCfg.RaftSnapshotInterval,
RaftTrailingLogs: newCfg.RaftTrailingLogs,
} }
if err := a.delegate.ReloadConfig(cc); err != nil { if err := a.delegate.ReloadConfig(cc); err != nil {
return err return err

View File

@ -659,8 +659,11 @@ type RPCConfig struct {
// ReloadableConfig is the configuration that is passed to ReloadConfig when // ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded. // application config is reloaded.
type ReloadableConfig struct { type ReloadableConfig struct {
RPCRateLimit rate.Limit RPCRateLimit rate.Limit
RPCMaxBurst int RPCMaxBurst int
RPCMaxConnsPerClient int RPCMaxConnsPerClient int
ConfigEntryBootstrap []structs.ConfigEntry ConfigEntryBootstrap []structs.ConfigEntry
RaftSnapshotThreshold int
RaftSnapshotInterval time.Duration
RaftTrailingLogs int
} }

View File

@ -1387,6 +1387,13 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
// ReloadConfig is used to have the Server do an online reload of // ReloadConfig is used to have the Server do an online reload of
// relevant configuration information // relevant configuration information
func (s *Server) ReloadConfig(config ReloadableConfig) error { func (s *Server) ReloadConfig(config ReloadableConfig) error {
// Reload raft config first before updating any other state since it could
// error if the new config is invalid.
raftCfg := computeRaftReloadableConfig(config)
if err := s.raft.ReloadConfig(raftCfg); err != nil {
return err
}
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
s.rpcConnLimiter.SetConfig(connlimit.Config{ s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient, MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
@ -1401,6 +1408,33 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error {
return nil return nil
} }
// computeRaftReloadableConfig works out the correct reloadable config for raft.
// We reload raft even if nothing has changed since it's cheap and simpler than
// trying to work out if it's different from the current raft config. This
// function is separate to make it cheap to table test thoroughly without a full
// raft instance.
func computeRaftReloadableConfig(config ReloadableConfig) raft.ReloadableConfig {
// We use the raw defaults _not_ the current values so that you can reload
// back to a zero value having previously started Consul with a custom value
// for one of these fields.
defaultConf := DefaultConfig()
raftCfg := raft.ReloadableConfig{
TrailingLogs: defaultConf.RaftConfig.TrailingLogs,
SnapshotInterval: defaultConf.RaftConfig.SnapshotInterval,
SnapshotThreshold: defaultConf.RaftConfig.SnapshotThreshold,
}
if config.RaftSnapshotThreshold != 0 {
raftCfg.SnapshotThreshold = uint64(config.RaftSnapshotThreshold)
}
if config.RaftSnapshotInterval != 0 {
raftCfg.SnapshotInterval = config.RaftSnapshotInterval
}
if config.RaftTrailingLogs != 0 {
raftCfg.TrailingLogs = uint64(config.RaftTrailingLogs)
}
return raftCfg
}
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() { func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1) atomic.StoreInt32(&s.readyForConsistentReads, 1)

View File

@ -14,6 +14,7 @@ import (
"github.com/google/tcpproxy" "github.com/google/tcpproxy"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
@ -1466,6 +1467,9 @@ func TestServer_ReloadConfig(t *testing.T) {
c.Build = "1.5.0" c.Build = "1.5.0"
c.RPCRateLimit = 500 c.RPCRateLimit = 500
c.RPCMaxBurst = 5000 c.RPCMaxBurst = 5000
// Set one raft param to be non-default in the initial config, others are
// default.
c.RaftConfig.TrailingLogs = 1234
}) })
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s.Shutdown() defer s.Shutdown()
@ -1480,6 +1484,14 @@ func TestServer_ReloadConfig(t *testing.T) {
RPCRateLimit: 1000, RPCRateLimit: 1000,
RPCMaxBurst: 10000, RPCMaxBurst: 10000,
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit}, ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
// Reset the custom one to default be removing it from config file (it will
// be a zero value here).
RaftTrailingLogs: 0,
// Set a different Raft param to something custom now
RaftSnapshotThreshold: 4321,
// Leave other raft fields default
} }
require.NoError(t, s.ReloadConfig(rc)) require.NoError(t, s.ReloadConfig(rc))
@ -1496,6 +1508,98 @@ func TestServer_ReloadConfig(t *testing.T) {
limiter = s.rpcLimiter.Load().(*rate.Limiter) limiter = s.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst()) require.Equal(t, 10000, limiter.Burst())
// Check raft config
defaults := DefaultConfig()
got := s.raft.ReloadableConfig()
require.Equal(t, uint64(4321), got.SnapshotThreshold,
"should have be reloaded to new value")
require.Equal(t, defaults.RaftConfig.SnapshotInterval, got.SnapshotInterval,
"should have remained the default interval")
require.Equal(t, defaults.RaftConfig.TrailingLogs, got.TrailingLogs,
"should have reloaded to default trailing_logs")
// Now check that update each of those raft fields separately works correctly
// too.
}
func TestServer_computeRaftReloadableConfig(t *testing.T) {
defaults := DefaultConfig().RaftConfig
cases := []struct {
name string
rc ReloadableConfig
want raft.ReloadableConfig
}{
{
// This case is the common path - reload is called with a ReloadableConfig
// populated from the RuntimeConfig which has zero values for the fields.
// On startup we selectively pick non-zero runtime config fields to
// override defaults so we need to do the same.
name: "Still defaults",
rc: ReloadableConfig{},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "Threshold set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "interval set",
rc: ReloadableConfig{
RaftSnapshotInterval: 13 * time.Minute,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: defaults.TrailingLogs,
},
},
{
name: "trailing logs set",
rc: ReloadableConfig{
RaftTrailingLogs: 78910,
},
want: raft.ReloadableConfig{
SnapshotThreshold: defaults.SnapshotThreshold,
SnapshotInterval: defaults.SnapshotInterval,
TrailingLogs: 78910,
},
},
{
name: "all set",
rc: ReloadableConfig{
RaftSnapshotThreshold: 123456,
RaftSnapshotInterval: 13 * time.Minute,
RaftTrailingLogs: 78910,
},
want: raft.ReloadableConfig{
SnapshotThreshold: 123456,
SnapshotInterval: 13 * time.Minute,
TrailingLogs: 78910,
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
got := computeRaftReloadableConfig(tc.rc)
require.Equal(t, tc.want, got)
})
}
} }
func TestServer_RPC_RateLimit(t *testing.T) { func TestServer_RPC_RateLimit(t *testing.T) {

View File

@ -175,6 +175,19 @@ func registerWithGRPC(b grpcresolver.Builder) {
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends // getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. // all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) { func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
raftGauges := []prometheus.GaugeDefinition{
{
Name: []string{"raft", "fsm", "lastRestoreDuration"},
Help: "This measures how long the last FSM restore (from disk or leader) took.",
},
{
Name: []string{"raft", "leader", "oldestLogAge"},
Help: "This measures how old the oldest log in the leader's log store is.",
},
}
// Build slice of slices for all gauge definitions // Build slice of slices for all gauge definitions
var gauges = [][]prometheus.GaugeDefinition{ var gauges = [][]prometheus.GaugeDefinition{
cache.Gauges, cache.Gauges,
@ -185,7 +198,9 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
usagemetrics.Gauges, usagemetrics.Gauges,
consul.ReplicationGauges, consul.ReplicationGauges,
Gauges, Gauges,
raftGauges,
} }
// Flatten definitions // Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? // NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var gaugeDefs []prometheus.GaugeDefinition var gaugeDefs []prometheus.GaugeDefinition
@ -252,6 +267,14 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
Name: []string{"raft", "leader", "lastContact"}, Name: []string{"raft", "leader", "lastContact"},
Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.", Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.",
}, },
{
Name: []string{"raft", "snapshot", "persist"},
Help: "Measures the time it takes raft to write a new snapshot to disk.",
},
{
Name: []string{"raft", "rpc", "installSnapshot"},
Help: "Measures the time it takes the raft leader to install a snapshot on a follower that is catching up after being down or has just joined the cluster.",
},
} }
var summaries = [][]prometheus.SummaryDefinition{ var summaries = [][]prometheus.SummaryDefinition{

4
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/Microsoft/go-winio v0.4.3 // indirect github.com/Microsoft/go-winio v0.4.3 // indirect
github.com/NYTimes/gziphandler v1.0.1 github.com/NYTimes/gziphandler v1.0.1
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/go-metrics v0.3.6 github.com/armon/go-metrics v0.3.7
github.com/armon/go-radix v1.0.0 github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.25.41 github.com/aws/aws-sdk-go v1.25.41
github.com/coredns/coredns v1.1.2 github.com/coredns/coredns v1.1.2
@ -52,7 +52,7 @@ require (
github.com/hashicorp/mdns v1.0.4 // indirect github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/memberlist v0.2.3 github.com/hashicorp/memberlist v0.2.3
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.2.0 github.com/hashicorp/raft v1.3.0
github.com/hashicorp/raft-autopilot v0.1.2 github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5 github.com/hashicorp/serf v0.9.5

7
go.sum
View File

@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
github.com/armon/go-metrics v0.3.6 h1:x/tmtOF9cDBoXH7XoAGOz2qqm1DknFD1590XmD/DUJ8= github.com/armon/go-metrics v0.3.7 h1:c/oCtWzYpboy6+6f6LjXRlyW7NwA2SWf+a9KMlHq/bM=
github.com/armon/go-metrics v0.3.6/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-metrics v0.3.7/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@ -279,8 +279,9 @@ github.com/hashicorp/memberlist v0.2.3/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.3.0 h1:Wox4J4R7J2FOJLtTa6hdk0VJfiNUSP32pYoYR738bkE=
github.com/hashicorp/raft v1.3.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs= github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw= github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=

View File

@ -5,7 +5,6 @@ package prometheus
import ( import (
"fmt" "fmt"
"log" "log"
"math"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
@ -31,17 +30,16 @@ type PrometheusOpts struct {
Expiration time.Duration Expiration time.Duration
Registerer prometheus.Registerer Registerer prometheus.Registerer
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to // Gauges, Summaries, and Counters allow us to pre-declare metrics by giving
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be // their Name, Help, and ConstLabels to the PrometheusSink when it is created.
// deleted when their expiry is reached. // Metrics declared in this way will be initialized at zero and will not be
// - Gauges and Summaries will be set to NaN when they expire. // deleted or altered when their expiry is reached.
// - Counters continue to Collect their last known value. //
// Ex: // Ex: PrometheusOpts{
// PrometheusOpts{
// Expiration: 10 * time.Second, // Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{ // Gauges: []GaugeDefinition{
// { // {
// Name: []string{ "application", "component", "measurement"}, // Name: []string{ "application", "component", "measurement"},
// Help: "application_component_measurement provides an example of how to declare static metrics", // Help: "application_component_measurement provides an example of how to declare static metrics",
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, }, // ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
// }, // },
@ -139,21 +137,24 @@ func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) {
// logic to clean up ephemeral metrics if their value haven't been set for a // logic to clean up ephemeral metrics if their value haven't been set for a
// duration exceeding our allowed expiration time. // duration exceeding our allowed expiration time.
func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) { func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
p.collectAtTime(c, time.Now())
}
// collectAtTime allows internal testing of the expiry based logic here without
// mocking clocks or making tests timing sensitive.
func (p *PrometheusSink) collectAtTime(c chan<- prometheus.Metric, t time.Time) {
expire := p.expiration != 0 expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool { p.gauges.Range(func(k, v interface{}) bool {
if v == nil { if v == nil {
return true return true
} }
g := v.(*gauge) g := v.(*gauge)
lastUpdate := g.updatedAt lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) { if expire && lastUpdate.Add(p.expiration).Before(t) {
if g.canDelete { if g.canDelete {
p.gauges.Delete(k) p.gauges.Delete(k)
return true return true
} }
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
} }
g.Collect(c) g.Collect(c)
return true return true
@ -164,13 +165,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
} }
s := v.(*summary) s := v.(*summary)
lastUpdate := s.updatedAt lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) { if expire && lastUpdate.Add(p.expiration).Before(t) {
if s.canDelete { if s.canDelete {
p.summaries.Delete(k) p.summaries.Delete(k)
return true return true
} }
// We have observed nothing in this interval.
s.Observe(math.NaN())
} }
s.Collect(c) s.Collect(c)
return true return true
@ -181,12 +180,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
} }
count := v.(*counter) count := v.(*counter)
lastUpdate := count.updatedAt lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) { if expire && lastUpdate.Add(p.expiration).Before(t) {
if count.canDelete { if count.canDelete {
p.counters.Delete(k) p.counters.Delete(k)
return true return true
} }
// Counters remain at their previous value when not observed, so we do not set it to NaN.
} }
count.Collect(c) count.Collect(c)
return true return true

View File

@ -1,5 +1,21 @@
# UNRELEASED # UNRELEASED
# 1.3.0 (April 22nd, 2021)
IMPROVEMENTS
* Added metrics for `oldestLogAge` and `lastRestoreDuration` to monitor capacity issues that can cause unrecoverable cluster failure [[GH-452](https://github.com/hashicorp/raft/pull/452)][[GH-454](https://github.com/hashicorp/raft/pull/454/files)]
* Made `TrailingLogs`, `SnapshotInterval` and `SnapshotThreshold` reloadable at runtime using a new `ReloadConfig` method. This allows recovery from cases where there are not enough logs retained for followers to catchup after a restart. [[GH-444](https://github.com/hashicorp/raft/pull/444)]
* Inclusify the repository by switching to main [[GH-446](https://github.com/hashicorp/raft/pull/446)]
* Add option for a buffered `ApplyCh` if `MaxAppendEntries` is enabled [[GH-445](https://github.com/hashicorp/raft/pull/445)]
* Add string to `LogType` for more human readable debugging [[GH-442](https://github.com/hashicorp/raft/pull/442)]
* Extract fuzzy testing into its own module [[GH-459](https://github.com/hashicorp/raft/pull/459)]
BUG FIXES
* Update LogCache `StoreLogs()` to capture an error that would previously cause a panic [[GH-460](https://github.com/hashicorp/raft/pull/460)]
# 1.2.0 (October 5th, 2020)
IMPROVEMENTS IMPROVEMENTS
* Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)] * Remove `StartAsLeader` configuration option [[GH-364](https://github.com/hashicorp/raft/pull/386)]
@ -85,4 +101,4 @@ v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. Th
# 0.1.0 (September 29th, 2017) # 0.1.0 (September 29th, 2017)
v0.1.0 is the original stable version of the library that was in master and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0. v0.1.0 is the original stable version of the library that was in main and has been maintained with no breaking API changes. This was in use by Consul prior to version 0.7.0.

View File

@ -16,28 +16,28 @@ endif
TEST_RESULTS_DIR?=/tmp/test-results TEST_RESULTS_DIR?=/tmp/test-results
test: test:
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race . GOTRACEBACK=all go test $(TESTARGS) -timeout=180s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race . GOTRACEBACK=all go test $(TESTARGS) -timeout=180s -tags batchtest -race .
integ: test integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ . INTEG_TESTS=yes go test $(TESTARGS) -timeout=60s -run=Integ .
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -tags batchtest -run=Integ . INTEG_TESTS=yes go test $(TESTARGS) -timeout=60s -tags batchtest -run=Integ .
ci.test-norace: ci.test-norace:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -tags batchtest gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -tags batchtest
ci.test: ci.test:
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race . gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -race .
gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=60s -race -tags batchtest . gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-test.xml -- -timeout=180s -race -tags batchtest .
ci.integ: ci.test ci.integ: ci.test
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ . INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=60s -run=Integ .
INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=25s -run=Integ -tags batchtest . INTEG_TESTS=yes gotestsum --format=short-verbose --junitfile $(TEST_RESULTS_DIR)/gotestsum-report-integ.xml -- -timeout=60s -run=Integ -tags batchtest .
fuzz: fuzz:
go test $(TESTARGS) -timeout=20m ./fuzzy cd ./fuzzy && go test $(TESTARGS) -timeout=20m .
go test $(TESTARGS) -timeout=20m -tags batchtest ./fuzzy cd ./fuzzy && go test $(TESTARGS) -timeout=20m -tags batchtest .
deps: deps:
go get -t -d -v ./... go get -t -d -v ./...

View File

@ -28,16 +28,21 @@ To prevent complications with cgo, the primary backend `MDBStore` is in a separa
called [raft-mdb](http://github.com/hashicorp/raft-mdb). That is the recommended implementation called [raft-mdb](http://github.com/hashicorp/raft-mdb). That is the recommended implementation
for the `LogStore` and `StableStore`. for the `LogStore` and `StableStore`.
A pure Go backend using [BoltDB](https://github.com/boltdb/bolt) is also available called A pure Go backend using [Bbolt](https://github.com/etcd-io/bbolt) is also available called
[raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore` [raft-boltdb](https://github.com/hashicorp/raft-boltdb). It can also be used as a `LogStore`
and `StableStore`. and `StableStore`.
## Community Contributed Examples
[Raft gRPC Example](https://github.com/Jille/raft-grpc-example) - Utilizing the Raft repository with gRPC
## Tagged Releases ## Tagged Releases
As of September 2017, HashiCorp will start using tags for this library to clearly indicate As of September 2017, HashiCorp will start using tags for this library to clearly indicate
major version updates. We recommend you vendor your application's dependency on this library. major version updates. We recommend you vendor your application's dependency on this library.
* v0.1.0 is the original stable version of the library that was in master and has been maintained * v0.1.0 is the original stable version of the library that was in main and has been maintained
with no breaking API changes. This was in use by Consul prior to version 0.7.0. with no breaking API changes. This was in use by Consul prior to version 0.7.0.
* v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version * v1.0.0 takes the changes that were staged in the library-v2-stage-one branch. This version
@ -104,4 +109,3 @@ greatly sacrificing performance.
In terms of performance, Raft is comparable to Paxos. Assuming stable leadership, In terms of performance, Raft is comparable to Paxos. Assuming stable leadership,
committing a log entry requires a single round trip to half of the cluster. committing a log entry requires a single round trip to half of the cluster.
Thus performance is bound by disk I/O and network latency. Thus performance is bound by disk I/O and network latency.

View File

@ -81,8 +81,15 @@ type Raft struct {
// be committed and applied to the FSM. // be committed and applied to the FSM.
applyCh chan *logFuture applyCh chan *logFuture
// Configuration provided at Raft initialization // conf stores the current configuration to use. This is the most recent one
conf Config // provided. All reads of config values should use the config() helper method
// to read this safely.
conf atomic.Value
// confReloadMu ensures that only one thread can reload config at once since
// we need to read-modify-write the atomic. It is NOT necessary to hold this
// for any other operation e.g. reading config using config().
confReloadMu sync.Mutex
// FSM is the client state machine to apply commands to // FSM is the client state machine to apply commands to
fsm FSM fsm FSM
@ -199,7 +206,7 @@ type Raft struct {
// server. Any further attempts to bootstrap will return an error that can be // server. Any further attempts to bootstrap will return an error that can be
// safely ignored. // safely ignored.
// //
// One sane approach is to bootstrap a single server with a configuration // One approach is to bootstrap a single server with a configuration
// listing just itself as a Voter, then invoke AddVoter() on it to add other // listing just itself as a Voter, then invoke AddVoter() on it to add other
// servers to the cluster. // servers to the cluster.
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
@ -316,6 +323,12 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
continue continue
} }
// Note this is the one place we call fsm.Restore without the
// fsmRestoreAndMeasure wrapper since this function should only be called to
// reset state on disk and the FSM passed will not be used for a running
// server instance. If the same process will eventually become a Raft peer
// then it will call NewRaft and restore again from disk then which will
// report metrics.
err = fsm.Restore(source) err = fsm.Restore(source)
// Close the source after the restore has completed // Close the source after the restore has completed
source.Close() source.Close()
@ -385,9 +398,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil return nil
} }
// GetConfiguration returns the configuration of the Raft cluster without // GetConfiguration returns the persisted configuration of the Raft cluster
// starting a Raft instance or connecting to the cluster // without starting a Raft instance or connecting to the cluster. This function
// This function has identical behavior to Raft.GetConfiguration // has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore, func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) { snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true conf.skipStartup = true
@ -486,7 +499,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// Make sure we have a valid server address and ID. // Make sure we have a valid server address and ID.
protocolVersion := conf.ProtocolVersion protocolVersion := conf.ProtocolVersion
localAddr := ServerAddress(trans.LocalAddr()) localAddr := trans.LocalAddr()
localID := conf.LocalID localID := conf.LocalID
// TODO (slackpad) - When we deprecate protocol version 2, remove this // TODO (slackpad) - When we deprecate protocol version 2, remove this
@ -495,11 +508,16 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address") return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
} }
// Buffer applyCh to MaxAppendEntries if the option is enabled
applyCh := make(chan *logFuture)
if conf.BatchApplyCh {
applyCh = make(chan *logFuture, conf.MaxAppendEntries)
}
// Create Raft struct. // Create Raft struct.
r := &Raft{ r := &Raft{
protocolVersion: protocolVersion, protocolVersion: protocolVersion,
applyCh: make(chan *logFuture), applyCh: applyCh,
conf: *conf,
fsm: fsm, fsm: fsm,
fsmMutateCh: make(chan interface{}, 128), fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture), fsmSnapshotCh: make(chan *reqSnapshotFuture),
@ -524,6 +542,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
} }
r.conf.Store(*conf)
// Initialize as a follower. // Initialize as a follower.
r.setState(Follower) r.setState(Follower)
@ -577,23 +597,23 @@ func (r *Raft) restoreSnapshot() error {
// Try to load in order of newest to oldest // Try to load in order of newest to oldest
for _, snapshot := range snapshots { for _, snapshot := range snapshots {
if !r.conf.NoSnapshotRestoreOnStart { if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID) _, source, err := r.snapshots.Open(snapshot.ID)
if err != nil { if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err) r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
continue continue
} }
err = r.fsm.Restore(source) if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
// Close the source after the restore has completed source.Close()
source.Close()
if err != nil {
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err) r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue continue
} }
source.Close()
r.logger.Info("restored from snapshot", "id", snapshot.ID) r.logger.Info("restored from snapshot", "id", snapshot.ID)
} }
// Update the lastApplied so we don't replay old logs // Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index) r.setLastApplied(snapshot.Index)
@ -624,6 +644,45 @@ func (r *Raft) restoreSnapshot() error {
return nil return nil
} }
func (r *Raft) config() Config {
return r.conf.Load().(Config)
}
// ReloadConfig updates the configuration of a running raft node. If the new
// configuration is invalid an error is returned and no changes made to the
// instance. All fields will be copied from rc into the new configuration, even
// if they are zero valued.
func (r *Raft) ReloadConfig(rc ReloadableConfig) error {
r.confReloadMu.Lock()
defer r.confReloadMu.Unlock()
// Load the current config (note we are under a lock so it can't be changed
// between this read and a later Store).
oldCfg := r.config()
// Set the reloadable fields
newCfg := rc.apply(oldCfg)
if err := ValidateConfig(&newCfg); err != nil {
return err
}
r.conf.Store(newCfg)
return nil
}
// ReloadableConfig returns the current state of the reloadable fields in Raft's
// configuration. This is useful for programs to discover the current state for
// reporting to users or tests. It is safe to call from any goroutine. It is
// intended for reporting and testing purposes primarily; external
// synchronization would be required to safely use this in a read-modify-write
// pattern for reloadable configuration options.
func (r *Raft) ReloadableConfig() ReloadableConfig {
cfg := r.config()
var rc ReloadableConfig
rc.fromConfig(cfg)
return rc
}
// BootstrapCluster is equivalent to non-member BootstrapCluster but can be // BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This // called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster with an // should only be called at the beginning of time for the cluster with an

View File

@ -23,7 +23,7 @@ type commitment struct {
startIndex uint64 startIndex uint64
} }
// newCommitment returns an commitment struct that notifies the provided // newCommitment returns a commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is // channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term. // created each time this server becomes leader for a particular term.
// 'configuration' is the servers in the cluster. // 'configuration' is the servers in the cluster.

View File

@ -151,25 +151,36 @@ type Config struct {
// an inconsistent log. // an inconsistent log.
MaxAppendEntries int MaxAppendEntries int
// BatchApplyCh indicates whether we should buffer applyCh
// to size MaxAppendEntries. This enables batch log commitment,
// but breaks the timeout guarantee on Apply. Specifically,
// a log can be added to the applyCh buffer but not actually be
// processed until after the specified timeout.
BatchApplyCh bool
// If we are a member of a cluster, and RemovePeer is invoked for the // If we are a member of a cluster, and RemovePeer is invoked for the
// local node, then we forget all peers and transition into the follower state. // local node, then we forget all peers and transition into the follower state.
// If ShutdownOnRemove is is set, we additional shutdown Raft. Otherwise, // If ShutdownOnRemove is set, we additional shutdown Raft. Otherwise,
// we can become a leader of a cluster containing only this node. // we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool ShutdownOnRemove bool
// TrailingLogs controls how many logs we leave after a snapshot. This is // TrailingLogs controls how many logs we leave after a snapshot. This is used
// used so that we can quickly replay logs on a follower instead of being // so that we can quickly replay logs on a follower instead of being forced to
// forced to send an entire snapshot. // send an entire snapshot. The value passed here is the initial setting used.
// This can be tuned during operation using ReloadConfig.
TrailingLogs uint64 TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot. // SnapshotInterval controls how often we check if we should perform a
// We randomly stagger between this value and 2x this value to avoid the entire // snapshot. We randomly stagger between this value and 2x this value to avoid
// cluster from performing a snapshot at once. // the entire cluster from performing a snapshot at once. The value passed
// here is the initial setting used. This can be tuned during operation using
// ReloadConfig.
SnapshotInterval time.Duration SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before // SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can // we perform a snapshot. This is to prevent excessive snapshotting by
// just replay a small set of logs. // replaying a small set of logs instead. The value passed here is the initial
// setting used. This can be tuned during operation using ReloadConfig.
SnapshotThreshold uint64 SnapshotThreshold uint64
// LeaderLeaseTimeout is used to control how long the "lease" lasts // LeaderLeaseTimeout is used to control how long the "lease" lasts
@ -178,7 +189,7 @@ type Config struct {
// step down as leader. // step down as leader.
LeaderLeaseTimeout time.Duration LeaderLeaseTimeout time.Duration
// The unique ID for this server across all time. When running with // LocalID is a unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network // ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport. // address of your transport.
LocalID ServerID LocalID ServerID
@ -192,25 +203,65 @@ type Config struct {
// Defaults to os.Stderr. // Defaults to os.Stderr.
LogOutput io.Writer LogOutput io.Writer
// LogLevel represents a log level. If a no matching string is specified, // LogLevel represents a log level. If the value does not match a known
// hclog.NoLevel is assumed. // logging level hclog.NoLevel is used.
LogLevel string LogLevel string
// Logger is a user-provided hc-log logger. If nil, a logger writing to // Logger is a user-provided logger. If nil, a logger writing to
// LogOutput with LogLevel is used. // LogOutput with LogLevel is used.
Logger hclog.Logger Logger hclog.Logger
// NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the // NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// FSM on start. This is useful if your FSM recovers from other mechanisms // FSM on start. This is useful if your FSM recovers from other mechanisms
// than raft snapshotting. Snapshot metadata will still be used to initialize // than raft snapshotting. Snapshot metadata will still be used to initialize
// raft's configuration and index values. This is used in NewRaft and // raft's configuration and index values.
// RestoreCluster.
NoSnapshotRestoreOnStart bool NoSnapshotRestoreOnStart bool
// skipStartup allows NewRaft() to bypass all background work goroutines // skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool skipStartup bool
} }
// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
// Reconfiguring some fields is potentially dangerous so we should only
// selectively enable it for fields where that is allowed.
type ReloadableConfig struct {
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here updates the setting at runtime
// which will take effect as soon as the next snapshot completes and truncation
// occurs.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
}
// apply sets the reloadable fields on the passed Config to the values in
// `ReloadableConfig`. It returns a copy of Config with the fields from this
// ReloadableConfig set.
func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
return to
}
// fromConfig copies the reloadable fields from the passed Config.
func (rc *ReloadableConfig) fromConfig(from Config) {
rc.TrailingLogs = from.TrailingLogs
rc.SnapshotInterval = from.SnapshotInterval
rc.SnapshotThreshold = from.SnapshotThreshold
}
// DefaultConfig returns a Config with usable defaults. // DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
@ -238,20 +289,20 @@ func ValidateConfig(config *Config) error {
} }
if config.ProtocolVersion < protocolMin || if config.ProtocolVersion < protocolMin ||
config.ProtocolVersion > ProtocolVersionMax { config.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("Protocol version %d must be >= %d and <= %d", return fmt.Errorf("ProtocolVersion %d must be >= %d and <= %d",
config.ProtocolVersion, protocolMin, ProtocolVersionMax) config.ProtocolVersion, protocolMin, ProtocolVersionMax)
} }
if len(config.LocalID) == 0 { if len(config.LocalID) == 0 {
return fmt.Errorf("LocalID cannot be empty") return fmt.Errorf("LocalID cannot be empty")
} }
if config.HeartbeatTimeout < 5*time.Millisecond { if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("Heartbeat timeout is too low") return fmt.Errorf("HeartbeatTimeout is too low")
} }
if config.ElectionTimeout < 5*time.Millisecond { if config.ElectionTimeout < 5*time.Millisecond {
return fmt.Errorf("Election timeout is too low") return fmt.Errorf("ElectionTimeout is too low")
} }
if config.CommitTimeout < time.Millisecond { if config.CommitTimeout < time.Millisecond {
return fmt.Errorf("Commit timeout is too low") return fmt.Errorf("CommitTimeout is too low")
} }
if config.MaxAppendEntries <= 0 { if config.MaxAppendEntries <= 0 {
return fmt.Errorf("MaxAppendEntries must be positive") return fmt.Errorf("MaxAppendEntries must be positive")
@ -260,16 +311,16 @@ func ValidateConfig(config *Config) error {
return fmt.Errorf("MaxAppendEntries is too large") return fmt.Errorf("MaxAppendEntries is too large")
} }
if config.SnapshotInterval < 5*time.Millisecond { if config.SnapshotInterval < 5*time.Millisecond {
return fmt.Errorf("Snapshot interval is too low") return fmt.Errorf("SnapshotInterval is too low")
} }
if config.LeaderLeaseTimeout < 5*time.Millisecond { if config.LeaderLeaseTimeout < 5*time.Millisecond {
return fmt.Errorf("Leader lease timeout is too low") return fmt.Errorf("LeaderLeaseTimeout is too low")
} }
if config.LeaderLeaseTimeout > config.HeartbeatTimeout { if config.LeaderLeaseTimeout > config.HeartbeatTimeout {
return fmt.Errorf("Leader lease timeout cannot be larger than heartbeat timeout") return fmt.Errorf("LeaderLeaseTimeout cannot be larger than heartbeat timeout")
} }
if config.ElectionTimeout < config.HeartbeatTimeout { if config.ElectionTimeout < config.HeartbeatTimeout {
return fmt.Errorf("Election timeout must be equal or greater than Heartbeat Timeout") return fmt.Errorf("ElectionTimeout must be equal or greater than Heartbeat Timeout")
} }
return nil return nil
} }

View File

@ -181,17 +181,17 @@ func checkConfiguration(configuration Configuration) error {
var voters int var voters int
for _, server := range configuration.Servers { for _, server := range configuration.Servers {
if server.ID == "" { if server.ID == "" {
return fmt.Errorf("Empty ID in configuration: %v", configuration) return fmt.Errorf("empty ID in configuration: %v", configuration)
} }
if server.Address == "" { if server.Address == "" {
return fmt.Errorf("Empty address in configuration: %v", server) return fmt.Errorf("empty address in configuration: %v", server)
} }
if idSet[server.ID] { if idSet[server.ID] {
return fmt.Errorf("Found duplicate ID in configuration: %v", server.ID) return fmt.Errorf("found duplicate ID in configuration: %v", server.ID)
} }
idSet[server.ID] = true idSet[server.ID] = true
if addressSet[server.Address] { if addressSet[server.Address] {
return fmt.Errorf("Found duplicate address in configuration: %v", server.Address) return fmt.Errorf("found duplicate address in configuration: %v", server.Address)
} }
addressSet[server.Address] = true addressSet[server.Address] = true
if server.Suffrage == Voter { if server.Suffrage == Voter {
@ -199,7 +199,7 @@ func checkConfiguration(configuration Configuration) error {
} }
} }
if voters == 0 { if voters == 0 {
return fmt.Errorf("Need at least one voter in configuration: %v", configuration) return fmt.Errorf("need at least one voter in configuration: %v", configuration)
} }
return nil return nil
} }
@ -209,7 +209,7 @@ func checkConfiguration(configuration Configuration) error {
// that it can be unit tested easily. // that it can be unit tested easily.
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) { func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
if change.prevIndex > 0 && change.prevIndex != currentIndex { if change.prevIndex > 0 && change.prevIndex != currentIndex {
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex) return Configuration{}, fmt.Errorf("configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
} }
configuration := current.Clone() configuration := current.Clone()

View File

@ -175,16 +175,13 @@ func (r *Raft) runFSM() {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return return
} }
defer source.Close()
// Attempt to restore // Attempt to restore
start := time.Now() if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
if err := r.fsm.Restore(source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
return return
} }
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
// Update the last index and term // Update the last index and term
lastIndex = meta.Index lastIndex = meta.Index
@ -233,3 +230,17 @@ func (r *Raft) runFSM() {
} }
} }
} }
// fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure
// and report timing metrics. The caller is still responsible for calling Close
// on the source in all cases.
func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error {
start := time.Now()
if err := fsm.Restore(source); err != nil {
return err
}
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"},
float32(time.Since(start).Milliseconds()))
return nil
}

View File

@ -9,12 +9,13 @@ import (
// Future is used to represent an action that may occur in the future. // Future is used to represent an action that may occur in the future.
type Future interface { type Future interface {
// Error blocks until the future arrives and then // Error blocks until the future arrives and then returns the error status
// returns the error status of the future. // of the future. This may be called any number of times - all calls will
// This may be called any number of times - all // return the same value, however is not OK to call this method twice
// calls will return the same value. // concurrently on the same Future instance.
// Note that it is not OK to call this method // Error will only return generic errors related to raft, such
// twice concurrently on the same Future instance. // as ErrLeadershipLost, or ErrRaftShutdown. Some operations, such as
// ApplyLog, may also return errors from other methods.
Error() error Error() error
} }
@ -32,9 +33,11 @@ type IndexFuture interface {
type ApplyFuture interface { type ApplyFuture interface {
IndexFuture IndexFuture
// Response returns the FSM response as returned // Response returns the FSM response as returned by the FSM.Apply method. This
// by the FSM.Apply method. This must not be called // must not be called until after the Error method has returned.
// until after the Error method has returned. // Note that if FSM.Apply returns an error, it will be returned by Response,
// and not by the Error method, so it is always important to check Response
// for errors from the FSM.
Response() interface{} Response() interface{}
} }

View File

@ -4,10 +4,7 @@ go 1.12
require ( require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/boltdb/bolt v1.3.1 // indirect
github.com/hashicorp/go-hclog v0.9.1 github.com/hashicorp/go-hclog v0.9.1
github.com/hashicorp/go-msgpack v0.5.5 github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect
) )

View File

@ -2,8 +2,6 @@ github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -22,8 +20,6 @@ github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCS
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@ -41,5 +37,3 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U=
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -1,5 +1,12 @@
package raft package raft
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
)
// LogType describes various types of log entries. // LogType describes various types of log entries.
type LogType uint8 type LogType uint8
@ -33,6 +40,26 @@ const (
LogConfiguration LogConfiguration
) )
// String returns LogType as a human readable string.
func (lt LogType) String() string {
switch lt {
case LogCommand:
return "LogCommand"
case LogNoop:
return "LogNoop"
case LogAddPeerDeprecated:
return "LogAddPeerDeprecated"
case LogRemovePeerDeprecated:
return "LogRemovePeerDeprecated"
case LogBarrier:
return "LogBarrier"
case LogConfiguration:
return "LogConfiguration"
default:
return fmt.Sprintf("%d", lt)
}
}
// Log entries are replicated to all members of the Raft cluster // Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine. // and form the heart of the replicated state machine.
type Log struct { type Log struct {
@ -62,6 +89,19 @@ type Log struct {
// trouble, so gating extension behavior via some flag in the client // trouble, so gating extension behavior via some flag in the client
// program is also a good idea. // program is also a good idea.
Extensions []byte Extensions []byte
// AppendedAt stores the time the leader first appended this log to it's
// LogStore. Followers will observe the leader's time. It is not used for
// coordination or as part of the replication protocol at all. It exists only
// to provide operational information for example how many seconds worth of
// logs are present on the leader which might impact follower's ability to
// catch up after restoring a large snapshot. We should never rely on this
// being in the past when appending on a follower or reading a log back since
// the clock skew can mean a follower could see a log with a future timestamp.
// In general too the leader is not required to persist the log before
// delivering to followers although the current implementation happens to do
// this.
AppendedAt time.Time
} }
// LogStore is used to provide an interface for storing // LogStore is used to provide an interface for storing
@ -85,3 +125,52 @@ type LogStore interface {
// DeleteRange deletes a range of log entries. The range is inclusive. // DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error DeleteRange(min, max uint64) error
} }
func oldestLog(s LogStore) (Log, error) {
var l Log
// We might get unlucky and have a truncate right between getting first log
// index and fetching it so keep trying until we succeed or hard fail.
var lastFailIdx uint64
var lastErr error
for {
firstIdx, err := s.FirstIndex()
if err != nil {
return l, err
}
if firstIdx == 0 {
return l, ErrLogNotFound
}
if firstIdx == lastFailIdx {
// Got same index as last time around which errored, don't bother trying
// to fetch it again just return the error.
return l, lastErr
}
err = s.GetLog(firstIdx, &l)
if err == nil {
// We found the oldest log, break the loop
break
}
// We failed, keep trying to see if there is a new firstIndex
lastFailIdx = firstIdx
lastErr = err
}
return l, nil
}
func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(interval):
// In error case emit 0 as the age
ageMs := float32(0.0)
l, err := oldestLog(s)
if err == nil && !l.AppendedAt.IsZero() {
ageMs = float32(time.Since(l.AppendedAt).Milliseconds())
}
metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs)
case <-stopCh:
return
}
}
}

View File

@ -51,14 +51,17 @@ func (c *LogCache) StoreLog(log *Log) error {
} }
func (c *LogCache) StoreLogs(logs []*Log) error { func (c *LogCache) StoreLogs(logs []*Log) error {
// Insert the logs into the ring buffer err := c.store.StoreLogs(logs)
// Insert the logs into the ring buffer, but only on success
if err != nil {
return fmt.Errorf("unable to store logs within log store, err: %q", err)
}
c.l.Lock() c.l.Lock()
for _, l := range logs { for _, l := range logs {
c.cache[l.Index%uint64(len(c.cache))] = l c.cache[l.Index%uint64(len(c.cache))] = l
} }
c.l.Unlock() c.l.Unlock()
return nil
return c.store.StoreLogs(logs)
} }
func (c *LogCache) FirstIndex() (uint64, error) { func (c *LogCache) FirstIndex() (uint64, error) {

View File

@ -5,13 +5,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/hashicorp/go-hclog"
"io" "io"
"net" "net"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
) )
@ -122,7 +122,6 @@ type StreamLayer interface {
type netConn struct { type netConn struct {
target ServerAddress target ServerAddress
conn net.Conn conn net.Conn
r *bufio.Reader
w *bufio.Writer w *bufio.Writer
dec *codec.Decoder dec *codec.Decoder
enc *codec.Encoder enc *codec.Encoder
@ -344,12 +343,10 @@ func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
netConn := &netConn{ netConn := &netConn{
target: target, target: target,
conn: conn, conn: conn,
r: bufio.NewReader(conn), dec: codec.NewDecoder(bufio.NewReader(conn), &codec.MsgpackHandle{}),
w: bufio.NewWriter(conn), w: bufio.NewWriter(conn),
} }
// Setup encoder/decoders
netConn.dec = codec.NewDecoder(netConn.r, &codec.MsgpackHandle{})
netConn.enc = codec.NewEncoder(netConn.w, &codec.MsgpackHandle{}) netConn.enc = codec.NewEncoder(netConn.w, &codec.MsgpackHandle{})
// Done // Done

View File

@ -2,6 +2,7 @@ package raft
import ( import (
"sync/atomic" "sync/atomic"
"time"
) )
// Observation is sent along the given channel to observers when an event occurs. // Observation is sent along the given channel to observers when an event occurs.
@ -27,6 +28,12 @@ type PeerObservation struct {
Peer Server Peer Server
} }
// FailedHeartbeatObservation is sent when a node fails to heartbeat with the leader
type FailedHeartbeatObservation struct {
PeerID ServerID
LastContact time.Time
}
// nextObserverId is used to provide a unique ID for each observer to aid in // nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration. // deregistration.
var nextObserverID uint64 var nextObserverID uint64

View File

@ -15,7 +15,8 @@ import (
) )
const ( const (
minCheckInterval = 10 * time.Millisecond minCheckInterval = 10 * time.Millisecond
oldestLogGaugeInterval = 10 * time.Second
) )
var ( var (
@ -29,7 +30,7 @@ var (
// responses. // responses.
func (r *Raft) getRPCHeader() RPCHeader { func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{ return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion, ProtocolVersion: r.config().ProtocolVersion,
} }
} }
@ -56,7 +57,7 @@ func (r *Raft) checkRPCHeader(rpc RPC) error {
// currently what we want, and in general support one version back. We // currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol // may need to revisit this policy depending on how future protocol
// changes evolve. // changes evolve.
if header.ProtocolVersion < r.conf.ProtocolVersion-1 { if header.ProtocolVersion < r.config().ProtocolVersion-1 {
return ErrUnsupportedProtocol return ErrUnsupportedProtocol
} }
@ -151,7 +152,7 @@ func (r *Raft) runFollower() {
didWarn := false didWarn := false
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader()) r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)
for r.getState() == Follower { for r.getState() == Follower {
select { select {
@ -187,11 +188,12 @@ func (r *Raft) runFollower() {
case <-heartbeatTimer: case <-heartbeatTimer:
// Restart the heartbeat timer // Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout) hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)
// Check if we have had a successful contact // Check if we have had a successful contact
lastContact := r.LastContact() lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout { if time.Now().Sub(lastContact) < hbTimeout {
continue continue
} }
@ -228,7 +230,8 @@ func (r *Raft) runFollower() {
// called on the main thread, and only makes sense in the follower state. // called on the main thread, and only makes sense in the follower state.
func (r *Raft) liveBootstrap(configuration Configuration) error { func (r *Raft) liveBootstrap(configuration Configuration) error {
// Use the pre-init API to make the static updates. // Use the pre-init API to make the static updates.
err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots, cfg := r.config()
err := BootstrapCluster(&cfg, r.logs, r.stable, r.snapshots,
r.trans, configuration) r.trans, configuration)
if err != nil { if err != nil {
return err return err
@ -260,7 +263,7 @@ func (r *Raft) runCandidate() {
// otherwise. // otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }() defer func() { r.candidateFromLeadershipTransfer = false }()
electionTimer := randomTimeout(r.conf.ElectionTimeout) electionTimer := randomTimeout(r.config().ElectionTimeout)
// Tally the votes, need a simple majority // Tally the votes, need a simple majority
grantedVotes := 0 grantedVotes := 0
@ -344,10 +347,7 @@ func (r *Raft) setLeadershipTransferInProgress(v bool) {
func (r *Raft) getLeadershipTransferInProgress() bool { func (r *Raft) getLeadershipTransferInProgress() bool {
v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress) v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress)
if v == 1 { return v == 1
return true
}
return false
} }
func (r *Raft) setupLeaderState() { func (r *Raft) setupLeaderState() {
@ -370,8 +370,13 @@ func (r *Raft) runLeader() {
// Notify that we are the leader // Notify that we are the leader
overrideNotifyBool(r.leaderCh, true) overrideNotifyBool(r.leaderCh, true)
// Store the notify chan. It's not reloadable so shouldn't change before the
// defer below runs, but this makes sure we always notify the same chan if
// ever for both gaining and loosing leadership.
notify := r.config().NotifyCh
// Push to the notify channel if given // Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil { if notify != nil {
select { select {
case notify <- true: case notify <- true:
case <-r.shutdownCh: case <-r.shutdownCh:
@ -382,8 +387,14 @@ func (r *Raft) runLeader() {
// leaderloop. // leaderloop.
r.setupLeaderState() r.setupLeaderState()
// Run a background go-routine to emit metrics on log age
stopCh := make(chan struct{})
go emitLogStoreMetrics(r.logs, []string{"raft", "leader"}, oldestLogGaugeInterval, stopCh)
// Cleanup state on step down // Cleanup state on step down
defer func() { defer func() {
close(stopCh)
// Since we were the leader previously, we update our // Since we were the leader previously, we update our
// last contact time when we step down, so that we are not // last contact time when we step down, so that we are not
// reporting a last contact time from before we were the // reporting a last contact time from before we were the
@ -427,7 +438,7 @@ func (r *Raft) runLeader() {
overrideNotifyBool(r.leaderCh, false) overrideNotifyBool(r.leaderCh, false)
// Push to the notify channel if given // Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil { if notify != nil {
select { select {
case notify <- false: case notify <- false:
case <-r.shutdownCh: case <-r.shutdownCh:
@ -548,7 +559,9 @@ func (r *Raft) leaderLoop() {
// only a single peer (ourself) and replicating to an undefined set // only a single peer (ourself) and replicating to an undefined set
// of peers. // of peers.
stepDown := false stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout) // This is only used for the first lease check, we reload lease below
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)
for r.getState() == Leader { for r.getState() == Leader {
select { select {
@ -583,7 +596,7 @@ func (r *Raft) leaderLoop() {
// the stopCh and doneCh. // the stopCh and doneCh.
go func() { go func() {
select { select {
case <-time.After(r.conf.ElectionTimeout): case <-time.After(r.config().ElectionTimeout):
close(stopCh) close(stopCh)
err := fmt.Errorf("leadership transfer timeout") err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error()) r.logger.Debug(err.Error())
@ -680,7 +693,7 @@ func (r *Raft) leaderLoop() {
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady))) metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown { if stepDown {
if r.conf.ShutdownOnRemove { if r.config().ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down") r.logger.Info("removed ourself, shutting down")
r.Shutdown() r.Shutdown()
} else { } else {
@ -751,7 +764,7 @@ func (r *Raft) leaderLoop() {
// Group commit, gather all the ready commits // Group commit, gather all the ready commits
ready := []*logFuture{newLog} ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP: GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ { for i := 0; i < r.config().MaxAppendEntries; i++ {
select { select {
case newLog := <-r.applyCh: case newLog := <-r.applyCh:
ready = append(ready, newLog) ready = append(ready, newLog)
@ -776,7 +789,7 @@ func (r *Raft) leaderLoop() {
// Next check interval should adjust for the last node we've // Next check interval should adjust for the last node we've
// contacted, without going negative // contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff checkInterval := r.config().LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval { if checkInterval < minCheckInterval {
checkInterval = minCheckInterval checkInterval = minCheckInterval
} }
@ -872,6 +885,11 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Track contacted nodes, we can always contact ourself // Track contacted nodes, we can always contact ourself
contacted := 0 contacted := 0
// Store lease timeout for this one check invocation as we need to refer to it
// in the loop and would be confusing if it ever becomes reloadable and
// changes between iterations below.
leaseTimeout := r.config().LeaderLeaseTimeout
// Check each follower // Check each follower
var maxDiff time.Duration var maxDiff time.Duration
now := time.Now() now := time.Now()
@ -883,14 +901,14 @@ func (r *Raft) checkLeaderLease() time.Duration {
} }
f := r.leaderState.replState[server.ID] f := r.leaderState.replState[server.ID]
diff := now.Sub(f.LastContact()) diff := now.Sub(f.LastContact())
if diff <= r.conf.LeaderLeaseTimeout { if diff <= leaseTimeout {
contacted++ contacted++
if diff > maxDiff { if diff > maxDiff {
maxDiff = diff maxDiff = diff
} }
} else { } else {
// Log at least once at high value, then debug. Otherwise it gets very verbose. // Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout { if diff <= 3*leaseTimeout {
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff) r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else { } else {
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff) r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
@ -1080,6 +1098,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
lastIndex++ lastIndex++
applyLog.log.Index = lastIndex applyLog.log.Index = lastIndex
applyLog.log.Term = term applyLog.log.Term = term
applyLog.log.AppendedAt = now
logs[idx] = &applyLog.log logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog) r.leaderState.inflight.PushBack(applyLog)
} }
@ -1131,7 +1150,11 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
} }
} }
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries) // Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries
batch := make([]*commitTuple, 0, maxAppendEntries)
// Apply all the preceding logs // Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ { for idx := lastApplied + 1; idx <= index; idx++ {
@ -1156,9 +1179,9 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
batch = append(batch, preparedLog) batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM // If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries { if len(batch) >= maxAppendEntries {
applyBatch(batch) applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries) batch = make([]*commitTuple, 0, maxAppendEntries)
} }
case futureOk: case futureOk:
@ -1282,7 +1305,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
} }
// Save the current leader // Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader))) r.setLeader(r.trans.DecodePeer(a.Leader))
// Verify the last log entry // Verify the last log entry
if a.PrevLogEntry > 0 { if a.PrevLogEntry > 0 {
@ -1542,7 +1565,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
} }
// Save the current leader // Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(req.Leader))) r.setLeader(r.trans.DecodePeer(req.Leader))
// Create a new snapshot // Create a new snapshot
var reqConfiguration Configuration var reqConfiguration Configuration
@ -1738,16 +1761,6 @@ func (r *Raft) setState(state RaftState) {
} }
} }
// LookupServer looks up a server by ServerID.
func (r *Raft) lookupServer(id ServerID) *Server {
for _, server := range r.configurations.latest.Servers {
if server.ID != r.localID {
return &server
}
}
return nil
}
// pickServer returns the follower that is most up to date and participating in quorum. // pickServer returns the follower that is most up to date and participating in quorum.
// Because it accesses leaderstate, it should only be called from the leaderloop. // Because it accesses leaderstate, it should only be called from the leaderloop.
func (r *Raft) pickServer() *Server { func (r *Raft) pickServer() *Server {

View File

@ -161,7 +161,7 @@ RPC:
// raft commits stop flowing naturally. The actual heartbeats // raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the // can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282. // follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout): case <-randomTimeout(r.config().CommitTimeout):
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx) shouldStop = r.replicateTo(s, lastLogIdx)
} }
@ -373,7 +373,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
// Wait for the next heartbeat interval or forced notify // Wait for the next heartbeat interval or forced notify
select { select {
case <-s.notifyCh: case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10): case <-randomTimeout(r.config().HeartbeatTimeout / 10):
case <-stopCh: case <-stopCh:
return return
} }
@ -381,6 +381,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
start := time.Now() start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil { if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err) r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: s.peer.ID, LastContact: s.LastContact()})
failures++ failures++
select { select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)): case <-time.After(backoff(failureWait, failures, maxFailureScale)):
@ -447,7 +448,7 @@ SEND:
case <-s.triggerCh: case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
case <-randomTimeout(r.conf.CommitTimeout): case <-randomTimeout(r.config().CommitTimeout):
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
} }
@ -562,9 +563,12 @@ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error
// setNewLogs is used to setup the logs which should be appended for a request. // setNewLogs is used to setup the logs which should be appended for a request.
func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error { func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
// Append up to MaxAppendEntries or up to the lastIndex // Append up to MaxAppendEntries or up to the lastIndex. we need to use a
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries) // consistent value for maxAppendEntries in the lines below in case it ever
maxIndex := min(nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex) // becomes reloadable.
maxAppendEntries := r.config().MaxAppendEntries
req.Entries = make([]*Log, 0, maxAppendEntries)
maxIndex := min(nextIndex+uint64(maxAppendEntries)-1, lastIndex)
for i := nextIndex; i <= maxIndex; i++ { for i := nextIndex; i <= maxIndex; i++ {
oldLog := new(Log) oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil { if err := r.logs.GetLog(i, oldLog); err != nil {

View File

@ -69,7 +69,7 @@ type SnapshotSink interface {
func (r *Raft) runSnapshots() { func (r *Raft) runSnapshots() {
for { for {
select { select {
case <-randomTimeout(r.conf.SnapshotInterval): case <-randomTimeout(r.config().SnapshotInterval):
// Check if we should snapshot // Check if we should snapshot
if !r.shouldSnapshot() { if !r.shouldSnapshot() {
continue continue
@ -113,7 +113,7 @@ func (r *Raft) shouldSnapshot() bool {
// Compare the delta to the threshold // Compare the delta to the threshold
delta := lastIdx - lastSnap delta := lastIdx - lastSnap
return delta >= r.conf.SnapshotThreshold return delta >= r.config().SnapshotThreshold
} }
// takeSnapshot is used to take a new snapshot. This must only be called from // takeSnapshot is used to take a new snapshot. This must only be called from
@ -219,7 +219,11 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// Check if we have enough logs to truncate // Check if we have enough logs to truncate
lastLogIdx, _ := r.getLastLog() lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
// Use a consistent value for trailingLogs for the duration of this method
// call to avoid surprising behaviour.
trailingLogs := r.config().TrailingLogs
if lastLogIdx <= trailingLogs {
return nil return nil
} }
@ -227,7 +231,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
// back from the head, which ever is further back. This ensures // back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs // at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed. // after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) maxLog := min(snapIdx, lastLogIdx-trailingLogs)
if minLog > maxLog { if minLog > maxLog {
r.logger.Info("no logs to truncate") r.logger.Info("no logs to truncate")

View File

@ -11,6 +11,6 @@ fi
# Generate the tag. # Generate the tag.
echo "==> Tagging version $VERSION..." echo "==> Tagging version $VERSION..."
git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION" git commit --allow-empty -a --gpg-sign=348FFC4C -m "Release v$VERSION"
git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" master git tag -a -m "Version $VERSION" -s -u 348FFC4C "v${VERSION}" main
exit 0 exit 0

View File

@ -27,7 +27,7 @@ func inmemConfig(t *testing.T) *Config {
conf.ElectionTimeout = 50 * time.Millisecond conf.ElectionTimeout = 50 * time.Millisecond
conf.LeaderLeaseTimeout = 50 * time.Millisecond conf.LeaderLeaseTimeout = 50 * time.Millisecond
conf.CommitTimeout = 5 * time.Millisecond conf.CommitTimeout = 5 * time.Millisecond
conf.Logger = newTestLeveledLogger(t) conf.Logger = newTestLogger(t)
return conf return conf
} }
@ -144,9 +144,6 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
} }
if a.prefix != "" { if a.prefix != "" {
l := a.prefix + ": " + string(d) l := a.prefix + ": " + string(d)
if testing.Verbose() {
fmt.Printf("testLoggerAdapter verbose: %s\n", l)
}
a.t.Log(l) a.t.Log(l)
return len(l), nil return len(l), nil
} }
@ -156,27 +153,21 @@ func (a *testLoggerAdapter) Write(d []byte) (int, error) {
} }
func newTestLogger(t *testing.T) hclog.Logger { func newTestLogger(t *testing.T) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{ return newTestLoggerWithPrefix(t, "")
Output: &testLoggerAdapter{t: t},
Level: hclog.DefaultLevel,
})
} }
// newTestLoggerWithPrefix returns a Logger that can be used in tests. prefix will
// be added as the name of the logger.
//
// If tests are run with -v (verbose mode, or -json which implies verbose) the
// log output will go to stderr directly.
// If tests are run in regular "quiet" mode, logs will be sent to t.Log so that
// the logs only appear when a test fails.
func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger { func newTestLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{ if testing.Verbose() {
Output: &testLoggerAdapter{t: t, prefix: prefix}, return hclog.New(&hclog.LoggerOptions{Name: prefix})
Level: hclog.DefaultLevel, }
})
}
func newTestLeveledLogger(t *testing.T) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{
Name: "",
Output: &testLoggerAdapter{t: t},
})
}
func newTestLeveledLoggerWithPrefix(t *testing.T, prefix string) hclog.Logger {
return hclog.New(&hclog.LoggerOptions{ return hclog.New(&hclog.LoggerOptions{
Name: prefix, Name: prefix,
Output: &testLoggerAdapter{t: t, prefix: prefix}, Output: &testLoggerAdapter{t: t, prefix: prefix},
@ -243,8 +234,8 @@ func (c *cluster) Failf(format string, args ...interface{}) {
// other goroutines created during the test. Calling FailNowf does not stop // other goroutines created during the test. Calling FailNowf does not stop
// those other goroutines. // those other goroutines.
func (c *cluster) FailNowf(format string, args ...interface{}) { func (c *cluster) FailNowf(format string, args ...interface{}) {
c.logger.Error(fmt.Sprintf(format, args...)) c.t.Helper()
c.t.FailNow() c.t.Fatalf(format, args...)
} }
// Close shuts down the cluster and cleans up. // Close shuts down the cluster and cleans up.
@ -264,7 +255,7 @@ func (c *cluster) Close() {
for _, f := range futures { for _, f := range futures {
if err := f.Error(); err != nil { if err := f.Error(); err != nil {
c.FailNowf("shutdown future err: %v", err) c.t.Fatalf("shutdown future err: %v", err)
} }
} }
@ -325,7 +316,7 @@ CHECK:
c.t.FailNow() c.t.FailNow()
case <-limitCh: case <-limitCh:
c.FailNowf("timeout waiting for replication") c.t.Fatalf("timeout waiting for replication")
case <-ch: case <-ch:
for _, fsmRaw := range c.fsms { for _, fsmRaw := range c.fsms {
@ -423,14 +414,14 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.FailNow() c.t.FailNow()
case <-limitCh: case <-limitCh:
c.FailNowf("timeout waiting for stable %s state", s) c.t.Fatalf("timeout waiting for stable %s state", s)
case <-eventCh: case <-eventCh:
c.logger.Debug("resetting stability timeout") c.logger.Debug("resetting stability timeout")
case t, ok := <-timer.C: case t, ok := <-timer.C:
if !ok { if !ok {
c.FailNowf("timer channel errored") c.t.Fatalf("timer channel errored")
} }
c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
@ -442,9 +433,10 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
// Leader waits for the cluster to elect a leader and stay in a stable state. // Leader waits for the cluster to elect a leader and stay in a stable state.
func (c *cluster) Leader() *Raft { func (c *cluster) Leader() *Raft {
c.t.Helper()
leaders := c.GetInState(Leader) leaders := c.GetInState(Leader)
if len(leaders) != 1 { if len(leaders) != 1 {
c.FailNowf("expected one leader: %v", leaders) c.t.Fatalf("expected one leader: %v", leaders)
} }
return leaders[0] return leaders[0]
} }
@ -455,7 +447,7 @@ func (c *cluster) Followers() []*Raft {
expFollowers := len(c.rafts) - 1 expFollowers := len(c.rafts) - 1
followers := c.GetInState(Follower) followers := c.GetInState(Follower)
if len(followers) != expFollowers { if len(followers) != expFollowers {
c.FailNowf("timeout waiting for %d followers (followers are %v)", expFollowers, followers) c.t.Fatalf("timeout waiting for %d followers (followers are %v)", expFollowers, followers)
} }
return followers return followers
} }
@ -551,7 +543,7 @@ func (c *cluster) EnsureLeader(t *testing.T, expect ServerAddress) {
} }
} }
if fail { if fail {
c.FailNowf("at least one peer has the wrong notion of leader") t.Fatalf("at least one peer has the wrong notion of leader")
} }
} }
@ -572,7 +564,7 @@ CHECK:
if len(first.logs) != len(fsm.logs) { if len(first.logs) != len(fsm.logs) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("FSM log length mismatch: %d %d", t.Fatalf("FSM log length mismatch: %d %d",
len(first.logs), len(fsm.logs)) len(first.logs), len(fsm.logs))
} else { } else {
goto WAIT goto WAIT
@ -583,7 +575,7 @@ CHECK:
if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 { if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("FSM log mismatch at index %d", idx) t.Fatalf("FSM log mismatch at index %d", idx)
} else { } else {
goto WAIT goto WAIT
} }
@ -592,7 +584,7 @@ CHECK:
if len(first.configurations) != len(fsm.configurations) { if len(first.configurations) != len(fsm.configurations) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("FSM configuration length mismatch: %d %d", t.Fatalf("FSM configuration length mismatch: %d %d",
len(first.logs), len(fsm.logs)) len(first.logs), len(fsm.logs))
} else { } else {
goto WAIT goto WAIT
@ -603,7 +595,7 @@ CHECK:
if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) { if !reflect.DeepEqual(first.configurations[idx], fsm.configurations[idx]) {
fsm.Unlock() fsm.Unlock()
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx]) t.Fatalf("FSM configuration mismatch at index %d: %v, %v", idx, first.configurations[idx], fsm.configurations[idx])
} else { } else {
goto WAIT goto WAIT
} }
@ -626,7 +618,7 @@ WAIT:
func (c *cluster) getConfiguration(r *Raft) Configuration { func (c *cluster) getConfiguration(r *Raft) Configuration {
future := r.GetConfiguration() future := r.GetConfiguration()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
c.FailNowf("failed to get configuration: %v", err) c.t.Fatalf("failed to get configuration: %v", err)
return Configuration{} return Configuration{}
} }
@ -647,7 +639,7 @@ CHECK:
otherSet := c.getConfiguration(raft) otherSet := c.getConfiguration(raft)
if !reflect.DeepEqual(peerSet, otherSet) { if !reflect.DeepEqual(peerSet, otherSet) {
if time.Now().After(limit) { if time.Now().After(limit) {
c.FailNowf("peer mismatch: %+v %+v", peerSet, otherSet) t.Fatalf("peer mismatch: %+v %+v", peerSet, otherSet)
} else { } else {
goto WAIT goto WAIT
} }
@ -700,7 +692,7 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
for i := 0; i < opts.Peers; i++ { for i := 0; i < opts.Peers; i++ {
dir, err := ioutil.TempDir("", "raft") dir, err := ioutil.TempDir("", "raft")
if err != nil { if err != nil {
c.FailNowf("err: %v", err) t.Fatalf("err: %v", err)
} }
store := NewInmemStore() store := NewInmemStore()
@ -750,23 +742,23 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
peerConf := opts.Conf peerConf := opts.Conf
peerConf.LocalID = configuration.Servers[i].ID peerConf.LocalID = configuration.Servers[i].ID
peerConf.Logger = newTestLeveledLoggerWithPrefix(t, string(configuration.Servers[i].ID)) peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID))
if opts.Bootstrap { if opts.Bootstrap {
err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration) err := BootstrapCluster(peerConf, logs, store, snap, trans, configuration)
if err != nil { if err != nil {
c.FailNowf("BootstrapCluster failed: %v", err) t.Fatalf("BootstrapCluster failed: %v", err)
} }
} }
raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans) raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, trans)
if err != nil { if err != nil {
c.FailNowf("NewRaft failed: %v", err) t.Fatalf("NewRaft failed: %v", err)
} }
raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) raft.RegisterObserver(NewObserver(c.observationCh, false, nil))
if err != nil { if err != nil {
c.FailNowf("RegisterObserver failed: %v", err) t.Fatalf("RegisterObserver failed: %v", err)
} }
c.rafts = append(c.rafts, raft) c.rafts = append(c.rafts, raft)
} }

4
vendor/modules.txt vendored
View File

@ -32,7 +32,7 @@ github.com/NYTimes/gziphandler
github.com/StackExchange/wmi github.com/StackExchange/wmi
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e # github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/circbuf github.com/armon/circbuf
# github.com/armon/go-metrics v0.3.6 # github.com/armon/go-metrics v0.3.7
github.com/armon/go-metrics github.com/armon/go-metrics
github.com/armon/go-metrics/circonus github.com/armon/go-metrics/circonus
github.com/armon/go-metrics/datadog github.com/armon/go-metrics/datadog
@ -481,7 +481,7 @@ github.com/hashicorp/mdns
github.com/hashicorp/memberlist github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 # github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.2.0 # github.com/hashicorp/raft v1.3.0
github.com/hashicorp/raft github.com/hashicorp/raft
# github.com/hashicorp/raft-autopilot v0.1.2 # github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-autopilot github.com/hashicorp/raft-autopilot

View File

@ -1762,29 +1762,50 @@ bind_addr = "{{ GetPrivateInterfaces | include \"network\" \"10.0.0.0/8\" | attr
- `raft_protocol` ((#raft_protocol)) Equivalent to the [`-raft-protocol` - `raft_protocol` ((#raft_protocol)) Equivalent to the [`-raft-protocol`
command-line flag](#_raft_protocol). command-line flag](#_raft_protocol).
- `raft_snapshot_threshold` ((#\_raft_snapshot_threshold)) This controls - `raft_snapshot_threshold` ((#\_raft_snapshot_threshold)) This controls the
the minimum number of raft commit entries between snapshots that are saved to disk. minimum number of raft commit entries between snapshots that are saved to
This is a low-level parameter that should rarely need to be changed. Very busy disk. This is a low-level parameter that should rarely need to be changed.
clusters experiencing excessive disk IO may increase this value to reduce disk Very busy clusters experiencing excessive disk IO may increase this value to
IO, and minimize the chances of all servers taking snapshots at the same time. reduce disk IO, and minimize the chances of all servers taking snapshots at
the same time. Increasing this trades off disk IO for disk space since the log
will grow much larger and the space in the raft.db file can't be reclaimed
till the next snapshot. Servers may take longer to recover from crashes or
failover if this is increased significantly as more logs will need to be
replayed. In Consul 1.1.0 and later this defaults to 16384, and in prior
versions it was set to 8192.
Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
server a `SIGHUP` to allow tuning snapshot activity without a rolling restart
in emergencies.
- `raft_snapshot_interval` ((#\_raft_snapshot_interval)) This controls how often
servers check if they need to save a snapshot to disk. This is a low-level
parameter that should rarely need to be changed. Very busy clusters
experiencing excessive disk IO may increase this value to reduce disk IO, and
minimize the chances of all servers taking snapshots at the same time.
Increasing this trades off disk IO for disk space since the log will grow much Increasing this trades off disk IO for disk space since the log will grow much
larger and the space in the raft.db file can't be reclaimed till the next snapshot. larger and the space in the raft.db file can't be reclaimed till the next
Servers may take longer to recover from crashes or failover if this is increased snapshot. Servers may take longer to recover from crashes or failover if this
significantly as more logs will need to be replayed. In Consul 1.1.0 and later is increased significantly as more logs will need to be replayed. In Consul
this defaults to 16384, and in prior versions it was set to 8192. 1.1.0 and later this defaults to `30s`, and in prior versions it was set to
`5s`.
Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
server a `SIGHUP` to allow tuning snapshot activity without a rolling restart
in emergencies.
- `raft_snapshot_interval` ((#\_raft_snapshot_interval)) This controls how often servers check if they - `raft_trailing_logs` - This controls how many log entries are left in the log
need to save a snapshot to disk. This is a low-level parameter that should rarely need to be changed. Very busy clusters experiencing excessive disk IO may increase this value to reduce disk IO, and minimize the chances of all servers taking snapshots at the same time. Increasing this trades store on disk after a snapshot is made. This should only be adjusted when
off disk IO for disk space since the log will grow much larger and the space in the raft.db file can't be reclaimed till the next snapshot. Servers may take longer to recover from crashes or failover if this is increased significantly as more logs will need to be replayed. In Consul 1.1.0 and later this defaults to `30s`, and in prior versions it was set to `5s`. followers cannot catch up to the leader due to a very large snapshot size
and high write throughput causing log truncation before an snapshot can be
- `raft_trailing_logs` - This controls how many fully installed on a follower. If you need to use this to recover a cluster,
log entries are left in the log store on disk after a snapshot is made. This should consider reducing write throughput or the amount of data stored on Consul as
only be adjusted when followers cannot catch up to the leader due to a very large it is likely under a load it is not designed to handle. The default value is
snapshot size that and high write throughput causing log truncation before an snapshot 10000 which is suitable for all normal workloads. Added in Consul 1.5.3.
can be fully installed. If you need to use this to recover a cluster, consider
reducing write throughput or the amount of data stored on Consul as it is likely Since Consul 1.10.0 this can be reloaded using `consul reload` or sending the
under a load it is not designed to handle. The default value is 10000 which is server a `SIGHUP` to allow recovery without downtime when followers can't keep
suitable for all normal workloads. Added in Consul 1.5.3. up.
- `reap` This controls Consul's automatic reaping of child processes, - `reap` This controls Consul's automatic reaping of child processes,
which is useful if Consul is running as PID 1 in a Docker container. If this isn't which is useful if Consul is running as PID 1 in a Docker container. If this isn't
@ -2292,6 +2313,13 @@ items which are reloaded include:
- Log level - Log level
- [Metric Prefix Filter](#telemetry-prefix_filter) - [Metric Prefix Filter](#telemetry-prefix_filter)
- [Node Metadata](#node_meta) - [Node Metadata](#node_meta)
- Some Raft options (since Consul 1.10.0)
- [`raft_snapshot_threshold`](#_raft_snapshot_threshold)
- [`raft_snapshot_interval`](#_raft_snapshot_interval)
- [`raft_trailing_logs`](#_raft_trailing_logs)
- These can be important in certain outage situations so being able to control
them without a restart provides a recovery path that doesn't involve
downtime. They generally shouldn't be changed otherwise.
- [RPC rate limiting](#limits) - [RPC rate limiting](#limits)
- [HTTP Maximum Connections per Client](#http_max_conns_per_client) - [HTTP Maximum Connections per Client](#http_max_conns_per_client)
- Services - Services

View File

@ -11,7 +11,7 @@ description: >-
The Consul agent collects various runtime metrics about the performance of The Consul agent collects various runtime metrics about the performance of
different libraries and subsystems. These metrics are aggregated on a ten different libraries and subsystems. These metrics are aggregated on a ten
second (10s) interval and are retained for one minute. An _interval_ is the period of time between instances of data being collected and aggregated. second (10s) interval and are retained for one minute. An _interval_ is the period of time between instances of data being collected and aggregated.
When telemetry is being streamed to an external metrics store, the interval is defined to be that store's flush interval. When telemetry is being streamed to an external metrics store, the interval is defined to be that store's flush interval.
@ -96,7 +96,7 @@ These are some metrics emitted that can help you understand the health of your c
**Why it's important:** Autopilot can expose the overall health of your cluster with a simple boolean. **Why it's important:** Autopilot can expose the overall health of your cluster with a simple boolean.
**What to look for:** Alert if `healthy` is 0. Some other indicators of an unhealthy cluster would be: **What to look for:** Alert if `healthy` is 0. Some other indicators of an unhealthy cluster would be:
- `consul.raft.commitTime` - This can help reflect the speed of state store - `consul.raft.commitTime` - This can help reflect the speed of state store
changes being performmed by the agent. If this number is rising, the server may changes being performmed by the agent. If this number is rising, the server may
be experiencing an issue due to degraded resources on the host. be experiencing an issue due to degraded resources on the host.
@ -147,6 +147,109 @@ you will need to apply a function such as InfluxDB's [`non_negative_difference()
Sudden large changes to the `consul.client.rpc` metrics (greater than 50% deviation from baseline). Sudden large changes to the `consul.client.rpc` metrics (greater than 50% deviation from baseline).
`consul.client.rpc.exceeded` or `consul.client.rpc.failed` count > 0, as it implies that an agent is being rate-limited or fails to make an RPC request to a Consul server `consul.client.rpc.exceeded` or `consul.client.rpc.failed` count > 0, as it implies that an agent is being rate-limited or fails to make an RPC request to a Consul server
### Raft Replication Capacity Issues
| Metric Name | Description | Unit | Type |
| :--------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------- | :------ |
| `consul.raft.fsm.lastRestoreDuration` | Measures the time taken to restore the FSM from a snapshot on an agent restart or from the leader calling installSnapshot. This is a gauge that holds it's value since most servers only restore during restarts which are typically infrequent. | ms | gauge |
| `consul.raft.leader.oldestLogAge` | The number of milliseconds since the _oldest_ log in the leader's log store was written. This can be important for replication health where write rate is high and the snapshot is large as followers may be unable to recover from a restart if restoring takes longer than the minimum value for the current leader. Compare this with `consul.raft.fsm.lastRestoreDuration` and `consul.raft.rpc.installSnapshot` to monitor. In normal usage this gauge value will grow linearly over time until a snapshot completes on the leader and the log is truncated. | ms | gauge |
| `consul.raft.rpc.installSnapshot` | Measures the time taken to process the installSnapshot RPC call. This metric should only be seen on agents which are currently in the follower state. | ms | timer |
**Why they're important:** These metrics allow operators to monitor the health
and capacity of raft replication on servers. **When Consul is handling large
amounts of data and high write throughput** it is possible for the cluster to
get into the following state:
* Write throughput is high (say 500 commits per second or more) and constant
* The leader is writing out a large snapshot every minute or so
* The snapshot is large enough that it takes considerable time to restore from
disk on a restart or from the leader if a follower gets behind
* Disk IO available allows the leader to write a snapshot faster than it can be
restored from disk on a follower
Under these conditions, a follower after a restart may be unable to catch up on
replication and become a voter again since it takes longer to restore from disk
or the leader than the leader takes to write a new snapshot and truncate its
logs. Servers retain
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) (default
`10240`) log entries even if their snapshot was more recent. On a leader
processing 500 commits/second, that is only about 20 seconds worth of logs.
Assuming the leader is able to write out a snapshot and truncate the logs in
less than 20 seconds, there will only be 20 seconds worth of "recent" logs
available on the leader right after the leader has taken a snapshot and never
more than about 80 seconds worth assuming it is taking a snapshot and truncating
logs every 60 seconds.
In this state, followers must be able to restore a snapshot into memory and
resume replication in under 80 seconds otherwise they will never be able to
rejoin the cluster until write rates reduce. If they take more than 20 seconds
then there will be a chance that they are unlucky with timing when they restart
and have to download a snapshot again from the servers one or more times. If
they take 50 seconds or more then they will likely fail to catch up more often
than they succeed and will remain non-voters for some time until they happen to
complete the restore just before the leader truncates its logs.
In the worst case, the follower will be left continually downloading snapshots
from the leader which are always too old to use by the time they are restored.
This can put additional strain on the leader transferring large snapshots
repeatedly as well as reduce the fault tolerance and serving capacity of the
cluster.
Since Consul 1.5.3
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) has been
configurable. Increasing it allows the leader to retain more logs and give
followers more time to restore and catch up. The tradeoff is potentially
slower appends which eventually might affect write throughput and latency
negatively so setting it arbitrarily high is not recommended. Before Consul
1.10.0 it required a rolling restart to change this configuration on the leader
though and since no followers could restart without loosing health this could
mean loosing cluster availability and needing to recover the cluster from a loss
of quorum.
Since Consul 1.10.0
[`raft_trailing_logs`](/docs/agent/options#_raft_trailing_logs) is now
reloadable with `consul reload` or `SIGHUP` allowing operators to increase this
without the leader restarting or loosing leadership allowing the cluster to be
recovered gracefully.
Monitoring these metrics can help avoid or diagnose this state.
**What to look for:**
`consul.raft.leader.oldestLogAge` should look like a saw-tooth wave increasing
linearly with time until the leader takes a snapshot and then jumping down as
the oldest logs are truncated. The lowest point on that line should remain
comfortably higher (i.e. 2x or more) than the time it takes to restore a
snapshot.
There are two ways a snapshot can be restored on a follower: from disk on
startup or from the leader during an `installSnapshot` RPC. The leader only
sends an `installSnapshot` RPC if the follower is new and has no state, or if
it's state is too old for it to catch up with the leaders logs.
`consul.raft.fsm.lastRestoreDuration` shows the time it took to restore from
either source the last time it happened. Most of the time this is when the
server was started. It's a gauge that will always show the last restore duration
(in Consul 1.10.0 and later) however long ago that was.
`consul.raft.rpc.installSnapshot` is the timing information from the leader's
perspective when it installs a new snapshot on a follower. It includes the time
spent transferring the data as well as the follower restoring it. Since these
events are typically infrequent, you may need to graph the last value observed,
for example using `max_over_time` with a large range in Prometheus. While the
restore part will also be reflected in `lastRestoreDuration`, it can be useful
to observe this too since the logs need to be able to cover this entire
operation including the snapshot delivery to ensure followers can always catch
up safely.
Graphing `consul.raft.leader.oldestLogAge` on the same axes as the other two
metrics here can help see at a glance if restore times are creeping dangerously
close to the limit of what the leader is retaining at the current write rate.
Note that if servers don't restart often, then the snapshot could have grown
significantly since the last restore happened so last restore times might not
reflect what would happen if an agent restarts now.
## Metrics Reference ## Metrics Reference
This is a full list of metrics emitted by Consul. This is a full list of metrics emitted by Consul.
@ -215,41 +318,43 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.cache.fetch_success` | Counts the number of successful fetches by the cache. | counter | counter | | `consul.cache.fetch_success` | Counts the number of successful fetches by the cache. | counter | counter |
| `consul.cache.fetch_error` | Counts the number of failed fetches by the cache. | counter | counter | | `consul.cache.fetch_error` | Counts the number of failed fetches by the cache. | counter | counter |
| `consul.cache.evict_expired` | Counts the number of expired entries that are evicted. | counter | counter | | `consul.cache.evict_expired` | Counts the number of expired entries that are evicted. | counter | counter |
| `consul.raft.fsm.snapshot` | Measures the time taken by the FSM to record the current state for the snapshot. | ms | timer |
| `consul.raft.fsm.apply` | The number of logs committed since the last interval. | commit logs / interval | counter |
| `consul.raft.commitNumLogs` | Measures the count of logs processed for application to the FSM in a single batch. | logs | gauge |
| `consul.raft.fsm.enqueue` | Measures the amount of time to enqueue a batch of logs for the FSM to apply. | ms | timer |
| `consul.raft.fsm.restore` | Measures the time taken by the FSM to restore its state from a snapshot. | ms | timer |
| `consul.raft.snapshot.create` | Measures the time taken to initialize the snapshot process. | ms | timer |
| `consul.raft.snapshot.persist` | Measures the time taken to dump the current snapshot taken by the Consul agent to the disk. | ms | timer |
| `consul.raft.snapshot.takeSnapshot` | Measures the total time involved in taking the current snapshot (creating one and persisting it) by the Consul agent. | ms | timer |
| `consul.raft.replication.heartbeat` | Measures the time taken to invoke appendEntries on a peer, so that it doesnt timeout on a periodic basis. | ms | timer |
| `consul.serf.snapshot.appendLine` | Measures the time taken by the Consul agent to append an entry into the existing log. | ms | timer |
| `consul.serf.snapshot.compact` | Measures the time taken by the Consul agent to compact a log. This operation occurs only when the snapshot becomes large enough to justify the compaction . | ms | timer |
| `consul.raft.applied_index` | Represents the raft applied index. | index | gauge | | `consul.raft.applied_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.last_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.state.leader` | Increments whenever a Consul server becomes a leader. If there are frequent leadership changes this may be indication that the servers are overloaded and aren't meeting the soft real-time requirements for Raft, or that there are networking problems between the servers. | leadership transitions / interval | counter |
| `consul.raft.state.candidate` | Increments whenever a Consul server starts an election. If this increments without a leadership change occurring it could indicate that a single server is overloaded or is experiencing network connectivity issues. | election attempts / interval | counter |
| `consul.raft.apply` | Counts the number of Raft transactions occurring over the interval, which is a general indicator of the write load on the Consul servers. | raft transactions / interval | counter | | `consul.raft.apply` | Counts the number of Raft transactions occurring over the interval, which is a general indicator of the write load on the Consul servers. | raft transactions / interval | counter |
| `consul.raft.barrier` | Counts the number of times the agent has started the barrier i.e the number of times it has issued a blocking call, to ensure that the agent has all the pending operations that were queued, to be applied to the agent's FSM. | blocks / interval | counter | | `consul.raft.barrier` | Counts the number of times the agent has started the barrier i.e the number of times it has issued a blocking call, to ensure that the agent has all the pending operations that were queued, to be applied to the agent's FSM. | blocks / interval | counter |
| `consul.raft.verify_leader` | Counts the number of times an agent checks whether it is still the leader or not | checks / interval | Counter | | `consul.raft.commitNumLogs` | Measures the count of logs processed for application to the FSM in a single batch. | logs | gauge |
| `consul.raft.restore` | Counts the number of times the restore operation has been performed by the agent. Here, restore refers to the action of raft consuming an external snapshot to restore its state. | operation invoked / interval | counter |
| `consul.raft.commitTime` | Measures the time it takes to commit a new entry to the Raft log on the leader. | ms | timer | | `consul.raft.commitTime` | Measures the time it takes to commit a new entry to the Raft log on the leader. | ms | timer |
| `consul.raft.fsm.lastRestoreDuration` | Measures the time taken to restore the FSM from a snapshot on an agent restart or from the leader calling installSnapshot. This is a gauge that holds it's value since most servers only restore during restarts which are typically infrequent. | ms | gauge |
| `consul.raft.fsm.snapshot` | Measures the time taken by the FSM to record the current state for the snapshot. | ms | timer |
| `consul.raft.fsm.apply` | The number of logs committed since the last interval. | commit logs / interval | counter |
| `consul.raft.fsm.enqueue` | Measures the amount of time to enqueue a batch of logs for the FSM to apply. | ms | timer |
| `consul.raft.fsm.restore` | Measures the time taken by the FSM to restore its state from a snapshot. | ms | timer |
| `consul.raft.last_index` | Represents the raft applied index. | index | gauge |
| `consul.raft.leader.dispatchLog` | Measures the time it takes for the leader to write log entries to disk. | ms | timer | | `consul.raft.leader.dispatchLog` | Measures the time it takes for the leader to write log entries to disk. | ms | timer |
| `consul.raft.leader.dispatchNumLogs` | Measures the number of logs committed to disk in a batch. | logs | gauge | | `consul.raft.leader.dispatchNumLogs` | Measures the number of logs committed to disk in a batch. | logs | gauge |
| `consul.raft.leader.lastContact` | Measures the time since the leader was last able to contact the follower nodes when checking its leader lease. It can be used as a measure for how stable the Raft timing is and how close the leader is to timing out its lease.The lease timeout is 500 ms times the [`raft_multiplier` configuration](/docs/agent/options#raft_multiplier), so this telemetry value should not be getting close to that configured value, otherwise the Raft timing is marginal and might need to be tuned, or more powerful servers might be needed. See the [Server Performance](/docs/install/performance) guide for more details. | ms | timer |
| `consul.raft.leader.oldestLogAge` | The number of milliseconds since the _oldest_ log in the leader's log store was written. This can be important for replication health where write rate is high and the snapshot is large as followers may be unable to recover from a restart if restoring takes longer than the minimum value for the current leader. Compare this with `consul.raft.fsm.lastRestoreDuration` and `consul.raft.rpc.installSnapshot` to monitor. In normal usage this gauge value will grow linearly over time until a snapshot completes on the leader and the log is truncated. Note: this metric won't be emitted until the leader writes a snapshot. After an upgrade to Consul 1.10.0 it won't be emitted until the oldest log was written after the upgrade. | ms | gauge |
| `consul.raft.replication.heartbeat` | Measures the time taken to invoke appendEntries on a peer, so that it doesnt timeout on a periodic basis. | ms | timer |
| `consul.raft.replication.appendEntries` | Measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers. | ms | timer | | `consul.raft.replication.appendEntries` | Measures the time it takes to replicate log entries to followers. This is a general indicator of the load pressure on the Consul servers, as well as the performance of the communication between the servers. | ms | timer |
| `consul.raft.state.follower` | Counts the number of times an agent has entered the follower mode. This happens when a new agent joins the cluster or after the end of a leader election. | follower state entered / interval | counter | | `consul.raft.replication.appendEntries.rpc` | Measures the time taken by the append entries RFC, to replicate the log entries of a leader agent onto its follower agent(s) | ms | timer |
| `consul.raft.transistion.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter | | `consul.raft.replication.appendEntries.logs` | Measures the number of logs replicated to an agent, to bring it up to speed with the leader's logs. | logs appended/ interval | counter |
| `consul.raft.restore` | Counts the number of times the restore operation has been performed by the agent. Here, restore refers to the action of raft consuming an external snapshot to restore its state. | operation invoked / interval | counter |
| `consul.raft.restoreUserSnapshot` | Measures the time taken by the agent to restore the FSM state from a user's snapshot | ms | timer | | `consul.raft.restoreUserSnapshot` | Measures the time taken by the agent to restore the FSM state from a user's snapshot | ms | timer |
| `consul.raft.rpc.processHeartBeat` | Measures the time taken to process a heartbeat request. | ms | timer |
| `consul.raft.rpc.appendEntries` | Measures the time taken to process an append entries RPC call from an agent. | ms | timer | | `consul.raft.rpc.appendEntries` | Measures the time taken to process an append entries RPC call from an agent. | ms | timer |
| `consul.raft.rpc.appendEntries.storeLogs` | Measures the time taken to add any outstanding logs for an agent, since the last appendEntries was invoked | ms | timer | | `consul.raft.rpc.appendEntries.storeLogs` | Measures the time taken to add any outstanding logs for an agent, since the last appendEntries was invoked | ms | timer |
| `consul.raft.rpc.appendEntries.processLogs` | Measures the time taken to process the outstanding log entries of an agent. | ms | timer | | `consul.raft.rpc.appendEntries.processLogs` | Measures the time taken to process the outstanding log entries of an agent. | ms | timer |
| `consul.raft.rpc.requestVote` | Measures the time taken to process the request vote RPC call. | ms | timer |
| `consul.raft.rpc.installSnapshot` | Measures the time taken to process the installSnapshot RPC call. This metric should only be seen on agents which are currently in the follower state. | ms | timer | | `consul.raft.rpc.installSnapshot` | Measures the time taken to process the installSnapshot RPC call. This metric should only be seen on agents which are currently in the follower state. | ms | timer |
| `consul.raft.replication.appendEntries.rpc` | Measures the time taken by the append entries RFC, to replicate the log entries of a leader agent onto its follower agent(s) | ms | timer | | `consul.raft.rpc.processHeartBeat` | Measures the time taken to process a heartbeat request. | ms | timer |
| `consul.raft.replication.appendEntries.logs` | Measures the number of logs replicated to an agent, to bring it up to speed with the leader's logs. | logs appended/ interval | counter | | `consul.raft.rpc.requestVote` | Measures the time taken to process the request vote RPC call. | ms | timer |
| `consul.raft.leader.lastContact` | This will only be emitted by the Raft leader and measures the time since the leader was last able to contact the follower nodes when checking its leader lease. It can be used as a measure for how stable the Raft timing is and how close the leader is to timing out its lease.The lease timeout is 500 ms times the [`raft_multiplier` configuration](/docs/agent/options#raft_multiplier), so this telemetry value should not be getting close to that configured value, otherwise the Raft timing is marginal and might need to be tuned, or more powerful servers might be needed. See the [Server Performance](/docs/install/performance) guide for more details. | ms | timer | | `consul.raft.snapshot.create` | Measures the time taken to initialize the snapshot process. | ms | timer |
| `consul.raft.snapshot.persist` | Measures the time taken to dump the current snapshot taken by the Consul agent to the disk. | ms | timer |
| `consul.raft.snapshot.takeSnapshot` | Measures the total time involved in taking the current snapshot (creating one and persisting it) by the Consul agent. | ms | timer |
| `consul.serf.snapshot.appendLine` | Measures the time taken by the Consul agent to append an entry into the existing log. | ms | timer |
| `consul.serf.snapshot.compact` | Measures the time taken by the Consul agent to compact a log. This operation occurs only when the snapshot becomes large enough to justify the compaction . | ms | timer |
| `consul.raft.state.candidate` | Increments whenever a Consul server starts an election. If this increments without a leadership change occurring it could indicate that a single server is overloaded or is experiencing network connectivity issues. | election attempts / interval | counter |
| `consul.raft.state.leader` | Increments whenever a Consul server becomes a leader. If there are frequent leadership changes this may be indication that the servers are overloaded and aren't meeting the soft real-time requirements for Raft, or that there are networking problems between the servers. | leadership transitions / interval | counter |
| `consul.raft.state.follower` | Counts the number of times an agent has entered the follower mode. This happens when a new agent joins the cluster or after the end of a leader election. | follower state entered / interval | counter |
| `consul.raft.transistion.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter |
| `consul.raft.verify_leader` | Counts the number of times an agent checks whether it is still the leader or not | checks / interval | Counter |
| `consul.rpc.accept_conn` | Increments when a server accepts an RPC connection. | connections | counter | | `consul.rpc.accept_conn` | Increments when a server accepts an RPC connection. | connections | counter |
| `consul.catalog.register` | Measures the time it takes to complete a catalog register operation. | ms | timer | | `consul.catalog.register` | Measures the time it takes to complete a catalog register operation. | ms | timer |
| `consul.catalog.deregister` | Measures the time it takes to complete a catalog deregister operation. | ms | timer | | `consul.catalog.deregister` | Measures the time it takes to complete a catalog deregister operation. | ms | timer |