b4b61efc75
* Auto-join support for IPv6 discovery The go-discover library returns IP addresses and not URLs. It just so happens net.URL parses "127.0.0.1", which isn't a valid URL. Instead, we construct the URL ourselves. Being careful to check if it's an ipv6 address and making sure it's in explicit form if so. Fixes #12323 * feedback: addrs & ipv6 test Rename addrs to clusterIPs to improve clarity and intent Tighten up our IPv6 address detection to be more correct and to ensure it's actually in implicit form
1218 lines
36 KiB
Go
1218 lines
36 KiB
Go
package vault
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
|
"github.com/hashicorp/go-discover"
|
|
discoverk8s "github.com/hashicorp/go-discover/provider/k8s"
|
|
"github.com/hashicorp/go-hclog"
|
|
wrapping "github.com/hashicorp/go-kms-wrapping"
|
|
"github.com/hashicorp/go-secure-stdlib/tlsutil"
|
|
uuid "github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/vault/api"
|
|
"github.com/hashicorp/vault/physical/raft"
|
|
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
|
"github.com/hashicorp/vault/sdk/logical"
|
|
"github.com/hashicorp/vault/vault/seal"
|
|
"github.com/mitchellh/mapstructure"
|
|
"golang.org/x/net/http2"
|
|
)
|
|
|
|
var (
|
|
raftTLSStoragePath = "core/raft/tls"
|
|
raftTLSRotationPeriod = 24 * time.Hour
|
|
|
|
raftAutopilotConfigurationStoragePath = "core/raft/autopilot/configuration"
|
|
|
|
// TestingUpdateClusterAddr is used in tests to override the cluster address
|
|
TestingUpdateClusterAddr uint32
|
|
|
|
ErrJoinWithoutAutoloading = errors.New("attempt to join a cluster using autoloaded licenses while not using autoloading ourself")
|
|
)
|
|
|
|
// GetRaftNodeID returns the raft node ID if there is one, or an empty string if there's not
|
|
func (c *Core) GetRaftNodeID() string {
|
|
rb := c.getRaftBackend()
|
|
|
|
if rb == nil {
|
|
return ""
|
|
} else {
|
|
return rb.NodeID()
|
|
}
|
|
}
|
|
|
|
func (c *Core) GetRaftIndexes() (committed uint64, applied uint64) {
|
|
c.stateLock.RLock()
|
|
defer c.stateLock.RUnlock()
|
|
|
|
raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
|
if !ok {
|
|
return 0, 0
|
|
}
|
|
|
|
return raftStorage.CommittedIndex(), raftStorage.AppliedIndex()
|
|
}
|
|
|
|
// startRaftBackend will call SetupCluster in the raft backend which starts raft
|
|
// up and enables the cluster handler.
|
|
func (c *Core) startRaftBackend(ctx context.Context) (retErr error) {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return nil
|
|
}
|
|
|
|
// Retrieve the raft TLS information
|
|
raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var creating bool
|
|
var raftTLS *raft.TLSKeyring
|
|
switch raftTLSEntry {
|
|
case nil:
|
|
// If this is HA-only and no TLS keyring is found, that means the
|
|
// cluster has not been bootstrapped or joined. We return early here in
|
|
// this case. If we return here, the raft object has not been instantiated,
|
|
// and a bootstrap call should be made.
|
|
if c.isRaftHAOnly() {
|
|
c.logger.Trace("skipping raft backend setup during unseal, no bootstrap operation has been started yet")
|
|
return nil
|
|
}
|
|
|
|
// If we did not find a TLS keyring we will attempt to create one here.
|
|
// This happens after a storage migration process. This node is also
|
|
// marked to start as leader so we can write the new TLS Key. This is an
|
|
// error condition if there are already multiple nodes in the cluster,
|
|
// and the below storage write will fail. If the cluster is somehow in
|
|
// this state the unseal will fail and a cluster recovery will need to
|
|
// be done.
|
|
creating = true
|
|
raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
raftTLS = &raft.TLSKeyring{
|
|
Keys: []*raft.TLSKey{raftTLSKey},
|
|
ActiveKeyID: raftTLSKey.ID,
|
|
}
|
|
default:
|
|
raftTLS = new(raft.TLSKeyring)
|
|
if err := raftTLSEntry.DecodeJSON(raftTLS); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
hasState, err := raftBackend.HasState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// This can be hit on follower nodes that got their config updated to use
|
|
// raft for HA-only before they are joined to the cluster. Since followers
|
|
// in this case use shared storage, it doesn't return early from the TLS
|
|
// case above, but there's not raft state yet for the backend to call
|
|
// raft.SetupCluster.
|
|
if !hasState {
|
|
c.logger.Trace("skipping raft backend setup during unseal, no raft state found")
|
|
return nil
|
|
}
|
|
|
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
|
|
|
if err := raftBackend.SetupCluster(ctx, raft.SetupOpts{
|
|
TLSKeyring: raftTLS,
|
|
ClusterListener: c.getClusterListener(),
|
|
StartAsLeader: creating,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if retErr != nil {
|
|
c.logger.Info("stopping raft server")
|
|
if err := raftBackend.TeardownCluster(c.getClusterListener()); err != nil {
|
|
c.logger.Error("failed to stop raft server", "error", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// If we are in need of creating the TLS keyring then we should write it out
|
|
// to storage here. If we fail it may mean we couldn't become leader and we
|
|
// should error out.
|
|
if creating {
|
|
c.logger.Info("writing raft TLS keyring to storage")
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, raftTLS)
|
|
if err != nil {
|
|
c.logger.Error("error marshaling raft TLS keyring", "error", err)
|
|
return err
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
c.logger.Error("error writing raft TLS keyring", "error", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) setupRaftActiveNode(ctx context.Context) error {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return nil
|
|
}
|
|
|
|
c.logger.Info("starting raft active node")
|
|
|
|
autopilotConfig, err := c.loadAutopilotConfiguration(ctx)
|
|
if err != nil {
|
|
c.logger.Error("failed to load autopilot config from storage when setting up cluster; continuing since autopilot falls back to default config", "error", err)
|
|
}
|
|
disableAutopilot := c.disableAutopilot
|
|
raftBackend.SetupAutopilot(c.activeContext, autopilotConfig, c.raftFollowerStates, disableAutopilot)
|
|
|
|
c.pendingRaftPeers = &sync.Map{}
|
|
|
|
// Reload the raft TLS keys to ensure we are using the latest version.
|
|
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.startPeriodicRaftTLSRotate(ctx)
|
|
}
|
|
|
|
func (c *Core) stopRaftActiveNode() {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return
|
|
}
|
|
|
|
c.logger.Info("stopping raft active node")
|
|
|
|
if !raftBackend.AutopilotDisabled() {
|
|
raftBackend.StopAutopilot()
|
|
}
|
|
|
|
c.pendingRaftPeers = nil
|
|
c.stopPeriodicRaftTLSRotate()
|
|
}
|
|
|
|
func (c *Core) startPeriodicRaftTLSRotate(ctx context.Context) error {
|
|
raftBackend := c.getRaftBackend()
|
|
|
|
// No-op if raft is not being used
|
|
if raftBackend == nil {
|
|
return nil
|
|
}
|
|
|
|
c.raftTLSRotationStopCh = make(chan struct{})
|
|
logger := c.logger.Named("raft")
|
|
|
|
if c.isRaftHAOnly() {
|
|
return c.raftTLSRotateDirect(ctx, logger, c.raftTLSRotationStopCh)
|
|
}
|
|
|
|
return c.raftTLSRotatePhased(ctx, logger, raftBackend, c.raftTLSRotationStopCh)
|
|
}
|
|
|
|
// raftTLSRotateDirect will spawn a go routine in charge of periodically
|
|
// rotating the TLS certs and keys used for raft traffic.
|
|
//
|
|
// The logic for updating the TLS keyring is through direct storage update. This
|
|
// is called whenever raft is used for HA-only, which means that the underlying
|
|
// storage is a shared physical object, thus requiring no additional
|
|
// coordination.
|
|
func (c *Core) raftTLSRotateDirect(ctx context.Context, logger hclog.Logger, stopCh chan struct{}) error {
|
|
logger.Info("creating new raft TLS config")
|
|
|
|
rotateKeyring := func() (time.Time, error) {
|
|
// Create a new key
|
|
raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to generate new raft TLS key: %w", err)
|
|
}
|
|
|
|
// Read the existing keyring
|
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to read raft TLS keyring: %w", err)
|
|
}
|
|
|
|
// Advance the term and store the new key, replacing the old one.
|
|
// Unlike phased rotation, we don't need to update AppliedIndex since
|
|
// we don't rely on it to check whether the followers got the key. A
|
|
// shared storage means that followers will have the key as soon as it's
|
|
// written to storage.
|
|
keyring.Term += 1
|
|
keyring.Keys[0] = raftTLSKey
|
|
keyring.ActiveKeyID = raftTLSKey.ID
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err)
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to write keyring: %w", err)
|
|
}
|
|
|
|
logger.Info("wrote new raft TLS config")
|
|
|
|
// Schedule the next rotation
|
|
return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil
|
|
}
|
|
|
|
// Read the keyring to calculate the time of next rotation.
|
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activeKey := keyring.GetActive()
|
|
if activeKey == nil {
|
|
return errors.New("no active raft TLS key found")
|
|
}
|
|
|
|
go func() {
|
|
nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod)
|
|
|
|
var backoff bool
|
|
for {
|
|
// If we encountered and error we should try to create the key
|
|
// again.
|
|
if backoff {
|
|
nextRotationTime = time.Now().Add(10 * time.Second)
|
|
backoff = false
|
|
}
|
|
|
|
select {
|
|
case <-time.After(time.Until(nextRotationTime)):
|
|
// It's time to rotate the keys
|
|
next, err := rotateKeyring()
|
|
if err != nil {
|
|
logger.Error("failed to rotate TLS key", "error", err)
|
|
backoff = true
|
|
continue
|
|
}
|
|
|
|
nextRotationTime = next
|
|
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// raftTLSRotatePhased will spawn a go routine in charge of periodically
|
|
// rotating the TLS certs and keys used for raft traffic.
|
|
//
|
|
// The logic for updating the TLS certificate uses a pseudo two phase commit
|
|
// using the known applied indexes from standby nodes. When writing a new Key
|
|
// it will be appended to the end of the keyring. Standbys can start accepting
|
|
// connections with this key as soon as they see the update. Then it will write
|
|
// the keyring a second time indicating the applied index for this key update.
|
|
//
|
|
// The active node will wait until it sees all standby nodes are at or past the
|
|
// applied index for this update. At that point it will delete the older key
|
|
// and make the new key active. The key isn't officially in use until this
|
|
// happens. The dual write ensures the standby at least gets the first update
|
|
// containing the key before the active node switches over to using it.
|
|
//
|
|
// If a standby is shut down then it cannot advance the key term until it
|
|
// receives the update. This ensures a standby node isn't left behind and unable
|
|
// to reconnect with the cluster. Additionally, only one outstanding key
|
|
// is allowed for this same reason (max keyring size of 2).
|
|
func (c *Core) raftTLSRotatePhased(ctx context.Context, logger hclog.Logger, raftBackend *raft.RaftBackend, stopCh chan struct{}) error {
|
|
followerStates := c.raftFollowerStates
|
|
followerStates.Clear()
|
|
|
|
// Pre-populate the follower list with the set of peers.
|
|
raftConfig, err := raftBackend.GetConfiguration(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, server := range raftConfig.Servers {
|
|
if server.NodeID != raftBackend.NodeID() {
|
|
followerStates.Update(server.NodeID, 0, 0, "voter")
|
|
}
|
|
}
|
|
|
|
// rotateKeyring writes new key data to the keyring and adds an applied
|
|
// index that is used to verify it has been committed. The keys written in
|
|
// this function can be used on standbys but the active node doesn't start
|
|
// using it yet.
|
|
rotateKeyring := func() (time.Time, error) {
|
|
// Read the existing keyring
|
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to read raft TLS keyring: %w", err)
|
|
}
|
|
|
|
switch {
|
|
case len(keyring.Keys) == 2 && keyring.Keys[1].AppliedIndex == 0:
|
|
// If this case is hit then the second write to add the applied
|
|
// index failed. Attempt to write it again.
|
|
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
|
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err)
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to write keyring: %w", err)
|
|
}
|
|
|
|
case len(keyring.Keys) > 1:
|
|
// If there already exists a pending key update then the update
|
|
// hasn't replicated down to all standby nodes yet. Don't allow any
|
|
// new keys to be created until all standbys have seen this previous
|
|
// rotation. As a backoff strategy, another rotation attempt is
|
|
// scheduled for 5 minutes from now.
|
|
logger.Warn("skipping new raft TLS config creation, keys are pending")
|
|
return time.Now().Add(time.Minute * 5), nil
|
|
}
|
|
|
|
logger.Info("creating new raft TLS config")
|
|
|
|
// Create a new key
|
|
raftTLSKey, err := raft.GenerateTLSKey(c.secureRandomReader)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to generate new raft TLS key: %w", err)
|
|
}
|
|
|
|
// Advance the term and store the new key
|
|
keyring.Term += 1
|
|
keyring.Keys = append(keyring.Keys, raftTLSKey)
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err)
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to write keyring: %w", err)
|
|
}
|
|
|
|
// Write the keyring again with the new applied index. This allows us to
|
|
// track if standby nodes received the update.
|
|
keyring.Keys[1].AppliedIndex = raftBackend.AppliedIndex()
|
|
keyring.AppliedIndex = raftBackend.AppliedIndex()
|
|
entry, err = logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to json encode keyring: %w", err)
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return time.Time{}, fmt.Errorf("failed to write keyring: %w", err)
|
|
}
|
|
|
|
logger.Info("wrote new raft TLS config")
|
|
// Schedule the next rotation
|
|
return raftTLSKey.CreatedTime.Add(raftTLSRotationPeriod), nil
|
|
}
|
|
|
|
// checkCommitted verifies key updates have been applied to all nodes and
|
|
// finalizes the rotation by deleting the old keys and updating the raft
|
|
// backend.
|
|
checkCommitted := func() error {
|
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read raft TLS keyring: %w", err)
|
|
}
|
|
|
|
switch {
|
|
case len(keyring.Keys) == 1:
|
|
// No Keys to apply
|
|
return nil
|
|
case keyring.Keys[1].AppliedIndex != keyring.AppliedIndex:
|
|
// We haven't fully committed the new key, continue here
|
|
return nil
|
|
case followerStates.MinIndex() < keyring.AppliedIndex:
|
|
// Not all the followers have applied the latest key
|
|
return nil
|
|
}
|
|
|
|
// Upgrade to the new key
|
|
keyring.Keys = keyring.Keys[1:]
|
|
keyring.ActiveKeyID = keyring.Keys[0].ID
|
|
keyring.Term += 1
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to json encode keyring: %w", err)
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return fmt.Errorf("failed to write keyring: %w", err)
|
|
}
|
|
|
|
// Update the TLS Key in the backend
|
|
if err := raftBackend.SetTLSKeyring(keyring); err != nil {
|
|
return fmt.Errorf("failed to install keyring: %w", err)
|
|
}
|
|
|
|
logger.Info("installed new raft TLS key", "term", keyring.Term)
|
|
return nil
|
|
}
|
|
|
|
// Read the keyring to calculate the time of next rotation.
|
|
keyring, err := c.raftReadTLSKeyring(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
activeKey := keyring.GetActive()
|
|
if activeKey == nil {
|
|
return errors.New("no active raft TLS key found")
|
|
}
|
|
|
|
// Start the process in a go routine
|
|
go func() {
|
|
nextRotationTime := activeKey.CreatedTime.Add(raftTLSRotationPeriod)
|
|
|
|
keyCheckInterval := time.NewTicker(1 * time.Minute)
|
|
defer keyCheckInterval.Stop()
|
|
|
|
var backoff bool
|
|
for {
|
|
// If we encountered and error we should try to create the key
|
|
// again.
|
|
if backoff {
|
|
nextRotationTime = time.Now().Add(10 * time.Second)
|
|
backoff = false
|
|
}
|
|
|
|
select {
|
|
case <-keyCheckInterval.C:
|
|
err := checkCommitted()
|
|
if err != nil {
|
|
logger.Error("failed to activate TLS key", "error", err)
|
|
}
|
|
case <-time.After(time.Until(nextRotationTime)):
|
|
// It's time to rotate the keys
|
|
next, err := rotateKeyring()
|
|
if err != nil {
|
|
logger.Error("failed to rotate TLS key", "error", err)
|
|
backoff = true
|
|
continue
|
|
}
|
|
|
|
nextRotationTime = next
|
|
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) raftReadTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
|
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if tlsKeyringEntry == nil {
|
|
return nil, errors.New("no keyring found")
|
|
}
|
|
var keyring raft.TLSKeyring
|
|
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &keyring, nil
|
|
}
|
|
|
|
// raftCreateTLSKeyring creates the initial TLS key and the TLS Keyring for raft
|
|
// use. If a keyring entry is already present in storage, it will return an
|
|
// error.
|
|
func (c *Core) raftCreateTLSKeyring(ctx context.Context) (*raft.TLSKeyring, error) {
|
|
if raftBackend := c.getRaftBackend(); raftBackend == nil {
|
|
return nil, fmt.Errorf("raft backend not in use")
|
|
}
|
|
|
|
// Check if the keyring is already present
|
|
raftTLSEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if raftTLSEntry != nil {
|
|
return nil, fmt.Errorf("TLS keyring already present")
|
|
}
|
|
|
|
raftTLS, err := raft.GenerateTLSKey(c.secureRandomReader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
keyring := &raft.TLSKeyring{
|
|
Keys: []*raft.TLSKey{raftTLS},
|
|
ActiveKeyID: raftTLS.ID,
|
|
}
|
|
|
|
entry, err := logical.StorageEntryJSON(raftTLSStoragePath, keyring)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := c.barrier.Put(ctx, entry); err != nil {
|
|
return nil, err
|
|
}
|
|
return keyring, nil
|
|
}
|
|
|
|
func (c *Core) stopPeriodicRaftTLSRotate() {
|
|
if c.raftTLSRotationStopCh != nil {
|
|
close(c.raftTLSRotationStopCh)
|
|
}
|
|
c.raftTLSRotationStopCh = nil
|
|
c.raftFollowerStates.Clear()
|
|
}
|
|
|
|
func (c *Core) checkRaftTLSKeyUpgrades(ctx context.Context) error {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return nil
|
|
}
|
|
|
|
tlsKeyringEntry, err := c.barrier.Get(ctx, raftTLSStoragePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tlsKeyringEntry == nil {
|
|
return nil
|
|
}
|
|
|
|
var keyring raft.TLSKeyring
|
|
if err := tlsKeyringEntry.DecodeJSON(&keyring); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := raftBackend.SetTLSKeyring(&keyring); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleSnapshotRestore is for the raft backend to hook back into core after a
|
|
// snapshot is restored so we can clear the necessary caches and handle changing
|
|
// keyrings or master keys
|
|
func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(context.Context) error {
|
|
return func(ctx context.Context) (retErr error) {
|
|
c.logger.Info("running post snapshot restore invalidations")
|
|
|
|
if grabLock {
|
|
// Grab statelock
|
|
if stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, c.standbyStopCh.Load().(chan struct{})); stopped {
|
|
c.logger.Error("did not apply snapshot; vault is shutting down")
|
|
return errors.New("did not apply snapshot; vault is shutting down")
|
|
}
|
|
defer c.stateLock.Unlock()
|
|
}
|
|
|
|
if sealNode {
|
|
// If we failed to restore the snapshot we should seal this node as
|
|
// it's in an unknown state
|
|
defer func() {
|
|
if retErr != nil {
|
|
if err := c.sealInternalWithOptions(false, false, true); err != nil {
|
|
c.logger.Error("failed to seal node", "error", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Purge the cache so we make sure we are operating on fresh data
|
|
c.physicalCache.Purge(ctx)
|
|
|
|
// Reload the keyring in case it changed. If this fails it's likely
|
|
// we've changed master keys.
|
|
err := c.performKeyUpgrades(ctx)
|
|
if err != nil {
|
|
// The snapshot contained a master key or keyring we couldn't
|
|
// recover
|
|
switch c.seal.BarrierType() {
|
|
case wrapping.Shamir:
|
|
// If we are a shamir seal we can't do anything. Just
|
|
// seal all nodes.
|
|
|
|
// Seal ourselves
|
|
c.logger.Info("failed to perform key upgrades, sealing", "error", err)
|
|
return err
|
|
|
|
default:
|
|
// If we are using an auto-unseal we can try to use the seal to
|
|
// unseal again. If the auto-unseal mechanism has changed then
|
|
// there isn't anything we can do but seal.
|
|
c.logger.Info("failed to perform key upgrades, reloading using auto seal")
|
|
keys, err := c.seal.GetStoredKeys(ctx)
|
|
if err != nil {
|
|
c.logger.Error("raft snapshot restore failed to get stored keys", "error", err)
|
|
return err
|
|
}
|
|
if err := c.barrier.Seal(); err != nil {
|
|
c.logger.Error("raft snapshot restore failed to seal barrier", "error", err)
|
|
return err
|
|
}
|
|
if err := c.barrier.Unseal(ctx, keys[0]); err != nil {
|
|
c.logger.Error("raft snapshot restore failed to unseal barrier", "error", err)
|
|
return err
|
|
}
|
|
c.logger.Info("done reloading master key using auto seal")
|
|
}
|
|
}
|
|
|
|
// Refresh the raft TLS keys
|
|
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
|
|
c.logger.Info("failed to perform TLS key upgrades, sealing", "error", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (c *Core) InitiateRetryJoin(ctx context.Context) error {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return nil
|
|
}
|
|
|
|
if raftBackend.Initialized() {
|
|
return nil
|
|
}
|
|
|
|
leaderInfos, err := raftBackend.JoinConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Nothing to do if config wasn't supplied
|
|
if len(leaderInfos) == 0 {
|
|
return nil
|
|
}
|
|
|
|
c.logger.Info("raft retry join initiated")
|
|
|
|
if _, err = c.JoinRaftCluster(ctx, leaderInfos, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) JoinRaftCluster(ctx context.Context, leaderInfos []*raft.LeaderJoinInfo, nonVoter bool) (bool, error) {
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return false, errors.New("raft backend not in use")
|
|
}
|
|
|
|
if err := raftBackend.SetDesiredSuffrage(nonVoter); err != nil {
|
|
c.logger.Error("failed to set desired suffrage for this node", "error", err)
|
|
return false, nil
|
|
}
|
|
|
|
init, err := c.InitializedLocally(ctx)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check if core is initialized: %w", err)
|
|
}
|
|
|
|
isRaftHAOnly := c.isRaftHAOnly()
|
|
// Prevent join from happening if we're using raft for storage and
|
|
// it has already been initialized.
|
|
if init && !isRaftHAOnly {
|
|
return true, nil
|
|
}
|
|
|
|
// Check on seal status and storage type before proceeding:
|
|
// If raft is used for storage, core needs to be sealed
|
|
if !isRaftHAOnly && !c.Sealed() {
|
|
c.logger.Error("node must be sealed before joining")
|
|
return false, errors.New("node must be sealed before joining")
|
|
}
|
|
|
|
// If raft is used for ha-only, core needs to be unsealed
|
|
if isRaftHAOnly && c.Sealed() {
|
|
c.logger.Error("node must be unsealed before joining")
|
|
return false, errors.New("node must be unsealed before joining")
|
|
}
|
|
|
|
// Disallow leader API address to be provided if we're using raft for HA-only.
|
|
// The leader API address is obtained directly through storage. This serves
|
|
// as a form of verification that this node is sharing the same physical
|
|
// storage as the leader node.
|
|
if isRaftHAOnly {
|
|
for _, info := range leaderInfos {
|
|
if info.LeaderAPIAddr != "" || info.AutoJoin != "" {
|
|
return false, errors.New("leader API address and auto-join metadata must be unset when raft is used exclusively for HA")
|
|
}
|
|
}
|
|
|
|
// Get the leader address from storage
|
|
keys, err := c.barrier.List(ctx, coreLeaderPrefix)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if len(keys) == 0 || len(keys[0]) == 0 {
|
|
return false, errors.New("unable to fetch leadership entry")
|
|
}
|
|
|
|
leadershipEntry := coreLeaderPrefix + keys[0]
|
|
entry, err := c.barrier.Get(ctx, leadershipEntry)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if entry == nil {
|
|
return false, errors.New("unable to read leadership entry")
|
|
}
|
|
|
|
var adv activeAdvertisement
|
|
err = jsonutil.DecodeJSON(entry.Value, &adv)
|
|
if err != nil {
|
|
return false, fmt.Errorf("unable to decoded leader entry: %w", err)
|
|
}
|
|
|
|
leaderInfos[0].LeaderAPIAddr = adv.RedirectAddr
|
|
}
|
|
|
|
disco, err := newDiscover()
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to create auto-join discovery: %w", err)
|
|
}
|
|
|
|
join := func(retry bool) error {
|
|
joinLeader := func(leaderInfo *raft.LeaderJoinInfo, leaderAddr string) error {
|
|
if leaderInfo == nil {
|
|
return errors.New("raft leader information is nil")
|
|
}
|
|
if len(leaderAddr) == 0 {
|
|
return errors.New("raft leader address not provided")
|
|
}
|
|
|
|
init, err := c.InitializedLocally(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check if core is initialized: %w", err)
|
|
}
|
|
|
|
if init && !isRaftHAOnly {
|
|
c.logger.Info("returning from raft join as the node is initialized")
|
|
return nil
|
|
}
|
|
|
|
c.logger.Info("attempting to join possible raft leader node", "leader_addr", leaderAddr)
|
|
|
|
// Create an API client to interact with the leader node
|
|
transport := cleanhttp.DefaultPooledTransport()
|
|
|
|
if leaderInfo.TLSConfig == nil && (len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0) {
|
|
leaderInfo.TLSConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create TLS config: %w", err)
|
|
}
|
|
leaderInfo.TLSConfig.ServerName = leaderInfo.LeaderTLSServerName
|
|
}
|
|
if leaderInfo.TLSConfig == nil && leaderInfo.LeaderTLSServerName != "" {
|
|
leaderInfo.TLSConfig, err = tlsutil.SetupTLSConfig(map[string]string{"address": leaderInfo.LeaderTLSServerName}, "")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create TLS config: %w", err)
|
|
}
|
|
}
|
|
|
|
if leaderInfo.TLSConfig != nil {
|
|
transport.TLSClientConfig = leaderInfo.TLSConfig.Clone()
|
|
if err := http2.ConfigureTransport(transport); err != nil {
|
|
return fmt.Errorf("failed to configure TLS: %w", err)
|
|
}
|
|
}
|
|
|
|
client := &http.Client{
|
|
Transport: transport,
|
|
}
|
|
|
|
config := api.DefaultConfig()
|
|
if config.Error != nil {
|
|
return fmt.Errorf("failed to create api client: %w", config.Error)
|
|
}
|
|
|
|
config.Address = leaderAddr
|
|
config.HttpClient = client
|
|
config.MaxRetries = 0
|
|
|
|
apiClient, err := api.NewClient(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create api client: %w", err)
|
|
}
|
|
|
|
// Attempt to join the leader by requesting for the bootstrap challenge
|
|
secret, err := apiClient.Logical().Write("sys/storage/raft/bootstrap/challenge", map[string]interface{}{
|
|
"server_id": raftBackend.NodeID(),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error during raft bootstrap init call: %w", err)
|
|
}
|
|
if secret == nil {
|
|
return errors.New("could not retrieve raft bootstrap package")
|
|
}
|
|
|
|
var sealConfig SealConfig
|
|
err = mapstructure.Decode(secret.Data["seal_config"], &sealConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if sealConfig.Type != c.seal.BarrierType() {
|
|
return fmt.Errorf("mismatching seal types between raft leader (%s) and follower (%s)", sealConfig.Type, c.seal.BarrierType())
|
|
}
|
|
|
|
challengeB64, ok := secret.Data["challenge"]
|
|
if !ok {
|
|
return errors.New("error during raft bootstrap call, no challenge given")
|
|
}
|
|
challengeRaw, err := base64.StdEncoding.DecodeString(challengeB64.(string))
|
|
if err != nil {
|
|
return fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
|
}
|
|
|
|
eBlob := &wrapping.EncryptedBlobInfo{}
|
|
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
|
|
return fmt.Errorf("error decoding raft bootstrap challenge: %w", err)
|
|
}
|
|
|
|
raftInfo := &raftInformation{
|
|
challenge: eBlob,
|
|
leaderClient: apiClient,
|
|
leaderBarrierConfig: &sealConfig,
|
|
nonVoter: nonVoter,
|
|
}
|
|
|
|
// If we're using Shamir and using raft for both physical and HA, we
|
|
// need to block until the node is unsealed, unless retry is set to
|
|
// false.
|
|
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
|
c.raftInfo = raftInfo
|
|
if err := c.seal.SetBarrierConfig(ctx, &sealConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !retry {
|
|
return nil
|
|
}
|
|
|
|
// Wait until unseal keys are supplied
|
|
c.raftInfo.joinInProgress = true
|
|
if atomic.LoadUint32(c.postUnsealStarted) != 1 {
|
|
return errors.New("waiting for unseal keys to be supplied")
|
|
}
|
|
}
|
|
|
|
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
|
|
return fmt.Errorf("failed to send answer to raft leader node: %w", err)
|
|
}
|
|
|
|
if c.seal.BarrierType() == wrapping.Shamir && !isRaftHAOnly {
|
|
// Reset the state
|
|
c.raftInfo = nil
|
|
|
|
// In case of Shamir unsealing, inform the unseal process that raft join is completed
|
|
close(c.raftJoinDoneCh)
|
|
}
|
|
|
|
c.logger.Info("successfully joined the raft cluster", "leader_addr", leaderInfo.LeaderAPIAddr)
|
|
return nil
|
|
}
|
|
|
|
// Each join try goes through all the possible leader nodes and attempts to join
|
|
// them, until one of the attempt succeeds.
|
|
for _, leaderInfo := range leaderInfos {
|
|
switch {
|
|
case leaderInfo.LeaderAPIAddr != "" && leaderInfo.AutoJoin != "":
|
|
c.logger.Error("join attempt failed", "error", errors.New("cannot provide both leader address and auto-join metadata"))
|
|
|
|
case leaderInfo.LeaderAPIAddr != "":
|
|
if err := joinLeader(leaderInfo, leaderInfo.LeaderAPIAddr); err != nil {
|
|
c.logger.Warn("join attempt failed", "error", err)
|
|
} else {
|
|
// successfully joined leader
|
|
return nil
|
|
}
|
|
|
|
case leaderInfo.AutoJoin != "":
|
|
scheme := leaderInfo.AutoJoinScheme
|
|
if scheme == "" {
|
|
// default to HTTPS when no scheme is provided
|
|
scheme = "https"
|
|
}
|
|
port := leaderInfo.AutoJoinPort
|
|
if port == 0 {
|
|
// default to 8200 when no port is provided
|
|
port = 8200
|
|
}
|
|
// Addrs returns either IPv4 or IPv6 address sans scheme or port
|
|
clusterIPs, err := disco.Addrs(leaderInfo.AutoJoin, c.logger.StandardLogger(nil))
|
|
if err != nil {
|
|
c.logger.Error("failed to parse addresses from auto-join metadata", "error", err)
|
|
}
|
|
for _, ip := range clusterIPs {
|
|
if strings.Count(ip, ":") >= 2 && !strings.HasPrefix(ip, "["){
|
|
// An IPv6 address in implicit form, however we need it in explicit form to use in a URL.
|
|
ip = fmt.Sprintf("[%s]", ip)
|
|
}
|
|
u := fmt.Sprintf("%s://%s:%d", scheme, ip, port)
|
|
if err := joinLeader(leaderInfo, u); err != nil {
|
|
c.logger.Warn("join attempt failed", "error", err)
|
|
} else {
|
|
// successfully joined leader
|
|
return nil
|
|
}
|
|
}
|
|
|
|
default:
|
|
c.logger.Error("join attempt failed", "error", errors.New("must provide leader address or auto-join metadata"))
|
|
}
|
|
}
|
|
|
|
return errors.New("failed to join any raft leader node")
|
|
}
|
|
|
|
switch leaderInfos[0].Retry {
|
|
case true:
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
err := join(true)
|
|
if err == nil {
|
|
return
|
|
}
|
|
c.logger.Error("failed to retry join raft cluster", "retry", "2s")
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
}()
|
|
|
|
// Backgrounded so return false
|
|
return false, nil
|
|
default:
|
|
if err := join(false); err != nil {
|
|
c.logger.Error("failed to join raft cluster", "error", err)
|
|
return false, fmt.Errorf("failed to join raft cluster: %w", err)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// getRaftBackend returns the RaftBackend from the HA or physical backend,
|
|
// in that order of preference, or nil if not of type RaftBackend.
|
|
func (c *Core) getRaftBackend() *raft.RaftBackend {
|
|
var raftBackend *raft.RaftBackend
|
|
|
|
if raftHA, ok := c.ha.(*raft.RaftBackend); ok {
|
|
raftBackend = raftHA
|
|
}
|
|
|
|
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
|
|
raftBackend = raftStorage
|
|
}
|
|
|
|
return raftBackend
|
|
}
|
|
|
|
// isRaftHAOnly returns true if c.ha is raft and physical storage is non-raft
|
|
func (c *Core) isRaftHAOnly() bool {
|
|
_, isRaftHA := c.ha.(*raft.RaftBackend)
|
|
_, isRaftStorage := c.underlyingPhysical.(*raft.RaftBackend)
|
|
|
|
return isRaftHA && !isRaftStorage
|
|
}
|
|
|
|
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess *seal.Access, raftInfo *raftInformation) error {
|
|
if raftInfo.challenge == nil {
|
|
return errors.New("raft challenge is nil")
|
|
}
|
|
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return errors.New("raft backend is not in use")
|
|
}
|
|
|
|
if raftBackend.Initialized() {
|
|
return errors.New("raft is already initialized")
|
|
}
|
|
|
|
plaintext, err := sealAccess.Decrypt(ctx, raftInfo.challenge, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("error decrypting challenge: %w", err)
|
|
}
|
|
|
|
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing cluster address: %w", err)
|
|
}
|
|
clusterAddr := parsedClusterAddr.Host
|
|
if atomic.LoadUint32(&TestingUpdateClusterAddr) == 1 && strings.HasSuffix(clusterAddr, ":0") {
|
|
// We are testing and have an address provider, so just create a random
|
|
// addr, it will be overwritten later.
|
|
var err error
|
|
clusterAddr, err = uuid.GenerateUUID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
|
|
if err := answerReq.SetJSONBody(map[string]interface{}{
|
|
"answer": base64.StdEncoding.EncodeToString(plaintext),
|
|
"cluster_addr": clusterAddr,
|
|
"server_id": raftBackend.NodeID(),
|
|
"non_voter": raftInfo.nonVoter,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
answerRespJson, err := raftInfo.leaderClient.RawRequestWithContext(ctx, answerReq)
|
|
if answerRespJson != nil {
|
|
defer answerRespJson.Body.Close()
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var answerResp answerRespData
|
|
if err := jsonutil.DecodeJSONFromReader(answerRespJson.Body, &answerResp); err != nil {
|
|
return err
|
|
}
|
|
|
|
if answerResp.Data.AutoloadedLicense && !LicenseAutoloaded(c) {
|
|
return ErrJoinWithoutAutoloading
|
|
}
|
|
if err := raftBackend.Bootstrap(answerResp.Data.Peers); err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.startClusterListener(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("error starting cluster: %w", err)
|
|
}
|
|
|
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
|
opts := raft.SetupOpts{
|
|
TLSKeyring: answerResp.Data.TLSKeyring,
|
|
ClusterListener: c.getClusterListener(),
|
|
}
|
|
err = raftBackend.SetupCluster(ctx, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to setup raft cluster: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) loadAutopilotConfiguration(ctx context.Context) (*raft.AutopilotConfig, error) {
|
|
var autopilotConfig *raft.AutopilotConfig
|
|
entry, err := c.barrier.Get(ctx, raftAutopilotConfigurationStoragePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if entry == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
if err := jsonutil.DecodeJSON(entry.Value, &autopilotConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return autopilotConfig, nil
|
|
}
|
|
|
|
// RaftBootstrap performs bootstrapping of a raft cluster if core contains a raft
|
|
// backend. If raft is not part for the storage or HA storage backend, this
|
|
// call results in an error.
|
|
func (c *Core) RaftBootstrap(ctx context.Context, onInit bool) error {
|
|
if c.logger.IsDebug() {
|
|
c.logger.Debug("bootstrapping raft backend")
|
|
defer c.logger.Debug("finished bootstrapping raft backend")
|
|
}
|
|
|
|
raftBackend := c.getRaftBackend()
|
|
if raftBackend == nil {
|
|
return errors.New("raft backend not in use")
|
|
}
|
|
|
|
parsedClusterAddr, err := url.Parse(c.ClusterAddr())
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing cluster address: %w", err)
|
|
}
|
|
if err := raftBackend.Bootstrap([]raft.Peer{
|
|
{
|
|
ID: raftBackend.NodeID(),
|
|
Address: parsedClusterAddr.Host,
|
|
},
|
|
}); err != nil {
|
|
return fmt.Errorf("could not bootstrap clustered storage: %w", err)
|
|
}
|
|
|
|
raftOpts := raft.SetupOpts{
|
|
StartAsLeader: true,
|
|
}
|
|
|
|
if !onInit {
|
|
// Generate the TLS Keyring info for SetupCluster to consume
|
|
raftTLS, err := c.raftCreateTLSKeyring(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("could not generate TLS keyring during bootstrap: %w", err)
|
|
}
|
|
|
|
raftBackend.SetRestoreCallback(c.raftSnapshotRestoreCallback(true, true))
|
|
raftOpts.ClusterListener = c.getClusterListener()
|
|
|
|
raftOpts.TLSKeyring = raftTLS
|
|
}
|
|
|
|
if err := raftBackend.SetupCluster(ctx, raftOpts); err != nil {
|
|
return fmt.Errorf("could not start clustered storage: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Core) isRaftUnseal() bool {
|
|
return c.raftInfo != nil
|
|
}
|
|
|
|
type answerRespData struct {
|
|
Data answerResp `json:"data"`
|
|
}
|
|
|
|
type answerResp struct {
|
|
Peers []raft.Peer `json:"peers"`
|
|
TLSKeyring *raft.TLSKeyring `json:"tls_keyring"`
|
|
AutoloadedLicense bool `json:"autoloaded_license"`
|
|
}
|
|
|
|
func newDiscover() (*discover.Discover, error) {
|
|
providers := make(map[string]discover.Provider)
|
|
for k, v := range discover.Providers {
|
|
providers[k] = v
|
|
}
|
|
|
|
providers["k8s"] = &discoverk8s.Provider{}
|
|
|
|
return discover.New(
|
|
discover.WithProviders(providers),
|
|
)
|
|
}
|