Add a test for server stabilization (#11128)

This commit is contained in:
Nick Cabatoff 2021-03-17 17:23:13 -04:00 committed by GitHub
parent 509c31604d
commit 411495514c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 195 additions and 43 deletions

View File

@ -145,6 +145,8 @@ type RaftBackend struct {
// VAULT_RAFT_AUTOPILOT_DISABLE during startup and can't be updated once the
// node is up and running.
disableAutopilot bool
autopilotReconcileInterval time.Duration
}
// LeaderJoinInfo contains information required by a node to join itself as a
@ -385,7 +387,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
if err != nil {
return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err)
}
snap = newSnapshotStoreDelay(snap, delay)
snap = newSnapshotStoreDelay(snap, delay, logger)
}
maxEntrySize := defaultMaxEntrySize
@ -398,28 +400,40 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
maxEntrySize = uint64(i)
}
var reconcileInterval time.Duration
if interval := conf["autopilot_reconcile_interval"]; interval != "" {
interval, err := time.ParseDuration(interval)
if err != nil {
return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err)
}
reconcileInterval = interval
}
return &RaftBackend{
logger: logger,
fsm: fsm,
raftInitCh: make(chan struct{}),
conf: conf,
logStore: log,
stableStore: stable,
snapStore: snap,
dataDir: path,
localID: localID,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
maxEntrySize: maxEntrySize,
followerHeartbeatTicker: time.NewTicker(time.Second),
logger: logger,
fsm: fsm,
raftInitCh: make(chan struct{}),
conf: conf,
logStore: log,
stableStore: stable,
snapStore: snap,
dataDir: path,
localID: localID,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
maxEntrySize: maxEntrySize,
followerHeartbeatTicker: time.NewTicker(time.Second),
autopilotReconcileInterval: reconcileInterval,
}, nil
}
type snapshotStoreDelay struct {
logger log.Logger
wrapped raft.SnapshotStore
delay time.Duration
}
func (s snapshotStoreDelay) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
s.logger.Trace("delaying before creating snapshot", "delay", s.delay)
time.Sleep(s.delay)
return s.wrapped.Create(version, index, term, configuration, configurationIndex, trans)
}
@ -434,8 +448,9 @@ func (s snapshotStoreDelay) Open(id string) (*raft.SnapshotMeta, io.ReadCloser,
var _ raft.SnapshotStore = &snapshotStoreDelay{}
func newSnapshotStoreDelay(snap raft.SnapshotStore, delay time.Duration) *snapshotStoreDelay {
func newSnapshotStoreDelay(snap raft.SnapshotStore, delay time.Duration, logger log.Logger) *snapshotStoreDelay {
return &snapshotStoreDelay{
logger: logger,
wrapped: snap,
delay: delay,
}
@ -666,6 +681,7 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
// Setup the raft config
raftConfig := raft.DefaultConfig()
raftConfig.SnapshotInterval = 5 * time.Second
if err := b.applyConfigSettings(raftConfig); err != nil {
return err
}
@ -774,6 +790,7 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
}
}
b.logger.Info("creating Raft", "config", fmt.Sprintf("%#v", raftConfig))
raftObj, err := raft.NewRaft(raftConfig, b.fsm.chunker, b.logStore, b.stableStore, b.snapStore, b.raftTransport)
b.fsm.SetNoopRestore(false)
if err != nil {

View File

@ -670,13 +670,20 @@ func (b *RaftBackend) SetupAutopilot(ctx context.Context, storageConfig *Autopil
b.autopilotConfig.Merge(storageConfig)
// Create the autopilot instance
b.autopilot = autopilot.New(b.raft, newDelegate(b), autopilot.WithLogger(b.logger), autopilot.WithPromoter(b.autopilotPromoter()))
options := []autopilot.Option{
autopilot.WithLogger(b.logger),
autopilot.WithPromoter(b.autopilotPromoter()),
}
if b.autopilotReconcileInterval != 0 {
options = append(options, autopilot.WithReconcileInterval(b.autopilotReconcileInterval))
}
b.autopilot = autopilot.New(b.raft, newDelegate(b), options...)
b.followerStates = followerStates
b.followerHeartbeatTicker = time.NewTicker(1 * time.Second)
b.l.Unlock()
b.logger.Info("starting autopilot", "config", b.autopilotConfig)
b.logger.Info("starting autopilot", "config", b.autopilotConfig, "reconcile_interval", b.autopilotReconcileInterval)
b.autopilot.Start(ctx)
go b.startFollowerHeartbeatTracker()

View File

@ -2,21 +2,23 @@ package rafttests
import (
"context"
"fmt"
"math"
"testing"
"time"
"github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/hashicorp/go-hclog"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/stretchr/testify/require"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/vault"
"github.com/kr/pretty"
testingintf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
)
func TestRaft_Autopilot_Disable(t *testing.T) {
@ -208,3 +210,133 @@ func TestRaft_Autopilot_Configuration(t *testing.T) {
vault.TestWaitActive(t, leaderCore.Core)
configCheckFunc(config)
}
// TestRaft_Autopilot_Stabilization_Delay verifies that if a node takes a long
// time to become ready, it doesn't get promoted to voter until then.
func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
conf, opts := teststorage.ClusterSetup(nil, nil, teststorage.RaftBackendSetup)
conf.DisableAutopilot = false
opts.InmemClusterLayers = true
opts.KeepStandbysSealed = true
opts.SetupFunc = nil
timeToHealthyCore2 := 5 * time.Second
opts.PhysicalFactory = func(t testingintf.T, coreIdx int, logger hclog.Logger, conf map[string]interface{}) *vault.PhysicalBackendBundle {
config := map[string]interface{}{
"snapshot_threshold": "50",
"trailing_logs": "100",
"autopilot_reconcile_interval": "1s",
}
if coreIdx == 2 {
config["snapshot_delay"] = timeToHealthyCore2.String()
}
return teststorage.MakeRaftBackend(t, coreIdx, logger, config)
}
cluster := vault.NewTestCluster(t, conf, opts)
cluster.Start()
defer cluster.Cleanup()
testhelpers.WaitForActiveNode(t, cluster)
// Check that autopilot execution state is running
client := cluster.Cores[0].Client
state, err := client.Sys().RaftAutopilotState()
require.NotNil(t, state)
require.NoError(t, err)
require.Equal(t, true, state.Healthy)
require.Len(t, state.Servers, 1)
require.Equal(t, "core-0", state.Servers["core-0"].ID)
require.Equal(t, "alive", state.Servers["core-0"].NodeStatus)
require.Equal(t, "leader", state.Servers["core-0"].Status)
_, err = client.Logical().Write("sys/storage/raft/autopilot/configuration", map[string]interface{}{
"server_stabilization_time": "3s",
})
require.NoError(t, err)
config, err := client.Sys().RaftAutopilotConfiguration()
require.NoError(t, err)
// Wait for 110% of the stabilization time to add nodes
stabilizationKickOffWaitDuration := time.Duration(math.Ceil(1.1 * float64(config.ServerStabilizationTime)))
time.Sleep(stabilizationKickOffWaitDuration)
cli := cluster.Cores[0].Client
// Write more keys than snapshot_threshold
for i := 0; i < 250; i++ {
_, err := cli.Logical().Write(fmt.Sprintf("secret/%d", i), map[string]interface{}{
"test": "data",
})
if err != nil {
t.Fatal(err)
}
}
joinFunc := func(core *vault.TestClusterCore) {
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), []*raft.LeaderJoinInfo{
{
LeaderAPIAddr: client.Address(),
TLSConfig: cluster.Cores[0].TLSConfig,
Retry: true,
},
}, false)
require.NoError(t, err)
time.Sleep(1 * time.Second)
cluster.UnsealCore(t, core)
}
checkState := func(nodeID string, numServers int, allHealthy bool, healthy bool, suffrage string) {
state, err = client.Sys().RaftAutopilotState()
require.NoError(t, err)
require.Equal(t, allHealthy, state.Healthy)
require.Len(t, state.Servers, numServers)
require.Equal(t, healthy, state.Servers[nodeID].Healthy)
require.Equal(t, "alive", state.Servers[nodeID].NodeStatus)
require.Equal(t, suffrage, state.Servers[nodeID].Status)
}
joinFunc(cluster.Cores[1])
checkState("core-1", 2, false, false, "non-voter")
core2shouldBeHealthyAt := time.Now().Add(timeToHealthyCore2)
joinFunc(cluster.Cores[2])
checkState("core-2", 3, false, false, "non-voter")
stabilizationWaitDuration := time.Duration(1.25 * float64(config.ServerStabilizationTime))
deadline := time.Now().Add(stabilizationWaitDuration)
var core1healthy, core2healthy bool
for time.Now().Before(deadline) {
state, err := client.Sys().RaftAutopilotState()
require.NoError(t, err)
core1healthy = state.Servers["core-1"].Healthy
core2healthy = state.Servers["core-2"].Healthy
time.Sleep(1 * time.Second)
}
if !core1healthy || core2healthy {
t.Fatalf("expected health: core1=true and core2=false, got: core=%v, core2=%v", core1healthy, core2healthy)
}
time.Sleep(2 * time.Second) // wait for reconciliation
state, err = client.Sys().RaftAutopilotState()
require.NoError(t, err)
require.Equal(t, []string{"core-0", "core-1"}, state.Voters)
for time.Now().Before(core2shouldBeHealthyAt) {
state, err := client.Sys().RaftAutopilotState()
require.NoError(t, err)
core2healthy = state.Servers["core-2"].Healthy
time.Sleep(1 * time.Second)
t.Log(core2healthy)
}
deadline = time.Now().Add(10 * time.Second)
for time.Now().Before(deadline) {
state, err = client.Sys().RaftAutopilotState()
if err != nil {
t.Fatal(err)
}
if strutil.EquivalentSlices(state.Voters, []string{"core-0", "core-1", "core-2"}) {
break
}
}
require.Equal(t, state.Voters, []string{"core-0", "core-1", "core-2"})
}

View File

@ -5,29 +5,24 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/logical"
"io/ioutil"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api"
credUserpass "github.com/hashicorp/vault/builtin/credential/userpass"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
vaulthttp "github.com/hashicorp/vault/http"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
vaultcluster "github.com/hashicorp/vault/vault/cluster"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
)
@ -55,20 +50,7 @@ func raftCluster(t testing.TB, ropts *RaftClusterOpts) *vault.TestCluster {
var opts = vault.TestClusterOptions{
HandlerFunc: vaulthttp.Handler,
}
opts.Logger = logging.NewVaultLogger(hclog.Trace).Named(t.Name())
if ropts.InmemCluster {
inmemCluster, err := vaultcluster.NewInmemLayerCluster("inmem-cluster", 3, hclog.New(&hclog.LoggerOptions{
Mutex: &sync.Mutex{},
Level: hclog.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}
opts.ClusterLayers = inmemCluster
}
opts.InmemClusterLayers = ropts.InmemCluster
opts.PhysicalFactoryConfig = ropts.PhysicalFactoryConfig
conf.DisablePerformanceStandby = ropts.DisablePerfStandby

View File

@ -1081,6 +1081,9 @@ type TestClusterOptions struct {
// ClusterLayers are used to override the default cluster connection layer
ClusterLayers cluster.NetworkLayerSet
// InmemClusterLayers is a shorthand way of asking for ClusterLayers to be
// built using the inmem implementation.
InmemClusterLayers bool
// RaftAddressProvider is used to set the raft ServerAddressProvider on
// each core.
@ -1561,6 +1564,17 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
testCluster.pubKey = pubKey
testCluster.priKey = priKey
if opts != nil && opts.InmemClusterLayers {
if opts.ClusterLayers != nil {
t.Fatalf("cannot specify ClusterLayers when InmemClusterLayers is true")
}
inmemCluster, err := cluster.NewInmemLayerCluster("inmem-cluster", numCores, testCluster.Logger.Named("inmem-cluster"))
if err != nil {
t.Fatal(err)
}
opts.ClusterLayers = inmemCluster
}
// Create cores
testCluster.cleanupFuncs = []func(){}
cores := []*Core{}