raft: add support for using backend for ha_storage (#9193)
* raft: initial work on raft ha storage support * add note on join * add todo note * raft: add support for bootstrapping and joining existing nodes * raft: gate bootstrap join by reading leader api address from storage * raft: properly check for raft-only for certain conditionals * raft: add bootstrap to api and cli * raft: fix bootstrap cli command * raft: add test for setting up new cluster with raft HA * raft: extend TestRaft_HA_NewCluster to include inmem and consul backends * raft: add test for updating an existing cluster to use raft HA * raft: remove debug log lines, clean up verifyRaftPeers * raft: minor cleanup * raft: minor cleanup * Update physical/raft/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/ha.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/ha.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/logical_system_raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * address feedback comments * address feedback comments * raft: refactor tls keyring logic * address feedback comments * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * address feedback comments * testing: fix import ordering * raft: rename var, cleanup comment line * docs: remove ha_storage restriction note on raft * docs: more raft HA interaction updates with migration and recovery mode * docs: update the raft join command * raft: update comments * raft: add missing isRaftHAOnly check for clearing out state set earlier * raft: update a few ha_storage config checks * Update command/operator_raft_bootstrap.go Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com> * raft: address feedback comments * raft: fix panic when checking for config.HAStorage.Type * Update vault/raft.go Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * Update website/pages/docs/commands/operator/raft.mdx Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> * raft: remove bootstrap cli command * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * Update vault/raft.go Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> * raft: address review feedback * raft: revert vendored sdk * raft: don't send applied index and node ID info if we're HA-only Co-authored-by: Brian Kassouf <briankassouf@users.noreply.github.com> Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com> Co-authored-by: Vishal Nayak <vishalnayak@users.noreply.github.com>
This commit is contained in:
parent
6bd17d7e91
commit
c45bdca0b3
|
@ -153,7 +153,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
|
|||
|
||||
if c.flagReset {
|
||||
if err := SetStorageMigration(from, false); err != nil {
|
||||
return errwrap.Wrapf("error reseting migration lock: {{err}}", err)
|
||||
return errwrap.Wrapf("error resetting migration lock: {{err}}", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
|
|||
}
|
||||
|
||||
if migrationStatus != nil {
|
||||
return fmt.Errorf("Storage migration in progress (started: %s).", migrationStatus.Start.Format(time.RFC3339))
|
||||
return fmt.Errorf("storage migration in progress (started: %s)", migrationStatus.Start.Format(time.RFC3339))
|
||||
}
|
||||
|
||||
switch config.StorageSource.Type {
|
||||
|
@ -259,7 +259,7 @@ func (c *OperatorMigrateCommand) createDestinationBackend(kind string, conf map[
|
|||
if err != nil {
|
||||
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||
}
|
||||
if err := raftStorage.Bootstrap(context.Background(), []raft.Peer{
|
||||
if err := raftStorage.Bootstrap([]raft.Peer{
|
||||
{
|
||||
ID: raftStorage.NodeID(),
|
||||
Address: parsedClusterAddr.Host,
|
||||
|
|
|
@ -110,15 +110,12 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int {
|
|||
|
||||
args = f.Args()
|
||||
switch len(args) {
|
||||
case 0:
|
||||
// No-op: This is acceptable if we're using raft for HA-only
|
||||
case 1:
|
||||
leaderAPIAddr = strings.TrimSpace(args[0])
|
||||
default:
|
||||
c.UI.Error(fmt.Sprintf("Incorrect arguments (expected 1, got %d)", len(args)))
|
||||
return 1
|
||||
}
|
||||
|
||||
if len(leaderAPIAddr) == 0 {
|
||||
c.UI.Error("leader api address is required")
|
||||
c.UI.Error(fmt.Sprintf("Too many arguments (expected 0-1, got %d)", len(args)))
|
||||
return 1
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,14 @@ var enableFourClusterDev = func(c *ServerCommand, base *vault.CoreConfig, info m
|
|||
return 1
|
||||
}
|
||||
|
||||
const storageMigrationLock = "core/migration"
|
||||
const (
|
||||
storageMigrationLock = "core/migration"
|
||||
|
||||
// Even though there are more types than the ones below, the following consts
|
||||
// are declared internally for value comparison and reusability.
|
||||
storageTypeRaft = "raft"
|
||||
storageTypeConsul = "consul"
|
||||
)
|
||||
|
||||
type ServerCommand struct {
|
||||
*BaseCommand
|
||||
|
@ -453,7 +460,7 @@ func (c *ServerCommand) runRecoveryMode() int {
|
|||
c.UI.Error(fmt.Sprintf("Unknown storage type %s", config.Storage.Type))
|
||||
return 1
|
||||
}
|
||||
if config.Storage.Type == "raft" {
|
||||
if config.Storage.Type == storageTypeRaft || (config.HAStorage != nil && config.HAStorage.Type == storageTypeRaft) {
|
||||
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
||||
config.ClusterAddr = envCA
|
||||
}
|
||||
|
@ -963,7 +970,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
|
||||
// Do any custom configuration needed per backend
|
||||
switch config.Storage.Type {
|
||||
case "consul":
|
||||
case storageTypeConsul:
|
||||
if config.ServiceRegistration == nil {
|
||||
// If Consul is configured for storage and service registration is unconfigured,
|
||||
// use Consul for service registration without requiring additional configuration.
|
||||
|
@ -973,7 +980,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
Config: config.Storage.Config,
|
||||
}
|
||||
}
|
||||
case "raft":
|
||||
case storageTypeRaft:
|
||||
if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" {
|
||||
config.ClusterAddr = envCA
|
||||
}
|
||||
|
@ -1145,6 +1152,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
SecureRandomReader: secureRandomReader,
|
||||
}
|
||||
if c.flagDev {
|
||||
coreConfig.EnableRaw = true
|
||||
coreConfig.DevToken = c.flagDevRootTokenID
|
||||
if c.flagDevLeasedKV {
|
||||
coreConfig.LogicalBackends["kv"] = vault.LeasedPassthroughBackendFactory
|
||||
|
@ -1175,24 +1183,26 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
// Initialize the separate HA storage backend, if it exists
|
||||
var ok bool
|
||||
if config.HAStorage != nil {
|
||||
if config.Storage.Type == "raft" {
|
||||
if config.Storage.Type == storageTypeRaft && config.HAStorage.Type == storageTypeRaft {
|
||||
c.UI.Error("Raft cannot be set both as 'storage' and 'ha_storage'. Setting 'storage' to 'raft' will automatically set it up for HA operations as well")
|
||||
return 1
|
||||
}
|
||||
|
||||
if config.Storage.Type == storageTypeRaft {
|
||||
c.UI.Error("HA storage cannot be declared when Raft is the storage type")
|
||||
return 1
|
||||
}
|
||||
|
||||
// TODO: Remove when Raft can server as the ha_storage backend.
|
||||
// See https://github.com/hashicorp/vault/issues/8206
|
||||
if config.HAStorage.Type == "raft" {
|
||||
c.UI.Error("Raft cannot be used as separate HA storage at this time")
|
||||
return 1
|
||||
}
|
||||
factory, exists := c.PhysicalBackends[config.HAStorage.Type]
|
||||
if !exists {
|
||||
c.UI.Error(fmt.Sprintf("Unknown HA storage type %s", config.HAStorage.Type))
|
||||
return 1
|
||||
|
||||
}
|
||||
habackend, err := factory(config.HAStorage.Config, c.logger)
|
||||
|
||||
namedHALogger := c.logger.Named("ha." + config.HAStorage.Type)
|
||||
allLoggers = append(allLoggers, namedHALogger)
|
||||
habackend, err := factory(config.HAStorage.Config, namedHALogger)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf(
|
||||
"Error initializing HA storage of type %s: %s", config.HAStorage.Type, err))
|
||||
|
@ -1211,10 +1221,13 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
coreConfig.RedirectAddr = config.HAStorage.RedirectAddr
|
||||
|
||||
// TODO: Check for raft and disableClustering case when Raft on HA
|
||||
// Storage support is added.
|
||||
disableClustering = config.HAStorage.DisableClustering
|
||||
|
||||
if config.HAStorage.Type == storageTypeRaft && disableClustering {
|
||||
c.UI.Error("Disable clustering cannot be set to true when Raft is the HA storage type")
|
||||
return 1
|
||||
}
|
||||
|
||||
if !disableClustering {
|
||||
coreConfig.ClusterAddr = config.HAStorage.ClusterAddr
|
||||
}
|
||||
|
@ -1223,7 +1236,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
coreConfig.RedirectAddr = config.Storage.RedirectAddr
|
||||
disableClustering = config.Storage.DisableClustering
|
||||
|
||||
if config.Storage.Type == "raft" && disableClustering {
|
||||
if (config.Storage.Type == storageTypeRaft) && disableClustering {
|
||||
c.UI.Error("Disable clustering cannot be set to true when Raft is the storage type")
|
||||
return 1
|
||||
}
|
||||
|
@ -1559,7 +1572,8 @@ CLUSTER_SYNTHESIS_COMPLETE:
|
|||
|
||||
// When the underlying storage is raft, kick off retry join if it was specified
|
||||
// in the configuration
|
||||
if config.Storage.Type == "raft" {
|
||||
// TODO: Should we also support retry_join for ha_storage?
|
||||
if config.Storage.Type == storageTypeRaft {
|
||||
if err := core.InitiateRetryJoin(context.Background()); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to initiate raft retry join, %q", err.Error()))
|
||||
return 1
|
||||
|
|
|
@ -415,7 +415,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||
|
||||
addressProvider := &TestRaftServerAddressProvider{Cluster: cluster}
|
||||
|
||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
|
||||
leader := cluster.Cores[0]
|
||||
|
||||
|
|
|
@ -130,9 +130,56 @@ func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger) *vault.Physi
|
|||
}
|
||||
}
|
||||
|
||||
type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions)
|
||||
// RaftHAFactory returns a PhysicalBackendBundle with raft set as the HABackend
|
||||
// and the physical.Backend provided in PhysicalBackendBundler as the storage
|
||||
// backend.
|
||||
func RaftHAFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
// Call the factory func to create the storage backend
|
||||
physFactory := SharedPhysicalFactory(f)
|
||||
bundle := physFactory(t, coreIdx, logger)
|
||||
|
||||
func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
// This can happen if a shared physical backend is called on a non-0th core.
|
||||
if bundle == nil {
|
||||
bundle = new(vault.PhysicalBackendBundle)
|
||||
}
|
||||
|
||||
raftDir := makeRaftDir(t)
|
||||
cleanupFunc := func() {
|
||||
os.RemoveAll(raftDir)
|
||||
}
|
||||
|
||||
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||
conf := map[string]string{
|
||||
"path": raftDir,
|
||||
"node_id": nodeID,
|
||||
"performance_multiplier": "8",
|
||||
}
|
||||
|
||||
// Create and set the HA Backend
|
||||
raftBackend, err := raft.NewRaftBackend(conf, logger)
|
||||
if err != nil {
|
||||
bundle.Cleanup()
|
||||
t.Fatal(err)
|
||||
}
|
||||
bundle.HABackend = raftBackend.(physical.HABackend)
|
||||
|
||||
// Re-wrap the cleanup func
|
||||
bundleCleanup := bundle.Cleanup
|
||||
bundle.Cleanup = func() {
|
||||
if bundleCleanup != nil {
|
||||
bundleCleanup()
|
||||
}
|
||||
cleanupFunc()
|
||||
}
|
||||
|
||||
return bundle
|
||||
}
|
||||
}
|
||||
|
||||
type PhysicalBackendBundler func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle
|
||||
|
||||
func SharedPhysicalFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
if coreIdx == 0 {
|
||||
return f(t, logger)
|
||||
|
@ -141,6 +188,8 @@ func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.Physi
|
|||
}
|
||||
}
|
||||
|
||||
type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions)
|
||||
|
||||
func InmemBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||
opts.PhysicalFactory = SharedPhysicalFactory(MakeInmemBackend)
|
||||
}
|
||||
|
@ -166,6 +215,11 @@ func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
|||
}
|
||||
}
|
||||
|
||||
func RaftHASetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, bundler PhysicalBackendBundler) {
|
||||
opts.KeepStandbysSealed = true
|
||||
opts.PhysicalFactory = RaftHAFactory(bundler)
|
||||
}
|
||||
|
||||
func ClusterSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, setup ClusterSetupMutator) (*vault.CoreConfig, *vault.TestClusterOptions) {
|
||||
var localConf vault.CoreConfig
|
||||
if conf != nil {
|
||||
|
|
|
@ -5,12 +5,12 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
raftlib "github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// ReusableStorage is a physical backend that can be re-used across
|
||||
|
@ -59,8 +59,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica
|
|||
},
|
||||
|
||||
// No-op
|
||||
Cleanup: func(t testing.T, cluster *vault.TestCluster) {
|
||||
},
|
||||
Cleanup: func(t testing.T, cluster *vault.TestCluster) {},
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
|
@ -74,10 +73,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica
|
|||
|
||||
// MakeReusableRaftStorage makes a physical raft backend that can be re-used
|
||||
// across multiple test clusters in sequence.
|
||||
func MakeReusableRaftStorage(
|
||||
t testing.T, logger hclog.Logger, numCores int,
|
||||
addressProvider raftlib.ServerAddressProvider,
|
||||
) (ReusableStorage, StorageCleanup) {
|
||||
func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int, addressProvider raftlib.ServerAddressProvider) (ReusableStorage, StorageCleanup) {
|
||||
|
||||
raftDirs := make([]string, numCores)
|
||||
for i := 0; i < numCores; i++ {
|
||||
|
@ -91,7 +87,7 @@ func MakeReusableRaftStorage(
|
|||
conf.DisablePerformanceStandby = true
|
||||
opts.KeepStandbysSealed = true
|
||||
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider)
|
||||
return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider, false)
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -120,6 +116,49 @@ func CloseRaftStorage(t testing.T, cluster *vault.TestCluster, idx int) {
|
|||
}
|
||||
}
|
||||
|
||||
func MakeReusableRaftHAStorage(t testing.T, logger hclog.Logger, numCores int, bundle *vault.PhysicalBackendBundle) (ReusableStorage, StorageCleanup) {
|
||||
raftDirs := make([]string, numCores)
|
||||
for i := 0; i < numCores; i++ {
|
||||
raftDirs[i] = makeRaftDir(t)
|
||||
}
|
||||
|
||||
storage := ReusableStorage{
|
||||
Setup: func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
|
||||
opts.KeepStandbysSealed = true
|
||||
opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle {
|
||||
haBundle := makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], nil, true)
|
||||
|
||||
return &vault.PhysicalBackendBundle{
|
||||
Backend: bundle.Backend,
|
||||
HABackend: haBundle.HABackend,
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Close open files being used by raft.
|
||||
Cleanup: func(t testing.T, cluster *vault.TestCluster) {
|
||||
for _, core := range cluster.Cores {
|
||||
raftStorage := core.UnderlyingHAStorage.(*raft.RaftBackend)
|
||||
if err := raftStorage.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
if bundle.Cleanup != nil {
|
||||
bundle.Cleanup()
|
||||
}
|
||||
|
||||
for _, rd := range raftDirs {
|
||||
os.RemoveAll(rd)
|
||||
}
|
||||
}
|
||||
|
||||
return storage, cleanup
|
||||
}
|
||||
|
||||
func makeRaftDir(t testing.T) string {
|
||||
raftDir, err := ioutil.TempDir("", "vault-raft-")
|
||||
if err != nil {
|
||||
|
@ -129,10 +168,7 @@ func makeRaftDir(t testing.T) string {
|
|||
return raftDir
|
||||
}
|
||||
|
||||
func makeReusableRaftBackend(
|
||||
t testing.T, coreIdx int, logger hclog.Logger, raftDir string,
|
||||
addressProvider raftlib.ServerAddressProvider,
|
||||
) *vault.PhysicalBackendBundle {
|
||||
func makeReusableRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, raftDir string, addressProvider raftlib.ServerAddressProvider, ha bool) *vault.PhysicalBackendBundle {
|
||||
|
||||
nodeID := fmt.Sprintf("core-%d", coreIdx)
|
||||
conf := map[string]string{
|
||||
|
@ -146,9 +182,16 @@ func makeReusableRaftBackend(
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if addressProvider != nil {
|
||||
backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
}
|
||||
|
||||
return &vault.PhysicalBackendBundle{
|
||||
Backend: backend,
|
||||
bundle := new(vault.PhysicalBackendBundle)
|
||||
|
||||
if ha {
|
||||
bundle.HABackend = backend.(physical.HABackend)
|
||||
} else {
|
||||
bundle.Backend = backend
|
||||
}
|
||||
return bundle
|
||||
}
|
||||
|
|
|
@ -144,6 +144,7 @@ func Handler(props *vault.HandlerProperties) http.Handler {
|
|||
mux.Handle("/v1/sys/rekey-recovery-key/init", handleRequestForwarding(core, handleSysRekeyInit(core, true)))
|
||||
mux.Handle("/v1/sys/rekey-recovery-key/update", handleRequestForwarding(core, handleSysRekeyUpdate(core, true)))
|
||||
mux.Handle("/v1/sys/rekey-recovery-key/verify", handleRequestForwarding(core, handleSysRekeyVerify(core, true)))
|
||||
mux.Handle("/v1/sys/storage/raft/bootstrap", handleSysRaftBootstrap(core))
|
||||
mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core))
|
||||
for _, path := range injectDataIntoTopRoutes {
|
||||
mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core)))
|
||||
|
|
|
@ -12,6 +12,25 @@ import (
|
|||
"github.com/hashicorp/vault/vault"
|
||||
)
|
||||
|
||||
func handleSysRaftBootstrap(core *vault.Core) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case "POST", "PUT":
|
||||
if core.Sealed() {
|
||||
respondError(w, http.StatusBadRequest, errors.New("node must be unsealed to bootstrap"))
|
||||
}
|
||||
|
||||
if err := core.RaftBootstrap(context.Background(), false); err != nil {
|
||||
respondError(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
default:
|
||||
respondError(w, http.StatusBadRequest, nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func handleSysRaftJoin(core *vault.Core) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
|
@ -53,6 +72,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
|
|||
Retry: req.Retry,
|
||||
},
|
||||
}
|
||||
|
||||
joined, err := core.JoinRaftCluster(context.Background(), leaderInfos, req.NonVoter)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, err)
|
||||
|
|
|
@ -41,6 +41,8 @@ const EnvVaultRaftPath = "VAULT_RAFT_PATH"
|
|||
// Verify RaftBackend satisfies the correct interfaces
|
||||
var _ physical.Backend = (*RaftBackend)(nil)
|
||||
var _ physical.Transactional = (*RaftBackend)(nil)
|
||||
var _ physical.HABackend = (*RaftBackend)(nil)
|
||||
var _ physical.Lock = (*RaftLock)(nil)
|
||||
|
||||
var (
|
||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||
|
@ -68,6 +70,11 @@ type RaftBackend struct {
|
|||
// raft is the instance of raft we will operate on.
|
||||
raft *raft.Raft
|
||||
|
||||
// raftInitCh is used to block during HA lock acquisition if raft
|
||||
// has not been initialized yet, which can occur if raft is being
|
||||
// used for HA-only.
|
||||
raftInitCh chan struct{}
|
||||
|
||||
// raftNotifyCh is used to receive updates about leadership changes
|
||||
// regarding this node.
|
||||
raftNotifyCh chan bool
|
||||
|
@ -323,6 +330,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
|
|||
return &RaftBackend{
|
||||
logger: logger,
|
||||
fsm: fsm,
|
||||
raftInitCh: make(chan struct{}),
|
||||
conf: conf,
|
||||
logStore: log,
|
||||
stableStore: stable,
|
||||
|
@ -420,7 +428,7 @@ func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvid
|
|||
}
|
||||
|
||||
// Bootstrap prepares the given peers to be part of the raft cluster
|
||||
func (b *RaftBackend) Bootstrap(ctx context.Context, peers []Peer) error {
|
||||
func (b *RaftBackend) Bootstrap(peers []Peer) error {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
|
@ -532,6 +540,13 @@ func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error
|
|||
})
|
||||
}
|
||||
|
||||
func (b *RaftBackend) HasState() (bool, error) {
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
|
||||
}
|
||||
|
||||
// SetupCluster starts the raft cluster and enables the networking needed for
|
||||
// the raft nodes to communicate.
|
||||
func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
||||
|
@ -699,6 +714,10 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|||
opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
|
||||
}
|
||||
|
||||
// Close the init channel to signal setup has been completed
|
||||
close(b.raftInitCh)
|
||||
|
||||
b.logger.Trace("finished setting up raft cluster")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -712,6 +731,9 @@ func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error
|
|||
b.l.Lock()
|
||||
future := b.raft.Shutdown()
|
||||
b.raft = nil
|
||||
|
||||
// If we're tearing down, then we need to recreate the raftInitCh
|
||||
b.raftInitCh = make(chan struct{})
|
||||
b.l.Unlock()
|
||||
|
||||
return future.Error()
|
||||
|
@ -1111,10 +1133,10 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// HAEnabled is the implemention of the HABackend interface
|
||||
// HAEnabled is the implementation of the HABackend interface
|
||||
func (b *RaftBackend) HAEnabled() bool { return true }
|
||||
|
||||
// HAEnabled is the implemention of the HABackend interface
|
||||
// HAEnabled is the implementation of the HABackend interface
|
||||
func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) {
|
||||
return &RaftLock{
|
||||
key: key,
|
||||
|
@ -1161,18 +1183,27 @@ func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-ch
|
|||
// Lock blocks until we become leader or are shutdown. It returns a channel that
|
||||
// is closed when we detect a loss of leadership.
|
||||
func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||
// If not initialized, block until it is
|
||||
if !l.b.Initialized() {
|
||||
select {
|
||||
case <-l.b.raftInitCh:
|
||||
case <-stopCh:
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
l.b.l.RLock()
|
||||
|
||||
// Ensure that we still have a raft instance after grabbing the read lock
|
||||
if l.b.raft == nil {
|
||||
l.b.l.RUnlock()
|
||||
return nil, errors.New("attempted to grab a lock on a nil raft backend")
|
||||
}
|
||||
|
||||
// Cache the notifyCh locally
|
||||
leaderNotifyCh := l.b.raftNotifyCh
|
||||
|
||||
// TODO: Remove when Raft can server as the ha_storage backend. The internal
|
||||
// raft pointer should not be nil here, but the nil check is a guard against
|
||||
// https://github.com/hashicorp/vault/issues/8206
|
||||
if l.b.raft == nil {
|
||||
return nil, errors.New("attempted to grab a lock on a nil raft backend")
|
||||
}
|
||||
|
||||
// Check to see if we are already leader.
|
||||
if l.b.raft.State() == raft.Leader {
|
||||
err := l.b.applyLog(context.Background(), &LogData{
|
||||
|
@ -1225,6 +1256,10 @@ func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|||
|
||||
// Unlock gives up leadership.
|
||||
func (l *RaftLock) Unlock() error {
|
||||
if l.b.raft == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return l.b.raft.LeadershipTransfer().Error()
|
||||
}
|
||||
|
||||
|
|
|
@ -36,8 +36,13 @@ func getRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str
|
|||
}
|
||||
|
||||
func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) {
|
||||
id, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "raft",
|
||||
Name: fmt.Sprintf("raft-%s", id),
|
||||
Level: hclog.Trace,
|
||||
})
|
||||
logger.Info("raft dir", "dir", raftDir)
|
||||
|
@ -45,6 +50,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
|
|||
conf := map[string]string{
|
||||
"path": raftDir,
|
||||
"trailing_logs": "100",
|
||||
"node_id": id,
|
||||
}
|
||||
|
||||
if noStoreState {
|
||||
|
@ -58,7 +64,12 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
|
|||
backend := backendRaw.(*RaftBackend)
|
||||
|
||||
if bootstrap {
|
||||
err = backend.Bootstrap(context.Background(), []Peer{Peer{ID: backend.NodeID(), Address: backend.NodeID()}})
|
||||
err = backend.Bootstrap([]Peer{
|
||||
{
|
||||
ID: backend.NodeID(),
|
||||
Address: backend.NodeID(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ func addPeer(t *testing.T, leader, follower *RaftBackend) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = follower.Bootstrap(context.Background(), peers)
|
||||
err = follower.Bootstrap(peers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -314,4 +314,3 @@ func writeFile(t *testing.T, filename string, data []byte, perms os.FileMode) {
|
|||
t.Fatalf("Unable to write to file [%s]: %s", filename, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
"github.com/hashicorp/vault/helper/metricsutil"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/internalshared/reloadutil"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/certutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
|
@ -1455,7 +1454,7 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
|
|||
return false, err
|
||||
}
|
||||
|
||||
if err := c.startRaftStorage(ctx); err != nil {
|
||||
if err := c.startRaftBackend(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
@ -1714,7 +1713,7 @@ func (c *Core) sealInternal() error {
|
|||
return c.sealInternalWithOptions(true, false, true)
|
||||
}
|
||||
|
||||
func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft bool) error {
|
||||
func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, performCleanup bool) error {
|
||||
// Mark sealed, and if already marked return
|
||||
if swapped := atomic.CompareAndSwapUint32(c.sealed, 0, 1); !swapped {
|
||||
return nil
|
||||
|
@ -1796,10 +1795,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b
|
|||
|
||||
c.teardownReplicationResolverHandler()
|
||||
|
||||
// If the storage backend needs to be sealed
|
||||
if shutdownRaft {
|
||||
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil {
|
||||
// Perform additional cleanup upon sealing.
|
||||
if performCleanup {
|
||||
if raftBackend := c.getRaftBackend(); raftBackend != nil {
|
||||
if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil {
|
||||
c.logger.Error("error stopping storage cluster", "error", err)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
package rafttests
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/api"
|
||||
"github.com/hashicorp/vault/helper/testhelpers"
|
||||
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
|
||||
vaulthttp "github.com/hashicorp/vault/http"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
)
|
||||
|
||||
func TestRaft_HA_NewCluster(t *testing.T) {
|
||||
t.Run("file", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("no_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeFileBackend, false)
|
||||
})
|
||||
|
||||
t.Run("with_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeFileBackend, true)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("inmem", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("no_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeInmemBackend, false)
|
||||
})
|
||||
|
||||
t.Run("with_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeInmemBackend, true)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("consul", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("no_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeConsulBackend, false)
|
||||
})
|
||||
|
||||
t.Run("with_client_certs", func(t *testing.T) {
|
||||
testRaftHANewCluster(t, teststorage.MakeConsulBackend, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func testRaftHANewCluster(t *testing.T, bundler teststorage.PhysicalBackendBundler, addClientCerts bool) {
|
||||
var conf vault.CoreConfig
|
||||
var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
|
||||
|
||||
teststorage.RaftHASetup(&conf, &opts, bundler)
|
||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||
cluster.Start()
|
||||
defer cluster.Cleanup()
|
||||
|
||||
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
||||
|
||||
leaderCore := cluster.Cores[0]
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
|
||||
// Seal the leader so we can install an address provider
|
||||
{
|
||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
cluster.UnsealCore(t, leaderCore)
|
||||
vault.TestWaitActive(t, leaderCore.Core)
|
||||
}
|
||||
|
||||
// Now unseal core for join commands to work
|
||||
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||
|
||||
joinFunc := func(client *api.Client, addClientCerts bool) {
|
||||
req := &api.RaftJoinRequest{
|
||||
LeaderCACert: string(cluster.CACertPEM),
|
||||
}
|
||||
if addClientCerts {
|
||||
req.LeaderClientCert = string(cluster.CACertPEM)
|
||||
req.LeaderClientKey = string(cluster.CAKeyPEM)
|
||||
}
|
||||
resp, err := client.Sys().RaftJoin(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !resp.Joined {
|
||||
t.Fatalf("failed to join raft cluster")
|
||||
}
|
||||
}
|
||||
|
||||
joinFunc(cluster.Cores[1].Client, addClientCerts)
|
||||
joinFunc(cluster.Cores[2].Client, addClientCerts)
|
||||
|
||||
// Ensure peers are added
|
||||
leaderClient := cluster.Cores[0].Client
|
||||
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
"core-2": true,
|
||||
})
|
||||
|
||||
// Test remove peers
|
||||
_, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
|
||||
"server_id": "core-1",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
|
||||
"server_id": "core-2",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Ensure peers are removed
|
||||
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||
"core-0": true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestRaft_HA_ExistingCluster(t *testing.T) {
|
||||
conf := vault.CoreConfig{
|
||||
DisablePerformanceStandby: true,
|
||||
}
|
||||
opts := vault.TestClusterOptions{
|
||||
HandlerFunc: vaulthttp.Handler,
|
||||
NumCores: vault.DefaultNumCores,
|
||||
KeepStandbysSealed: true,
|
||||
}
|
||||
logger := logging.NewVaultLogger(hclog.Debug).Named(t.Name())
|
||||
|
||||
physBundle := teststorage.MakeInmemBackend(t, logger)
|
||||
physBundle.HABackend = nil
|
||||
|
||||
storage, cleanup := teststorage.MakeReusableStorage(t, logger, physBundle)
|
||||
defer cleanup()
|
||||
|
||||
var (
|
||||
clusterBarrierKeys [][]byte
|
||||
clusterRootToken string
|
||||
)
|
||||
createCluster := func(t *testing.T) {
|
||||
t.Log("simulating cluster creation without raft as HABackend")
|
||||
|
||||
storage.Setup(&conf, &opts)
|
||||
|
||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||
cluster.Start()
|
||||
defer func() {
|
||||
cluster.Cleanup()
|
||||
storage.Cleanup(t, cluster)
|
||||
}()
|
||||
|
||||
clusterBarrierKeys = cluster.BarrierKeys
|
||||
clusterRootToken = cluster.RootToken
|
||||
}
|
||||
|
||||
createCluster(t)
|
||||
|
||||
haStorage, haCleanup := teststorage.MakeReusableRaftHAStorage(t, logger, opts.NumCores, physBundle)
|
||||
defer haCleanup()
|
||||
|
||||
updateCLuster := func(t *testing.T) {
|
||||
t.Log("simulating cluster update with raft as HABackend")
|
||||
|
||||
opts.SkipInit = true
|
||||
haStorage.Setup(&conf, &opts)
|
||||
|
||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||
cluster.Start()
|
||||
defer func() {
|
||||
cluster.Cleanup()
|
||||
haStorage.Cleanup(t, cluster)
|
||||
}()
|
||||
|
||||
// Set cluster values
|
||||
cluster.BarrierKeys = clusterBarrierKeys
|
||||
cluster.RootToken = clusterRootToken
|
||||
|
||||
addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster}
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
|
||||
// Seal the leader so we can install an address provider
|
||||
leaderCore := cluster.Cores[0]
|
||||
{
|
||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||
leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
testhelpers.EnsureCoreUnsealed(t, cluster, leaderCore)
|
||||
}
|
||||
|
||||
// Call the bootstrap on the leader and then ensure that it becomes active
|
||||
leaderClient := cluster.Cores[0].Client
|
||||
leaderClient.SetToken(clusterRootToken)
|
||||
{
|
||||
_, err := leaderClient.Logical().Write("sys/storage/raft/bootstrap", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
vault.TestWaitActive(t, leaderCore.Core)
|
||||
}
|
||||
|
||||
// Set address provider
|
||||
cluster.Cores[1].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
cluster.Cores[2].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
|
||||
|
||||
// Now unseal core for join commands to work
|
||||
testhelpers.EnsureCoresUnsealed(t, cluster)
|
||||
|
||||
joinFunc := func(client *api.Client) {
|
||||
req := &api.RaftJoinRequest{
|
||||
LeaderCACert: string(cluster.CACertPEM),
|
||||
}
|
||||
resp, err := client.Sys().RaftJoin(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !resp.Joined {
|
||||
t.Fatalf("failed to join raft cluster")
|
||||
}
|
||||
}
|
||||
|
||||
joinFunc(cluster.Cores[1].Client)
|
||||
joinFunc(cluster.Cores[2].Client)
|
||||
|
||||
// Ensure peers are added
|
||||
verifyRaftPeers(t, leaderClient, map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
"core-2": true,
|
||||
})
|
||||
}
|
||||
|
||||
updateCLuster(t)
|
||||
}
|
|
@ -47,7 +47,7 @@ func TestRaft_Retry_Join(t *testing.T) {
|
|||
|
||||
leaderCore := cluster.Cores[0]
|
||||
leaderAPI := leaderCore.Client.Address()
|
||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
|
||||
{
|
||||
testhelpers.EnsureCoreSealed(t, leaderCore)
|
||||
|
@ -90,23 +90,7 @@ func TestRaft_Retry_Join(t *testing.T) {
|
|||
cluster.UnsealCore(t, core)
|
||||
}
|
||||
|
||||
checkConfigFunc := func(expected map[string]bool) {
|
||||
secret, err := cluster.Cores[0].Client.Logical().Read("sys/storage/raft/configuration")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{})
|
||||
|
||||
for _, s := range servers {
|
||||
server := s.(map[string]interface{})
|
||||
delete(expected, server["node_id"].(string))
|
||||
}
|
||||
if len(expected) != 0 {
|
||||
t.Fatalf("failed to read configuration successfully")
|
||||
}
|
||||
}
|
||||
|
||||
checkConfigFunc(map[string]bool{
|
||||
verifyRaftPeers(t, cluster.Cores[0].Client, map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
"core-2": true,
|
||||
|
@ -126,7 +110,7 @@ func TestRaft_Join(t *testing.T) {
|
|||
|
||||
leaderCore := cluster.Cores[0]
|
||||
leaderAPI := leaderCore.Client.Address()
|
||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
|
||||
// Seal the leader so we can install an address provider
|
||||
{
|
||||
|
@ -187,23 +171,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||
|
||||
client := cluster.Cores[0].Client
|
||||
|
||||
checkConfigFunc := func(expected map[string]bool) {
|
||||
secret, err := client.Logical().Read("sys/storage/raft/configuration")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{})
|
||||
|
||||
for _, s := range servers {
|
||||
server := s.(map[string]interface{})
|
||||
delete(expected, server["node_id"].(string))
|
||||
}
|
||||
if len(expected) != 0 {
|
||||
t.Fatalf("failed to read configuration successfully")
|
||||
}
|
||||
}
|
||||
|
||||
checkConfigFunc(map[string]bool{
|
||||
verifyRaftPeers(t, client, map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
"core-2": true,
|
||||
|
@ -216,7 +184,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkConfigFunc(map[string]bool{
|
||||
verifyRaftPeers(t, client, map[string]bool{
|
||||
"core-0": true,
|
||||
"core-1": true,
|
||||
})
|
||||
|
@ -228,7 +196,7 @@ func TestRaft_RemovePeer(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkConfigFunc(map[string]bool{
|
||||
verifyRaftPeers(t, client, map[string]bool{
|
||||
"core-0": true,
|
||||
})
|
||||
}
|
||||
|
@ -843,3 +811,39 @@ func BenchmarkRaft_SingleNode(b *testing.B) {
|
|||
|
||||
b.Run("256b", func(b *testing.B) { bench(b, 25) })
|
||||
}
|
||||
|
||||
func verifyRaftPeers(t *testing.T, client *api.Client, expected map[string]bool) {
|
||||
t.Helper()
|
||||
|
||||
resp, err := client.Logical().Read("sys/storage/raft/configuration")
|
||||
if err != nil {
|
||||
t.Fatalf("error reading raft config: %v", err)
|
||||
}
|
||||
|
||||
if resp == nil || resp.Data == nil {
|
||||
t.Fatal("missing response data")
|
||||
}
|
||||
|
||||
config, ok := resp.Data["config"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("missing config in response data")
|
||||
}
|
||||
|
||||
servers, ok := config["servers"].([]interface{})
|
||||
if !ok {
|
||||
t.Fatal("missing servers in response data config")
|
||||
}
|
||||
|
||||
// Iterate through the servers and remove the node found in the response
|
||||
// from the expected collection
|
||||
for _, s := range servers {
|
||||
server := s.(map[string]interface{})
|
||||
delete(expected, server["node_id"].(string))
|
||||
}
|
||||
|
||||
// If the collection is non-empty, it means that the peer was not found in
|
||||
// the response.
|
||||
if len(expected) != 0 {
|
||||
t.Fatalf("failed to read configuration successfully, expected peers no found in configuration list: %v", expected)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func testVariousBackends(t *testing.T, tf testFunc, basePort int, includeRaft bo
|
|||
logger := logger.Named("raft")
|
||||
raftBasePort := basePort + 400
|
||||
|
||||
atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1)
|
||||
atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1)
|
||||
addressProvider := testhelpers.NewHardcodedServerAddressProvider(numTestCores, raftBasePort+10)
|
||||
|
||||
storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores, addressProvider)
|
||||
|
|
14
vault/ha.go
14
vault/ha.go
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/certutil"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
|
@ -709,8 +708,10 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct
|
|||
|
||||
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
|
||||
func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{}) {
|
||||
raftBackend := c.getRaftBackend()
|
||||
isRaft := raftBackend != nil
|
||||
|
||||
opCount := new(int32)
|
||||
_, isRaft := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
for {
|
||||
select {
|
||||
case <-time.After(keyRotateCheckInterval):
|
||||
|
@ -751,9 +752,18 @@ func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{
|
|||
c.logger.Error("key rotation periodic upgrade check failed", "error", err)
|
||||
}
|
||||
|
||||
if isRaft {
|
||||
hasState, err := raftBackend.HasState()
|
||||
if err != nil {
|
||||
c.logger.Error("could not check raft state", "error", err)
|
||||
}
|
||||
|
||||
if raftBackend.Initialized() && hasState {
|
||||
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
|
||||
c.logger.Error("raft tls periodic upgrade check failed", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddInt32(lopCount, -1)
|
||||
return
|
||||
|
|
|
@ -208,30 +208,20 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
|
|||
return nil, ErrAlreadyInit
|
||||
}
|
||||
|
||||
// If we have clustered storage, set it up now
|
||||
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
||||
// Bootstrap the raft backend if that's provided as the physical or
|
||||
// HA backend.
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend != nil {
|
||||
err := c.RaftBootstrap(ctx, true)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||
}
|
||||
if err := raftStorage.Bootstrap(ctx, []raft.Peer{
|
||||
{
|
||||
ID: raftStorage.NodeID(),
|
||||
Address: parsedClusterAddr.Host,
|
||||
},
|
||||
}); err != nil {
|
||||
return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
|
||||
}
|
||||
|
||||
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
||||
StartAsLeader: true,
|
||||
}); err != nil {
|
||||
return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err)
|
||||
c.logger.Error("failed to bootstrap raft", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Teardown cluster after bootstrap setup
|
||||
defer func() {
|
||||
if err := raftStorage.TeardownCluster(nil); err != nil {
|
||||
c.logger.Error("failed to stop raft storage", "error", err)
|
||||
if err := raftBackend.TeardownCluster(nil); err != nil {
|
||||
c.logger.Error("failed to stop raft", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -385,10 +375,12 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
|
|||
results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0])
|
||||
}
|
||||
|
||||
if err := c.createRaftTLSKeyring(ctx); err != nil {
|
||||
if raftBackend != nil {
|
||||
if _, err := c.raftCreateTLSKeyring(ctx); err != nil {
|
||||
c.logger.Error("failed to create raft TLS keyring", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare to re-seal
|
||||
if err := c.preSeal(); err != nil {
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
"github.com/hashicorp/vault/helper/monitor"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/helper/random"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/framework"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
|
@ -166,7 +165,7 @@ func NewSystemBackend(core *Core, logger log.Logger) *SystemBackend {
|
|||
b.Backend.Paths = append(b.Backend.Paths, b.rawPaths()...)
|
||||
}
|
||||
|
||||
if _, ok := core.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
if backend := core.getRaftBackend(); backend != nil {
|
||||
b.Backend.Paths = append(b.Backend.Paths, b.raftStoragePaths()...)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,14 +7,15 @@ import (
|
|||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/framework"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/framework"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
|
||||
// raftStoragePaths returns paths for use when raft is the storage mechanism.
|
||||
|
@ -132,13 +133,12 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path {
|
|||
|
||||
func (b *SystemBackend) handleRaftConfigurationGet() framework.OperationFunc {
|
||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||
|
||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
raftBackend := b.Core.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
config, err := raftStorage.GetConfiguration(ctx)
|
||||
config, err := raftBackend.GetConfiguration(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -158,12 +158,12 @@ func (b *SystemBackend) handleRaftRemovePeerUpdate() framework.OperationFunc {
|
|||
return logical.ErrorResponse("no server id provided"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
raftBackend := b.Core.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
if err := raftStorage.RemovePeer(ctx, serverID); err != nil {
|
||||
if err := raftBackend.RemovePeer(ctx, serverID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if b.Core.raftFollowerStates != nil {
|
||||
|
@ -221,8 +221,8 @@ func (b *SystemBackend) handleRaftBootstrapChallengeWrite() framework.OperationF
|
|||
|
||||
func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc {
|
||||
return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
|
||||
raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
raftBackend := b.Core.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest
|
||||
}
|
||||
|
||||
|
@ -271,9 +271,9 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
|||
|
||||
switch nonVoter {
|
||||
case true:
|
||||
err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr)
|
||||
err = raftBackend.AddNonVotingPeer(ctx, serverID, clusterAddr)
|
||||
default:
|
||||
err = raftStorage.AddPeer(ctx, serverID, clusterAddr)
|
||||
err = raftBackend.AddPeer(ctx, serverID, clusterAddr)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -283,7 +283,7 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
|
|||
b.Core.raftFollowerStates.update(serverID, 0)
|
||||
}
|
||||
|
||||
peers, err := raftStorage.Peers(ctx)
|
||||
peers, err := raftBackend.Peers(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
448
vault/raft.go
448
vault/raft.go
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/errwrap"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
wrapping "github.com/hashicorp/go-kms-wrapping"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/vault/api"
|
||||
|
@ -31,6 +32,9 @@ import (
|
|||
var (
|
||||
raftTLSStoragePath = "core/raft/tls"
|
||||
raftTLSRotationPeriod = 24 * time.Hour
|
||||
|
||||
// TestingUpdateClusterAddr is used in tests to override the cluster address
|
||||
TestingUpdateClusterAddr uint32
|
||||
)
|
||||
|
||||
type raftFollowerStates struct {
|
||||
|
@ -88,14 +92,11 @@ func (c *Core) GetRaftIndexes() (committed uint64, applied uint64) {
|
|||
return raftStorage.CommittedIndex(), raftStorage.AppliedIndex()
|
||||
}
|
||||
|
||||
// startRaftStorage will call SetupCluster in the raft backend which starts raft
|
||||
// startRaftBackend will call SetupCluster in the raft backend which starts raft
|
||||
// up and enables the cluster handler.
|
||||
func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if raftStorage.Initialized() {
|
||||
func (c *Core) startRaftBackend(ctx context.Context) (retErr error) {
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil || raftBackend.Initialized() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -109,6 +110,15 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||
var raftTLS *raft.TLSKeyring
|
||||
switch raftTLSEntry {
|
||||
case nil:
|
||||
// If this is HA-only and no TLS keyring is found, that means the
|
||||
// cluster has not been bootstrapped or joined. We return early here in
|
||||
// this case. If we return here, the raft object has not been instantiated,
|
||||
// and a bootstrap call should be made.
|
||||
if c.isRaftHAOnly() {
|
||||
c.logger.Trace("skipping raft backend setup during unseal, no bootstrap operation has been started yet")
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we did not find a TLS keyring we will attempt to create one here.
|
||||
// This happens after a storage migration process. This node is also
|
||||
// marked to start as leader so we can write the new TLS Key. This is an
|
||||
|
@ -133,8 +143,23 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||
}
|
||||
}
|
||||
|
||||
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||
if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
||||
hasState, err := raftBackend.HasState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This can be hit on follower nodes that got their config updated to use
|
||||
// raft for HA-only before they are joined to the cluster. Since followers
|
||||
// in this case use shared storage, it doesn't return early from the TLS
|
||||
// case above, but there's not raft state yet for the backend to call
|
||||
// raft.SetupCluster.
|
||||
if !hasState {
|
||||
c.logger.Trace("skipping raft backend setup during unseal, no raft state found")
|
||||
return nil
|
||||
}
|
||||
|
||||
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||
if err := raftBackend.SetupCluster(ctx, raft.SetupOpts{
|
||||
TLSKeyring: raftTLS,
|
||||
ClusterListener: c.getClusterListener(),
|
||||
StartAsLeader: creating,
|
||||
|
@ -145,7 +170,7 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) {
|
|||
defer func() {
|
||||
if retErr != nil {
|
||||
c.logger.Info("stopping raft server")
|
||||
if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil {
|
||||
if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil {
|
||||
c.logger.Error("failed to stop raft server", "error", err)
|
||||
}
|
||||
}
|
||||
|
@ -180,7 +205,114 @@ func (c *Core) stopRaftActiveNode() {
|
|||
c.stopPeriodicRaftTLSRotate()
|
||||
}
|
||||
|
||||
// startPeriodicRaftTLSRotate will spawn a go routine in charge of periodically
|
||||
func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
||||
raftBackend := c.getRaftBackend()
|
||||
|
||||
// No-op if raft is not being used
|
||||
if raftBackend == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.raftTLSRotationStopCh = make(chan struct{})
|
||||
logger := c.logger.Named("raft")
|
||||
|
||||
if c.isRaftHAOnly() {
|
||||
return c.raftTLSRotateDirect(ctx, logger, c.raftTLSRotationStopCh)
|
||||
}
|
||||
|
||||
return c.raftTLSRotatePhased(ctx, logger, raftBackend, c.raftTLSRotationStopCh)
|
||||
}
|
||||
|
||||
// raftTLSRotateDirect will spawn a go routine in charge of periodically
|
||||
// rotating the TLS certs and keys used for raft traffic.
|
||||
//
|
||||
// The logic for updating the TLS keyring is through direct storage update. This
|
||||
// is called whenever raft is used for HA-only, which means that the underlying
|
||||
// storage is a shared physical object, thus requiring no additional
|
||||
// coordination.
|
||||
func (c *Core) raftTLSRotateDirect(ctx context.Context, logger hclog.Logger, stopCh chan struct{}) error {
|
||||
logger.Info("creating new raft TLS config")
|
||||
|
||||
rotateKeyring := func() (time.Time, error) {
|
||||
// Create a new key
|
||||
raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to generate new raft TLS key: {{err}}", err)
|
||||
}
|
||||
|
||||
// Read the existing keyring
|
||||
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||
}
|
||||
|
||||
// Advance the term and store the new key, replacing the old one.
|
||||
// Unlike phased rotation, we don't need to update AppliedIndex since
|
||||
// we don't rely on it to check whether the followers got the key. A
|
||||
// shared storage means that followers will have the key as soon as it's
|
||||
// written to storage.
|
||||
keyring.Term += 1
|
||||
keyring.Keys[0] = raftTLSKey
|
||||
keyring.ActiveKeyID = raftTLSKey.ID
|
||||
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||
}
|
||||
if err := c.barrier.Put(ctx, entry); err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to write keyring: {{err}}", err)
|
||||
}
|
||||
|
||||
logger.Info("wrote new raft TLS config")
|
||||
|
||||
// Schedule the next rotation
|
||||
return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil
|
||||
}
|
||||
|
||||
// Read the keyring to calculate the time of next rotation.
|
||||
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
activeKey := keyring.GetActive()
|
||||
if activeKey == nil {
|
||||
return errors.New("no active raft TLS key found")
|
||||
}
|
||||
|
||||
go func() {
|
||||
nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod)
|
||||
|
||||
var backoff bool
|
||||
for {
|
||||
// If we encountered and error we should try to create the key
|
||||
// again.
|
||||
if backoff {
|
||||
nextRotationTime = time.Now().Add(10 * time.Second)
|
||||
backoff = false
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(time.Until(nextRotationTime)):
|
||||
// It's time to rotate the keys
|
||||
next, err := rotateKeyring()
|
||||
if err != nil {
|
||||
logger.Error("failed to rotate TLS key", "error", err)
|
||||
backoff = true
|
||||
continue
|
||||
}
|
||||
|
||||
nextRotationTime = next
|
||||
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// raftTLSRotatePhased will spawn a go routine in charge of periodically
|
||||
// rotating the TLS certs and keys used for raft traffic.
|
||||
//
|
||||
// The logic for updating the TLS certificate uses a pseudo two phase commit
|
||||
|
@ -199,55 +331,30 @@ func (c *Core) stopRaftActiveNode() {
|
|||
// receives the update. This ensures a standby node isn't left behind and unable
|
||||
// to reconnect with the cluster. Additionally, only one outstanding key
|
||||
// is allowed for this same reason (max keyring size of 2).
|
||||
func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raftBackend *raft.RaftBackend, stopCh chan struct{}) error {
|
||||
followerStates := &raftFollowerStates{
|
||||
followers: make(map[string]uint64),
|
||||
}
|
||||
|
||||
// Pre-populate the follower list with the set of peers.
|
||||
raftConfig, err := raftStorage.GetConfiguration(ctx)
|
||||
raftConfig, err := raftBackend.GetConfiguration(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range raftConfig.Servers {
|
||||
if server.NodeID != raftStorage.NodeID() {
|
||||
if server.NodeID != raftBackend.NodeID() {
|
||||
followerStates.update(server.NodeID, 0)
|
||||
}
|
||||
}
|
||||
|
||||
logger := c.logger.Named("raft")
|
||||
c.raftTLSRotationStopCh = stopCh
|
||||
c.raftFollowerStates = followerStates
|
||||
|
||||
readKeyring := func() (*raft.TLSKeyring, error) {
|
||||
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tlsKeyringEntry == nil {
|
||||
return nil, errors.New("no keyring found")
|
||||
}
|
||||
var keyring raft.TLSKeyring
|
||||
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &keyring, nil
|
||||
}
|
||||
|
||||
// rotateKeyring writes new key data to the keyring and adds an applied
|
||||
// index that is used to verify it has been committed. The keys written in
|
||||
// this function can be used on standbys but the active node doesn't start
|
||||
// using it yet.
|
||||
rotateKeyring := func() (time.Time, error) {
|
||||
// Read the existing keyring
|
||||
keyring, err := readKeyring()
|
||||
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||
}
|
||||
|
@ -256,8 +363,8 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0:
|
||||
// If this case is hit then the second write to add the applied
|
||||
// index failed. Attempt to write it again.
|
||||
keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex()
|
||||
keyring.AppliedIndex = raftStorage.AppliedIndex()
|
||||
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
||||
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
||||
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||
|
@ -270,7 +377,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
// If there already exists a pending key update then the update
|
||||
// hasn't replicated down to all standby nodes yet. Don't allow any
|
||||
// new keys to be created until all standbys have seen this previous
|
||||
// rotation. As a backoff strategy another rotation attempt is
|
||||
// rotation. As a backoff strategy, another rotation attempt is
|
||||
// scheduled for 5 minutes from now.
|
||||
logger.Warn("skipping new raft TLS config creation, keys are pending")
|
||||
return time.Now().Add(time.Minute * 5), nil
|
||||
|
@ -296,9 +403,9 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Write the keyring again with the new applied index. This allows us to
|
||||
// track if standby nodes receive the update.
|
||||
keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex()
|
||||
keyring.AppliedIndex = raftStorage.AppliedIndex()
|
||||
// track if standby nodes received the update.
|
||||
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
||||
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
||||
entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||
if err != nil {
|
||||
return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err)
|
||||
|
@ -316,7 +423,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
// finalizes the rotation by deleting the old keys and updating the raft
|
||||
// backend.
|
||||
checkCommitted := func() error {
|
||||
keyring, err := readKeyring()
|
||||
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err)
|
||||
}
|
||||
|
@ -346,7 +453,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Update the TLS Key in the backend
|
||||
if err := raftStorage.SetTLSKeyring(keyring); err != nil {
|
||||
if err := raftBackend.SetTLSKeyring(keyring); err != nil {
|
||||
return errwrap.Wrapf("failed to install keyring: {{err}}", err)
|
||||
}
|
||||
|
||||
|
@ -355,7 +462,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Read the keyring to calculate the time of next rotation.
|
||||
keyring, err := readKeyring()
|
||||
keyring, err := c.raftReadTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -406,14 +513,43 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) createRaftTLSKeyring(ctx context.Context) error {
|
||||
if _, ok := c.underlyingPhysical.(*raft.RaftBackend); !ok {
|
||||
return nil
|
||||
func (c *Core) raftReadTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
||||
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tlsKeyringEntry == nil {
|
||||
return nil, errors.New("no keyring found")
|
||||
}
|
||||
var keyring raft.TLSKeyring
|
||||
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &keyring, nil
|
||||
}
|
||||
|
||||
// raftCreateTLSKeyring creates the initial TLS key and the TLS Keyring for raft
|
||||
// use. If a keyring entry is already present in storage, it will return an
|
||||
// error.
|
||||
func (c *Core) raftCreateTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
||||
if raftBackend := c.getRaftBackend(); raftBackend == nil {
|
||||
return nil, fmt.Errorf("raft backend not in use")
|
||||
}
|
||||
|
||||
// Check if the keyring is already present
|
||||
raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if raftTLSEntry != nil {
|
||||
return nil, fmt.Errorf("TLS keyring already present")
|
||||
}
|
||||
|
||||
raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keyring := &raft.TLSKeyring{
|
||||
|
@ -423,12 +559,12 @@ func (c *Core) createRaftTLSKeyring(ctx context.Context) error {
|
|||
|
||||
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if err := c.barrier.Put(ctx, entry); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
return nil
|
||||
return keyring, nil
|
||||
}
|
||||
|
||||
func (c *Core) stopPeriodicRaftTLSRotate() {
|
||||
|
@ -440,8 +576,8 @@ func (c *Core) stopPeriodicRaftTLSRotate() {
|
|||
}
|
||||
|
||||
func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -458,7 +594,7 @@ func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := raftStorage.SetTLSKeyring(&keyring); err != nil {
|
||||
if err := raftBackend.SetTLSKeyring(&keyring); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -544,16 +680,16 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co
|
|||
}
|
||||
|
||||
func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return errors.New("raft storage not configured")
|
||||
}
|
||||
|
||||
if raftStorage.Initialized() {
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
leaderInfos, err := raftStorage.JoinConfig()
|
||||
if raftBackend.Initialized() {
|
||||
return nil
|
||||
}
|
||||
|
||||
leaderInfos, err := raftBackend.JoinConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -573,23 +709,75 @@ func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return false, errors.New("raft storage not configured")
|
||||
}
|
||||
|
||||
if raftStorage.Initialized() {
|
||||
return false, errors.New("raft storage is already initialized")
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return false, errors.New("raft backend not in use")
|
||||
}
|
||||
|
||||
init, err := c.Initialized(ctx)
|
||||
if err != nil {
|
||||
return false, errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||
}
|
||||
if init {
|
||||
|
||||
isRaftHAOnly := c.isRaftHAOnly()
|
||||
// Prevent join from happening if we're using raft for storage and
|
||||
// it has already been initialized.
|
||||
if init && !isRaftHAOnly {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Check on seal status and storage type before proceeding:
|
||||
// If raft is used for storage, core needs to be sealed
|
||||
if !isRaftHAOnly && !c.Sealed() {
|
||||
c.logger.Error("node must be seal before joining")
|
||||
return false, errors.New("node must be sealed before joining")
|
||||
}
|
||||
|
||||
// If raft is used for ha-only, core needs to be unsealed
|
||||
if isRaftHAOnly && c.Sealed() {
|
||||
c.logger.Error("node must be unsealed before joining")
|
||||
return false, errors.New("node must be unsealed before joining")
|
||||
}
|
||||
|
||||
// Disallow leader API address to be provided if we're using raft for HA-only
|
||||
// The leader API address is obtained directly through storage. This serves
|
||||
// as a form of verification that this node is sharing the same physical
|
||||
// storage as the leader node.
|
||||
if isRaftHAOnly {
|
||||
for _, info := range leaderInfos {
|
||||
if info.LeaderAPIAddr != "" {
|
||||
return false, errors.New("leader API address must be unset when raft is used exclusively for HA")
|
||||
}
|
||||
}
|
||||
|
||||
// Get the leader address from storage
|
||||
keys, err := c.barrier.List(ctx, coreLeaderPrefix)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(keys) == 0 || len(keys[0]) == 0 {
|
||||
return false, errors.New("unable to fetch leadership entry")
|
||||
}
|
||||
|
||||
leadershipEntry := coreLeaderPrefix + keys[0]
|
||||
entry, err := c.barrier.Get(ctx, leadershipEntry)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if entry == nil {
|
||||
return false, errors.New("unable to read leadership entry")
|
||||
}
|
||||
|
||||
var adv activeAdvertisement
|
||||
err = jsonutil.DecodeJSON(entry.Value, &adv)
|
||||
if err != nil {
|
||||
return false, errwrap.Wrapf("unable to decoded leader entry: {{err}}", err)
|
||||
}
|
||||
|
||||
leaderInfos[0].LeaderAPIAddr = adv.RedirectAddr
|
||||
}
|
||||
|
||||
join := func(retry bool) error {
|
||||
joinLeader := func(leaderInfo *raft.LeaderJoinInfo) error {
|
||||
if leaderInfo == nil {
|
||||
|
@ -603,14 +791,11 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
if err != nil {
|
||||
return errwrap.Wrapf("failed to check if core is initialized: {{err}}", err)
|
||||
}
|
||||
if init {
|
||||
|
||||
if init && !isRaftHAOnly {
|
||||
c.logger.Info("returning from raft join as the node is initialized")
|
||||
return nil
|
||||
}
|
||||
if !c.Sealed() {
|
||||
c.logger.Info("returning from raft join as the node is unsealed")
|
||||
return nil
|
||||
}
|
||||
|
||||
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr)
|
||||
|
||||
|
@ -648,7 +833,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
|
||||
// Attempt to join the leader by requesting for the bootstrap challenge
|
||||
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
||||
"server_id": raftStorage.NodeID(),
|
||||
"server_id": raftBackend.NodeID(),
|
||||
})
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error during raft bootstrap init call: {{err}}", err)
|
||||
|
@ -687,7 +872,10 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
nonVoter: nonVoter,
|
||||
}
|
||||
|
||||
if c.seal.BarrierType() == wrapping.Shamir {
|
||||
// If we're using Shamir and using raft for both physical and HA, we
|
||||
// need to block until the node is unsealed, unless retry is set to
|
||||
// false.
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||
c.raftInfo = raftInfo
|
||||
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
||||
return err
|
||||
|
@ -708,7 +896,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
return errwrap.Wrapf("failed to send answer to raft leader node: {{err}}", err)
|
||||
}
|
||||
|
||||
if c.seal.BarrierType() == wrapping.Shamir {
|
||||
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
||||
// Reset the state
|
||||
c.raftInfo = nil
|
||||
|
||||
|
@ -763,20 +951,41 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// This is used in tests to override the cluster address
|
||||
var UpdateClusterAddrForTests uint32
|
||||
// getRaftBackend returns the RaftBackend from the HA or physical backend,
|
||||
// in that order of preference, or nil if not of type RaftBackend.
|
||||
func (c *Core) getRaftBackend() *raft.RaftBackend {
|
||||
var raftBackend *raft.RaftBackend
|
||||
|
||||
if raftHA, ok := c.ha.(*raft.RaftBackend); ok {
|
||||
raftBackend = raftHA
|
||||
}
|
||||
|
||||
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
raftBackend = raftStorage
|
||||
}
|
||||
|
||||
return raftBackend
|
||||
}
|
||||
|
||||
// isRaftHAOnly returns true if c.ha is raft and physical storage is non-raft
|
||||
func (c *Core) isRaftHAOnly() bool {
|
||||
_, isRaftHA := c.ha.(*raft.RaftBackend)
|
||||
_, isRaftStorage := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
|
||||
return isRaftHA && !isRaftStorage
|
||||
}
|
||||
|
||||
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, raftInfo *raftInformation) error {
|
||||
if raftInfo.challenge == nil {
|
||||
return errors.New("raft challenge is nil")
|
||||
}
|
||||
|
||||
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return errors.New("raft storage not in use")
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return errors.New("raft backend is not in use")
|
||||
}
|
||||
|
||||
if raftStorage.Initialized() {
|
||||
if raftBackend.Initialized() {
|
||||
return errors.New("raft is already initialized")
|
||||
}
|
||||
|
||||
|
@ -790,7 +999,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||
return errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||
}
|
||||
clusterAddr := parsedClusterAddr.Host
|
||||
if atomic.LoadUint32(&UpdateClusterAddrForTests) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
||||
if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
||||
// We are testing and have an address provider, so just create a random
|
||||
// addr, it will be overwritten later.
|
||||
var err error
|
||||
|
@ -804,7 +1013,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||
if err := answerReq.SetJSONBody(map[string]interface{}{
|
||||
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
||||
"cluster_addr": clusterAddr,
|
||||
"server_id": raftStorage.NodeID(),
|
||||
"server_id": raftBackend.NodeID(),
|
||||
"non_voter": raftInfo.nonVoter,
|
||||
}); err != nil {
|
||||
return err
|
||||
|
@ -823,15 +1032,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||
return err
|
||||
}
|
||||
|
||||
raftStorage.Bootstrap(ctx, answerResp.Data.Peers)
|
||||
if err := raftBackend.Bootstrap(answerResp.Data.Peers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.startClusterListener(ctx)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error starting cluster: {{err}}", err)
|
||||
}
|
||||
|
||||
raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||
err = raftStorage.SetupCluster(ctx, raft.SetupOpts{
|
||||
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||
err = raftBackend.SetupCluster(ctx, raft.SetupOpts{
|
||||
TLSKeyring: answerResp.Data.TLSKeyring,
|
||||
ClusterListener: c.getClusterListener(),
|
||||
})
|
||||
|
@ -842,6 +1053,57 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access,
|
|||
return nil
|
||||
}
|
||||
|
||||
// RaftBootstrap performs bootstrapping of a raft cluster if core contains a raft
|
||||
// backend. If raft is not part for the storage or HA storage backend, this
|
||||
// call results in an error.
|
||||
func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error {
|
||||
if c.logger.IsDebug() {
|
||||
c.logger.Debug("bootstrapping raft backend")
|
||||
defer c.logger.Debug("finished bootstrapping raft backend")
|
||||
}
|
||||
|
||||
raftBackend := c.getRaftBackend()
|
||||
if raftBackend == nil {
|
||||
return errors.New("raft backend not in use")
|
||||
}
|
||||
|
||||
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("error parsing cluster address: {{err}}", err)
|
||||
}
|
||||
if err := raftBackend.Bootstrap([]raft.Peer{
|
||||
{
|
||||
ID: raftBackend.NodeID(),
|
||||
Address: parsedClusterAddr.Host,
|
||||
},
|
||||
}); err != nil {
|
||||
return errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err)
|
||||
}
|
||||
|
||||
raftOpts := raft.SetupOpts{
|
||||
StartAsLeader: true,
|
||||
}
|
||||
|
||||
if !onInit {
|
||||
// Generate the TLS Keyring info for SetupCluster to consume
|
||||
raftTLS, err := c.raftCreateTLSKeyring(ctx)
|
||||
if err != nil {
|
||||
return errwrap.Wrapf("could not generate TLS keyring during bootstrap: {{err}}", err)
|
||||
}
|
||||
|
||||
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
||||
raftOpts.ClusterListener = c.getClusterListener()
|
||||
|
||||
raftOpts.TLSKeyring = raftTLS
|
||||
}
|
||||
|
||||
if err := raftBackend.SetupCluster(ctx, raftOpts); err != nil {
|
||||
return errwrap.Wrapf("could not start clustered storage: {{err}}", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) isRaftUnseal() bool {
|
||||
return c.raftInfo != nil
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/helper/forwarding"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/vault/replication"
|
||||
)
|
||||
|
@ -82,9 +81,11 @@ func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (
|
|||
ReplicationState: uint32(s.core.ReplicationState()),
|
||||
}
|
||||
|
||||
if raftStorage, ok := s.core.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
reply.RaftAppliedIndex = raftStorage.AppliedIndex()
|
||||
reply.RaftNodeID = raftStorage.NodeID()
|
||||
if raftBackend := s.core.getRaftBackend(); raftBackend != nil {
|
||||
if !s.core.isRaftHAOnly() {
|
||||
reply.RaftAppliedIndex = raftBackend.AppliedIndex()
|
||||
reply.RaftNodeID = raftBackend.NodeID()
|
||||
}
|
||||
}
|
||||
|
||||
return reply, nil
|
||||
|
@ -111,9 +112,11 @@ func (c *forwardingClient) startHeartbeat() {
|
|||
ClusterAddr: clusterAddr,
|
||||
}
|
||||
|
||||
if raftStorage, ok := c.core.underlyingPhysical.(*raft.RaftBackend); ok {
|
||||
req.RaftAppliedIndex = raftStorage.AppliedIndex()
|
||||
req.RaftNodeID = raftStorage.NodeID()
|
||||
if raftBackend := c.core.getRaftBackend(); raftBackend != nil {
|
||||
if !c.core.isRaftHAOnly() {
|
||||
req.RaftAppliedIndex = raftBackend.AppliedIndex()
|
||||
req.RaftNodeID = raftBackend.NodeID()
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
|
||||
|
|
|
@ -27,11 +27,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/mitchellh/copystructure"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"golang.org/x/net/http2"
|
||||
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
raftlib "github.com/hashicorp/raft"
|
||||
|
@ -53,6 +48,10 @@ import (
|
|||
physInmem "github.com/hashicorp/vault/sdk/physical/inmem"
|
||||
"github.com/hashicorp/vault/vault/cluster"
|
||||
"github.com/hashicorp/vault/vault/seal"
|
||||
"github.com/mitchellh/copystructure"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
// This file contains a number of methods that are useful for unit
|
||||
|
@ -888,8 +887,11 @@ func CleanupClusters(clusters []*TestCluster) {
|
|||
func (c *TestCluster) Cleanup() {
|
||||
c.Logger.Info("cleaning up vault cluster")
|
||||
for _, core := range c.Cores {
|
||||
// Upgrade logger to emit errors if not doing so already
|
||||
if !core.CoreConfig.Logger.IsError() {
|
||||
core.CoreConfig.Logger.SetLevel(log.Error)
|
||||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, core := range c.Cores {
|
||||
|
@ -964,6 +966,7 @@ type TestClusterCore struct {
|
|||
TLSConfig *tls.Config
|
||||
UnderlyingStorage physical.Backend
|
||||
UnderlyingRawStorage physical.Backend
|
||||
UnderlyingHAStorage physical.HABackend
|
||||
Barrier SecurityBarrier
|
||||
NodeID string
|
||||
}
|
||||
|
@ -1506,6 +1509,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
|
|||
Barrier: cores[i].barrier,
|
||||
NodeID: fmt.Sprintf("core-%d", i),
|
||||
UnderlyingRawStorage: coreConfigs[i].Physical,
|
||||
UnderlyingHAStorage: coreConfigs[i].HAPhysical,
|
||||
}
|
||||
tcc.ReloadFuncs = &cores[i].reloadFuncs
|
||||
tcc.ReloadFuncsLock = &cores[i].reloadFuncsLock
|
||||
|
@ -1675,9 +1679,14 @@ func (testCluster *TestCluster) newCore(
|
|||
case physBundle == nil && coreConfig.Physical == nil:
|
||||
t.Fatal("PhysicalFactory produced no physical and none in CoreConfig")
|
||||
case physBundle != nil:
|
||||
// Storage backend setup
|
||||
if physBundle.Backend != nil {
|
||||
testCluster.Logger.Info("created physical backend", "instance", idx)
|
||||
coreConfig.Physical = physBundle.Backend
|
||||
localConfig.Physical = physBundle.Backend
|
||||
}
|
||||
|
||||
// HA Backend setup
|
||||
haBackend := physBundle.HABackend
|
||||
if haBackend == nil {
|
||||
if ha, ok := physBundle.Backend.(physical.HABackend); ok {
|
||||
|
@ -1686,6 +1695,8 @@ func (testCluster *TestCluster) newCore(
|
|||
}
|
||||
coreConfig.HAPhysical = haBackend
|
||||
localConfig.HAPhysical = haBackend
|
||||
|
||||
// Cleanup setup
|
||||
if physBundle.Cleanup != nil {
|
||||
cleanupFunc = physBundle.Cleanup
|
||||
}
|
||||
|
|
|
@ -141,6 +141,7 @@ $ curl \
|
|||
|
||||
This endpoint returns a snapshot of the current state of the raft cluster. The
|
||||
snapshot is returned as binary data and should be redirected to a file.
|
||||
Unavailable if Raft is used exclusively for `ha_storage`.
|
||||
|
||||
| Method | Path |
|
||||
| :----- | :--------------------------- |
|
||||
|
@ -157,7 +158,8 @@ $ curl \
|
|||
|
||||
## Restore Raft using a snapshot
|
||||
|
||||
Installs the provided snapshot, returning the cluster to the state defined in it.
|
||||
Installs the provided snapshot, returning the cluster to the state defined in
|
||||
it. Unavailable if Raft is used exclusively for `ha_storage`.
|
||||
|
||||
| Method | Path |
|
||||
| :----- | :--------------------------- |
|
||||
|
@ -178,7 +180,7 @@ $ curl \
|
|||
Installs the provided snapshot, returning the cluster to the state defined in
|
||||
it. This is same as writing to `/sys/storage/raft/snapshot` except that this
|
||||
bypasses checks ensuring the Autounseal or shamir keys are consistent with the
|
||||
snapshot data.
|
||||
snapshot data. Unavailable if Raft is used exclusively for `ha_storage`.
|
||||
|
||||
| Method | Path |
|
||||
| :----- | :--------------------------------- |
|
||||
|
|
|
@ -76,6 +76,10 @@ defined `path`. `node_id` can optionally be set to identify this node.
|
|||
cluster hostname of this node. For more configuration options see the [raft
|
||||
storage configuration documentation](/docs/configuration/storage/raft).
|
||||
|
||||
If the original configuration uses "raft" for `ha_storage` a different
|
||||
`path` needs to be declared for the path in `storage_destination` and the new
|
||||
configuration for the node post-migration.
|
||||
|
||||
```hcl
|
||||
storage_source "consul" {
|
||||
address = "127.0.0.1:8500"
|
||||
|
@ -115,6 +119,9 @@ vault server.
|
|||
After migration the raft cluster will only have a single node. Additional peers
|
||||
should be joined to this node.
|
||||
|
||||
If the cluster was previously HA-enabled using "raft" as the `ha_storage`, the
|
||||
nodes will have to re-join to the migrated node before unsealing.
|
||||
|
||||
## Usage
|
||||
|
||||
The following flags are available for the `operator migrate` command.
|
||||
|
|
|
@ -28,8 +28,13 @@ Subcommands:
|
|||
|
||||
This command is used to join a new node as a peer to the Raft cluster. In order
|
||||
to join, there must be at least one existing member of the cluster. If Shamir
|
||||
seal is in use, then this API will request for the unseal keys to be supplied to
|
||||
join the cluster.
|
||||
seal is in use, then unseal keys are to be supplied before or after the
|
||||
join process, depending on whether it's being used exclusively for HA.
|
||||
|
||||
If raft is used for `storage`, the node must be joined before unsealing and the
|
||||
`leader-api-addr` argument must be provided. If raft is used for `ha_storage`,
|
||||
the node must be first unsealed before joining and the `leader-api-addr` must
|
||||
_not_ be provided.
|
||||
|
||||
```text
|
||||
Usage: vault operator raft join [options] <leader-api-addr>
|
||||
|
@ -72,7 +77,7 @@ Usage: vault operator raft list-peers
|
|||
|
||||
### Example Output
|
||||
|
||||
```python
|
||||
```json
|
||||
{
|
||||
...
|
||||
"data": {
|
||||
|
@ -145,6 +150,8 @@ Usage: vault operator raft snapshot save <snapshot_file>
|
|||
$ vault operator raft snapshot save raft.snap
|
||||
```
|
||||
|
||||
~> **Note:** Snapshot is not supported when Raft is used for `ha_storage`.
|
||||
|
||||
### snapshot restore
|
||||
|
||||
Restores a snapshot of Vault data taken with `vault operator raft snapshot save`.
|
||||
|
|
|
@ -43,3 +43,7 @@ In order to bring the Vault server up reliably, using any node's raft data,
|
|||
recovery mode Vault automatically resizes the cluster to size 1. This means
|
||||
that after having used recovery mode, part of the procedure for returning to
|
||||
active service must include rejoining the raft cluster.
|
||||
|
||||
If Raft is used exclusively for `ha_storage`, recovery mode will not allow for
|
||||
changes to the Raft data but instead allow for modification of the underlying
|
||||
physical data that is associated with Vault's storage backend.
|
||||
|
|
|
@ -39,9 +39,6 @@ between the nodes in the Raft cluster.
|
|||
~> **Note:** When using the Raft storage backend, a separate `ha_storage`
|
||||
backend cannot be declared.
|
||||
|
||||
~> **Note:** Raft cannot be used as the configured `ha_storage` backend at this
|
||||
time. To use Raft for HA coordination users must also use Raft for storage.
|
||||
|
||||
~> **Note:** When using the Raft storage backend, it is strongly recommended to
|
||||
set `disable_mlock` to `true`, and to disable memory swapping on the system.
|
||||
|
||||
|
|
Loading…
Reference in New Issue