Raft: use a larger initial heartbeat/election timeout (#15042)
This commit is contained in:
parent
90b12f1386
commit
c5928c1d15
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
storage/raft: Use larger timeouts at startup to reduce likelihood of inducing elections.
|
||||
```
|
2
go.mod
2
go.mod
|
@ -87,7 +87,7 @@ require (
|
|||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/hashicorp/hcl v1.0.1-vault-3
|
||||
github.com/hashicorp/nomad/api v0.0.0-20211006193434-215bf04bc650
|
||||
github.com/hashicorp/raft v1.3.3
|
||||
github.com/hashicorp/raft v1.3.9
|
||||
github.com/hashicorp/raft-autopilot v0.1.3
|
||||
github.com/hashicorp/raft-boltdb/v2 v2.0.0-20210421194847-a7e34179d62c
|
||||
github.com/hashicorp/raft-snapshot v1.0.4
|
||||
|
|
4
go.sum
4
go.sum
|
@ -938,8 +938,8 @@ github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcb
|
|||
github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
|
||||
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft v1.3.3 h1:Xr6DSHC5cIM8kzxu+IgoT/+MeNeUNeWin3ie6nlSrMg=
|
||||
github.com/hashicorp/raft v1.3.3/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
|
||||
github.com/hashicorp/raft v1.3.9 h1:9yuo1aR0bFTr1cw7pj3S2Bk6MhJCsnr2NAxvIBrP2x4=
|
||||
github.com/hashicorp/raft v1.3.9/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
|
||||
github.com/hashicorp/raft-autopilot v0.1.3 h1:Y+5jWKTFABJhCrpVwGpGjti2LzwQSzivoqd2wM6JWGw=
|
||||
github.com/hashicorp/raft-autopilot v0.1.3/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||
|
|
|
@ -212,7 +212,7 @@ func deriveStableActiveCore(t testing.T, cluster *vault.TestCluster) *vault.Test
|
|||
activeCore := DeriveActiveCore(t, cluster)
|
||||
minDuration := time.NewTimer(3 * time.Second)
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
for i := 0; i < 60; i++ {
|
||||
leaderResp, err := activeCore.Client.Sys().Leader()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -238,7 +238,7 @@ func deriveStableActiveCore(t testing.T, cluster *vault.TestCluster) *vault.Test
|
|||
|
||||
func DeriveActiveCore(t testing.T, cluster *vault.TestCluster) *vault.TestClusterCore {
|
||||
t.Helper()
|
||||
for i := 0; i < 20; i++ {
|
||||
for i := 0; i < 60; i++ {
|
||||
for _, core := range cluster.Cores {
|
||||
leaderResp, err := core.Client.Sys().Leader()
|
||||
if err != nil {
|
||||
|
@ -329,7 +329,7 @@ func WaitForNCoresSealed(t testing.T, cluster *vault.TestCluster, n int) {
|
|||
|
||||
func WaitForActiveNode(t testing.T, cluster *vault.TestCluster) *vault.TestClusterCore {
|
||||
t.Helper()
|
||||
for i := 0; i < 30; i++ {
|
||||
for i := 0; i < 60; i++ {
|
||||
for _, core := range cluster.Cores {
|
||||
if standby, _ := core.Core.Standby(); !standby {
|
||||
return core
|
||||
|
@ -564,7 +564,7 @@ func WaitForRaftApply(t testing.T, core *vault.TestClusterCore, index uint64) {
|
|||
|
||||
// AwaitLeader waits for one of the cluster's nodes to become leader.
|
||||
func AwaitLeader(t testing.T, cluster *vault.TestCluster) (int, error) {
|
||||
timeout := time.Now().Add(30 * time.Second)
|
||||
timeout := time.Now().Add(60 * time.Second)
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
break
|
||||
|
|
|
@ -784,7 +784,15 @@ func (f *FSM) Restore(r io.ReadCloser) error {
|
|||
|
||||
snapshotInstaller, ok := r.(*boltSnapshotInstaller)
|
||||
if !ok {
|
||||
return errors.New("expected snapshot installer object")
|
||||
wrapper, ok := r.(raft.ReadCloserWrapper)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected ReadCloserWrapper object, got: %T", r)
|
||||
}
|
||||
snapshotInstallerRaw := wrapper.WrappedReadCloser()
|
||||
snapshotInstaller, ok = snapshotInstallerRaw.(*boltSnapshotInstaller)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected snapshot installer object, got: %T", snapshotInstallerRaw)
|
||||
}
|
||||
}
|
||||
|
||||
f.l.Lock()
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -676,6 +677,7 @@ func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
|
|||
// scheduler.
|
||||
config.BatchApplyCh = true
|
||||
|
||||
b.logger.Trace("applying raft config", "inputs", b.conf)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -758,6 +760,7 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|||
return false
|
||||
}
|
||||
|
||||
var initialTimeoutMultiplier time.Duration
|
||||
switch {
|
||||
case opts.TLSKeyring == nil && listenerIsNil(opts.ClusterListener):
|
||||
// If we don't have a provided network we use an in-memory one.
|
||||
|
@ -769,6 +772,19 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|||
case listenerIsNil(opts.ClusterListener):
|
||||
return errors.New("no cluster listener provided")
|
||||
default:
|
||||
initialTimeoutMultiplier = 3
|
||||
if !opts.StartAsLeader {
|
||||
electionTimeout, heartbeatTimeout := raftConfig.ElectionTimeout, raftConfig.HeartbeatTimeout
|
||||
// Use bigger values for first election
|
||||
raftConfig.ElectionTimeout *= initialTimeoutMultiplier
|
||||
raftConfig.HeartbeatTimeout *= initialTimeoutMultiplier
|
||||
b.logger.Trace("using larger timeouts for raft at startup",
|
||||
"initial_election_timeout", raftConfig.ElectionTimeout,
|
||||
"initial_heartbeat_timeout", raftConfig.HeartbeatTimeout,
|
||||
"normal_election_timeout", electionTimeout,
|
||||
"normal_heartbeat_timeout", heartbeatTimeout)
|
||||
}
|
||||
|
||||
// Set the local address and localID in the streaming layer and the raft config.
|
||||
streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener)
|
||||
if err != nil {
|
||||
|
@ -902,6 +918,63 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|||
// Close the init channel to signal setup has been completed
|
||||
close(b.raftInitCh)
|
||||
|
||||
reloadConfig := func() {
|
||||
newCfg := raft.ReloadableConfig{
|
||||
TrailingLogs: raftConfig.TrailingLogs,
|
||||
SnapshotInterval: raftConfig.SnapshotInterval,
|
||||
SnapshotThreshold: raftConfig.SnapshotThreshold,
|
||||
HeartbeatTimeout: raftConfig.HeartbeatTimeout / initialTimeoutMultiplier,
|
||||
ElectionTimeout: raftConfig.ElectionTimeout / initialTimeoutMultiplier,
|
||||
}
|
||||
err := raftObj.ReloadConfig(newCfg)
|
||||
if err != nil {
|
||||
b.logger.Error("failed to reload raft config to set lower timeouts", "error", err)
|
||||
} else {
|
||||
b.logger.Trace("reloaded raft config to set lower timeouts", "config", fmt.Sprintf("%#v", newCfg))
|
||||
}
|
||||
}
|
||||
confFuture := raftObj.GetConfiguration()
|
||||
numServers := 0
|
||||
if err := confFuture.Error(); err != nil {
|
||||
// This should probably never happen, but just in case we'll log the error.
|
||||
// We'll default in this case to the multi-node behaviour.
|
||||
b.logger.Error("failed to read raft configuration", "error", err)
|
||||
} else {
|
||||
clusterConf := confFuture.Configuration()
|
||||
numServers = len(clusterConf.Servers)
|
||||
}
|
||||
if initialTimeoutMultiplier != 0 {
|
||||
if numServers == 1 {
|
||||
reloadConfig()
|
||||
} else {
|
||||
go func() {
|
||||
ticker := time.NewTicker(50 * time.Millisecond)
|
||||
// Emulate the random timeout used in Raft lib, to ensure that
|
||||
// if all nodes are brought up simultaneously, they don't all
|
||||
// call for an election at once.
|
||||
extra := time.Duration(rand.Int63()) % raftConfig.HeartbeatTimeout
|
||||
timeout := time.NewTimer(raftConfig.HeartbeatTimeout + extra)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
switch raftObj.State() {
|
||||
case raft.Candidate, raft.Leader:
|
||||
b.logger.Trace("triggering raft config reload due to being candidate or leader")
|
||||
reloadConfig()
|
||||
return
|
||||
case raft.Shutdown:
|
||||
return
|
||||
}
|
||||
case <-timeout.C:
|
||||
b.logger.Trace("triggering raft config reload due to initial timeout")
|
||||
reloadConfig()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
b.logger.Trace("finished setting up raft cluster")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -42,6 +42,9 @@ func TestRaft_Autopilot_Stabilization_And_State(t *testing.T) {
|
|||
DisableFollowerJoins: true,
|
||||
InmemCluster: true,
|
||||
EnableAutopilot: true,
|
||||
PhysicalFactoryConfig: map[string]interface{}{
|
||||
"performance_multiplier": "5",
|
||||
},
|
||||
})
|
||||
defer cluster.Cleanup()
|
||||
|
||||
|
@ -55,6 +58,23 @@ func TestRaft_Autopilot_Stabilization_And_State(t *testing.T) {
|
|||
require.Equal(t, "alive", state.Servers["core-0"].NodeStatus)
|
||||
require.Equal(t, "leader", state.Servers["core-0"].Status)
|
||||
|
||||
writeConfig := func(config map[string]interface{}, expectError bool) {
|
||||
resp, err := client.Logical().Write("sys/storage/raft/autopilot/configuration", config)
|
||||
if expectError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, resp)
|
||||
}
|
||||
|
||||
writableConfig := map[string]interface{}{
|
||||
"last_contact_threshold": "5s",
|
||||
"max_trailing_logs": 100,
|
||||
"server_stabilization_time": "10s",
|
||||
}
|
||||
writeConfig(writableConfig, false)
|
||||
|
||||
config, err := client.Sys().RaftAutopilotConfiguration()
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -126,6 +146,25 @@ func TestRaft_Autopilot_Stabilization_And_State(t *testing.T) {
|
|||
state, err = client.Sys().RaftAutopilotState()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"core-0", "core-1", "core-2"}, state.Voters)
|
||||
|
||||
// Now make sure that after we seal and unseal a node, the current leader
|
||||
// remains leader, and that the cluster becomes healthy again.
|
||||
leader := state.Leader
|
||||
testhelpers.EnsureCoreSealed(t, cluster.Cores[1])
|
||||
time.Sleep(10 * time.Second)
|
||||
testhelpers.EnsureCoreUnsealed(t, cluster, cluster.Cores[1])
|
||||
|
||||
deadline := time.Now().Add(2 * time.Minute)
|
||||
for time.Now().Before(deadline) {
|
||||
state, err = client.Sys().RaftAutopilotState()
|
||||
require.NoError(t, err)
|
||||
if state.Healthy && state.Leader == leader {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
require.Equal(t, true, state.Healthy)
|
||||
require.Equal(t, leader, state.Leader)
|
||||
}
|
||||
|
||||
func TestRaft_Autopilot_Configuration(t *testing.T) {
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
type testFunc func(t *testing.T, logger hclog.Logger, storage teststorage.ReusableStorage, basePort int)
|
||||
|
||||
func testVariousBackends(t *testing.T, tf testFunc, basePort int, includeRaft bool) {
|
||||
logger := logging.NewVaultLogger(hclog.Debug).Named(t.Name())
|
||||
logger := logging.NewVaultLogger(hclog.Trace).Named(t.Name())
|
||||
|
||||
t.Run("inmem", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
Loading…
Reference in New Issue