diff --git a/command/operator_migrate.go b/command/operator_migrate.go index 6fd8f8015..9ece5914e 100644 --- a/command/operator_migrate.go +++ b/command/operator_migrate.go @@ -153,7 +153,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { if c.flagReset { if err := SetStorageMigration(from, false); err != nil { - return errwrap.Wrapf("error reseting migration lock: {{err}}", err) + return errwrap.Wrapf("error resetting migration lock: {{err}}", err) } return nil } @@ -169,7 +169,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error { } if migrationStatus != nil { - return fmt.Errorf("Storage migration in progress (started: %s).", migrationStatus.Start.Format(time.RFC3339)) + return fmt.Errorf("storage migration in progress (started: %s)", migrationStatus.Start.Format(time.RFC3339)) } switch config.StorageSource.Type { @@ -259,7 +259,7 @@ func (c *OperatorMigrateCommand) createDestinationBackend(kind string, conf map[ if err != nil { return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err) } - if err := raftStorage.Bootstrap(context.Background(), []raft.Peer{ + if err := raftStorage.Bootstrap([]raft.Peer{ { ID: raftStorage.NodeID(), Address: parsedClusterAddr.Host, diff --git a/command/operator_raft_join.go b/command/operator_raft_join.go index 6ea54ce85..f3fab7ccd 100644 --- a/command/operator_raft_join.go +++ b/command/operator_raft_join.go @@ -110,15 +110,12 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int { args = f.Args() switch len(args) { + case 0: + // No-op: This is acceptable if we're using raft for HA-only case 1: leaderAPIAddr = strings.TrimSpace(args[0]) default: - c.UI.Error(fmt.Sprintf("Incorrect arguments (expected 1, got %d)", len(args))) - return 1 - } - - if len(leaderAPIAddr) == 0 { - c.UI.Error("leader api address is required") + c.UI.Error(fmt.Sprintf("Too many arguments (expected 0-1, got %d)", len(args))) return 1 } diff --git a/command/server.go b/command/server.go index 9d2a038af..b974b729b 100644 --- a/command/server.go +++ b/command/server.go @@ -64,7 +64,14 @@ var enableFourClusterDev = func(c *ServerCommand, base *vault.CoreConfig, info m return 1 } -const storageMigrationLock = "core/migration" +const ( + storageMigrationLock = "core/migration" + + // Even though there are more types than the ones below, the following consts + // are declared internally for value comparison and reusability. + storageTypeRaft = "raft" + storageTypeConsul = "consul" +) type ServerCommand struct { *BaseCommand @@ -453,7 +460,7 @@ func (c *ServerCommand) runRecoveryMode() int { c.UI.Error(fmt.Sprintf("Unknown storage type %s", config.Storage.Type)) return 1 } - if config.Storage.Type == "raft" { + if config.Storage.Type == storageTypeRaft || (config.HAStorage != nil && config.HAStorage.Type == storageTypeRaft) { if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" { config.ClusterAddr = envCA } @@ -963,7 +970,7 @@ func (c *ServerCommand) Run(args []string) int { // Do any custom configuration needed per backend switch config.Storage.Type { - case "consul": + case storageTypeConsul: if config.ServiceRegistration == nil { // If Consul is configured for storage and service registration is unconfigured, // use Consul for service registration without requiring additional configuration. @@ -973,7 +980,7 @@ func (c *ServerCommand) Run(args []string) int { Config: config.Storage.Config, } } - case "raft": + case storageTypeRaft: if envCA := os.Getenv("VAULT_CLUSTER_ADDR"); envCA != "" { config.ClusterAddr = envCA } @@ -1145,6 +1152,7 @@ func (c *ServerCommand) Run(args []string) int { SecureRandomReader: secureRandomReader, } if c.flagDev { + coreConfig.EnableRaw = true coreConfig.DevToken = c.flagDevRootTokenID if c.flagDevLeasedKV { coreConfig.LogicalBackends["kv"] = vault.LeasedPassthroughBackendFactory @@ -1175,24 +1183,26 @@ func (c *ServerCommand) Run(args []string) int { // Initialize the separate HA storage backend, if it exists var ok bool if config.HAStorage != nil { - if config.Storage.Type == "raft" { + if config.Storage.Type == storageTypeRaft && config.HAStorage.Type == storageTypeRaft { + c.UI.Error("Raft cannot be set both as 'storage' and 'ha_storage'. Setting 'storage' to 'raft' will automatically set it up for HA operations as well") + return 1 + } + + if config.Storage.Type == storageTypeRaft { c.UI.Error("HA storage cannot be declared when Raft is the storage type") return 1 } - // TODO: Remove when Raft can server as the ha_storage backend. - // See https://github.com/hashicorp/vault/issues/8206 - if config.HAStorage.Type == "raft" { - c.UI.Error("Raft cannot be used as separate HA storage at this time") - return 1 - } factory, exists := c.PhysicalBackends[config.HAStorage.Type] if !exists { c.UI.Error(fmt.Sprintf("Unknown HA storage type %s", config.HAStorage.Type)) return 1 } - habackend, err := factory(config.HAStorage.Config, c.logger) + + namedHALogger := c.logger.Named("ha." + config.HAStorage.Type) + allLoggers = append(allLoggers, namedHALogger) + habackend, err := factory(config.HAStorage.Config, namedHALogger) if err != nil { c.UI.Error(fmt.Sprintf( "Error initializing HA storage of type %s: %s", config.HAStorage.Type, err)) @@ -1211,10 +1221,13 @@ func (c *ServerCommand) Run(args []string) int { } coreConfig.RedirectAddr = config.HAStorage.RedirectAddr - - // TODO: Check for raft and disableClustering case when Raft on HA - // Storage support is added. disableClustering = config.HAStorage.DisableClustering + + if config.HAStorage.Type == storageTypeRaft && disableClustering { + c.UI.Error("Disable clustering cannot be set to true when Raft is the HA storage type") + return 1 + } + if !disableClustering { coreConfig.ClusterAddr = config.HAStorage.ClusterAddr } @@ -1223,7 +1236,7 @@ func (c *ServerCommand) Run(args []string) int { coreConfig.RedirectAddr = config.Storage.RedirectAddr disableClustering = config.Storage.DisableClustering - if config.Storage.Type == "raft" && disableClustering { + if (config.Storage.Type == storageTypeRaft) && disableClustering { c.UI.Error("Disable clustering cannot be set to true when Raft is the storage type") return 1 } @@ -1559,7 +1572,8 @@ CLUSTER_SYNTHESIS_COMPLETE: // When the underlying storage is raft, kick off retry join if it was specified // in the configuration - if config.Storage.Type == "raft" { + // TODO: Should we also support retry_join for ha_storage? + if config.Storage.Type == storageTypeRaft { if err := core.InitiateRetryJoin(context.Background()); err != nil { c.UI.Error(fmt.Sprintf("Failed to initiate raft retry join, %q", err.Error())) return 1 diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index 6bae6e687..db8cfe957 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -415,7 +415,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) { addressProvider := &TestRaftServerAddressProvider{Cluster: cluster} - atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1) + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) leader := cluster.Cores[0] diff --git a/helper/testhelpers/teststorage/teststorage.go b/helper/testhelpers/teststorage/teststorage.go index 10dd47259..100d350e2 100644 --- a/helper/testhelpers/teststorage/teststorage.go +++ b/helper/testhelpers/teststorage/teststorage.go @@ -130,9 +130,56 @@ func MakeRaftBackend(t testing.T, coreIdx int, logger hclog.Logger) *vault.Physi } } -type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) +// RaftHAFactory returns a PhysicalBackendBundle with raft set as the HABackend +// and the physical.Backend provided in PhysicalBackendBundler as the storage +// backend. +func RaftHAFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + // Call the factory func to create the storage backend + physFactory := SharedPhysicalFactory(f) + bundle := physFactory(t, coreIdx, logger) -func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + // This can happen if a shared physical backend is called on a non-0th core. + if bundle == nil { + bundle = new(vault.PhysicalBackendBundle) + } + + raftDir := makeRaftDir(t) + cleanupFunc := func() { + os.RemoveAll(raftDir) + } + + nodeID := fmt.Sprintf("core-%d", coreIdx) + conf := map[string]string{ + "path": raftDir, + "node_id": nodeID, + "performance_multiplier": "8", + } + + // Create and set the HA Backend + raftBackend, err := raft.NewRaftBackend(conf, logger) + if err != nil { + bundle.Cleanup() + t.Fatal(err) + } + bundle.HABackend = raftBackend.(physical.HABackend) + + // Re-wrap the cleanup func + bundleCleanup := bundle.Cleanup + bundle.Cleanup = func() { + if bundleCleanup != nil { + bundleCleanup() + } + cleanupFunc() + } + + return bundle + } +} + +type PhysicalBackendBundler func(t testing.T, logger hclog.Logger) *vault.PhysicalBackendBundle + +func SharedPhysicalFactory(f PhysicalBackendBundler) func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { return func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { if coreIdx == 0 { return f(t, logger) @@ -141,6 +188,8 @@ func SharedPhysicalFactory(f func(t testing.T, logger hclog.Logger) *vault.Physi } } +type ClusterSetupMutator func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) + func InmemBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { opts.PhysicalFactory = SharedPhysicalFactory(MakeInmemBackend) } @@ -166,6 +215,11 @@ func RaftBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { } } +func RaftHASetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, bundler PhysicalBackendBundler) { + opts.KeepStandbysSealed = true + opts.PhysicalFactory = RaftHAFactory(bundler) +} + func ClusterSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, setup ClusterSetupMutator) (*vault.CoreConfig, *vault.TestClusterOptions) { var localConf vault.CoreConfig if conf != nil { diff --git a/helper/testhelpers/teststorage/teststorage_reusable.go b/helper/testhelpers/teststorage/teststorage_reusable.go index 3546fb7e5..6579a36af 100644 --- a/helper/testhelpers/teststorage/teststorage_reusable.go +++ b/helper/testhelpers/teststorage/teststorage_reusable.go @@ -5,12 +5,12 @@ import ( "io/ioutil" "os" - "github.com/mitchellh/go-testing-interface" - hclog "github.com/hashicorp/go-hclog" raftlib "github.com/hashicorp/raft" "github.com/hashicorp/vault/physical/raft" + "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/vault" + "github.com/mitchellh/go-testing-interface" ) // ReusableStorage is a physical backend that can be re-used across @@ -59,8 +59,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica }, // No-op - Cleanup: func(t testing.T, cluster *vault.TestCluster) { - }, + Cleanup: func(t testing.T, cluster *vault.TestCluster) {}, } cleanup := func() { @@ -74,10 +73,7 @@ func MakeReusableStorage(t testing.T, logger hclog.Logger, bundle *vault.Physica // MakeReusableRaftStorage makes a physical raft backend that can be re-used // across multiple test clusters in sequence. -func MakeReusableRaftStorage( - t testing.T, logger hclog.Logger, numCores int, - addressProvider raftlib.ServerAddressProvider, -) (ReusableStorage, StorageCleanup) { +func MakeReusableRaftStorage(t testing.T, logger hclog.Logger, numCores int, addressProvider raftlib.ServerAddressProvider) (ReusableStorage, StorageCleanup) { raftDirs := make([]string, numCores) for i := 0; i < numCores; i++ { @@ -91,7 +87,7 @@ func MakeReusableRaftStorage( conf.DisablePerformanceStandby = true opts.KeepStandbysSealed = true opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { - return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider) + return makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], addressProvider, false) } }, @@ -120,6 +116,49 @@ func CloseRaftStorage(t testing.T, cluster *vault.TestCluster, idx int) { } } +func MakeReusableRaftHAStorage(t testing.T, logger hclog.Logger, numCores int, bundle *vault.PhysicalBackendBundle) (ReusableStorage, StorageCleanup) { + raftDirs := make([]string, numCores) + for i := 0; i < numCores; i++ { + raftDirs[i] = makeRaftDir(t) + } + + storage := ReusableStorage{ + Setup: func(conf *vault.CoreConfig, opts *vault.TestClusterOptions) { + opts.KeepStandbysSealed = true + opts.PhysicalFactory = func(t testing.T, coreIdx int, logger hclog.Logger) *vault.PhysicalBackendBundle { + haBundle := makeReusableRaftBackend(t, coreIdx, logger, raftDirs[coreIdx], nil, true) + + return &vault.PhysicalBackendBundle{ + Backend: bundle.Backend, + HABackend: haBundle.HABackend, + } + } + }, + + // Close open files being used by raft. + Cleanup: func(t testing.T, cluster *vault.TestCluster) { + for _, core := range cluster.Cores { + raftStorage := core.UnderlyingHAStorage.(*raft.RaftBackend) + if err := raftStorage.Close(); err != nil { + t.Fatal(err) + } + } + }, + } + + cleanup := func() { + if bundle.Cleanup != nil { + bundle.Cleanup() + } + + for _, rd := range raftDirs { + os.RemoveAll(rd) + } + } + + return storage, cleanup +} + func makeRaftDir(t testing.T) string { raftDir, err := ioutil.TempDir("", "vault-raft-") if err != nil { @@ -129,10 +168,7 @@ func makeRaftDir(t testing.T) string { return raftDir } -func makeReusableRaftBackend( - t testing.T, coreIdx int, logger hclog.Logger, raftDir string, - addressProvider raftlib.ServerAddressProvider, -) *vault.PhysicalBackendBundle { +func makeReusableRaftBackend(t testing.T, coreIdx int, logger hclog.Logger, raftDir string, addressProvider raftlib.ServerAddressProvider, ha bool) *vault.PhysicalBackendBundle { nodeID := fmt.Sprintf("core-%d", coreIdx) conf := map[string]string{ @@ -146,9 +182,16 @@ func makeReusableRaftBackend( t.Fatal(err) } - backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) - - return &vault.PhysicalBackendBundle{ - Backend: backend, + if addressProvider != nil { + backend.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) } + + bundle := new(vault.PhysicalBackendBundle) + + if ha { + bundle.HABackend = backend.(physical.HABackend) + } else { + bundle.Backend = backend + } + return bundle } diff --git a/http/handler.go b/http/handler.go index 0549d21d8..31047958a 100644 --- a/http/handler.go +++ b/http/handler.go @@ -144,6 +144,7 @@ func Handler(props *vault.HandlerProperties) http.Handler { mux.Handle("/v1/sys/rekey-recovery-key/init", handleRequestForwarding(core, handleSysRekeyInit(core, true))) mux.Handle("/v1/sys/rekey-recovery-key/update", handleRequestForwarding(core, handleSysRekeyUpdate(core, true))) mux.Handle("/v1/sys/rekey-recovery-key/verify", handleRequestForwarding(core, handleSysRekeyVerify(core, true))) + mux.Handle("/v1/sys/storage/raft/bootstrap", handleSysRaftBootstrap(core)) mux.Handle("/v1/sys/storage/raft/join", handleSysRaftJoin(core)) for _, path := range injectDataIntoTopRoutes { mux.Handle(path, handleRequestForwarding(core, handleLogicalWithInjector(core))) diff --git a/http/sys_raft.go b/http/sys_raft.go index c443a11ad..e139e195a 100644 --- a/http/sys_raft.go +++ b/http/sys_raft.go @@ -12,6 +12,25 @@ import ( "github.com/hashicorp/vault/vault" ) +func handleSysRaftBootstrap(core *vault.Core) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST", "PUT": + if core.Sealed() { + respondError(w, http.StatusBadRequest, errors.New("node must be unsealed to bootstrap")) + } + + if err := core.RaftBootstrap(context.Background(), false); err != nil { + respondError(w, http.StatusInternalServerError, err) + return + } + + default: + respondError(w, http.StatusBadRequest, nil) + } + }) +} + func handleSysRaftJoin(core *vault.Core) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -53,6 +72,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ Retry: req.Retry, }, } + joined, err := core.JoinRaftCluster(context.Background(), leaderInfos, req.NonVoter) if err != nil { respondError(w, http.StatusInternalServerError, err) diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 867c2e4fa..5ada5b957 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -41,6 +41,8 @@ const EnvVaultRaftPath = "VAULT_RAFT_PATH" // Verify RaftBackend satisfies the correct interfaces var _ physical.Backend = (*RaftBackend)(nil) var _ physical.Transactional = (*RaftBackend)(nil) +var _ physical.HABackend = (*RaftBackend)(nil) +var _ physical.Lock = (*RaftLock)(nil) var ( // raftLogCacheSize is the maximum number of logs to cache in-memory. @@ -68,6 +70,11 @@ type RaftBackend struct { // raft is the instance of raft we will operate on. raft *raft.Raft + // raftInitCh is used to block during HA lock acquisition if raft + // has not been initialized yet, which can occur if raft is being + // used for HA-only. + raftInitCh chan struct{} + // raftNotifyCh is used to receive updates about leadership changes // regarding this node. raftNotifyCh chan bool @@ -323,6 +330,7 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend return &RaftBackend{ logger: logger, fsm: fsm, + raftInitCh: make(chan struct{}), conf: conf, logStore: log, stableStore: stable, @@ -420,7 +428,7 @@ func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvid } // Bootstrap prepares the given peers to be part of the raft cluster -func (b *RaftBackend) Bootstrap(ctx context.Context, peers []Peer) error { +func (b *RaftBackend) Bootstrap(peers []Peer) error { b.l.Lock() defer b.l.Unlock() @@ -532,6 +540,13 @@ func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error }) } +func (b *RaftBackend) HasState() (bool, error) { + b.l.RLock() + defer b.l.RUnlock() + + return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore) +} + // SetupCluster starts the raft cluster and enables the networking needed for // the raft nodes to communicate. func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error { @@ -699,6 +714,10 @@ func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error { opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer) } + // Close the init channel to signal setup has been completed + close(b.raftInitCh) + + b.logger.Trace("finished setting up raft cluster") return nil } @@ -712,6 +731,9 @@ func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error b.l.Lock() future := b.raft.Shutdown() b.raft = nil + + // If we're tearing down, then we need to recreate the raftInitCh + b.raftInitCh = make(chan struct{}) b.l.Unlock() return future.Error() @@ -1111,10 +1133,10 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error { return nil } -// HAEnabled is the implemention of the HABackend interface +// HAEnabled is the implementation of the HABackend interface func (b *RaftBackend) HAEnabled() bool { return true } -// HAEnabled is the implemention of the HABackend interface +// HAEnabled is the implementation of the HABackend interface func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) { return &RaftLock{ key: key, @@ -1161,18 +1183,27 @@ func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-ch // Lock blocks until we become leader or are shutdown. It returns a channel that // is closed when we detect a loss of leadership. func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + // If not initialized, block until it is + if !l.b.Initialized() { + select { + case <-l.b.raftInitCh: + case <-stopCh: + return nil, nil + } + + } + l.b.l.RLock() + // Ensure that we still have a raft instance after grabbing the read lock + if l.b.raft == nil { + l.b.l.RUnlock() + return nil, errors.New("attempted to grab a lock on a nil raft backend") + } + // Cache the notifyCh locally leaderNotifyCh := l.b.raftNotifyCh - // TODO: Remove when Raft can server as the ha_storage backend. The internal - // raft pointer should not be nil here, but the nil check is a guard against - // https://github.com/hashicorp/vault/issues/8206 - if l.b.raft == nil { - return nil, errors.New("attempted to grab a lock on a nil raft backend") - } - // Check to see if we are already leader. if l.b.raft.State() == raft.Leader { err := l.b.applyLog(context.Background(), &LogData{ @@ -1225,6 +1256,10 @@ func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { // Unlock gives up leadership. func (l *RaftLock) Unlock() error { + if l.b.raft == nil { + return nil + } + return l.b.raft.LeadershipTransfer().Error() } diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index e21d7cdec..72c9dee40 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -36,8 +36,13 @@ func getRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, str } func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir string) (*RaftBackend, string) { + id, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + logger := hclog.New(&hclog.LoggerOptions{ - Name: "raft", + Name: fmt.Sprintf("raft-%s", id), Level: hclog.Trace, }) logger.Info("raft dir", "dir", raftDir) @@ -45,6 +50,7 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str conf := map[string]string{ "path": raftDir, "trailing_logs": "100", + "node_id": id, } if noStoreState { @@ -58,7 +64,12 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str backend := backendRaw.(*RaftBackend) if bootstrap { - err = backend.Bootstrap(context.Background(), []Peer{Peer{ID: backend.NodeID(), Address: backend.NodeID()}}) + err = backend.Bootstrap([]Peer{ + { + ID: backend.NodeID(), + Address: backend.NodeID(), + }, + }) if err != nil { t.Fatal(err) } diff --git a/physical/raft/snapshot_test.go b/physical/raft/snapshot_test.go index df4cd967a..2a0164b4c 100644 --- a/physical/raft/snapshot_test.go +++ b/physical/raft/snapshot_test.go @@ -40,7 +40,7 @@ func addPeer(t *testing.T, leader, follower *RaftBackend) { t.Fatal(err) } - err = follower.Bootstrap(context.Background(), peers) + err = follower.Bootstrap(peers) if err != nil { t.Fatal(err) } diff --git a/physical/raft/streamlayer.go b/physical/raft/streamlayer.go index 71ce991eb..349606b52 100644 --- a/physical/raft/streamlayer.go +++ b/physical/raft/streamlayer.go @@ -316,7 +316,7 @@ func (l *raftLayer) CALookup(context.Context) ([]*x509.Certificate, error) { return ret, nil } -// Stop shutsdown the raft layer. +// Stop shuts down the raft layer. func (l *raftLayer) Stop() error { l.Close() return nil diff --git a/plugins/database/mongodb/connection_producer_test.go b/plugins/database/mongodb/connection_producer_test.go index b96486da7..0ee104c14 100644 --- a/plugins/database/mongodb/connection_producer_test.go +++ b/plugins/database/mongodb/connection_producer_test.go @@ -314,4 +314,3 @@ func writeFile(t *testing.T, filename string, data []byte, perms os.FileMode) { t.Fatalf("Unable to write to file [%s]: %s", filename, err) } } - diff --git a/vault/core.go b/vault/core.go index 2a24d86b4..826308759 100644 --- a/vault/core.go +++ b/vault/core.go @@ -31,7 +31,6 @@ import ( "github.com/hashicorp/vault/helper/metricsutil" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/internalshared/reloadutil" - "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/certutil" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/jsonutil" @@ -1455,7 +1454,7 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro return false, err } - if err := c.startRaftStorage(ctx); err != nil { + if err := c.startRaftBackend(ctx); err != nil { return false, err } @@ -1714,7 +1713,7 @@ func (c *Core) sealInternal() error { return c.sealInternalWithOptions(true, false, true) } -func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft bool) error { +func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, performCleanup bool) error { // Mark sealed, and if already marked return if swapped := atomic.CompareAndSwapUint32(c.sealed, 0, 1); !swapped { return nil @@ -1796,10 +1795,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b c.teardownReplicationResolverHandler() - // If the storage backend needs to be sealed - if shutdownRaft { - if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil { + // Perform additional cleanup upon sealing. + if performCleanup { + if raftBackend := c.getRaftBackend(); raftBackend != nil { + if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil { c.logger.Error("error stopping storage cluster", "error", err) return err } diff --git a/vault/external_tests/raft/raft_ha_test.go b/vault/external_tests/raft/raft_ha_test.go new file mode 100644 index 000000000..d8a0ea5f4 --- /dev/null +++ b/vault/external_tests/raft/raft_ha_test.go @@ -0,0 +1,242 @@ +package rafttests + +import ( + "sync/atomic" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/api" + "github.com/hashicorp/vault/helper/testhelpers" + "github.com/hashicorp/vault/helper/testhelpers/teststorage" + vaulthttp "github.com/hashicorp/vault/http" + "github.com/hashicorp/vault/physical/raft" + "github.com/hashicorp/vault/sdk/helper/logging" + "github.com/hashicorp/vault/vault" +) + +func TestRaft_HA_NewCluster(t *testing.T) { + t.Run("file", func(t *testing.T) { + t.Parallel() + + t.Run("no_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeFileBackend, false) + }) + + t.Run("with_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeFileBackend, true) + }) + }) + + t.Run("inmem", func(t *testing.T) { + t.Parallel() + + t.Run("no_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeInmemBackend, false) + }) + + t.Run("with_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeInmemBackend, true) + }) + }) + + t.Run("consul", func(t *testing.T) { + t.Parallel() + + t.Run("no_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeConsulBackend, false) + }) + + t.Run("with_client_certs", func(t *testing.T) { + testRaftHANewCluster(t, teststorage.MakeConsulBackend, true) + }) + }) +} + +func testRaftHANewCluster(t *testing.T, bundler teststorage.PhysicalBackendBundler, addClientCerts bool) { + var conf vault.CoreConfig + var opts = vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler} + + teststorage.RaftHASetup(&conf, &opts, bundler) + cluster := vault.NewTestCluster(t, &conf, &opts) + cluster.Start() + defer cluster.Cleanup() + + addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster} + + leaderCore := cluster.Cores[0] + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) + + // Seal the leader so we can install an address provider + { + testhelpers.EnsureCoreSealed(t, leaderCore) + leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) + cluster.UnsealCore(t, leaderCore) + vault.TestWaitActive(t, leaderCore.Core) + } + + // Now unseal core for join commands to work + testhelpers.EnsureCoresUnsealed(t, cluster) + + joinFunc := func(client *api.Client, addClientCerts bool) { + req := &api.RaftJoinRequest{ + LeaderCACert: string(cluster.CACertPEM), + } + if addClientCerts { + req.LeaderClientCert = string(cluster.CACertPEM) + req.LeaderClientKey = string(cluster.CAKeyPEM) + } + resp, err := client.Sys().RaftJoin(req) + if err != nil { + t.Fatal(err) + } + if !resp.Joined { + t.Fatalf("failed to join raft cluster") + } + } + + joinFunc(cluster.Cores[1].Client, addClientCerts) + joinFunc(cluster.Cores[2].Client, addClientCerts) + + // Ensure peers are added + leaderClient := cluster.Cores[0].Client + verifyRaftPeers(t, leaderClient, map[string]bool{ + "core-0": true, + "core-1": true, + "core-2": true, + }) + + // Test remove peers + _, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{ + "server_id": "core-1", + }) + if err != nil { + t.Fatal(err) + } + + _, err = leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{ + "server_id": "core-2", + }) + if err != nil { + t.Fatal(err) + } + + // Ensure peers are removed + verifyRaftPeers(t, leaderClient, map[string]bool{ + "core-0": true, + }) +} + +func TestRaft_HA_ExistingCluster(t *testing.T) { + conf := vault.CoreConfig{ + DisablePerformanceStandby: true, + } + opts := vault.TestClusterOptions{ + HandlerFunc: vaulthttp.Handler, + NumCores: vault.DefaultNumCores, + KeepStandbysSealed: true, + } + logger := logging.NewVaultLogger(hclog.Debug).Named(t.Name()) + + physBundle := teststorage.MakeInmemBackend(t, logger) + physBundle.HABackend = nil + + storage, cleanup := teststorage.MakeReusableStorage(t, logger, physBundle) + defer cleanup() + + var ( + clusterBarrierKeys [][]byte + clusterRootToken string + ) + createCluster := func(t *testing.T) { + t.Log("simulating cluster creation without raft as HABackend") + + storage.Setup(&conf, &opts) + + cluster := vault.NewTestCluster(t, &conf, &opts) + cluster.Start() + defer func() { + cluster.Cleanup() + storage.Cleanup(t, cluster) + }() + + clusterBarrierKeys = cluster.BarrierKeys + clusterRootToken = cluster.RootToken + } + + createCluster(t) + + haStorage, haCleanup := teststorage.MakeReusableRaftHAStorage(t, logger, opts.NumCores, physBundle) + defer haCleanup() + + updateCLuster := func(t *testing.T) { + t.Log("simulating cluster update with raft as HABackend") + + opts.SkipInit = true + haStorage.Setup(&conf, &opts) + + cluster := vault.NewTestCluster(t, &conf, &opts) + cluster.Start() + defer func() { + cluster.Cleanup() + haStorage.Cleanup(t, cluster) + }() + + // Set cluster values + cluster.BarrierKeys = clusterBarrierKeys + cluster.RootToken = clusterRootToken + + addressProvider := &testhelpers.TestRaftServerAddressProvider{Cluster: cluster} + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) + + // Seal the leader so we can install an address provider + leaderCore := cluster.Cores[0] + { + testhelpers.EnsureCoreSealed(t, leaderCore) + leaderCore.UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) + testhelpers.EnsureCoreUnsealed(t, cluster, leaderCore) + } + + // Call the bootstrap on the leader and then ensure that it becomes active + leaderClient := cluster.Cores[0].Client + leaderClient.SetToken(clusterRootToken) + { + _, err := leaderClient.Logical().Write("sys/storage/raft/bootstrap", nil) + if err != nil { + t.Fatal(err) + } + vault.TestWaitActive(t, leaderCore.Core) + } + + // Set address provider + cluster.Cores[1].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) + cluster.Cores[2].UnderlyingHAStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider) + + // Now unseal core for join commands to work + testhelpers.EnsureCoresUnsealed(t, cluster) + + joinFunc := func(client *api.Client) { + req := &api.RaftJoinRequest{ + LeaderCACert: string(cluster.CACertPEM), + } + resp, err := client.Sys().RaftJoin(req) + if err != nil { + t.Fatal(err) + } + if !resp.Joined { + t.Fatalf("failed to join raft cluster") + } + } + + joinFunc(cluster.Cores[1].Client) + joinFunc(cluster.Cores[2].Client) + + // Ensure peers are added + verifyRaftPeers(t, leaderClient, map[string]bool{ + "core-0": true, + "core-1": true, + "core-2": true, + }) + } + + updateCLuster(t) +} diff --git a/vault/external_tests/raft/raft_test.go b/vault/external_tests/raft/raft_test.go index cd0ecc7d4..d3f46f7a0 100644 --- a/vault/external_tests/raft/raft_test.go +++ b/vault/external_tests/raft/raft_test.go @@ -47,7 +47,7 @@ func TestRaft_Retry_Join(t *testing.T) { leaderCore := cluster.Cores[0] leaderAPI := leaderCore.Client.Address() - atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1) + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) { testhelpers.EnsureCoreSealed(t, leaderCore) @@ -90,23 +90,7 @@ func TestRaft_Retry_Join(t *testing.T) { cluster.UnsealCore(t, core) } - checkConfigFunc := func(expected map[string]bool) { - secret, err := cluster.Cores[0].Client.Logical().Read("sys/storage/raft/configuration") - if err != nil { - t.Fatal(err) - } - servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{}) - - for _, s := range servers { - server := s.(map[string]interface{}) - delete(expected, server["node_id"].(string)) - } - if len(expected) != 0 { - t.Fatalf("failed to read configuration successfully") - } - } - - checkConfigFunc(map[string]bool{ + verifyRaftPeers(t, cluster.Cores[0].Client, map[string]bool{ "core-0": true, "core-1": true, "core-2": true, @@ -126,7 +110,7 @@ func TestRaft_Join(t *testing.T) { leaderCore := cluster.Cores[0] leaderAPI := leaderCore.Client.Address() - atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1) + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) // Seal the leader so we can install an address provider { @@ -187,23 +171,7 @@ func TestRaft_RemovePeer(t *testing.T) { client := cluster.Cores[0].Client - checkConfigFunc := func(expected map[string]bool) { - secret, err := client.Logical().Read("sys/storage/raft/configuration") - if err != nil { - t.Fatal(err) - } - servers := secret.Data["config"].(map[string]interface{})["servers"].([]interface{}) - - for _, s := range servers { - server := s.(map[string]interface{}) - delete(expected, server["node_id"].(string)) - } - if len(expected) != 0 { - t.Fatalf("failed to read configuration successfully") - } - } - - checkConfigFunc(map[string]bool{ + verifyRaftPeers(t, client, map[string]bool{ "core-0": true, "core-1": true, "core-2": true, @@ -216,7 +184,7 @@ func TestRaft_RemovePeer(t *testing.T) { t.Fatal(err) } - checkConfigFunc(map[string]bool{ + verifyRaftPeers(t, client, map[string]bool{ "core-0": true, "core-1": true, }) @@ -228,7 +196,7 @@ func TestRaft_RemovePeer(t *testing.T) { t.Fatal(err) } - checkConfigFunc(map[string]bool{ + verifyRaftPeers(t, client, map[string]bool{ "core-0": true, }) } @@ -843,3 +811,39 @@ func BenchmarkRaft_SingleNode(b *testing.B) { b.Run("256b", func(b *testing.B) { bench(b, 25) }) } + +func verifyRaftPeers(t *testing.T, client *api.Client, expected map[string]bool) { + t.Helper() + + resp, err := client.Logical().Read("sys/storage/raft/configuration") + if err != nil { + t.Fatalf("error reading raft config: %v", err) + } + + if resp == nil || resp.Data == nil { + t.Fatal("missing response data") + } + + config, ok := resp.Data["config"].(map[string]interface{}) + if !ok { + t.Fatal("missing config in response data") + } + + servers, ok := config["servers"].([]interface{}) + if !ok { + t.Fatal("missing servers in response data config") + } + + // Iterate through the servers and remove the node found in the response + // from the expected collection + for _, s := range servers { + server := s.(map[string]interface{}) + delete(expected, server["node_id"].(string)) + } + + // If the collection is non-empty, it means that the peer was not found in + // the response. + if len(expected) != 0 { + t.Fatalf("failed to read configuration successfully, expected peers no found in configuration list: %v", expected) + } +} diff --git a/vault/external_tests/sealmigration/seal_migration_test.go b/vault/external_tests/sealmigration/seal_migration_test.go index 68fa55e66..c90c2a3ec 100644 --- a/vault/external_tests/sealmigration/seal_migration_test.go +++ b/vault/external_tests/sealmigration/seal_migration_test.go @@ -77,7 +77,7 @@ func testVariousBackends(t *testing.T, tf testFunc, basePort int, includeRaft bo logger := logger.Named("raft") raftBasePort := basePort + 400 - atomic.StoreUint32(&vault.UpdateClusterAddrForTests, 1) + atomic.StoreUint32(&vault.TestingUpdateClusterAddr, 1) addressProvider := testhelpers.NewHardcodedServerAddressProvider(numTestCores, raftBasePort+10) storage, cleanup := teststorage.MakeReusableRaftStorage(t, logger, numTestCores, addressProvider) diff --git a/vault/ha.go b/vault/ha.go index 63ea7e84d..f14084f6a 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -16,7 +16,6 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" - "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/certutil" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/jsonutil" @@ -709,8 +708,10 @@ func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct // periodicCheckKeyUpgrade is used to watch for key rotation events as a standby func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{}) { + raftBackend := c.getRaftBackend() + isRaft := raftBackend != nil + opCount := new(int32) - _, isRaft := c.underlyingPhysical.(*raft.RaftBackend) for { select { case <-time.After(keyRotateCheckInterval): @@ -751,8 +752,17 @@ func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{ c.logger.Error("key rotation periodic upgrade check failed", "error", err) } - if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil { - c.logger.Error("raft tls periodic upgrade check failed", "error", err) + if isRaft { + hasState, err := raftBackend.HasState() + if err != nil { + c.logger.Error("could not check raft state", "error", err) + } + + if raftBackend.Initialized() && hasState { + if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil { + c.logger.Error("raft tls periodic upgrade check failed", "error", err) + } + } } atomic.AddInt32(lopCount, -1) diff --git a/vault/init.go b/vault/init.go index 2fd8a2548..17aac352a 100644 --- a/vault/init.go +++ b/vault/init.go @@ -208,30 +208,20 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes return nil, ErrAlreadyInit } - // If we have clustered storage, set it up now - if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - parsedClusterAddr, err := url.Parse(c.ClusterAddr()) + // Bootstrap the raft backend if that's provided as the physical or + // HA backend. + raftBackend := c.getRaftBackend() + if raftBackend != nil { + err := c.RaftBootstrap(ctx, true) if err != nil { - return nil, errwrap.Wrapf("error parsing cluster address: {{err}}", err) - } - if err := raftStorage.Bootstrap(ctx, []raft.Peer{ - { - ID: raftStorage.NodeID(), - Address: parsedClusterAddr.Host, - }, - }); err != nil { - return nil, errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err) - } - - if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{ - StartAsLeader: true, - }); err != nil { - return nil, errwrap.Wrapf("could not start clustered storage: {{err}}", err) + c.logger.Error("failed to bootstrap raft", "error", err) + return nil, err } + // Teardown cluster after bootstrap setup defer func() { - if err := raftStorage.TeardownCluster(nil); err != nil { - c.logger.Error("failed to stop raft storage", "error", err) + if err := raftBackend.TeardownCluster(nil); err != nil { + c.logger.Error("failed to stop raft", "error", err) } }() } @@ -385,9 +375,11 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes results.RootToken = base64.StdEncoding.EncodeToString(encryptedVals[0]) } - if err := c.createRaftTLSKeyring(ctx); err != nil { - c.logger.Error("failed to create raft TLS keyring", "error", err) - return nil, err + if raftBackend != nil { + if _, err := c.raftCreateTLSKeyring(ctx); err != nil { + c.logger.Error("failed to create raft TLS keyring", "error", err) + return nil, err + } } // Prepare to re-seal diff --git a/vault/logical_system.go b/vault/logical_system.go index d5fd43d57..81e209885 100644 --- a/vault/logical_system.go +++ b/vault/logical_system.go @@ -29,7 +29,6 @@ import ( "github.com/hashicorp/vault/helper/monitor" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/random" - "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/framework" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/jsonutil" @@ -166,7 +165,7 @@ func NewSystemBackend(core *Core, logger log.Logger) *SystemBackend { b.Backend.Paths = append(b.Backend.Paths, b.rawPaths()...) } - if _, ok := core.underlyingPhysical.(*raft.RaftBackend); ok { + if backend := core.getRaftBackend(); backend != nil { b.Backend.Paths = append(b.Backend.Paths, b.raftStoragePaths()...) } diff --git a/vault/logical_system_raft.go b/vault/logical_system_raft.go index ea4ae023b..cd04d0efa 100644 --- a/vault/logical_system_raft.go +++ b/vault/logical_system_raft.go @@ -7,14 +7,15 @@ import ( "errors" "strings" + "github.com/hashicorp/vault/sdk/framework" + "github.com/hashicorp/vault/sdk/logical" + "github.com/hashicorp/vault/sdk/physical" + proto "github.com/golang/protobuf/proto" wrapping "github.com/hashicorp/go-kms-wrapping" uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/physical/raft" - "github.com/hashicorp/vault/sdk/framework" - "github.com/hashicorp/vault/sdk/logical" - "github.com/hashicorp/vault/sdk/physical" ) // raftStoragePaths returns paths for use when raft is the storage mechanism. @@ -132,13 +133,12 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path { func (b *SystemBackend) handleRaftConfigurationGet() framework.OperationFunc { return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) { - - raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend) - if !ok { + raftBackend := b.Core.getRaftBackend() + if raftBackend == nil { return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest } - config, err := raftStorage.GetConfiguration(ctx) + config, err := raftBackend.GetConfiguration(ctx) if err != nil { return nil, err } @@ -158,12 +158,12 @@ func (b *SystemBackend) handleRaftRemovePeerUpdate() framework.OperationFunc { return logical.ErrorResponse("no server id provided"), logical.ErrInvalidRequest } - raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend) - if !ok { + raftBackend := b.Core.getRaftBackend() + if raftBackend == nil { return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest } - if err := raftStorage.RemovePeer(ctx, serverID); err != nil { + if err := raftBackend.RemovePeer(ctx, serverID); err != nil { return nil, err } if b.Core.raftFollowerStates != nil { @@ -221,8 +221,8 @@ func (b *SystemBackend) handleRaftBootstrapChallengeWrite() framework.OperationF func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc { return func(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) { - raftStorage, ok := b.Core.underlyingPhysical.(*raft.RaftBackend) - if !ok { + raftBackend := b.Core.getRaftBackend() + if raftBackend == nil { return logical.ErrorResponse("raft storage is not in use"), logical.ErrInvalidRequest } @@ -271,9 +271,9 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc switch nonVoter { case true: - err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr) + err = raftBackend.AddNonVotingPeer(ctx, serverID, clusterAddr) default: - err = raftStorage.AddPeer(ctx, serverID, clusterAddr) + err = raftBackend.AddPeer(ctx, serverID, clusterAddr) } if err != nil { return nil, err @@ -283,7 +283,7 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc b.Core.raftFollowerStates.update(serverID, 0) } - peers, err := raftStorage.Peers(ctx) + peers, err := raftBackend.Peers(ctx) if err != nil { return nil, err } diff --git a/vault/raft.go b/vault/raft.go index 8c9124516..f1cdd1807 100644 --- a/vault/raft.go +++ b/vault/raft.go @@ -16,6 +16,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hashicorp/errwrap" cleanhttp "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping" uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/api" @@ -31,6 +32,9 @@ import ( var ( raftTLSStoragePath = "core/raft/tls" raftTLSRotationPeriod = 24 * time.Hour + + // TestingUpdateClusterAddr is used in tests to override the cluster address + TestingUpdateClusterAddr uint32 ) type raftFollowerStates struct { @@ -88,14 +92,11 @@ func (c *Core) GetRaftIndexes() (committed uint64, applied uint64) { return raftStorage.CommittedIndex(), raftStorage.AppliedIndex() } -// startRaftStorage will call SetupCluster in the raft backend which starts raft +// startRaftBackend will call SetupCluster in the raft backend which starts raft // up and enables the cluster handler. -func (c *Core) startRaftStorage(ctx context.Context) (retErr error) { - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { - return nil - } - if raftStorage.Initialized() { +func (c *Core) startRaftBackend(ctx context.Context) (retErr error) { + raftBackend := c.getRaftBackend() + if raftBackend == nil || raftBackend.Initialized() { return nil } @@ -109,6 +110,15 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) { var raftTLS *raft.TLSKeyring switch raftTLSEntry { case nil: + // If this is HA-only and no TLS keyring is found, that means the + // cluster has not been bootstrapped or joined. We return early here in + // this case. If we return here, the raft object has not been instantiated, + // and a bootstrap call should be made. + if c.isRaftHAOnly() { + c.logger.Trace("skipping raft backend setup during unseal, no bootstrap operation has been started yet") + return nil + } + // If we did not find a TLS keyring we will attempt to create one here. // This happens after a storage migration process. This node is also // marked to start as leader so we can write the new TLS Key. This is an @@ -133,8 +143,23 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) { } } - raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) - if err := raftStorage.SetupCluster(ctx, raft.SetupOpts{ + hasState, err := raftBackend.HasState() + if err != nil { + return err + } + + // This can be hit on follower nodes that got their config updated to use + // raft for HA-only before they are joined to the cluster. Since followers + // in this case use shared storage, it doesn't return early from the TLS + // case above, but there's not raft state yet for the backend to call + // raft.SetupCluster. + if !hasState { + c.logger.Trace("skipping raft backend setup during unseal, no raft state found") + return nil + } + + raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) + if err := raftBackend.SetupCluster(ctx, raft.SetupOpts{ TLSKeyring: raftTLS, ClusterListener: c.getClusterListener(), StartAsLeader: creating, @@ -145,7 +170,7 @@ func (c *Core) startRaftStorage(ctx context.Context) (retErr error) { defer func() { if retErr != nil { c.logger.Info("stopping raft server") - if err := raftStorage.TeardownCluster(c.getClusterListener()); err != nil { + if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil { c.logger.Error("failed to stop raft server", "error", err) } } @@ -180,7 +205,114 @@ func (c *Core) stopRaftActiveNode() { c.stopPeriodicRaftTLSRotate() } -// startPeriodicRaftTLSRotate will spawn a go routine in charge of periodically +func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { + raftBackend := c.getRaftBackend() + + // No-op if raft is not being used + if raftBackend == nil { + return nil + } + + c.raftTLSRotationStopCh = make(chan struct{}) + logger := c.logger.Named("raft") + + if c.isRaftHAOnly() { + return c.raftTLSRotateDirect(ctx, logger, c.raftTLSRotationStopCh) + } + + return c.raftTLSRotatePhased(ctx, logger, raftBackend, c.raftTLSRotationStopCh) +} + +// raftTLSRotateDirect will spawn a go routine in charge of periodically +// rotating the TLS certs and keys used for raft traffic. +// +// The logic for updating the TLS keyring is through direct storage update. This +// is called whenever raft is used for HA-only, which means that the underlying +// storage is a shared physical object, thus requiring no additional +// coordination. +func (c *Core) raftTLSRotateDirect(ctx context.Context, logger hclog.Logger, stopCh chan struct{}) error { + logger.Info("creating new raft TLS config") + + rotateKeyring := func() (time.Time, error) { + // Create a new key + raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader) + if err != nil { + return time.Time{}, errwrap.Wrapf("failed to generate new raft TLS key: {{err}}", err) + } + + // Read the existing keyring + keyring, err := c.raftReadTLSKeyring(ctx) + if err != nil { + return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err) + } + + // Advance the term and store the new key, replacing the old one. + // Unlike phased rotation, we don't need to update AppliedIndex since + // we don't rely on it to check whether the followers got the key. A + // shared storage means that followers will have the key as soon as it's + // written to storage. + keyring.Term += 1 + keyring.Keys[0] = raftTLSKey + keyring.ActiveKeyID = raftTLSKey.ID + entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) + if err != nil { + return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err) + } + if err := c.barrier.Put(ctx, entry); err != nil { + return time.Time{}, errwrap.Wrapf("failed to write keyring: {{err}}", err) + } + + logger.Info("wrote new raft TLS config") + + // Schedule the next rotation + return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil + } + + // Read the keyring to calculate the time of next rotation. + keyring, err := c.raftReadTLSKeyring(ctx) + if err != nil { + return err + } + + activeKey := keyring.GetActive() + if activeKey == nil { + return errors.New("no active raft TLS key found") + } + + go func() { + nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod) + + var backoff bool + for { + // If we encountered and error we should try to create the key + // again. + if backoff { + nextRotationTime = time.Now().Add(10 * time.Second) + backoff = false + } + + select { + case <-time.After(time.Until(nextRotationTime)): + // It's time to rotate the keys + next, err := rotateKeyring() + if err != nil { + logger.Error("failed to rotate TLS key", "error", err) + backoff = true + continue + } + + nextRotationTime = next + + case <-stopCh: + return + } + } + }() + + return nil +} + +// raftTLSRotatePhased will spawn a go routine in charge of periodically // rotating the TLS certs and keys used for raft traffic. // // The logic for updating the TLS certificate uses a pseudo two phase commit @@ -199,55 +331,30 @@ func (c *Core) stopRaftActiveNode() { // receives the update. This ensures a standby node isn't left behind and unable // to reconnect with the cluster. Additionally, only one outstanding key // is allowed for this same reason (max keyring size of 2). -func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { - return nil - } - - stopCh := make(chan struct{}) +func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raftBackend *raft.RaftBackend, stopCh chan struct{}) error { followerStates := &raftFollowerStates{ followers: make(map[string]uint64), } // Pre-populate the follower list with the set of peers. - raftConfig, err := raftStorage.GetConfiguration(ctx) + raftConfig, err := raftBackend.GetConfiguration(ctx) if err != nil { return err } for _, server := range raftConfig.Servers { - if server.NodeID != raftStorage.NodeID() { + if server.NodeID != raftBackend.NodeID() { followerStates.update(server.NodeID, 0) } } - - logger := c.logger.Named("raft") - c.raftTLSRotationStopCh = stopCh c.raftFollowerStates = followerStates - readKeyring := func() (*raft.TLSKeyring, error) { - tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) - if err != nil { - return nil, err - } - if tlsKeyringEntry == nil { - return nil, errors.New("no keyring found") - } - var keyring raft.TLSKeyring - if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { - return nil, err - } - - return &keyring, nil - } - // rotateKeyring writes new key data to the keyring and adds an applied // index that is used to verify it has been committed. The keys written in // this function can be used on standbys but the active node doesn't start // using it yet. rotateKeyring := func() (time.Time, error) { // Read the existing keyring - keyring, err := readKeyring() + keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return time.Time{}, errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err) } @@ -256,8 +363,8 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0: // If this case is hit then the second write to add the applied // index failed. Attempt to write it again. - keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex() - keyring.AppliedIndex = raftStorage.AppliedIndex() + keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex() + keyring.AppliedIndex = raftBackend.AppliedIndex() entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err) @@ -270,7 +377,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { // If there already exists a pending key update then the update // hasn't replicated down to all standby nodes yet. Don't allow any // new keys to be created until all standbys have seen this previous - // rotation. As a backoff strategy another rotation attempt is + // rotation. As a backoff strategy, another rotation attempt is // scheduled for 5 minutes from now. logger.Warn("skipping new raft TLS config creation, keys are pending") return time.Now().Add(time.Minute * 5), nil @@ -296,9 +403,9 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { } // Write the keyring again with the new applied index. This allows us to - // track if standby nodes receive the update. - keyring.Keys[1].AppliedIndex = raftStorage.AppliedIndex() - keyring.AppliedIndex = raftStorage.AppliedIndex() + // track if standby nodes received the update. + keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex() + keyring.AppliedIndex = raftBackend.AppliedIndex() entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, errwrap.Wrapf("failed to json encode keyring: {{err}}", err) @@ -316,7 +423,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { // finalizes the rotation by deleting the old keys and updating the raft // backend. checkCommitted := func() error { - keyring, err := readKeyring() + keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return errwrap.Wrapf("failed to read raft TLS keyring: {{err}}", err) } @@ -346,7 +453,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { } // Update the TLS Key in the backend - if err := raftStorage.SetTLSKeyring(keyring); err != nil { + if err := raftBackend.SetTLSKeyring(keyring); err != nil { return errwrap.Wrapf("failed to install keyring: {{err}}", err) } @@ -355,7 +462,7 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { } // Read the keyring to calculate the time of next rotation. - keyring, err := readKeyring() + keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return err } @@ -406,14 +513,43 @@ func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { return nil } -func (c *Core) createRaftTLSKeyring(ctx context.Context) error { - if _, ok := c.underlyingPhysical.(*raft.RaftBackend); !ok { - return nil +func (c *Core) raftReadTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) { + tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) + if err != nil { + return nil, err + } + if tlsKeyringEntry == nil { + return nil, errors.New("no keyring found") + } + var keyring raft.TLSKeyring + if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { + return nil, err + } + + return &keyring, nil +} + +// raftCreateTLSKeyring creates the initial TLS key and the TLS Keyring for raft +// use. If a keyring entry is already present in storage, it will return an +// error. +func (c *Core) raftCreateTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) { + if raftBackend := c.getRaftBackend(); raftBackend == nil { + return nil, fmt.Errorf("raft backend not in use") + } + + // Check if the keyring is already present + raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) + if err != nil { + return nil, err + } + + if raftTLSEntry != nil { + return nil, fmt.Errorf("TLS keyring already present") } raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader) if err != nil { - return err + return nil, err } keyring := &raft.TLSKeyring{ @@ -423,12 +559,12 @@ func (c *Core) createRaftTLSKeyring(ctx context.Context) error { entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { - return err + return nil, err } if err := c.barrier.Put(ctx, entry); err != nil { - return err + return nil, err } - return nil + return keyring, nil } func (c *Core) stopPeriodicRaftTLSRotate() { @@ -440,8 +576,8 @@ func (c *Core) stopPeriodicRaftTLSRotate() { } func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error { - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { + raftBackend := c.getRaftBackend() + if raftBackend == nil { return nil } @@ -458,7 +594,7 @@ func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error { return err } - if err := raftStorage.SetTLSKeyring(&keyring); err != nil { + if err := raftBackend.SetTLSKeyring(&keyring); err != nil { return err } @@ -544,16 +680,16 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co } func (c *Core) InitiateRetryJoin(ctx context.Context) error { - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { - return errors.New("raft storage not configured") - } - - if raftStorage.Initialized() { + raftBackend := c.getRaftBackend() + if raftBackend == nil { return nil } - leaderInfos, err := raftStorage.JoinConfig() + if raftBackend.Initialized() { + return nil + } + + leaderInfos, err := raftBackend.JoinConfig() if err != nil { return err } @@ -573,23 +709,75 @@ func (c *Core) InitiateRetryJoin(ctx context.Context) error { } func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) { - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { - return false, errors.New("raft storage not configured") - } - - if raftStorage.Initialized() { - return false, errors.New("raft storage is already initialized") + raftBackend := c.getRaftBackend() + if raftBackend == nil { + return false, errors.New("raft backend not in use") } init, err := c.Initialized(ctx) if err != nil { return false, errwrap.Wrapf("failed to check if core is initialized: {{err}}", err) } - if init { + + isRaftHAOnly := c.isRaftHAOnly() + // Prevent join from happening if we're using raft for storage and + // it has already been initialized. + if init && !isRaftHAOnly { return true, nil } + // Check on seal status and storage type before proceeding: + // If raft is used for storage, core needs to be sealed + if !isRaftHAOnly && !c.Sealed() { + c.logger.Error("node must be seal before joining") + return false, errors.New("node must be sealed before joining") + } + + // If raft is used for ha-only, core needs to be unsealed + if isRaftHAOnly && c.Sealed() { + c.logger.Error("node must be unsealed before joining") + return false, errors.New("node must be unsealed before joining") + } + + // Disallow leader API address to be provided if we're using raft for HA-only + // The leader API address is obtained directly through storage. This serves + // as a form of verification that this node is sharing the same physical + // storage as the leader node. + if isRaftHAOnly { + for _, info := range leaderInfos { + if info.LeaderAPIAddr != "" { + return false, errors.New("leader API address must be unset when raft is used exclusively for HA") + } + } + + // Get the leader address from storage + keys, err := c.barrier.List(ctx, coreLeaderPrefix) + if err != nil { + return false, err + } + + if len(keys) == 0 || len(keys[0]) == 0 { + return false, errors.New("unable to fetch leadership entry") + } + + leadershipEntry := coreLeaderPrefix + keys[0] + entry, err := c.barrier.Get(ctx, leadershipEntry) + if err != nil { + return false, err + } + if entry == nil { + return false, errors.New("unable to read leadership entry") + } + + var adv activeAdvertisement + err = jsonutil.DecodeJSON(entry.Value, &adv) + if err != nil { + return false, errwrap.Wrapf("unable to decoded leader entry: {{err}}", err) + } + + leaderInfos[0].LeaderAPIAddr = adv.RedirectAddr + } + join := func(retry bool) error { joinLeader := func(leaderInfo *raft.LeaderJoinInfo) error { if leaderInfo == nil { @@ -603,14 +791,11 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo if err != nil { return errwrap.Wrapf("failed to check if core is initialized: {{err}}", err) } - if init { + + if init && !isRaftHAOnly { c.logger.Info("returning from raft join as the node is initialized") return nil } - if !c.Sealed() { - c.logger.Info("returning from raft join as the node is unsealed") - return nil - } c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr) @@ -648,7 +833,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo // Attempt to join the leader by requesting for the bootstrap challenge secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{ - "server_id": raftStorage.NodeID(), + "server_id": raftBackend.NodeID(), }) if err != nil { return errwrap.Wrapf("error during raft bootstrap init call: {{err}}", err) @@ -687,7 +872,10 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo nonVoter: nonVoter, } - if c.seal.BarrierType() == wrapping.Shamir { + // If we're using Shamir and using raft for both physical and HA, we + // need to block until the node is unsealed, unless retry is set to + // false. + if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly { c.raftInfo = raftInfo if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil { return err @@ -708,7 +896,7 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo return errwrap.Wrapf("failed to send answer to raft leader node: {{err}}", err) } - if c.seal.BarrierType() == wrapping.Shamir { + if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly { // Reset the state c.raftInfo = nil @@ -763,20 +951,41 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJo return true, nil } -// This is used in tests to override the cluster address -var UpdateClusterAddrForTests uint32 +// getRaftBackend returns the RaftBackend from the HA or physical backend, +// in that order of preference, or nil if not of type RaftBackend. +func (c *Core) getRaftBackend() *raft.RaftBackend { + var raftBackend *raft.RaftBackend + + if raftHA, ok := c.ha.(*raft.RaftBackend); ok { + raftBackend = raftHA + } + + if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { + raftBackend = raftStorage + } + + return raftBackend +} + +// isRaftHAOnly returns true if c.ha is raft and physical storage is non-raft +func (c *Core) isRaftHAOnly() bool { + _, isRaftHA := c.ha.(*raft.RaftBackend) + _, isRaftStorage := c.underlyingPhysical.(*raft.RaftBackend) + + return isRaftHA && !isRaftStorage +} func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, raftInfo *raftInformation) error { if raftInfo.challenge == nil { return errors.New("raft challenge is nil") } - raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) - if !ok { - return errors.New("raft storage not in use") + raftBackend := c.getRaftBackend() + if raftBackend == nil { + return errors.New("raft backend is not in use") } - if raftStorage.Initialized() { + if raftBackend.Initialized() { return errors.New("raft is already initialized") } @@ -790,7 +999,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, return errwrap.Wrapf("error parsing cluster address: {{err}}", err) } clusterAddr := parsedClusterAddr.Host - if atomic.LoadUint32(&UpdateClusterAddrForTests) == 1 && strings.HasSuffix(clusterAddr, ":0") { + if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") { // We are testing and have an address provider, so just create a random // addr, it will be overwritten later. var err error @@ -804,7 +1013,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, if err := answerReq.SetJSONBody(map[string]interface{}{ "answer": base64.StdEncoding.EncodeToString(plaintext), "cluster_addr": clusterAddr, - "server_id": raftStorage.NodeID(), + "server_id": raftBackend.NodeID(), "non_voter": raftInfo.nonVoter, }); err != nil { return err @@ -823,15 +1032,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, return err } - raftStorage.Bootstrap(ctx, answerResp.Data.Peers) + if err := raftBackend.Bootstrap(answerResp.Data.Peers); err != nil { + return err + } err = c.startClusterListener(ctx) if err != nil { return errwrap.Wrapf("error starting cluster: {{err}}", err) } - raftStorage.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) - err = raftStorage.SetupCluster(ctx, raft.SetupOpts{ + raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) + err = raftBackend.SetupCluster(ctx, raft.SetupOpts{ TLSKeyring: answerResp.Data.TLSKeyring, ClusterListener: c.getClusterListener(), }) @@ -842,6 +1053,57 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, return nil } +// RaftBootstrap performs bootstrapping of a raft cluster if core contains a raft +// backend. If raft is not part for the storage or HA storage backend, this +// call results in an error. +func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error { + if c.logger.IsDebug() { + c.logger.Debug("bootstrapping raft backend") + defer c.logger.Debug("finished bootstrapping raft backend") + } + + raftBackend := c.getRaftBackend() + if raftBackend == nil { + return errors.New("raft backend not in use") + } + + parsedClusterAddr, err := url.Parse(c.ClusterAddr()) + if err != nil { + return errwrap.Wrapf("error parsing cluster address: {{err}}", err) + } + if err := raftBackend.Bootstrap([]raft.Peer{ + { + ID: raftBackend.NodeID(), + Address: parsedClusterAddr.Host, + }, + }); err != nil { + return errwrap.Wrapf("could not bootstrap clustered storage: {{err}}", err) + } + + raftOpts := raft.SetupOpts{ + StartAsLeader: true, + } + + if !onInit { + // Generate the TLS Keyring info for SetupCluster to consume + raftTLS, err := c.raftCreateTLSKeyring(ctx) + if err != nil { + return errwrap.Wrapf("could not generate TLS keyring during bootstrap: {{err}}", err) + } + + raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) + raftOpts.ClusterListener = c.getClusterListener() + + raftOpts.TLSKeyring = raftTLS + } + + if err := raftBackend.SetupCluster(ctx, raftOpts); err != nil { + return errwrap.Wrapf("could not start clustered storage: {{err}}", err) + } + + return nil +} + func (c *Core) isRaftUnseal() bool { return c.raftInfo != nil } diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index 4fa8cfde6..fc6b65ffb 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -8,7 +8,6 @@ import ( "time" "github.com/hashicorp/vault/helper/forwarding" - "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/vault/replication" ) @@ -82,9 +81,11 @@ func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) ( ReplicationState: uint32(s.core.ReplicationState()), } - if raftStorage, ok := s.core.underlyingPhysical.(*raft.RaftBackend); ok { - reply.RaftAppliedIndex = raftStorage.AppliedIndex() - reply.RaftNodeID = raftStorage.NodeID() + if raftBackend := s.core.getRaftBackend(); raftBackend != nil { + if !s.core.isRaftHAOnly() { + reply.RaftAppliedIndex = raftBackend.AppliedIndex() + reply.RaftNodeID = raftBackend.NodeID() + } } return reply, nil @@ -111,9 +112,11 @@ func (c *forwardingClient) startHeartbeat() { ClusterAddr: clusterAddr, } - if raftStorage, ok := c.core.underlyingPhysical.(*raft.RaftBackend); ok { - req.RaftAppliedIndex = raftStorage.AppliedIndex() - req.RaftNodeID = raftStorage.NodeID() + if raftBackend := c.core.getRaftBackend(); raftBackend != nil { + if !c.core.isRaftHAOnly() { + req.RaftAppliedIndex = raftBackend.AppliedIndex() + req.RaftNodeID = raftBackend.NodeID() + } } ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second) diff --git a/vault/testing.go b/vault/testing.go index 6f11fceea..d8ca6aaf0 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -27,11 +27,6 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/mitchellh/copystructure" - testing "github.com/mitchellh/go-testing-interface" - "golang.org/x/crypto/ed25519" - "golang.org/x/net/http2" - cleanhttp "github.com/hashicorp/go-cleanhttp" log "github.com/hashicorp/go-hclog" raftlib "github.com/hashicorp/raft" @@ -53,6 +48,10 @@ import ( physInmem "github.com/hashicorp/vault/sdk/physical/inmem" "github.com/hashicorp/vault/vault/cluster" "github.com/hashicorp/vault/vault/seal" + "github.com/mitchellh/copystructure" + testing "github.com/mitchellh/go-testing-interface" + "golang.org/x/crypto/ed25519" + "golang.org/x/net/http2" ) // This file contains a number of methods that are useful for unit @@ -888,7 +887,10 @@ func CleanupClusters(clusters []*TestCluster) { func (c *TestCluster) Cleanup() { c.Logger.Info("cleaning up vault cluster") for _, core := range c.Cores { - core.CoreConfig.Logger.SetLevel(log.Error) + // Upgrade logger to emit errors if not doing so already + if !core.CoreConfig.Logger.IsError() { + core.CoreConfig.Logger.SetLevel(log.Error) + } } wg := &sync.WaitGroup{} @@ -964,6 +966,7 @@ type TestClusterCore struct { TLSConfig *tls.Config UnderlyingStorage physical.Backend UnderlyingRawStorage physical.Backend + UnderlyingHAStorage physical.HABackend Barrier SecurityBarrier NodeID string } @@ -1506,6 +1509,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te Barrier: cores[i].barrier, NodeID: fmt.Sprintf("core-%d", i), UnderlyingRawStorage: coreConfigs[i].Physical, + UnderlyingHAStorage: coreConfigs[i].HAPhysical, } tcc.ReloadFuncs = &cores[i].reloadFuncs tcc.ReloadFuncsLock = &cores[i].reloadFuncsLock @@ -1675,9 +1679,14 @@ func (testCluster *TestCluster) newCore( case physBundle == nil && coreConfig.Physical == nil: t.Fatal("PhysicalFactory produced no physical and none in CoreConfig") case physBundle != nil: - testCluster.Logger.Info("created physical backend", "instance", idx) - coreConfig.Physical = physBundle.Backend - localConfig.Physical = physBundle.Backend + // Storage backend setup + if physBundle.Backend != nil { + testCluster.Logger.Info("created physical backend", "instance", idx) + coreConfig.Physical = physBundle.Backend + localConfig.Physical = physBundle.Backend + } + + // HA Backend setup haBackend := physBundle.HABackend if haBackend == nil { if ha, ok := physBundle.Backend.(physical.HABackend); ok { @@ -1686,6 +1695,8 @@ func (testCluster *TestCluster) newCore( } coreConfig.HAPhysical = haBackend localConfig.HAPhysical = haBackend + + // Cleanup setup if physBundle.Cleanup != nil { cleanupFunc = physBundle.Cleanup } diff --git a/website/pages/api-docs/system/storage/raft.mdx b/website/pages/api-docs/system/storage/raft.mdx index 52b06fa7f..00c5ec6f2 100644 --- a/website/pages/api-docs/system/storage/raft.mdx +++ b/website/pages/api-docs/system/storage/raft.mdx @@ -141,6 +141,7 @@ $ curl \ This endpoint returns a snapshot of the current state of the raft cluster. The snapshot is returned as binary data and should be redirected to a file. +Unavailable if Raft is used exclusively for `ha_storage`. | Method | Path | | :----- | :--------------------------- | @@ -157,7 +158,8 @@ $ curl \ ## Restore Raft using a snapshot -Installs the provided snapshot, returning the cluster to the state defined in it. +Installs the provided snapshot, returning the cluster to the state defined in +it. Unavailable if Raft is used exclusively for `ha_storage`. | Method | Path | | :----- | :--------------------------- | @@ -178,7 +180,7 @@ $ curl \ Installs the provided snapshot, returning the cluster to the state defined in it. This is same as writing to `/sys/storage/raft/snapshot` except that this bypasses checks ensuring the Autounseal or shamir keys are consistent with the -snapshot data. +snapshot data. Unavailable if Raft is used exclusively for `ha_storage`. | Method | Path | | :----- | :--------------------------------- | diff --git a/website/pages/docs/commands/operator/migrate.mdx b/website/pages/docs/commands/operator/migrate.mdx index 30a34b996..690729f4f 100644 --- a/website/pages/docs/commands/operator/migrate.mdx +++ b/website/pages/docs/commands/operator/migrate.mdx @@ -76,6 +76,10 @@ defined `path`. `node_id` can optionally be set to identify this node. cluster hostname of this node. For more configuration options see the [raft storage configuration documentation](/docs/configuration/storage/raft). +If the original configuration uses "raft" for `ha_storage` a different +`path` needs to be declared for the path in `storage_destination` and the new +configuration for the node post-migration. + ```hcl storage_source "consul" { address = "127.0.0.1:8500" @@ -115,6 +119,9 @@ vault server. After migration the raft cluster will only have a single node. Additional peers should be joined to this node. +If the cluster was previously HA-enabled using "raft" as the `ha_storage`, the +nodes will have to re-join to the migrated node before unsealing. + ## Usage The following flags are available for the `operator migrate` command. diff --git a/website/pages/docs/commands/operator/raft.mdx b/website/pages/docs/commands/operator/raft.mdx index 4685f18bb..462123353 100644 --- a/website/pages/docs/commands/operator/raft.mdx +++ b/website/pages/docs/commands/operator/raft.mdx @@ -28,8 +28,13 @@ Subcommands: This command is used to join a new node as a peer to the Raft cluster. In order to join, there must be at least one existing member of the cluster. If Shamir -seal is in use, then this API will request for the unseal keys to be supplied to -join the cluster. +seal is in use, then unseal keys are to be supplied before or after the +join process, depending on whether it's being used exclusively for HA. + +If raft is used for `storage`, the node must be joined before unsealing and the +`leader-api-addr` argument must be provided. If raft is used for `ha_storage`, +the node must be first unsealed before joining and the `leader-api-addr` must +_not_ be provided. ```text Usage: vault operator raft join [options] @@ -72,7 +77,7 @@ Usage: vault operator raft list-peers ### Example Output -```python +```json { ... "data": { @@ -145,6 +150,8 @@ Usage: vault operator raft snapshot save $ vault operator raft snapshot save raft.snap ``` +~> **Note:** Snapshot is not supported when Raft is used for `ha_storage`. + ### snapshot restore Restores a snapshot of Vault data taken with `vault operator raft snapshot save`. diff --git a/website/pages/docs/concepts/recovery-mode.mdx b/website/pages/docs/concepts/recovery-mode.mdx index a270c8e86..cef18d247 100644 --- a/website/pages/docs/concepts/recovery-mode.mdx +++ b/website/pages/docs/concepts/recovery-mode.mdx @@ -43,3 +43,7 @@ In order to bring the Vault server up reliably, using any node's raft data, recovery mode Vault automatically resizes the cluster to size 1. This means that after having used recovery mode, part of the procedure for returning to active service must include rejoining the raft cluster. + +If Raft is used exclusively for `ha_storage`, recovery mode will not allow for +changes to the Raft data but instead allow for modification of the underlying +physical data that is associated with Vault's storage backend. diff --git a/website/pages/docs/configuration/storage/raft.mdx b/website/pages/docs/configuration/storage/raft.mdx index 26149e1f4..5b974289e 100644 --- a/website/pages/docs/configuration/storage/raft.mdx +++ b/website/pages/docs/configuration/storage/raft.mdx @@ -39,9 +39,6 @@ between the nodes in the Raft cluster. ~> **Note:** When using the Raft storage backend, a separate `ha_storage` backend cannot be declared. -~> **Note:** Raft cannot be used as the configured `ha_storage` backend at this -time. To use Raft for HA coordination users must also use Raft for storage. - ~> **Note:** When using the Raft storage backend, it is strongly recommended to set `disable_mlock` to `true`, and to disable memory swapping on the system.