// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package vault import ( "context" "encoding/base64" "errors" "fmt" "net/http" "net/url" "strings" "sync" "sync/atomic" "time" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-discover" discoverk8s "github.com/hashicorp/go-discover/provider/k8s" "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-secure-stdlib/tlsutil" "github.com/hashicorp/go-uuid" goversion "github.com/hashicorp/go-version" "github.com/hashicorp/vault/api" "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/jsonutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/vault/seal" "github.com/mitchellh/mapstructure" "golang.org/x/net/http2" ) const ( // undoLogMonitorInterval is how often the leader checks to see // if all the cluster members it knows about are new enough to support // undo logs. undoLogMonitorInterval = time.Second // undoLogSafeVersion is the minimum version Vault must be at in order // for undo logs to be turned on. undoLogSafeVersion = "1.12.0" ) var ( raftTLSStoragePath = "core/raft/tls" raftTLSRotationPeriod = 24 * time.Hour raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration" // TestingUpdateClusterAddr is used in tests to override the cluster address TestingUpdateClusterAddr uint32 ErrJoinWithoutAutoloading = errors.New("attempt to join a cluster using autoloaded licenses while not using autoloading ourself") ) // GetRaftNodeID returns the raft node ID if there is one, or an empty string if there's not func (c *Core) GetRaftNodeID() string { rb := c.getRaftBackend() if rb == nil { return "" } else { return rb.NodeID() } } func (c *Core) GetRaftIndexes() (committed uint64, applied uint64) { c.stateLock.RLock() defer c.stateLock.RUnlock() return c.GetRaftIndexesLocked() } func (c *Core) GetRaftIndexesLocked() (committed uint64, applied uint64) { raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend) if !ok { return 0, 0 } return raftStorage.CommittedIndex(), raftStorage.AppliedIndex() } // startRaftBackend will call SetupCluster in the raft backend which starts raft // up and enables the cluster handler. func (c *Core) startRaftBackend(ctx context.Context) (retErr error) { raftBackend := c.getRaftBackend() if raftBackend == nil { return nil } var creating bool var raftTLS *raft.TLSKeyring if !raftBackend.Initialized() { // Retrieve the raft TLS information raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) if err != nil { return err } switch raftTLSEntry { case nil: // If this is HA-only and no TLS keyring is found, that means the // cluster has not been bootstrapped or joined. We return early here in // this case. If we return here, the raft object has not been instantiated, // and a bootstrap call should be made. if c.isRaftHAOnly() { c.logger.Trace("skipping raft backend setup during unseal, no bootstrap operation has been started yet") return nil } // If we did not find a TLS keyring we will attempt to create one here. // This happens after a storage migration process. This node is also // marked to start as leader so we can write the new TLS Key. This is an // error condition if there are already multiple nodes in the cluster, // and the below storage write will fail. If the cluster is somehow in // this state the unseal will fail and a cluster recovery will need to // be done. creating = true raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader) if err != nil { return err } raftTLS = &raft.TLSKeyring{ Keys: []*raft.TLSKey{raftTLSKey}, ActiveKeyID: raftTLSKey.ID, } default: raftTLS = new(raft.TLSKeyring) if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil { return err } } hasState, err := raftBackend.HasState() if err != nil { return err } // This can be hit on follower nodes that got their config updated to use // raft for HA-only before they are joined to the cluster. Since followers // in this case use shared storage, it doesn't return early from the TLS // case above, but there's not raft state yet for the backend to call // raft.SetupCluster. if !hasState { c.logger.Trace("skipping raft backend setup during unseal, no raft state found") return nil } raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) if err := raftBackend.SetupCluster(ctx, raft.SetupOpts{ TLSKeyring: raftTLS, ClusterListener: c.getClusterListener(), StartAsLeader: creating, }); err != nil { return err } } defer func() { if retErr != nil { c.logger.Info("stopping raft server") if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil { c.logger.Error("failed to stop raft server", "error", err) } } }() // If we are in need of creating the TLS keyring then we should write it out // to storage here. If we fail it may mean we couldn't become leader and we // should error out. if creating { c.logger.Info("writing raft TLS keyring to storage") entry, err := logical.StorageEntryJSON(raftTLSStoragePath, raftTLS) if err != nil { c.logger.Error("error marshaling raft TLS keyring", "error", err) return err } if err := c.barrier.Put(ctx, entry); err != nil { c.logger.Error("error writing raft TLS keyring", "error", err) return err } } return nil } func (c *Core) monitorUndoLogs() error { logger := c.logger.Named("undo-log-watcher") logger.Debug("starting undo log watcher") ctx := c.activeContext raftBackend := c.getRaftBackend() // First check storage and bail early if we already know undo logs are safe persisted, err := c.UndoLogsPersisted() if err != nil { return fmt.Errorf("error checking for undo logs persistence: %w", err) } if persisted { c.EnableUndoLogs() logger.Debug("undo logs are safe, no need to check any more") return nil } // If undo logs have been explicitly enabled, likely via VAULT_REPLICATION_USE_UNDO_LOGS, then exit, as presumably // we don't want to be checking for safety if we already know it's safe. if c.UndoLogsEnabled() { logger.Debug("undo logs have been explicitly enabled. exiting monitor.") return nil } minimumVersion, err := goversion.NewSemver(undoLogSafeVersion) if err != nil { return fmt.Errorf("minimum undo log version (%q) won't parse: %w", undoLogSafeVersion, err) } go func() { ticker := time.NewTicker(undoLogMonitorInterval) defer ticker.Stop() logger.Debug("undo logs have not been enabled yet, possibly due to a recent upgrade. starting a periodic check") for { select { case <-ticker.C: case <-ctx.Done(): return } // Check the raft configuration for expected servers config, err := raftBackend.GetConfiguration(ctx) if err != nil { logger.Error("couldn't read raft config", "error", err) continue } // This tracks which servers we expect to find in the cluster, from the raft config. expectedServers := make(map[string]struct{}) for _, server := range config.Servers { expectedServers[server.Address] = struct{}{} } // Retrieve all the nodes in our cluster nodes, err := c.getHAMembers() if err != nil { logger.Error("error getting HA members", "error", err) continue } // Check the versions of all of the cluster members. If they're all >= 1.12, undo logs are safe to enable. // If any are < 1.12, undo logs should remain off, regardless of how it was initially configured. enable := true for _, node := range nodes { nodeVersion, err := goversion.NewSemver(node.Version) if err != nil { logger.Error("error parsing node version", "node version", node.Version, "node", node.ClusterAddress, "error", err) break } if nodeVersion.LessThan(minimumVersion) { logger.Debug("node version is less than the minimum, disabling undo logs", "node", node.ClusterAddress, "version", node.Version) enable = false break } else { // Raft nodes have their address listed without a scheme, e.g. 127.0.0.1:8201. HA nodes that we get from // getHAMembers() have their address listed with a scheme, e.g. https://127.0.0.1:8201. So we need to // parse the HA node address and reconstruct it ourselves to get a matching hash key. clusterAddr, err := url.Parse(node.ClusterAddress) if err != nil { logger.Error("error parsing node cluster address", "node", node.ClusterAddress, "error", err) break } // Deleting from expectedServers means the node in question is running a Vault version greater than // or equal to the minimum. delete(expectedServers, clusterAddr.Host) } } // If expectedServers still has nodes in it, that means either we broke from the above loop because some // node's version was too low or because some member of the cluster hasn't sent an echo yet. Either way, // it means we can't enable undo logs. if len(expectedServers) != 0 { enable = false } if enable { logger.Debug("undo logs can be safely enabled now") err := c.PersistUndoLogs() if err != nil { logger.Error("error persisting undo logs safety", "error", err) continue } logger.Debug("undo logs have been enabled and this has been persisted to storage. shutting down the checker.") return } } }() return nil } func (c *Core) setupRaftActiveNode(ctx context.Context) error { raftBackend := c.getRaftBackend() if raftBackend == nil { return nil } c.logger.Info("starting raft active node") raftBackend.SetEffectiveSDKVersion(c.effectiveSDKVersion) autopilotConfig, err := c.loadAutopilotConfiguration(ctx) if err != nil { c.logger.Error("failed to load autopilot config from storage when setting up cluster; continuing since autopilot falls back to default config", "error", err) } disableAutopilot := c.disableAutopilot raftBackend.SetupAutopilot(c.activeContext, autopilotConfig, c.raftFollowerStates, disableAutopilot) c.pendingRaftPeers = &sync.Map{} // Reload the raft TLS keys to ensure we are using the latest version. if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil { return err } // We always want to start this watcher - if undo logs are safe to be enabled, it will exit quickly. If not, it // will monitor for safety until they are enabled. if err := c.monitorUndoLogs(); err != nil { return err } return c.startPeriodicRaftTLSRotate(ctx) } func (c *Core) stopRaftActiveNode() { raftBackend := c.getRaftBackend() if raftBackend == nil { return } c.logger.Info("stopping raft active node") if !raftBackend.AutopilotDisabled() { raftBackend.StopAutopilot() } c.pendingRaftPeers = nil c.stopPeriodicRaftTLSRotate() } func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error { raftBackend := c.getRaftBackend() // No-op if raft is not being used if raftBackend == nil { return nil } c.raftTLSRotationStopCh = make(chan struct{}) logger := c.logger.Named("raft") c.AddLogger(logger) if c.isRaftHAOnly() { return c.raftTLSRotateDirect(ctx, logger, c.raftTLSRotationStopCh) } return c.raftTLSRotatePhased(ctx, logger, raftBackend, c.raftTLSRotationStopCh) } // raftTLSRotateDirect will spawn a go routine in charge of periodically // rotating the TLS certs and keys used for raft traffic. // // The logic for updating the TLS keyring is through direct storage update. This // is called whenever raft is used for HA-only, which means that the underlying // storage is a shared physical object, thus requiring no additional // coordination. func (c *Core) raftTLSRotateDirect(ctx context.Context, logger hclog.Logger, stopCh chan struct{}) error { logger.Info("creating new raft TLS config") rotateKeyring := func() (time.Time, error) { // Create a new key raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader) if err != nil { return time.Time{}, fmt.Errorf("failed to generate new raft TLS key: %w", err) } // Read the existing keyring keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return time.Time{}, fmt.Errorf("failed to read raft TLS keyring: %w", err) } // Advance the term and store the new key, replacing the old one. // Unlike phased rotation, we don't need to update AppliedIndex since // we don't rely on it to check whether the followers got the key. A // shared storage means that followers will have the key as soon as it's // written to storage. keyring.Term += 1 keyring.Keys[0] = raftTLSKey keyring.ActiveKeyID = raftTLSKey.ID entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err) } if err := c.barrier.Put(ctx, entry); err != nil { return time.Time{}, fmt.Errorf("failed to write keyring: %w", err) } logger.Info("wrote new raft TLS config") // Schedule the next rotation return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil } // Read the keyring to calculate the time of next rotation. keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return err } activeKey := keyring.GetActive() if activeKey == nil { return errors.New("no active raft TLS key found") } go func() { nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod) var backoff bool for { // If we encountered and error we should try to create the key // again. if backoff { nextRotationTime = time.Now().Add(10 * time.Second) backoff = false } timer := time.NewTimer(time.Until(nextRotationTime)) select { case <-timer.C: // It's time to rotate the keys next, err := rotateKeyring() if err != nil { logger.Error("failed to rotate TLS key", "error", err) backoff = true continue } nextRotationTime = next case <-stopCh: timer.Stop() return } } }() return nil } // raftTLSRotatePhased will spawn a go routine in charge of periodically // rotating the TLS certs and keys used for raft traffic. // // The logic for updating the TLS certificate uses a pseudo two phase commit // using the known applied indexes from standby nodes. When writing a new Key // it will be appended to the end of the keyring. Standbys can start accepting // connections with this key as soon as they see the update. Then it will write // the keyring a second time indicating the applied index for this key update. // // The active node will wait until it sees all standby nodes are at or past the // applied index for this update. At that point it will delete the older key // and make the new key active. The key isn't officially in use until this // happens. The dual write ensures the standby at least gets the first update // containing the key before the active node switches over to using it. // // If a standby is shut down then it cannot advance the key term until it // receives the update. This ensures a standby node isn't left behind and unable // to reconnect with the cluster. Additionally, only one outstanding key // is allowed for this same reason (max keyring size of 2). func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raftBackend *raft.RaftBackend, stopCh chan struct{}) error { followerStates := c.raftFollowerStates followerStates.Clear() // Pre-populate the follower list with the set of peers. raftConfig, err := raftBackend.GetConfiguration(ctx) if err != nil { return err } for _, server := range raftConfig.Servers { if server.NodeID != raftBackend.NodeID() { followerStates.Update(&raft.EchoRequestUpdate{ NodeID: server.NodeID, AppliedIndex: 0, Term: 0, DesiredSuffrage: "voter", }) } } // rotateKeyring writes new key data to the keyring and adds an applied // index that is used to verify it has been committed. The keys written in // this function can be used on standbys but the active node doesn't start // using it yet. rotateKeyring := func() (time.Time, error) { // Read the existing keyring keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return time.Time{}, fmt.Errorf("failed to read raft TLS keyring: %w", err) } switch { case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0: // If this case is hit then the second write to add the applied // index failed. Attempt to write it again. keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex() keyring.AppliedIndex = raftBackend.AppliedIndex() entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err) } if err := c.barrier.Put(ctx, entry); err != nil { return time.Time{}, fmt.Errorf("failed to write keyring: %w", err) } case len(keyring.Keys) > 1: // If there already exists a pending key update then the update // hasn't replicated down to all standby nodes yet. Don't allow any // new keys to be created until all standbys have seen this previous // rotation. As a backoff strategy, another rotation attempt is // scheduled for 5 minutes from now. logger.Warn("skipping new raft TLS config creation, keys are pending") return time.Now().Add(time.Minute * 5), nil } logger.Info("creating new raft TLS config") // Create a new key raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader) if err != nil { return time.Time{}, fmt.Errorf("failed to generate new raft TLS key: %w", err) } // Advance the term and store the new key keyring.Term += 1 keyring.Keys = append(keyring.Keys, raftTLSKey) entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err) } if err := c.barrier.Put(ctx, entry); err != nil { return time.Time{}, fmt.Errorf("failed to write keyring: %w", err) } // Write the keyring again with the new applied index. This allows us to // track if standby nodes received the update. keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex() keyring.AppliedIndex = raftBackend.AppliedIndex() entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err) } if err := c.barrier.Put(ctx, entry); err != nil { return time.Time{}, fmt.Errorf("failed to write keyring: %w", err) } logger.Info("wrote new raft TLS config") // Schedule the next rotation return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil } // checkCommitted verifies key updates have been applied to all nodes and // finalizes the rotation by deleting the old keys and updating the raft // backend. checkCommitted := func() error { keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return fmt.Errorf("failed to read raft TLS keyring: %w", err) } switch { case len(keyring.Keys) == 1: // No Keys to apply return nil case keyring.Keys[1].AppliedIndex != keyring.AppliedIndex: // We haven't fully committed the new key, continue here return nil case followerStates.MinIndex() < keyring.AppliedIndex: // Not all the followers have applied the latest key return nil } // Upgrade to the new key keyring.Keys = keyring.Keys[1:] keyring.ActiveKeyID = keyring.Keys[0].ID keyring.Term += 1 entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return fmt.Errorf("failed to json encode keyring: %w", err) } if err := c.barrier.Put(ctx, entry); err != nil { return fmt.Errorf("failed to write keyring: %w", err) } // Update the TLS Key in the backend if err := raftBackend.SetTLSKeyring(keyring); err != nil { return fmt.Errorf("failed to install keyring: %w", err) } logger.Info("installed new raft TLS key", "term", keyring.Term) return nil } // Read the keyring to calculate the time of next rotation. keyring, err := c.raftReadTLSKeyring(ctx) if err != nil { return err } activeKey := keyring.GetActive() if activeKey == nil { return errors.New("no active raft TLS key found") } getNextRotationTime := func(next time.Time) time.Time { now := time.Now() // active key's CreatedTime + raftTLSRotationPeriod might be in // the past (meaning it is ready to be rotated) which will cause // NewTicker to panic when used with time.Until, prevent this by // pushing out rotation time into very near future if next.Before(now) { return now.Add(1 * time.Minute) } // push out to ensure proposed time does not elapse return next.Add(10 * time.Second) } // Start the process in a go routine go func() { nextRotationTime := getNextRotationTime(activeKey.CreatedTime.Add(raftTLSRotationPeriod)) keyCheckInterval := time.NewTicker(1 * time.Minute) defer keyCheckInterval.Stop() // ticker is used to prevent memory leak of using time.After in // for - select pattern. ticker := time.NewTicker(time.Until(nextRotationTime)) defer ticker.Stop() for { select { case <-keyCheckInterval.C: err := checkCommitted() if err != nil { logger.Error("failed to activate TLS key", "error", err) } case <-ticker.C: // It's time to rotate the keys next, err := rotateKeyring() if err != nil { logger.Error("failed to rotate TLS key", "error", err) nextRotationTime = time.Now().Add(10 * time.Second) } else { nextRotationTime = getNextRotationTime(next) } ticker.Reset(time.Until(nextRotationTime)) case <-stopCh: return } } }() return nil } func (c *Core) raftReadTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) { tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) if err != nil { return nil, err } if tlsKeyringEntry == nil { return nil, errors.New("no keyring found") } var keyring raft.TLSKeyring if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { return nil, err } return &keyring, nil } // raftCreateTLSKeyring creates the initial TLS key and the TLS Keyring for raft // use. If a keyring entry is already present in storage, it will return an // error. func (c *Core) raftCreateTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) { if raftBackend := c.getRaftBackend(); raftBackend == nil { return nil, fmt.Errorf("raft backend not in use") } // Check if the keyring is already present raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) if err != nil { return nil, err } if raftTLSEntry != nil { return nil, fmt.Errorf("TLS keyring already present") } raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader) if err != nil { return nil, err } keyring := &raft.TLSKeyring{ Keys: []*raft.TLSKey{raftTLS}, ActiveKeyID: raftTLS.ID, } entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring) if err != nil { return nil, err } if err := c.barrier.Put(ctx, entry); err != nil { return nil, err } return keyring, nil } func (c *Core) stopPeriodicRaftTLSRotate() { if c.raftTLSRotationStopCh != nil { close(c.raftTLSRotationStopCh) } c.raftTLSRotationStopCh = nil c.raftFollowerStates.Clear() } func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error { raftBackend := c.getRaftBackend() if raftBackend == nil { return nil } tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath) if err != nil { return err } if tlsKeyringEntry == nil { return nil } var keyring raft.TLSKeyring if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil { return err } if err := raftBackend.SetTLSKeyring(&keyring); err != nil { return err } return nil } // handleSnapshotRestore is for the raft backend to hook back into core after a // snapshot is restored so we can clear the necessary caches and handle changing // keyrings or root keys func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(context.Context) error { return func(ctx context.Context) (retErr error) { c.logger.Info("running post snapshot restore invalidations") if grabLock { // Grab statelock l := newLockGrabber(c.stateLock.Lock, c.stateLock.Unlock, c.standbyStopCh.Load().(chan struct{})) go l.grab() if stopped := l.lockOrStop(); stopped { c.logger.Error("did not apply snapshot; vault is shutting down") return errors.New("did not apply snapshot; vault is shutting down") } defer c.stateLock.Unlock() } if sealNode { // If we failed to restore the snapshot we should seal this node as // it's in an unknown state defer func() { if retErr != nil { if err := c.sealInternalWithOptions(false, false, true); err != nil { c.logger.Error("failed to seal node", "error", err) } } }() } // Purge the cache so we make sure we are operating on fresh data c.physicalCache.Purge(ctx) // Reload the keyring in case it changed. If this fails it's likely // we've changed root keys. err := c.performKeyUpgrades(ctx) if err != nil { // The snapshot contained a root key or keyring we couldn't // recover switch c.seal.BarrierType() { case wrapping.WrapperTypeShamir: // If we are a shamir seal we can't do anything. Just // seal all nodes. // Seal ourselves c.logger.Info("failed to perform key upgrades, sealing", "error", err) return err default: // If we are using an auto-unseal we can try to use the seal to // unseal again. If the auto-unseal mechanism has changed then // there isn't anything we can do but seal. c.logger.Info("failed to perform key upgrades, reloading using auto seal") keys, err := c.seal.GetStoredKeys(ctx) if err != nil { c.logger.Error("raft snapshot restore failed to get stored keys", "error", err) return err } if err := c.barrier.Seal(); err != nil { c.logger.Error("raft snapshot restore failed to seal barrier", "error", err) return err } if err := c.barrier.Unseal(ctx, keys[0]); err != nil { c.logger.Error("raft snapshot restore failed to unseal barrier", "error", err) return err } c.logger.Info("done reloading root key using auto seal") } } // Refresh the raft TLS keys if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil { c.logger.Info("failed to perform TLS key upgrades, sealing", "error", err) return err } return nil } } func (c *Core) InitiateRetryJoin(ctx context.Context) error { raftBackend := c.getRaftBackend() if raftBackend == nil { return nil } if raftBackend.Initialized() { return nil } leaderInfos, err := raftBackend.JoinConfig() if err != nil { return err } // Nothing to do if config wasn't supplied if len(leaderInfos) == 0 { return nil } c.logger.Info("raft retry join initiated") if _, err = c.JoinRaftCluster(ctx, leaderInfos, raftBackend.NonVoter()); err != nil { return err } return nil } // getRaftChallenge is a helper function used by the raft join process for adding a // node to a cluster: it contacts the given node and initiates the bootstrap // challenge, returning the result or an error. func (c *Core) getRaftChallenge(leaderInfo *raft.LeaderJoinInfo) (*raftInformation, error) { if leaderInfo == nil { return nil, errors.New("raft leader information is nil") } if len(leaderInfo.LeaderAPIAddr) == 0 { return nil, errors.New("raft leader address not provided") } c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderInfo.LeaderAPIAddr) // Create an API client to interact with the leader node transport := cleanhttp.DefaultPooledTransport() var err error if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) { leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey)) if err != nil { return nil, fmt.Errorf("failed to create TLS config: %w", err) } leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName } if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" { leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "") if err != nil { return nil, fmt.Errorf("failed to create TLS config: %w", err) } } if leaderInfo.TLSConfig != nil { transport.TLSClientConfig = leaderInfo.TLSConfig.Clone() if err := http2.ConfigureTransport(transport); err != nil { return nil, fmt.Errorf("failed to configure TLS: %w", err) } } client := &http.Client{ Transport: transport, } config := api.DefaultConfig() if config.Error != nil { return nil, fmt.Errorf("failed to create api client: %w", config.Error) } config.Address = leaderInfo.LeaderAPIAddr config.HttpClient = client config.MaxRetries = 0 apiClient, err := api.NewClient(config) if err != nil { return nil, fmt.Errorf("failed to create api client: %w", err) } // Clearing namespace, as this client should only ever be using the root namespace apiClient.ClearNamespace() // Attempt to join the leader by requesting for the bootstrap challenge secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{ "server_id": c.getRaftBackend().NodeID(), }) if err != nil { return nil, fmt.Errorf("error during raft bootstrap init call: %w", err) } if secret == nil { return nil, errors.New("could not retrieve raft bootstrap package") } var sealConfig SealConfig err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig) if err != nil { return nil, err } if sealConfig.Type != c.seal.BarrierType().String() { return nil, fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType()) } challengeB64, ok := secret.Data["challenge"] if !ok { return nil, errors.New("error during raft bootstrap call, no challenge given") } challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string)) if err != nil { return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err) } eBlob := &wrapping.BlobInfo{} if err := proto.Unmarshal(challengeRaw, eBlob); err != nil { return nil, fmt.Errorf("error decoding raft bootstrap challenge: %w", err) } return &raftInformation{ challenge: eBlob, leaderClient: apiClient, leaderBarrierConfig: &sealConfig, }, nil } func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) { raftBackend := c.getRaftBackend() if raftBackend == nil { return false, errors.New("raft backend not in use") } init, err := c.InitializedLocally(ctx) if err != nil { return false, fmt.Errorf("failed to check if core is initialized: %w", err) } isRaftHAOnly := c.isRaftHAOnly() // Prevent join from happening if we're using raft for storage and // it has already been initialized. if init && !isRaftHAOnly { return true, nil } // Check on seal status and storage type before proceeding: // If raft is used for storage, core needs to be sealed if !isRaftHAOnly && !c.Sealed() { c.logger.Error("node must be sealed before joining") return false, errors.New("node must be sealed before joining") } // If raft is used for ha-only, core needs to be unsealed if isRaftHAOnly && c.Sealed() { c.logger.Error("node must be unsealed before joining") return false, errors.New("node must be unsealed before joining") } // Disallow leader API address to be provided if we're using raft for HA-only. // The leader API address is obtained directly through storage. This serves // as a form of verification that this node is sharing the same physical // storage as the leader node. if isRaftHAOnly { for _, info := range leaderInfos { if info.LeaderAPIAddr != "" || info.AutoJoin != "" { return false, errors.New("leader API address and auto-join metadata must be unset when raft is used exclusively for HA") } } // Get the leader address from storage keys, err := c.barrier.List(ctx, coreLeaderPrefix) if err != nil { return false, err } if len(keys) == 0 || len(keys[0]) == 0 { return false, errors.New("unable to fetch leadership entry") } leadershipEntry := coreLeaderPrefix + keys[0] entry, err := c.barrier.Get(ctx, leadershipEntry) if err != nil { return false, err } if entry == nil { return false, errors.New("unable to read leadership entry") } var adv activeAdvertisement err = jsonutil.DecodeJSON(entry.Value, &adv) if err != nil { return false, fmt.Errorf("unable to decoded leader entry: %w", err) } leaderInfos[0].LeaderAPIAddr = adv.RedirectAddr } disco, err := newDiscover() if err != nil { return false, fmt.Errorf("failed to create auto-join discovery: %w", err) } retryFailures := leaderInfos[0].Retry // answerChallenge performs the second part of a raft join: after we've issued // the sys/storage/raft/bootstrap/challenge call to initiate the join, this // func uses the seal to compute an answer to the challenge and sends it // back to the server that provided the challenge. answerChallenge := func(ctx context.Context, raftInfo *raftInformation) error { // If we're using Shamir and using raft for both physical and HA, we // need to block until the node is unsealed, unless retry is set to // false. if c.seal.BarrierType() == wrapping.WrapperTypeShamir && !c.isRaftHAOnly() { c.raftInfo.Store(raftInfo) if err := c.seal.SetBarrierConfig(ctx, raftInfo.leaderBarrierConfig); err != nil { return err } if !retryFailures { return nil } // Wait until unseal keys are supplied raftInfo.joinInProgress = true c.raftInfo.Store(raftInfo) if atomic.LoadUint32(c.postUnsealStarted) != 1 { return errors.New("waiting for unseal keys to be supplied") } } raftInfo.nonVoter = nonVoter if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil { return fmt.Errorf("failed to send answer to raft leader node: %w", err) } if c.seal.BarrierType() == wrapping.WrapperTypeShamir && !isRaftHAOnly { // Reset the state c.raftInfo.Store((*raftInformation)(nil)) // In case of Shamir unsealing, inform the unseal process that raft join is completed close(c.raftJoinDoneCh) } c.logger.Info("successfully joined the raft cluster", "leader_addr", raftInfo.leaderClient.Address()) return nil } // join attempts to join to any of the leaders defined in leaderInfos, // using the first one that returns a challenge to our request. If shamir // seal is in use, we must wait to get enough unseal keys to solve the // challenge. If we're unable to get a challenge from any leader, or if // we fail to answer the challenge successfully, or if ctx times out, // an error is returned. join := func() error { init, err := c.InitializedLocally(ctx) if err != nil { return fmt.Errorf("failed to check if core is initialized: %w", err) } // InitializedLocally will return non-nil before HA backends are // initialized. c.Initialized(ctx) checks InitializedLocally first, so // we can't use that generically for both cases. Instead check // raftBackend.Initialized() directly for the HA-Only case. if (!isRaftHAOnly && init) || (isRaftHAOnly && raftBackend.Initialized()) { c.logger.Info("returning from raft join as the node is initialized") return nil } if err := raftBackend.SetDesiredSuffrage(nonVoter); err != nil { c.logger.Error("failed to set desired suffrage for this node", "error", err) return nil } challengeCh := make(chan *raftInformation) var expandedJoinInfos []*raft.LeaderJoinInfo for _, leaderInfo := range leaderInfos { joinInfos, err := c.raftLeaderInfo(leaderInfo, disco) if err != nil { c.logger.Error("error in retry_join stanza, will not use it for raft join", "error", err, "leader_api_addr", leaderInfo.LeaderAPIAddr, "auto_join", leaderInfo.AutoJoin != "") continue } expandedJoinInfos = append(expandedJoinInfos, joinInfos...) } if err != nil { return err } var wg sync.WaitGroup for i := range expandedJoinInfos { wg.Add(1) go func(joinInfo *raft.LeaderJoinInfo) { defer wg.Done() raftInfo, err := c.getRaftChallenge(joinInfo) if err != nil { c.Logger().Error("failed to get raft challenge", "leader_addr", joinInfo.LeaderAPIAddr, "error", err) return } challengeCh <- raftInfo }(expandedJoinInfos[i]) } go func() { wg.Wait() close(challengeCh) }() select { case <-ctx.Done(): err = ctx.Err() case raftInfo := <-challengeCh: if raftInfo != nil { err = answerChallenge(ctx, raftInfo) if err == nil { return nil } } else { // Return an error so we can retry_join err = fmt.Errorf("failed to get raft challenge") } } return err } switch retryFailures { case true: go func() { for { select { case <-ctx.Done(): return default: } err := join() if err == nil { return } c.logger.Error("failed to retry join raft cluster", "retry", "2s", "err", err) time.Sleep(2 * time.Second) } }() // Backgrounded so return false return false, nil default: if err := join(); err != nil { c.logger.Error("failed to join raft cluster", "error", err) return false, fmt.Errorf("failed to join raft cluster: %w", err) } } return true, nil } // raftLeaderInfo uses go-discover to expand leaderInfo to include any auto-join results func (c *Core) raftLeaderInfo(leaderInfo *raft.LeaderJoinInfo, disco *discover.Discover) ([]*raft.LeaderJoinInfo, error) { var ret []*raft.LeaderJoinInfo switch { case leaderInfo.LeaderAPIAddr != "" && leaderInfo.AutoJoin != "": return nil, errors.New("cannot provide both leader address and auto-join metadata") case leaderInfo.LeaderAPIAddr != "": ret = append(ret, leaderInfo) case leaderInfo.AutoJoin != "": scheme := leaderInfo.AutoJoinScheme if scheme == "" { // default to HTTPS when no scheme is provided scheme = "https" } port := leaderInfo.AutoJoinPort if port == 0 { // default to 8200 when no port is provided port = 8200 } // Addrs returns either IPv4 or IPv6 address, without scheme or port clusterIPs, err := disco.Addrs(leaderInfo.AutoJoin, c.logger.StandardLogger(nil)) if err != nil { return nil, fmt.Errorf("failed to parse addresses from auto-join metadata: %w", err) } for _, ip := range clusterIPs { if strings.Count(ip, ":") >= 2 && !strings.HasPrefix(ip, "[") { // An IPv6 address in implicit form, however we need it in explicit form to use in a URL. ip = fmt.Sprintf("[%s]", ip) } u := fmt.Sprintf("%s://%s:%d", scheme, ip, port) info := *leaderInfo info.LeaderAPIAddr = u ret = append(ret, &info) } default: return nil, errors.New("must provide leader address or auto-join metadata") } return ret, nil } // NewDelegateForCore creates a raft.Delegate for the specified core using its backend. func NewDelegateForCore(c *Core) *raft.Delegate { return raft.NewDelegate(c.getRaftBackend()) } // getRaftBackend returns the RaftBackend from the HA or physical backend, // in that order of preference, or nil if not of type RaftBackend. func (c *Core) getRaftBackend() *raft.RaftBackend { var raftBackend *raft.RaftBackend if raftHA, ok := c.ha.(*raft.RaftBackend); ok { raftBackend = raftHA } if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { raftBackend = raftStorage } return raftBackend } // isRaftHAOnly returns true if c.ha is raft and physical storage is non-raft func (c *Core) isRaftHAOnly() bool { _, isRaftHA := c.ha.(*raft.RaftBackend) _, isRaftStorage := c.underlyingPhysical.(*raft.RaftBackend) return isRaftHA && !isRaftStorage } func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, raftInfo *raftInformation) error { if raftInfo.challenge == nil { return errors.New("raft challenge is nil") } raftBackend := c.getRaftBackend() if raftBackend == nil { return errors.New("raft backend is not in use") } if raftBackend.Initialized() { return errors.New("raft is already initialized") } plaintext, err := sealAccess.Decrypt(ctx, raftInfo.challenge, nil) if err != nil { return fmt.Errorf("error decrypting challenge: %w", err) } parsedClusterAddr, err := url.Parse(c.ClusterAddr()) if err != nil { return fmt.Errorf("error parsing cluster address: %w", err) } clusterAddr := parsedClusterAddr.Host if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") { // We are testing and have an address provider, so just create a random // addr, it will be overwritten later. var err error clusterAddr, err = uuid.GenerateUUID() if err != nil { return err } } answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer") if err := answerReq.SetJSONBody(map[string]interface{}{ "answer": base64.StdEncoding.EncodeToString(plaintext), "cluster_addr": clusterAddr, "server_id": raftBackend.NodeID(), "non_voter": raftInfo.nonVoter, }); err != nil { return err } answerRespJson, err := raftInfo.leaderClient.RawRequestWithContext(ctx, answerReq) if answerRespJson != nil { defer answerRespJson.Body.Close() } if err != nil { return err } var answerResp answerRespData if err := jsonutil.DecodeJSONFromReader(answerRespJson.Body, &answerResp); err != nil { return err } if answerResp.Data.AutoloadedLicense && !LicenseAutoloaded(c) { return ErrJoinWithoutAutoloading } if err := raftBackend.Bootstrap(answerResp.Data.Peers); err != nil { return err } err = c.startClusterListener(ctx) if err != nil { return fmt.Errorf("error starting cluster: %w", err) } raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) opts := raft.SetupOpts{ TLSKeyring: answerResp.Data.TLSKeyring, ClusterListener: c.getClusterListener(), } err = raftBackend.SetupCluster(ctx, opts) if err != nil { return fmt.Errorf("failed to setup raft cluster: %w", err) } return nil } func (c *Core) loadAutopilotConfiguration(ctx context.Context) (*raft.AutopilotConfig, error) { var autopilotConfig *raft.AutopilotConfig entry, err := c.barrier.Get(ctx, raftAutopilotConfigurationStoragePath) if err != nil { return nil, err } if entry == nil { return nil, nil } if err := jsonutil.DecodeJSON(entry.Value, &autopilotConfig); err != nil { return nil, err } return autopilotConfig, nil } // RaftBootstrap performs bootstrapping of a raft cluster if core contains a raft // backend. If raft is not part for the storage or HA storage backend, this // call results in an error. func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error { if c.logger.IsDebug() { c.logger.Debug("bootstrapping raft backend") defer c.logger.Debug("finished bootstrapping raft backend") } raftBackend := c.getRaftBackend() if raftBackend == nil { return errors.New("raft backend not in use") } parsedClusterAddr, err := url.Parse(c.ClusterAddr()) if err != nil { return fmt.Errorf("error parsing cluster address: %w", err) } if err := raftBackend.Bootstrap([]raft.Peer{ { ID: raftBackend.NodeID(), Address: parsedClusterAddr.Host, }, }); err != nil { return fmt.Errorf("could not bootstrap clustered storage: %w", err) } raftOpts := raft.SetupOpts{ StartAsLeader: true, } if !onInit { // Generate the TLS Keyring info for SetupCluster to consume raftTLS, err := c.raftCreateTLSKeyring(ctx) if err != nil { return fmt.Errorf("could not generate TLS keyring during bootstrap: %w", err) } raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true)) raftOpts.ClusterListener = c.getClusterListener() raftOpts.TLSKeyring = raftTLS } if err := raftBackend.SetupCluster(ctx, raftOpts); err != nil { return fmt.Errorf("could not start clustered storage: %w", err) } return nil } func (c *Core) isRaftUnseal() bool { return c.raftInfo.Load().(*raftInformation) != nil } type answerRespData struct { Data answerResp `json:"data"` } type answerResp struct { Peers []raft.Peer `json:"peers"` TLSKeyring *raft.TLSKeyring `json:"tls_keyring"` AutoloadedLicense bool `json:"autoloaded_license"` } func newDiscover() (*discover.Discover, error) { providers := make(map[string]discover.Provider) for k, v := range discover.Providers { providers[k] = v } providers["k8s"] = &discoverk8s.Provider{} return discover.New( discover.WithProviders(providers), ) }