2019-06-20 19:14:58 +00:00
package raft
import (
"context"
2020-01-14 01:02:16 +00:00
"crypto/tls"
2019-06-20 19:14:58 +00:00
"errors"
"fmt"
"io"
"io/ioutil"
2022-04-29 12:32:16 +00:00
"math/rand"
2019-06-20 19:14:58 +00:00
"os"
"path/filepath"
"strconv"
"sync"
"time"
2019-10-28 16:43:12 +00:00
"github.com/armon/go-metrics"
2019-10-15 04:55:31 +00:00
"github.com/golang/protobuf/proto"
2019-06-20 19:14:58 +00:00
log "github.com/hashicorp/go-hclog"
2022-08-23 19:37:16 +00:00
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
2019-07-22 16:02:48 +00:00
"github.com/hashicorp/go-raftchunking"
2021-07-16 00:17:31 +00:00
"github.com/hashicorp/go-secure-stdlib/tlsutil"
2019-10-15 04:55:31 +00:00
"github.com/hashicorp/go-uuid"
2022-05-25 16:09:45 +00:00
goversion "github.com/hashicorp/go-version"
2019-06-20 19:14:58 +00:00
"github.com/hashicorp/raft"
2021-03-03 18:59:50 +00:00
autopilot "github.com/hashicorp/raft-autopilot"
2021-06-11 17:25:02 +00:00
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
2019-06-20 19:14:58 +00:00
snapshot "github.com/hashicorp/raft-snapshot"
2021-04-26 23:01:26 +00:00
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical"
2019-06-20 19:14:58 +00:00
"github.com/hashicorp/vault/vault/cluster"
"github.com/hashicorp/vault/vault/seal"
2022-12-07 18:29:51 +00:00
"github.com/hashicorp/vault/version"
2021-04-26 23:01:26 +00:00
bolt "go.etcd.io/bbolt"
2019-06-20 19:14:58 +00:00
)
2022-10-06 18:24:16 +00:00
const (
// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment.
EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID"
2019-10-28 16:43:12 +00:00
2022-10-06 18:24:16 +00:00
// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment.
EnvVaultRaftPath = "VAULT_RAFT_PATH"
2022-11-18 17:58:16 +00:00
// EnvVaultRaftNonVoter is used to override the non_voter config option, telling Vault to join as a non-voter (i.e. read replica).
EnvVaultRaftNonVoter = "VAULT_RAFT_RETRY_JOIN_AS_NON_VOTER"
raftNonVoterConfigKey = "retry_join_as_non_voter"
2022-10-06 18:24:16 +00:00
)
2019-10-28 16:43:12 +00:00
2022-01-11 13:16:53 +00:00
var getMmapFlags = func ( string ) int { return 0 }
2019-06-20 19:14:58 +00:00
// Verify RaftBackend satisfies the correct interfaces
2021-04-08 16:43:39 +00:00
var (
_ physical . Backend = ( * RaftBackend ) ( nil )
_ physical . Transactional = ( * RaftBackend ) ( nil )
_ physical . HABackend = ( * RaftBackend ) ( nil )
_ physical . Lock = ( * RaftLock ) ( nil )
)
2019-06-20 19:14:58 +00:00
var (
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
2020-06-23 18:08:30 +00:00
raftState = "raft/"
peersFileName = "peers.json"
2019-06-20 19:14:58 +00:00
2019-07-03 20:56:30 +00:00
restoreOpDelayDuration = 5 * time . Second
2020-06-01 14:17:24 +00:00
defaultMaxEntrySize = uint64 ( 2 * raftchunking . ChunkSize )
2019-06-20 19:14:58 +00:00
)
// RaftBackend implements the backend interfaces and uses the raft protocol to
// persist writes to the FSM.
type RaftBackend struct {
logger log . Logger
conf map [ string ] string
l sync . RWMutex
// fsm is the state store for vault's data
fsm * FSM
// raft is the instance of raft we will operate on.
raft * raft . Raft
2020-06-23 19:04:13 +00:00
// raftInitCh is used to block during HA lock acquisition if raft
// has not been initialized yet, which can occur if raft is being
// used for HA-only.
raftInitCh chan struct { }
2019-06-20 19:14:58 +00:00
// raftNotifyCh is used to receive updates about leadership changes
// regarding this node.
raftNotifyCh chan bool
// streamLayer is the network layer used to connect the nodes in the raft
// cluster.
streamLayer * raftLayer
// raftTransport is the transport layer that the raft library uses for RPC
// communication.
raftTransport raft . Transport
// snapStore is our snapshot mechanism.
snapStore raft . SnapshotStore
// logStore is used by the raft library to store the raft logs in durable
// storage.
logStore raft . LogStore
// stableStore is used by the raft library to store additional metadata in
// durable storage.
stableStore raft . StableStore
// bootstrapConfig is only set when this node needs to be bootstrapped upon
// startup.
bootstrapConfig * raft . Configuration
// dataDir is the location on the local filesystem that raft and FSM data
// will be stored.
dataDir string
// localID is the ID for this node. This can either be configured in the
// config file, via a file on disk, or is otherwise randomly generated.
localID string
// serverAddressProvider is used to map server IDs to addresses.
serverAddressProvider raft . ServerAddressProvider
2019-10-14 15:25:07 +00:00
// permitPool is used to limit the number of concurrent storage calls.
permitPool * physical . PermitPool
2020-06-01 14:17:24 +00:00
// maxEntrySize imposes a size limit (in bytes) on a raft entry (put or transaction).
// It is suggested to use a value of 2x the Raft chunking size for optimal
// performance.
maxEntrySize uint64
2021-03-03 18:59:50 +00:00
// autopilot is the instance of raft-autopilot library implementation of the
// autopilot features. This will be instantiated in both leader and followers.
// However, only active node will have a "running" autopilot.
autopilot * autopilot . Autopilot
// autopilotConfig represents the configuration required to instantiate autopilot.
autopilotConfig * AutopilotConfig
// followerStates represents the information about all the peers of the raft
// leader. This is used to track some state of the peers and as well as used
// to see if the peers are "alive" using the heartbeat received from them.
followerStates * FollowerStates
// followerHeartbeatTicker is used to compute dead servers using follower
// state heartbeats.
followerHeartbeatTicker * time . Ticker
// disableAutopilot if set will not put autopilot implementation to use. The
// fallback will be to interact with the raft instance directly. This can only
// be set during startup via the environment variable
// VAULT_RAFT_AUTOPILOT_DISABLE during startup and can't be updated once the
// node is up and running.
disableAutopilot bool
2021-03-17 21:23:13 +00:00
2022-05-20 20:49:11 +00:00
// autopilotReconcileInterval is how long between rounds of performing promotions, demotions
// and leadership transfers.
2021-03-17 21:23:13 +00:00
autopilotReconcileInterval time . Duration
2022-05-20 20:49:11 +00:00
// autopilotUpdateInterval is the time between the periodic state updates. These periodic
// state updates take in known servers from the delegate, request Raft stats be
// fetched and pull in other inputs such as the Raft configuration to create
// an updated view of the Autopilot State.
autopilotUpdateInterval time . Duration
// upgradeVersion is used to override the Vault SDK version when performing an autopilot automated upgrade.
upgradeVersion string
// redundancyZone specifies a redundancy zone for autopilot.
redundancyZone string
2022-10-06 18:24:16 +00:00
2022-11-18 17:58:16 +00:00
// nonVoter specifies whether the node should join the cluster as a non-voter. Non-voters get
// replicated to and can serve reads, but do not take part in leader elections.
nonVoter bool
2022-10-06 18:24:16 +00:00
effectiveSDKVersion string
2019-06-20 19:14:58 +00:00
}
2020-01-14 01:02:16 +00:00
// LeaderJoinInfo contains information required by a node to join itself as a
// follower to an existing raft cluster
type LeaderJoinInfo struct {
2020-10-13 20:26:39 +00:00
// AutoJoin defines any cloud auto-join metadata. If supplied, Vault will
// attempt to automatically discover peers in addition to what can be provided
// via 'leader_api_addr'.
AutoJoin string ` json:"auto_join" `
2020-10-23 20:13:09 +00:00
// AutoJoinScheme defines the optional URI protocol scheme for addresses
// discovered via auto-join.
AutoJoinScheme string ` json:"auto_join_scheme" `
// AutoJoinPort defines the optional port used for addressed discovered via
// auto-join.
AutoJoinPort uint ` json:"auto_join_port" `
2020-01-14 01:02:16 +00:00
// LeaderAPIAddr is the address of the leader node to connect to
LeaderAPIAddr string ` json:"leader_api_addr" `
// LeaderCACert is the CA cert of the leader node
LeaderCACert string ` json:"leader_ca_cert" `
2020-05-07 01:26:08 +00:00
// LeaderClientCert is the client certificate for the follower node to
// establish client authentication during TLS
2020-01-14 01:02:16 +00:00
LeaderClientCert string ` json:"leader_client_cert" `
2020-05-07 01:26:08 +00:00
// LeaderClientKey is the client key for the follower node to establish
// client authentication during TLS.
2020-01-14 01:02:16 +00:00
LeaderClientKey string ` json:"leader_client_key" `
2020-05-07 01:26:08 +00:00
// LeaderCACertFile is the path on disk to the the CA cert file of the
// leader node. This should only be provided via Vault's configuration file.
LeaderCACertFile string ` json:"leader_ca_cert_file" `
// LeaderClientCertFile is the path on disk to the client certificate file
// for the follower node to establish client authentication during TLS. This
// should only be provided via Vault's configuration file.
LeaderClientCertFile string ` json:"leader_client_cert_file" `
// LeaderClientKeyFile is the path on disk to the client key file for the
// follower node to establish client authentication during TLS. This should
// only be provided via Vault's configuration file.
LeaderClientKeyFile string ` json:"leader_client_key_file" `
2021-01-19 22:54:28 +00:00
// LeaderTLSServerName is the optional ServerName to expect in the leader's
// certificate, instead of the host/IP we're actually connecting to.
LeaderTLSServerName string ` json:"leader_tls_servername" `
2020-01-14 01:02:16 +00:00
// Retry indicates if the join process should automatically be retried
Retry bool ` json:"-" `
// TLSConfig for the API client to use when communicating with the leader node
TLSConfig * tls . Config ` json:"-" `
}
// JoinConfig returns a list of information about possible leader nodes that
// this node can join as a follower
func ( b * RaftBackend ) JoinConfig ( ) ( [ ] * LeaderJoinInfo , error ) {
config := b . conf [ "retry_join" ]
if config == "" {
return nil , nil
}
var leaderInfos [ ] * LeaderJoinInfo
err := jsonutil . DecodeJSON ( [ ] byte ( config ) , & leaderInfos )
if err != nil {
2021-05-31 16:54:05 +00:00
return nil , fmt . Errorf ( "failed to decode retry_join config: %w" , err )
2020-01-14 01:02:16 +00:00
}
if len ( leaderInfos ) == 0 {
return nil , errors . New ( "invalid retry_join config" )
}
2020-10-13 20:26:39 +00:00
for i , info := range leaderInfos {
if len ( info . AutoJoin ) != 0 && len ( info . LeaderAPIAddr ) != 0 {
return nil , errors . New ( "cannot provide both a leader_api_addr and auto_join" )
}
2020-10-23 20:13:09 +00:00
if info . AutoJoinScheme != "" && ( info . AutoJoinScheme != "http" && info . AutoJoinScheme != "https" ) {
2022-08-03 18:32:45 +00:00
return nil , fmt . Errorf ( "invalid scheme %q; must either be http or https" , info . AutoJoinScheme )
2020-10-23 20:13:09 +00:00
}
2020-01-14 01:02:16 +00:00
info . Retry = true
2020-05-07 01:26:08 +00:00
info . TLSConfig , err = parseTLSInfo ( info )
if err != nil {
2021-05-31 16:54:05 +00:00
return nil , fmt . Errorf ( "failed to create tls config to communicate with leader node (retry_join index: %d): %w" , i , err )
2020-01-14 01:02:16 +00:00
}
}
return leaderInfos , nil
}
2020-05-07 01:26:08 +00:00
// parseTLSInfo is a helper for parses the TLS information, preferring file
// paths over raw certificate content.
func parseTLSInfo ( leaderInfo * LeaderJoinInfo ) ( * tls . Config , error ) {
var tlsConfig * tls . Config
var err error
if len ( leaderInfo . LeaderCACertFile ) != 0 || len ( leaderInfo . LeaderClientCertFile ) != 0 || len ( leaderInfo . LeaderClientKeyFile ) != 0 {
tlsConfig , err = tlsutil . LoadClientTLSConfig ( leaderInfo . LeaderCACertFile , leaderInfo . LeaderClientCertFile , leaderInfo . LeaderClientKeyFile )
if err != nil {
return nil , err
}
} else if len ( leaderInfo . LeaderCACert ) != 0 || len ( leaderInfo . LeaderClientCert ) != 0 || len ( leaderInfo . LeaderClientKey ) != 0 {
tlsConfig , err = tlsutil . ClientTLSConfig ( [ ] byte ( leaderInfo . LeaderCACert ) , [ ] byte ( leaderInfo . LeaderClientCert ) , [ ] byte ( leaderInfo . LeaderClientKey ) )
if err != nil {
return nil , err
}
}
2021-01-28 01:21:45 +00:00
if tlsConfig != nil {
tlsConfig . ServerName = leaderInfo . LeaderTLSServerName
}
2020-05-07 01:26:08 +00:00
return tlsConfig , nil
}
2019-06-20 19:14:58 +00:00
// EnsurePath is used to make sure a path exists
func EnsurePath ( path string , dir bool ) error {
if ! dir {
path = filepath . Dir ( path )
}
2022-04-01 16:57:38 +00:00
return os . MkdirAll ( path , 0 o700 )
2019-06-20 19:14:58 +00:00
}
// NewRaftBackend constructs a RaftBackend using the given directory
func NewRaftBackend ( conf map [ string ] string , logger log . Logger ) ( physical . Backend , error ) {
2019-10-28 16:43:12 +00:00
path := os . Getenv ( EnvVaultRaftPath )
if path == "" {
pathFromConfig , ok := conf [ "path" ]
if ! ok {
return nil , fmt . Errorf ( "'path' must be set" )
}
path = pathFromConfig
2019-06-20 19:14:58 +00:00
}
2021-03-03 18:59:50 +00:00
var localID string
{
// Determine the local node ID from the environment.
if raftNodeID := os . Getenv ( EnvVaultRaftNodeID ) ; raftNodeID != "" {
localID = raftNodeID
}
// If not set in the environment check the configuration file.
if len ( localID ) == 0 {
localID = conf [ "node_id" ]
}
// If not set in the config check the "node-id" file.
if len ( localID ) == 0 {
localIDRaw , err := ioutil . ReadFile ( filepath . Join ( path , "node-id" ) )
switch {
case err == nil :
if len ( localIDRaw ) > 0 {
localID = string ( localIDRaw )
}
case os . IsNotExist ( err ) :
default :
return nil , err
}
}
// If all of the above fails generate a UUID and persist it to the
// "node-id" file.
if len ( localID ) == 0 {
id , err := uuid . GenerateUUID ( )
if err != nil {
return nil , err
}
2021-04-08 16:43:39 +00:00
if err := ioutil . WriteFile ( filepath . Join ( path , "node-id" ) , [ ] byte ( id ) , 0 o600 ) ; err != nil {
2021-03-03 18:59:50 +00:00
return nil , err
}
localID = id
}
}
2020-07-21 16:48:24 +00:00
// Create the FSM.
2021-03-03 18:59:50 +00:00
fsm , err := NewFSM ( path , localID , logger . Named ( "fsm" ) )
2020-07-21 16:48:24 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to create fsm: %v" , err )
}
2021-02-24 11:58:10 +00:00
if delayRaw , ok := conf [ "apply_delay" ] ; ok {
delay , err := time . ParseDuration ( delayRaw )
if err != nil {
return nil , fmt . Errorf ( "apply_delay does not parse as a duration: %w" , err )
}
fsm . applyCallback = func ( ) {
time . Sleep ( delay )
}
}
2019-06-20 19:14:58 +00:00
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
2020-06-01 14:17:24 +00:00
2019-06-20 19:14:58 +00:00
var devMode bool
if devMode {
store := raft . NewInmemStore ( )
stable = store
log = store
snap = raft . NewInmemSnapshotStore ( )
} else {
// Create the base raft path.
path := filepath . Join ( path , raftState )
if err := EnsurePath ( path , true ) ; err != nil {
return nil , err
}
// Create the backend raft store for logs and stable storage.
2022-01-11 13:16:53 +00:00
dbPath := filepath . Join ( path , "raft.db" )
opts := boltOptions ( dbPath )
2021-06-21 18:35:40 +00:00
raftOptions := raftboltdb . Options {
2022-01-11 13:16:53 +00:00
Path : dbPath ,
2021-11-23 04:16:57 +00:00
BoltOptions : opts ,
2021-06-21 18:35:40 +00:00
}
store , err := raftboltdb . New ( raftOptions )
2019-06-20 19:14:58 +00:00
if err != nil {
return nil , err
}
stable = store
// Wrap the store in a LogCache to improve performance.
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
return nil , err
}
log = cacheStore
// Create the snapshot store.
2020-06-23 18:08:30 +00:00
snapshots , err := NewBoltSnapshotStore ( path , logger . Named ( "snapshot" ) , fsm )
2019-06-20 19:14:58 +00:00
if err != nil {
return nil , err
}
snap = snapshots
}
2020-10-16 18:57:11 +00:00
if delayRaw , ok := conf [ "snapshot_delay" ] ; ok {
delay , err := time . ParseDuration ( delayRaw )
if err != nil {
return nil , fmt . Errorf ( "snapshot_delay does not parse as a duration: %w" , err )
}
2021-03-17 21:23:13 +00:00
snap = newSnapshotStoreDelay ( snap , delay , logger )
2020-10-16 18:57:11 +00:00
}
2020-06-01 14:17:24 +00:00
maxEntrySize := defaultMaxEntrySize
if maxEntrySizeCfg := conf [ "max_entry_size" ] ; len ( maxEntrySizeCfg ) != 0 {
i , err := strconv . Atoi ( maxEntrySizeCfg )
if err != nil {
return nil , fmt . Errorf ( "failed to parse 'max_entry_size': %w" , err )
}
maxEntrySize = uint64 ( i )
}
2021-03-17 21:23:13 +00:00
var reconcileInterval time . Duration
if interval := conf [ "autopilot_reconcile_interval" ] ; interval != "" {
interval , err := time . ParseDuration ( interval )
if err != nil {
return nil , fmt . Errorf ( "autopilot_reconcile_interval does not parse as a duration: %w" , err )
}
reconcileInterval = interval
}
2022-05-20 20:49:11 +00:00
var updateInterval time . Duration
if interval := conf [ "autopilot_update_interval" ] ; interval != "" {
interval , err := time . ParseDuration ( interval )
if err != nil {
return nil , fmt . Errorf ( "autopilot_update_interval does not parse as a duration: %w" , err )
}
updateInterval = interval
}
effectiveReconcileInterval := autopilot . DefaultReconcileInterval
effectiveUpdateInterval := autopilot . DefaultUpdateInterval
if reconcileInterval != 0 {
effectiveReconcileInterval = reconcileInterval
}
if updateInterval != 0 {
effectiveUpdateInterval = updateInterval
}
if effectiveReconcileInterval < effectiveUpdateInterval {
return nil , fmt . Errorf ( "autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)" , effectiveReconcileInterval , effectiveUpdateInterval )
}
2022-05-25 16:09:45 +00:00
var upgradeVersion string
if uv , ok := conf [ "autopilot_upgrade_version" ] ; ok && uv != "" {
upgradeVersion = uv
_ , err := goversion . NewVersion ( upgradeVersion )
if err != nil {
return nil , fmt . Errorf ( "autopilot_upgrade_version does not parse as a semantic version: %w" , err )
}
}
2022-11-18 17:58:16 +00:00
var nonVoter bool
if v := os . Getenv ( EnvVaultRaftNonVoter ) ; v != "" {
// Consistent with handling of other raft boolean env vars
// VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC
nonVoter = true
} else if v , ok := conf [ raftNonVoterConfigKey ] ; ok {
nonVoter , err = strconv . ParseBool ( v )
if err != nil {
return nil , fmt . Errorf ( "failed to parse %s config value %q as a boolean: %w" , raftNonVoterConfigKey , v , err )
}
}
if nonVoter && conf [ "retry_join" ] == "" {
return nil , fmt . Errorf ( "setting %s to true is only valid if at least one retry_join stanza is specified" , raftNonVoterConfigKey )
}
2019-06-20 19:14:58 +00:00
return & RaftBackend {
2021-03-17 21:23:13 +00:00
logger : logger ,
fsm : fsm ,
raftInitCh : make ( chan struct { } ) ,
conf : conf ,
logStore : log ,
stableStore : stable ,
snapStore : snap ,
dataDir : path ,
localID : localID ,
permitPool : physical . NewPermitPool ( physical . DefaultParallelOperations ) ,
maxEntrySize : maxEntrySize ,
followerHeartbeatTicker : time . NewTicker ( time . Second ) ,
autopilotReconcileInterval : reconcileInterval ,
2022-05-20 20:49:11 +00:00
autopilotUpdateInterval : updateInterval ,
redundancyZone : conf [ "autopilot_redundancy_zone" ] ,
2022-11-18 17:58:16 +00:00
nonVoter : nonVoter ,
2022-05-25 16:09:45 +00:00
upgradeVersion : upgradeVersion ,
2019-06-20 19:14:58 +00:00
} , nil
}
2020-10-16 18:57:11 +00:00
type snapshotStoreDelay struct {
2021-03-17 21:23:13 +00:00
logger log . Logger
2020-10-16 18:57:11 +00:00
wrapped raft . SnapshotStore
delay time . Duration
}
func ( s snapshotStoreDelay ) Create ( version raft . SnapshotVersion , index , term uint64 , configuration raft . Configuration , configurationIndex uint64 , trans raft . Transport ) ( raft . SnapshotSink , error ) {
2021-03-17 21:23:13 +00:00
s . logger . Trace ( "delaying before creating snapshot" , "delay" , s . delay )
2020-10-16 18:57:11 +00:00
time . Sleep ( s . delay )
return s . wrapped . Create ( version , index , term , configuration , configurationIndex , trans )
}
func ( s snapshotStoreDelay ) List ( ) ( [ ] * raft . SnapshotMeta , error ) {
return s . wrapped . List ( )
}
func ( s snapshotStoreDelay ) Open ( id string ) ( * raft . SnapshotMeta , io . ReadCloser , error ) {
return s . wrapped . Open ( id )
}
var _ raft . SnapshotStore = & snapshotStoreDelay { }
2021-03-17 21:23:13 +00:00
func newSnapshotStoreDelay ( snap raft . SnapshotStore , delay time . Duration , logger log . Logger ) * snapshotStoreDelay {
2020-10-16 18:57:11 +00:00
return & snapshotStoreDelay {
2021-03-17 21:23:13 +00:00
logger : logger ,
2020-10-16 18:57:11 +00:00
wrapped : snap ,
delay : delay ,
}
}
2020-05-14 12:31:02 +00:00
// Close is used to gracefully close all file resources. N.B. This method
// should only be called if you are sure the RaftBackend will never be used
// again.
func ( b * RaftBackend ) Close ( ) error {
b . l . Lock ( )
defer b . l . Unlock ( )
2022-12-15 15:04:27 +00:00
if err := b . fsm . Close ( ) ; err != nil {
2020-05-14 12:31:02 +00:00
return err
}
if err := b . stableStore . ( * raftboltdb . BoltStore ) . Close ( ) ; err != nil {
return err
}
return nil
}
2022-10-06 18:24:16 +00:00
func ( b * RaftBackend ) SetEffectiveSDKVersion ( sdkVersion string ) {
b . l . Lock ( )
b . effectiveSDKVersion = sdkVersion
b . l . Unlock ( )
}
2022-05-20 20:49:11 +00:00
func ( b * RaftBackend ) RedundancyZone ( ) string {
b . l . RLock ( )
defer b . l . RUnlock ( )
return b . redundancyZone
}
2022-11-18 17:58:16 +00:00
func ( b * RaftBackend ) NonVoter ( ) bool {
b . l . RLock ( )
defer b . l . RUnlock ( )
return b . nonVoter
}
2022-05-20 20:49:11 +00:00
func ( b * RaftBackend ) EffectiveVersion ( ) string {
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . upgradeVersion != "" {
return b . upgradeVersion
}
return version . GetVersion ( ) . Version
}
// DisableUpgradeMigration returns the state of the DisableUpgradeMigration config flag and whether it was set or not
func ( b * RaftBackend ) DisableUpgradeMigration ( ) ( bool , bool ) {
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . autopilotConfig == nil {
return false , false
}
return b . autopilotConfig . DisableUpgradeMigration , true
}
2021-04-26 23:01:26 +00:00
func ( b * RaftBackend ) CollectMetrics ( sink * metricsutil . ClusterMetricSink ) {
b . l . RLock ( )
logstoreStats := b . stableStore . ( * raftboltdb . BoltStore ) . Stats ( )
2022-12-15 15:04:27 +00:00
fsmStats := b . fsm . Stats ( )
2022-10-07 16:09:08 +00:00
stats := b . raft . Stats ( )
2021-04-26 23:01:26 +00:00
b . l . RUnlock ( )
b . collectMetricsWithStats ( logstoreStats , sink , "logstore" )
b . collectMetricsWithStats ( fsmStats , sink , "fsm" )
2022-10-07 16:09:08 +00:00
labels := [ ] metrics . Label {
{
Name : "peer_id" ,
Value : b . localID ,
} ,
}
for _ , key := range [ ] string { "term" , "commit_index" , "applied_index" , "fsm_pending" } {
n , err := strconv . ParseUint ( stats [ key ] , 10 , 64 )
if err == nil {
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "stats" , key } , float32 ( n ) , labels )
}
}
2021-04-26 23:01:26 +00:00
}
func ( b * RaftBackend ) collectMetricsWithStats ( stats bolt . Stats , sink * metricsutil . ClusterMetricSink , database string ) {
txstats := stats . TxStats
labels := [ ] metricsutil . Label { { "database" , database } }
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "freelist" , "free_pages" } , float32 ( stats . FreePageN ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "freelist" , "pending_pages" } , float32 ( stats . PendingPageN ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "freelist" , "allocated_bytes" } , float32 ( stats . FreeAlloc ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "freelist" , "used_bytes" } , float32 ( stats . FreelistInuse ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "transaction" , "started_read_transactions" } , float32 ( stats . TxN ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "transaction" , "currently_open_read_transactions" } , float32 ( stats . OpenTxN ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "page" , "count" } , float32 ( txstats . PageCount ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "page" , "bytes_allocated" } , float32 ( txstats . PageAlloc ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "cursor" , "count" } , float32 ( txstats . CursorCount ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "node" , "count" } , float32 ( txstats . NodeCount ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "node" , "dereferences" } , float32 ( txstats . NodeDeref ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "rebalance" , "count" } , float32 ( txstats . Rebalance ) , labels )
2022-01-24 15:51:35 +00:00
sink . AddSampleWithLabels ( [ ] string { "raft_storage" , "bolt" , "rebalance" , "time" } , float32 ( txstats . RebalanceTime . Milliseconds ( ) ) , labels )
2021-04-26 23:01:26 +00:00
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "split" , "count" } , float32 ( txstats . Split ) , labels )
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "spill" , "count" } , float32 ( txstats . Spill ) , labels )
2022-01-24 15:51:35 +00:00
sink . AddSampleWithLabels ( [ ] string { "raft_storage" , "bolt" , "spill" , "time" } , float32 ( txstats . SpillTime . Milliseconds ( ) ) , labels )
2021-04-26 23:01:26 +00:00
sink . SetGaugeWithLabels ( [ ] string { "raft_storage" , "bolt" , "write" , "count" } , float32 ( txstats . Write ) , labels )
2022-01-24 15:51:35 +00:00
sink . AddSampleWithLabels ( [ ] string { "raft_storage" , "bolt" , "write" , "time" } , float32 ( txstats . WriteTime . Milliseconds ( ) ) , labels )
2021-04-26 23:01:26 +00:00
}
2019-06-20 19:14:58 +00:00
// RaftServer has information about a server in the Raft configuration
type RaftServer struct {
// NodeID is the name of the server
NodeID string ` json:"node_id" `
// Address is the IP:port of the server, used for Raft communications
Address string ` json:"address" `
// Leader is true if this server is the current cluster leader
Leader bool ` json:"leader" `
// Protocol version is the raft protocol version used by the server
ProtocolVersion string ` json:"protocol_version" `
// Voter is true if this server has a vote in the cluster. This might
// be false if the server is staging and still coming online.
Voter bool ` json:"voter" `
}
// RaftConfigurationResponse is returned when querying for the current Raft
// configuration.
type RaftConfigurationResponse struct {
// Servers has the list of servers in the Raft configuration.
Servers [ ] * RaftServer ` json:"servers" `
// Index has the Raft index of this configuration.
Index uint64 ` json:"index" `
}
2019-07-19 01:10:15 +00:00
// Peer defines the ID and Address for a given member of the raft cluster.
2019-06-20 19:14:58 +00:00
type Peer struct {
2021-03-19 18:53:50 +00:00
ID string ` json:"id" `
Address string ` json:"address" `
Suffrage int ` json:"suffrage" `
2019-06-20 19:14:58 +00:00
}
// NodeID returns the identifier of the node
func ( b * RaftBackend ) NodeID ( ) string {
return b . localID
}
// Initialized tells if raft is running or not
func ( b * RaftBackend ) Initialized ( ) bool {
b . l . RLock ( )
init := b . raft != nil
b . l . RUnlock ( )
return init
}
// SetTLSKeyring is used to install a new keyring. If the active key has changed
// it will also close any network connections or streams forcing a reconnect
// with the new key.
2019-07-29 20:05:43 +00:00
func ( b * RaftBackend ) SetTLSKeyring ( keyring * TLSKeyring ) error {
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
err := b . streamLayer . setTLSKeyring ( keyring )
b . l . RUnlock ( )
return err
}
// SetServerAddressProvider sets a the address provider for determining the raft
// node addresses. This is currently only used in tests.
func ( b * RaftBackend ) SetServerAddressProvider ( provider raft . ServerAddressProvider ) {
b . l . Lock ( )
b . serverAddressProvider = provider
b . l . Unlock ( )
}
// Bootstrap prepares the given peers to be part of the raft cluster
2020-06-23 19:04:13 +00:00
func ( b * RaftBackend ) Bootstrap ( peers [ ] Peer ) error {
2019-06-20 19:14:58 +00:00
b . l . Lock ( )
defer b . l . Unlock ( )
hasState , err := raft . HasExistingState ( b . logStore , b . stableStore , b . snapStore )
if err != nil {
return err
}
if hasState {
return errors . New ( "error bootstrapping cluster: cluster already has state" )
}
raftConfig := & raft . Configuration {
Servers : make ( [ ] raft . Server , len ( peers ) ) ,
}
for i , p := range peers {
raftConfig . Servers [ i ] = raft . Server {
2021-03-19 18:53:50 +00:00
ID : raft . ServerID ( p . ID ) ,
Address : raft . ServerAddress ( p . Address ) ,
Suffrage : raft . ServerSuffrage ( p . Suffrage ) ,
2019-06-20 19:14:58 +00:00
}
}
// Store the config for later use
b . bootstrapConfig = raftConfig
return nil
}
// SetRestoreCallback sets the callback to be used when a restoreCallbackOp is
// processed through the FSM.
func ( b * RaftBackend ) SetRestoreCallback ( restoreCb restoreCallback ) {
b . fsm . l . Lock ( )
b . fsm . restoreCb = restoreCb
b . fsm . l . Unlock ( )
}
func ( b * RaftBackend ) applyConfigSettings ( config * raft . Config ) error {
config . Logger = b . logger
multiplierRaw , ok := b . conf [ "performance_multiplier" ]
multiplier := 5
if ok {
var err error
multiplier , err = strconv . Atoi ( multiplierRaw )
if err != nil {
return err
}
}
2022-05-20 20:49:11 +00:00
config . ElectionTimeout *= time . Duration ( multiplier )
config . HeartbeatTimeout *= time . Duration ( multiplier )
config . LeaderLeaseTimeout *= time . Duration ( multiplier )
2019-06-20 19:14:58 +00:00
snapThresholdRaw , ok := b . conf [ "snapshot_threshold" ]
if ok {
var err error
snapThreshold , err := strconv . Atoi ( snapThresholdRaw )
if err != nil {
return err
}
config . SnapshotThreshold = uint64 ( snapThreshold )
}
trailingLogsRaw , ok := b . conf [ "trailing_logs" ]
if ok {
var err error
trailingLogs , err := strconv . Atoi ( trailingLogsRaw )
if err != nil {
return err
}
config . TrailingLogs = uint64 ( trailingLogs )
}
2021-03-19 19:41:42 +00:00
snapshotIntervalRaw , ok := b . conf [ "snapshot_interval" ]
if ok {
var err error
snapshotInterval , err := time . ParseDuration ( snapshotIntervalRaw )
if err != nil {
return err
}
config . SnapshotInterval = snapshotInterval
}
2019-06-20 19:14:58 +00:00
2019-10-14 15:25:07 +00:00
config . NoSnapshotRestoreOnStart = true
config . MaxAppendEntries = 64
2021-06-21 19:00:41 +00:00
// Setting BatchApplyCh allows the raft library to enqueue up to
// MaxAppendEntries into each raft apply rather than relying on the
// scheduler.
config . BatchApplyCh = true
2022-04-29 12:32:16 +00:00
b . logger . Trace ( "applying raft config" , "inputs" , b . conf )
2019-06-20 19:14:58 +00:00
return nil
}
2019-07-29 20:05:43 +00:00
// SetupOpts are used to pass options to the raft setup function.
type SetupOpts struct {
// TLSKeyring is the keyring to use for the cluster traffic.
TLSKeyring * TLSKeyring
// ClusterListener is the cluster hook used to register the raft handler and
// client with core's cluster listeners.
ClusterListener cluster . ClusterHook
// StartAsLeader is used to specify this node should start as leader and
// bypass the leader election. This should be used with caution.
StartAsLeader bool
2019-10-15 04:55:31 +00:00
// RecoveryModeConfig is the configuration for the raft cluster in recovery
// mode.
RecoveryModeConfig * raft . Configuration
}
func ( b * RaftBackend ) StartRecoveryCluster ( ctx context . Context , peer Peer ) error {
recoveryModeConfig := & raft . Configuration {
Servers : [ ] raft . Server {
{
ID : raft . ServerID ( peer . ID ) ,
Address : raft . ServerAddress ( peer . Address ) ,
} ,
} ,
}
return b . SetupCluster ( context . Background ( ) , SetupOpts {
StartAsLeader : true ,
RecoveryModeConfig : recoveryModeConfig ,
} )
2019-07-29 20:05:43 +00:00
}
2020-06-23 19:04:13 +00:00
func ( b * RaftBackend ) HasState ( ) ( bool , error ) {
b . l . RLock ( )
defer b . l . RUnlock ( )
return raft . HasExistingState ( b . logStore , b . stableStore , b . snapStore )
}
2019-06-20 19:14:58 +00:00
// SetupCluster starts the raft cluster and enables the networking needed for
// the raft nodes to communicate.
2019-07-29 20:05:43 +00:00
func ( b * RaftBackend ) SetupCluster ( ctx context . Context , opts SetupOpts ) error {
2019-06-20 19:14:58 +00:00
b . logger . Trace ( "setting up raft cluster" )
b . l . Lock ( )
defer b . l . Unlock ( )
// We are already unsealed
if b . raft != nil {
b . logger . Debug ( "raft already started, not setting up cluster" )
return nil
}
if len ( b . localID ) == 0 {
return errors . New ( "no local node id configured" )
}
// Setup the raft config
raftConfig := raft . DefaultConfig ( )
if err := b . applyConfigSettings ( raftConfig ) ; err != nil {
return err
}
2020-04-21 20:45:07 +00:00
listenerIsNil := func ( cl cluster . ClusterHook ) bool {
switch {
case opts . ClusterListener == nil :
return true
default :
// Concrete type checks
switch cl . ( type ) {
case * cluster . Listener :
return cl . ( * cluster . Listener ) == nil
}
}
return false
}
2022-04-29 12:32:16 +00:00
var initialTimeoutMultiplier time . Duration
2019-06-20 19:14:58 +00:00
switch {
2020-04-21 20:45:07 +00:00
case opts . TLSKeyring == nil && listenerIsNil ( opts . ClusterListener ) :
2019-06-20 19:14:58 +00:00
// If we don't have a provided network we use an in-memory one.
// This allows us to bootstrap a node without bringing up a cluster
2019-06-27 17:00:03 +00:00
// network. This will be true during bootstrap, tests and dev modes.
_ , b . raftTransport = raft . NewInmemTransportWithTimeout ( raft . ServerAddress ( b . localID ) , time . Second )
2019-07-29 20:05:43 +00:00
case opts . TLSKeyring == nil :
2019-06-20 19:14:58 +00:00
return errors . New ( "no keyring provided" )
2020-04-21 20:45:07 +00:00
case listenerIsNil ( opts . ClusterListener ) :
2019-06-20 19:14:58 +00:00
return errors . New ( "no cluster listener provided" )
default :
2022-04-29 12:32:16 +00:00
initialTimeoutMultiplier = 3
if ! opts . StartAsLeader {
electionTimeout , heartbeatTimeout := raftConfig . ElectionTimeout , raftConfig . HeartbeatTimeout
// Use bigger values for first election
raftConfig . ElectionTimeout *= initialTimeoutMultiplier
raftConfig . HeartbeatTimeout *= initialTimeoutMultiplier
b . logger . Trace ( "using larger timeouts for raft at startup" ,
"initial_election_timeout" , raftConfig . ElectionTimeout ,
"initial_heartbeat_timeout" , raftConfig . HeartbeatTimeout ,
"normal_election_timeout" , electionTimeout ,
"normal_heartbeat_timeout" , heartbeatTimeout )
}
2019-06-20 19:14:58 +00:00
// Set the local address and localID in the streaming layer and the raft config.
2020-01-17 07:03:02 +00:00
streamLayer , err := NewRaftLayer ( b . logger . Named ( "stream" ) , opts . TLSKeyring , opts . ClusterListener )
2019-06-20 19:14:58 +00:00
if err != nil {
return err
}
transConfig := & raft . NetworkTransportConfig {
Stream : streamLayer ,
MaxPool : 3 ,
Timeout : 10 * time . Second ,
ServerAddressProvider : b . serverAddressProvider ,
2020-09-23 17:40:00 +00:00
Logger : b . logger . Named ( "raft-net" ) ,
2019-06-20 19:14:58 +00:00
}
transport := raft . NewNetworkTransportWithConfig ( transConfig )
b . streamLayer = streamLayer
b . raftTransport = transport
}
raftConfig . LocalID = raft . ServerID ( b . localID )
// Set up a channel for reliable leader notifications.
2020-04-01 00:42:48 +00:00
raftNotifyCh := make ( chan bool , 10 )
2019-06-20 19:14:58 +00:00
raftConfig . NotifyCh = raftNotifyCh
// If we have a bootstrapConfig set we should bootstrap now.
if b . bootstrapConfig != nil {
bootstrapConfig := b . bootstrapConfig
// Unset the bootstrap config
b . bootstrapConfig = nil
// Bootstrap raft with our known cluster members.
if err := raft . BootstrapCluster ( raftConfig , b . logStore , b . stableStore , b . snapStore , b . raftTransport , * bootstrapConfig ) ; err != nil {
return err
}
}
// Setup the Raft store.
b . fsm . SetNoopRestore ( true )
raftPath := filepath . Join ( b . dataDir , raftState )
peersFile := filepath . Join ( raftPath , peersFileName )
_ , err := os . Stat ( peersFile )
if err == nil {
b . logger . Info ( "raft recovery initiated" , "recovery_file" , peersFileName )
recoveryConfig , err := raft . ReadConfigJSON ( peersFile )
if err != nil {
2021-05-31 16:54:05 +00:00
return fmt . Errorf ( "raft recovery failed to parse peers.json: %w" , err )
2019-06-20 19:14:58 +00:00
}
2020-04-03 23:13:51 +00:00
// Non-voting servers are only allowed in enterprise. If Suffrage is disabled,
// error out to indicate that it isn't allowed.
for idx := range recoveryConfig . Servers {
2021-02-10 21:41:58 +00:00
if ! nonVotersAllowed && recoveryConfig . Servers [ idx ] . Suffrage == raft . Nonvoter {
2020-04-03 23:13:51 +00:00
return fmt . Errorf ( "raft recovery failed to parse configuration for node %q: setting `non_voter` is only supported in enterprise" , recoveryConfig . Servers [ idx ] . ID )
}
}
b . logger . Info ( "raft recovery found new config" , "config" , recoveryConfig )
2019-06-20 19:14:58 +00:00
err = raft . RecoverCluster ( raftConfig , b . fsm , b . logStore , b . stableStore , b . snapStore , b . raftTransport , recoveryConfig )
if err != nil {
2021-05-31 16:54:05 +00:00
return fmt . Errorf ( "raft recovery failed: %w" , err )
2019-06-20 19:14:58 +00:00
}
err = os . Remove ( peersFile )
if err != nil {
2021-05-31 16:54:05 +00:00
return fmt . Errorf ( "raft recovery failed to delete peers.json; please delete manually: %w" , err )
2019-06-20 19:14:58 +00:00
}
b . logger . Info ( "raft recovery deleted peers.json" )
}
2019-10-15 04:55:31 +00:00
if opts . RecoveryModeConfig != nil {
err = raft . RecoverCluster ( raftConfig , b . fsm , b . logStore , b . stableStore , b . snapStore , b . raftTransport , * opts . RecoveryModeConfig )
if err != nil {
2021-05-31 16:54:05 +00:00
return fmt . Errorf ( "recovering raft cluster failed: %w" , err )
2019-10-15 04:55:31 +00:00
}
}
2021-03-17 21:23:13 +00:00
b . logger . Info ( "creating Raft" , "config" , fmt . Sprintf ( "%#v" , raftConfig ) )
2019-07-25 00:44:13 +00:00
raftObj , err := raft . NewRaft ( raftConfig , b . fsm . chunker , b . logStore , b . stableStore , b . snapStore , b . raftTransport )
2019-06-20 19:14:58 +00:00
b . fsm . SetNoopRestore ( false )
if err != nil {
return err
}
2020-06-08 23:34:20 +00:00
// If we are expecting to start as leader wait until we win the election.
// This should happen quickly since there is only one node in the cluster.
// StartAsLeader is only set during init, recovery mode, storage migration,
// and tests.
if opts . StartAsLeader {
2022-04-01 14:17:11 +00:00
// ticker is used to prevent memory leak of using time.After in
// for - select pattern.
ticker := time . NewTicker ( 10 * time . Millisecond )
defer ticker . Stop ( )
2020-06-08 23:34:20 +00:00
for {
if raftObj . State ( ) == raft . Leader {
break
}
2022-04-01 14:17:11 +00:00
ticker . Reset ( 10 * time . Millisecond )
2020-06-08 23:34:20 +00:00
select {
case <- ctx . Done ( ) :
future := raftObj . Shutdown ( )
if future . Error ( ) != nil {
2021-05-31 16:54:05 +00:00
return fmt . Errorf ( "shutdown while waiting for leadership: %w" , future . Error ( ) )
2020-06-08 23:34:20 +00:00
}
return errors . New ( "shutdown while waiting for leadership" )
2022-04-01 14:17:11 +00:00
case <- ticker . C :
2020-06-08 23:34:20 +00:00
}
}
}
2019-06-20 19:14:58 +00:00
b . raft = raftObj
b . raftNotifyCh = raftNotifyCh
2021-03-03 18:59:50 +00:00
if err := b . fsm . upgradeLocalNodeConfig ( ) ; err != nil {
b . logger . Error ( "failed to upgrade local node configuration" )
return err
}
2019-06-20 19:14:58 +00:00
if b . streamLayer != nil {
// Add Handler to the cluster.
2019-07-29 20:05:43 +00:00
opts . ClusterListener . AddHandler ( consts . RaftStorageALPN , b . streamLayer )
2019-06-20 19:14:58 +00:00
// Add Client to the cluster.
2019-07-29 20:05:43 +00:00
opts . ClusterListener . AddClient ( consts . RaftStorageALPN , b . streamLayer )
2019-06-20 19:14:58 +00:00
}
2020-06-23 19:04:13 +00:00
// Close the init channel to signal setup has been completed
close ( b . raftInitCh )
2022-04-29 12:32:16 +00:00
reloadConfig := func ( ) {
newCfg := raft . ReloadableConfig {
TrailingLogs : raftConfig . TrailingLogs ,
SnapshotInterval : raftConfig . SnapshotInterval ,
SnapshotThreshold : raftConfig . SnapshotThreshold ,
HeartbeatTimeout : raftConfig . HeartbeatTimeout / initialTimeoutMultiplier ,
ElectionTimeout : raftConfig . ElectionTimeout / initialTimeoutMultiplier ,
}
err := raftObj . ReloadConfig ( newCfg )
if err != nil {
b . logger . Error ( "failed to reload raft config to set lower timeouts" , "error" , err )
} else {
b . logger . Trace ( "reloaded raft config to set lower timeouts" , "config" , fmt . Sprintf ( "%#v" , newCfg ) )
}
}
confFuture := raftObj . GetConfiguration ( )
numServers := 0
if err := confFuture . Error ( ) ; err != nil {
// This should probably never happen, but just in case we'll log the error.
// We'll default in this case to the multi-node behaviour.
b . logger . Error ( "failed to read raft configuration" , "error" , err )
} else {
clusterConf := confFuture . Configuration ( )
numServers = len ( clusterConf . Servers )
}
if initialTimeoutMultiplier != 0 {
if numServers == 1 {
reloadConfig ( )
} else {
go func ( ) {
ticker := time . NewTicker ( 50 * time . Millisecond )
// Emulate the random timeout used in Raft lib, to ensure that
// if all nodes are brought up simultaneously, they don't all
// call for an election at once.
extra := time . Duration ( rand . Int63 ( ) ) % raftConfig . HeartbeatTimeout
timeout := time . NewTimer ( raftConfig . HeartbeatTimeout + extra )
for {
select {
case <- ticker . C :
switch raftObj . State ( ) {
case raft . Candidate , raft . Leader :
b . logger . Trace ( "triggering raft config reload due to being candidate or leader" )
reloadConfig ( )
return
case raft . Shutdown :
return
}
case <- timeout . C :
b . logger . Trace ( "triggering raft config reload due to initial timeout" )
reloadConfig ( )
return
}
}
} ( )
}
}
2020-06-23 19:04:13 +00:00
b . logger . Trace ( "finished setting up raft cluster" )
2019-06-20 19:14:58 +00:00
return nil
}
// TeardownCluster shuts down the raft cluster
func ( b * RaftBackend ) TeardownCluster ( clusterListener cluster . ClusterHook ) error {
if clusterListener != nil {
clusterListener . StopHandler ( consts . RaftStorageALPN )
clusterListener . RemoveClient ( consts . RaftStorageALPN )
}
b . l . Lock ( )
2020-06-26 00:51:13 +00:00
// Perform shutdown only if the raft object is non-nil. The object could be nil
// if the node is unsealed but has not joined the peer set.
var future raft . Future
if b . raft != nil {
future = b . raft . Shutdown ( )
}
2019-06-20 19:14:58 +00:00
b . raft = nil
2020-06-23 19:04:13 +00:00
// If we're tearing down, then we need to recreate the raftInitCh
b . raftInitCh = make ( chan struct { } )
2019-06-20 19:14:58 +00:00
b . l . Unlock ( )
2020-06-26 00:51:13 +00:00
if future != nil {
return future . Error ( )
}
return nil
2019-06-20 19:14:58 +00:00
}
2020-05-18 23:07:27 +00:00
// CommittedIndex returns the latest index committed to stable storage
func ( b * RaftBackend ) CommittedIndex ( ) uint64 {
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft == nil {
return 0
}
return b . raft . LastIndex ( )
}
2019-06-20 19:14:58 +00:00
// AppliedIndex returns the latest index applied to the FSM
func ( b * RaftBackend ) AppliedIndex ( ) uint64 {
b . l . RLock ( )
defer b . l . RUnlock ( )
2020-09-22 21:47:13 +00:00
if b . fsm == nil {
2019-06-20 19:14:58 +00:00
return 0
}
2020-09-22 21:47:13 +00:00
// We use the latest index that the FSM has seen here, which may be behind
// raft.AppliedIndex() due to the async nature of the raft library.
indexState , _ := b . fsm . LatestState ( )
return indexState . Index
2019-06-20 19:14:58 +00:00
}
2021-03-03 18:59:50 +00:00
// Term returns the raft term of this node.
func ( b * RaftBackend ) Term ( ) uint64 {
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . fsm == nil {
return 0
}
// We use the latest index that the FSM has seen here, which may be behind
// raft.AppliedIndex() due to the async nature of the raft library.
indexState , _ := b . fsm . LatestState ( )
return indexState . Term
}
2019-06-20 19:14:58 +00:00
// RemovePeer removes the given peer ID from the raft cluster. If the node is
// ourselves we will give up leadership.
func ( b * RaftBackend ) RemovePeer ( ctx context . Context , peerID string ) error {
b . l . RLock ( )
defer b . l . RUnlock ( )
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2021-03-03 18:59:50 +00:00
if b . disableAutopilot {
if b . raft == nil {
return errors . New ( "raft storage is not initialized" )
}
b . logger . Trace ( "removing server from raft" , "id" , peerID )
future := b . raft . RemoveServer ( raft . ServerID ( peerID ) , 0 , 0 )
return future . Error ( )
2019-06-20 19:14:58 +00:00
}
2021-03-03 18:59:50 +00:00
if b . autopilot == nil {
return errors . New ( "raft storage autopilot is not initialized" )
}
2019-06-20 19:14:58 +00:00
2021-03-03 18:59:50 +00:00
b . logger . Trace ( "removing server from raft via autopilot" , "id" , peerID )
return b . autopilot . RemoveServer ( raft . ServerID ( peerID ) )
2019-06-20 19:14:58 +00:00
}
2021-06-11 17:25:02 +00:00
// GetConfigurationOffline is used to read the stale, last known raft
// configuration to this node. It accesses the last state written into the
// FSM. When a server is online use GetConfiguration instead.
func ( b * RaftBackend ) GetConfigurationOffline ( ) ( * RaftConfigurationResponse , error ) {
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft != nil {
return nil , errors . New ( "raft storage is initialized, used GetConfiguration instead" )
}
if b . fsm == nil {
return nil , nil
}
state , configuration := b . fsm . LatestState ( )
config := & RaftConfigurationResponse {
Index : state . Index ,
}
2021-06-24 16:56:38 +00:00
if configuration == nil || configuration . Servers == nil {
return config , nil
}
2021-06-11 17:25:02 +00:00
for _ , server := range configuration . Servers {
entry := & RaftServer {
NodeID : server . Id ,
Address : server . Address ,
// Since we are offline no node is the leader.
Leader : false ,
Voter : raft . ServerSuffrage ( server . Suffrage ) == raft . Voter ,
}
config . Servers = append ( config . Servers , entry )
}
return config , nil
}
2019-06-20 19:14:58 +00:00
func ( b * RaftBackend ) GetConfiguration ( ctx context . Context ) ( * RaftConfigurationResponse , error ) {
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft == nil {
return nil , errors . New ( "raft storage is not initialized" )
}
future := b . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
return nil , err
}
config := & RaftConfigurationResponse {
Index : future . Index ( ) ,
}
for _ , server := range future . Configuration ( ) . Servers {
entry := & RaftServer {
2019-07-25 12:41:14 +00:00
NodeID : string ( server . ID ) ,
Address : string ( server . Address ) ,
// Since we only service this request on the active node our node ID
// denotes the raft leader.
Leader : string ( server . ID ) == b . NodeID ( ) ,
2019-06-20 19:14:58 +00:00
Voter : server . Suffrage == raft . Voter ,
2019-06-27 21:39:52 +00:00
ProtocolVersion : strconv . Itoa ( raft . ProtocolVersionMax ) ,
2019-06-20 19:14:58 +00:00
}
2022-05-20 20:49:11 +00:00
2019-06-20 19:14:58 +00:00
config . Servers = append ( config . Servers , entry )
}
return config , nil
}
// AddPeer adds a new server to the raft cluster
func ( b * RaftBackend ) AddPeer ( ctx context . Context , peerID , clusterAddr string ) error {
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
2021-03-03 18:59:50 +00:00
if b . disableAutopilot {
if b . raft == nil {
return errors . New ( "raft storage is not initialized" )
}
b . logger . Trace ( "adding server to raft" , "id" , peerID )
future := b . raft . AddVoter ( raft . ServerID ( peerID ) , raft . ServerAddress ( clusterAddr ) , 0 , 0 )
return future . Error ( )
2019-06-20 19:14:58 +00:00
}
2021-03-03 18:59:50 +00:00
if b . autopilot == nil {
return errors . New ( "raft storage autopilot is not initialized" )
}
2019-06-21 01:32:00 +00:00
2021-03-03 18:59:50 +00:00
b . logger . Trace ( "adding server to raft via autopilot" , "id" , peerID )
return b . autopilot . AddServer ( & autopilot . Server {
ID : raft . ServerID ( peerID ) ,
Name : peerID ,
Address : raft . ServerAddress ( clusterAddr ) ,
RaftVersion : raft . ProtocolVersionMax ,
NodeType : autopilot . NodeVoter ,
} )
2019-06-20 19:14:58 +00:00
}
// Peers returns all the servers present in the raft cluster
func ( b * RaftBackend ) Peers ( ctx context . Context ) ( [ ] Peer , error ) {
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft == nil {
2021-03-03 18:59:50 +00:00
return nil , errors . New ( "raft storage is not initialized" )
2019-06-20 19:14:58 +00:00
}
future := b . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
return nil , err
}
ret := make ( [ ] Peer , len ( future . Configuration ( ) . Servers ) )
for i , s := range future . Configuration ( ) . Servers {
ret [ i ] = Peer {
2021-03-19 18:53:50 +00:00
ID : string ( s . ID ) ,
Address : string ( s . Address ) ,
Suffrage : int ( s . Suffrage ) ,
2019-06-20 19:14:58 +00:00
}
}
return ret , nil
}
2020-10-16 18:57:11 +00:00
// SnapshotHTTP is a wrapper for Snapshot that sends the snapshot as an HTTP
// response.
func ( b * RaftBackend ) SnapshotHTTP ( out * logical . HTTPResponseWriter , access * seal . Access ) error {
out . Header ( ) . Add ( "Content-Disposition" , "attachment" )
out . Header ( ) . Add ( "Content-Type" , "application/gzip" )
return b . Snapshot ( out , access )
}
2019-06-20 19:14:58 +00:00
// Snapshot takes a raft snapshot, packages it into a archive file and writes it
// to the provided writer. Seal access is used to encrypt the SHASUM file so we
2021-12-07 01:12:20 +00:00
// can validate the snapshot was taken using the same root keys or not.
2020-10-16 18:57:11 +00:00
func ( b * RaftBackend ) Snapshot ( out io . Writer , access * seal . Access ) error {
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft == nil {
2021-03-03 18:59:50 +00:00
return errors . New ( "raft storage is sealed" )
2019-06-20 19:14:58 +00:00
}
// If we have access to the seal create a sealer object
var s snapshot . Sealer
if access != nil {
s = & sealer {
access : access ,
}
}
2020-10-16 18:57:11 +00:00
return snapshot . Write ( b . logger . Named ( "snapshot" ) , b . raft , s , out )
2019-06-20 19:14:58 +00:00
}
// WriteSnapshotToTemp reads a snapshot archive off the provided reader,
// extracts the data and writes the snapshot to a temporary file. The seal
// access is used to decrypt the SHASUM file in the archive to ensure this
2021-12-07 01:12:20 +00:00
// snapshot has the same root key as the running instance. If the provided
2019-06-20 19:14:58 +00:00
// access is nil then it will skip that validation.
2020-01-11 01:39:52 +00:00
func ( b * RaftBackend ) WriteSnapshotToTemp ( in io . ReadCloser , access * seal . Access ) ( * os . File , func ( ) , raft . SnapshotMeta , error ) {
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
var metadata raft . SnapshotMeta
if b . raft == nil {
2021-03-03 18:59:50 +00:00
return nil , nil , metadata , errors . New ( "raft storage is sealed" )
2019-06-20 19:14:58 +00:00
}
// If we have access to the seal create a sealer object
var s snapshot . Sealer
if access != nil {
s = & sealer {
access : access ,
}
}
snap , cleanup , err := snapshot . WriteToTempFileWithSealer ( b . logger . Named ( "snapshot" ) , in , & metadata , s )
return snap , cleanup , metadata , err
}
// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
// raft.
func ( b * RaftBackend ) RestoreSnapshot ( ctx context . Context , metadata raft . SnapshotMeta , snap io . Reader ) error {
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
defer b . l . RUnlock ( )
if b . raft == nil {
return errors . New ( "raft storage is not initialized" )
}
if err := b . raft . Restore ( & metadata , snap , 0 ) ; err != nil {
b . logger . Named ( "snapshot" ) . Error ( "failed to restore snapshot" , "error" , err )
return err
}
// Apply a log that tells the follower nodes to run the restore callback
// function. This is done after the restore call so we can be sure the
// snapshot applied to a quorum of nodes.
command := & LogData {
Operations : [ ] * LogOperation {
2021-04-08 16:43:39 +00:00
{
2019-06-20 19:14:58 +00:00
OpType : restoreCallbackOp ,
} ,
} ,
}
err := b . applyLog ( ctx , command )
2019-07-03 20:56:30 +00:00
// Do a best-effort attempt to let the standbys apply the restoreCallbackOp
// before we continue.
time . Sleep ( restoreOpDelayDuration )
2019-06-20 19:14:58 +00:00
return err
}
// Delete inserts an entry in the log to delete the given path
func ( b * RaftBackend ) Delete ( ctx context . Context , path string ) error {
2019-10-14 15:25:07 +00:00
defer metrics . MeasureSince ( [ ] string { "raft-storage" , "delete" } , time . Now ( ) )
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2019-06-20 19:14:58 +00:00
command := & LogData {
Operations : [ ] * LogOperation {
2021-04-08 16:43:39 +00:00
{
2019-06-20 19:14:58 +00:00
OpType : deleteOp ,
Key : path ,
} ,
} ,
}
2019-10-14 15:25:07 +00:00
b . permitPool . Acquire ( )
defer b . permitPool . Release ( )
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
err := b . applyLog ( ctx , command )
b . l . RUnlock ( )
return err
}
// Get returns the value corresponding to the given path from the fsm
func ( b * RaftBackend ) Get ( ctx context . Context , path string ) ( * physical . Entry , error ) {
2019-10-14 15:25:07 +00:00
defer metrics . MeasureSince ( [ ] string { "raft-storage" , "get" } , time . Now ( ) )
2019-06-20 19:14:58 +00:00
if b . fsm == nil {
return nil , errors . New ( "raft: fsm not configured" )
}
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2019-10-14 15:25:07 +00:00
b . permitPool . Acquire ( )
defer b . permitPool . Release ( )
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2020-06-01 14:17:24 +00:00
entry , err := b . fsm . Get ( ctx , path )
if entry != nil {
valueLen := len ( entry . Value )
if uint64 ( valueLen ) > b . maxEntrySize {
2021-06-23 19:43:44 +00:00
b . logger . Warn ( "retrieved entry value is too large, has raft's max_entry_size been reduced?" ,
"size" , valueLen , "max_entry_size" , b . maxEntrySize )
2020-06-01 14:17:24 +00:00
}
}
return entry , err
2019-06-20 19:14:58 +00:00
}
2020-06-01 14:17:24 +00:00
// Put inserts an entry in the log for the put operation. It will return an
// error if the resulting entry encoding exceeds the configured max_entry_size
// or if the call to applyLog fails.
2019-06-20 19:14:58 +00:00
func ( b * RaftBackend ) Put ( ctx context . Context , entry * physical . Entry ) error {
2019-10-14 15:25:07 +00:00
defer metrics . MeasureSince ( [ ] string { "raft-storage" , "put" } , time . Now ( ) )
2021-11-25 19:07:03 +00:00
if len ( entry . Key ) > bolt . MaxKeySize {
return fmt . Errorf ( "%s, max key size for integrated storage is %d" , physical . ErrKeyTooLarge , bolt . MaxKeySize )
}
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2019-06-20 19:14:58 +00:00
command := & LogData {
Operations : [ ] * LogOperation {
2021-04-08 16:43:39 +00:00
{
2019-06-20 19:14:58 +00:00
OpType : putOp ,
Key : entry . Key ,
Value : entry . Value ,
} ,
} ,
}
2019-10-14 15:25:07 +00:00
b . permitPool . Acquire ( )
defer b . permitPool . Release ( )
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
err := b . applyLog ( ctx , command )
b . l . RUnlock ( )
return err
}
// List enumerates all the items under the prefix from the fsm
func ( b * RaftBackend ) List ( ctx context . Context , prefix string ) ( [ ] string , error ) {
2019-10-14 15:25:07 +00:00
defer metrics . MeasureSince ( [ ] string { "raft-storage" , "list" } , time . Now ( ) )
2019-06-20 19:14:58 +00:00
if b . fsm == nil {
return nil , errors . New ( "raft: fsm not configured" )
}
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2019-10-14 15:25:07 +00:00
b . permitPool . Acquire ( )
defer b . permitPool . Release ( )
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return nil , err
}
2019-06-20 19:14:58 +00:00
return b . fsm . List ( ctx , prefix )
}
// Transaction applies all the given operations into a single log and
// applies it.
func ( b * RaftBackend ) Transaction ( ctx context . Context , txns [ ] * physical . TxnEntry ) error {
2019-10-14 15:25:07 +00:00
defer metrics . MeasureSince ( [ ] string { "raft-storage" , "transaction" } , time . Now ( ) )
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2022-09-16 16:35:48 +00:00
txnMap := make ( map [ string ] * physical . TxnEntry )
2022-09-13 17:03:19 +00:00
2019-06-20 19:14:58 +00:00
command := & LogData {
2022-09-16 16:35:48 +00:00
Operations : make ( [ ] * LogOperation , len ( txns ) ) ,
2019-06-20 19:14:58 +00:00
}
2022-09-16 16:35:48 +00:00
for i , txn := range txns {
2019-06-20 19:14:58 +00:00
op := & LogOperation { }
switch txn . Operation {
case physical . PutOperation :
2021-11-26 13:38:39 +00:00
if len ( txn . Entry . Key ) > bolt . MaxKeySize {
return fmt . Errorf ( "%s, max key size for integrated storage is %d" , physical . ErrKeyTooLarge , bolt . MaxKeySize )
}
2019-06-20 19:14:58 +00:00
op . OpType = putOp
op . Key = txn . Entry . Key
op . Value = txn . Entry . Value
case physical . DeleteOperation :
op . OpType = deleteOp
op . Key = txn . Entry . Key
2022-09-16 16:35:48 +00:00
case physical . GetOperation :
op . OpType = getOp
op . Key = txn . Entry . Key
txnMap [ op . Key ] = txn
2019-06-20 19:14:58 +00:00
default :
return fmt . Errorf ( "%q is not a supported transaction operation" , txn . Operation )
}
command . Operations [ i ] = op
}
2019-10-14 15:25:07 +00:00
b . permitPool . Acquire ( )
defer b . permitPool . Release ( )
2019-06-20 19:14:58 +00:00
b . l . RLock ( )
err := b . applyLog ( ctx , command )
b . l . RUnlock ( )
2022-09-13 17:03:19 +00:00
2022-09-16 16:35:48 +00:00
// loop over results and update pointers to get operations
for _ , logOp := range command . Operations {
if logOp . OpType == getOp {
if txn , found := txnMap [ logOp . Key ] ; found {
txn . Entry . Value = logOp . Value
}
}
}
2019-06-20 19:14:58 +00:00
return err
}
// applyLog will take a given log command and apply it to the raft log. applyLog
// doesn't return until the log has been applied to a quorum of servers and is
// persisted to the local FSM. Caller should hold the backend's read lock.
func ( b * RaftBackend ) applyLog ( ctx context . Context , command * LogData ) error {
if b . raft == nil {
2021-03-03 18:59:50 +00:00
return errors . New ( "raft storage is not initialized" )
2019-06-20 19:14:58 +00:00
}
2021-08-19 16:03:56 +00:00
if err := ctx . Err ( ) ; err != nil {
return err
}
2019-06-20 19:14:58 +00:00
commandBytes , err := proto . Marshal ( command )
if err != nil {
return err
}
2020-06-01 14:17:24 +00:00
cmdSize := len ( commandBytes )
if uint64 ( cmdSize ) > b . maxEntrySize {
return fmt . Errorf ( "%s; got %d bytes, max: %d bytes" , physical . ErrValueTooLarge , cmdSize , b . maxEntrySize )
}
defer metrics . AddSample ( [ ] string { "raft-storage" , "entry_size" } , float32 ( cmdSize ) )
2019-07-25 00:44:13 +00:00
var chunked bool
2019-07-22 16:02:48 +00:00
var applyFuture raft . ApplyFuture
switch {
2019-07-29 21:11:46 +00:00
case len ( commandBytes ) <= raftchunking . ChunkSize :
2019-07-22 16:02:48 +00:00
applyFuture = b . raft . Apply ( commandBytes , 0 )
default :
2019-07-25 00:44:13 +00:00
chunked = true
2019-07-22 16:02:48 +00:00
applyFuture = raftchunking . ChunkingApply ( commandBytes , nil , 0 , b . raft . ApplyLog )
2019-06-20 19:14:58 +00:00
}
2019-07-25 00:44:13 +00:00
if err := applyFuture . Error ( ) ; err != nil {
2019-06-20 19:14:58 +00:00
return err
}
2019-07-25 00:44:13 +00:00
resp := applyFuture . Response ( )
if chunked {
// In this case we didn't apply all chunks successfully, possibly due
// to a term change
if resp == nil {
// This returns the error in the interface because the raft library
// returns errors from the FSM via the future, not via err from the
// apply function. Downstream client code expects to see any error
// from the FSM (as opposed to the apply itself) and decide whether
// it can retry in the future's response.
return errors . New ( "applying chunking failed, please retry" )
}
// We expect that this conversion should always work
chunkedSuccess , ok := resp . ( raftchunking . ChunkingSuccess )
if ! ok {
return errors . New ( "unknown type of response back from chunking FSM" )
}
// Replace the reply with the inner wrapped version
resp = chunkedSuccess . Response
}
2022-09-16 16:35:48 +00:00
fsmar , ok := resp . ( * FSMApplyResponse )
if ! ok || ! fsmar . Success {
2019-06-20 19:14:58 +00:00
return errors . New ( "could not apply data" )
}
2022-09-16 16:35:48 +00:00
// populate command with our results
if fsmar . EntrySlice == nil {
return errors . New ( "entries on FSM response were empty" )
}
for i , logOp := range command . Operations {
if logOp . OpType == getOp {
fsmEntry := fsmar . EntrySlice [ i ]
// this should always be true because the entries in the slice were created in the same order as
// the command operations.
if logOp . Key == fsmEntry . Key {
if len ( fsmEntry . Value ) > 0 {
logOp . Value = fsmEntry . Value
}
} else {
// this shouldn't happen
return errors . New ( "entries in FSM response were out of order" )
}
}
}
2019-06-20 19:14:58 +00:00
return nil
}
2020-06-23 19:04:13 +00:00
// HAEnabled is the implementation of the HABackend interface
2019-06-20 19:14:58 +00:00
func ( b * RaftBackend ) HAEnabled ( ) bool { return true }
2020-06-23 19:04:13 +00:00
// HAEnabled is the implementation of the HABackend interface
2019-06-20 19:14:58 +00:00
func ( b * RaftBackend ) LockWith ( key , value string ) ( physical . Lock , error ) {
return & RaftLock {
key : key ,
value : [ ] byte ( value ) ,
b : b ,
} , nil
}
2021-03-03 18:59:50 +00:00
// SetDesiredSuffrage sets a field in the fsm indicating the suffrage intent for
// this node.
func ( b * RaftBackend ) SetDesiredSuffrage ( nonVoter bool ) error {
b . l . Lock ( )
defer b . l . Unlock ( )
var desiredSuffrage string
switch nonVoter {
case true :
desiredSuffrage = "non-voter"
default :
desiredSuffrage = "voter"
}
err := b . fsm . recordSuffrage ( desiredSuffrage )
if err != nil {
return err
}
return nil
}
func ( b * RaftBackend ) DesiredSuffrage ( ) string {
2021-03-11 18:14:11 +00:00
return b . fsm . DesiredSuffrage ( )
2021-03-03 18:59:50 +00:00
}
2019-06-20 19:14:58 +00:00
// RaftLock implements the physical Lock interface and enables HA for this
// backend. The Lock uses the raftNotifyCh for receiving leadership edge
// triggers. Vault's active duty matches raft's leadership.
type RaftLock struct {
key string
value [ ] byte
b * RaftBackend
}
// monitorLeadership waits until we receive an update on the raftNotifyCh and
// closes the leaderLost channel.
2019-06-27 17:00:03 +00:00
func ( l * RaftLock ) monitorLeadership ( stopCh <- chan struct { } , leaderNotifyCh <- chan bool ) <- chan struct { } {
2019-06-20 19:14:58 +00:00
leaderLost := make ( chan struct { } )
go func ( ) {
2020-03-13 22:03:58 +00:00
for {
select {
case isLeader := <- leaderNotifyCh :
// leaderNotifyCh may deliver a true value initially if this
// server is already the leader prior to RaftLock.Lock call
// (the true message was already queued). The next message is
// always going to be false. The for loop should loop at most
// twice.
if ! isLeader {
close ( leaderLost )
return
}
case <- stopCh :
return
2019-07-29 20:05:43 +00:00
}
2019-06-20 19:14:58 +00:00
}
} ( )
return leaderLost
}
// Lock blocks until we become leader or are shutdown. It returns a channel that
// is closed when we detect a loss of leadership.
func ( l * RaftLock ) Lock ( stopCh <- chan struct { } ) ( <- chan struct { } , error ) {
2020-06-23 19:04:13 +00:00
// If not initialized, block until it is
if ! l . b . Initialized ( ) {
select {
case <- l . b . raftInitCh :
case <- stopCh :
return nil , nil
}
}
2019-06-27 17:00:03 +00:00
2020-06-23 19:04:13 +00:00
l . b . l . RLock ( )
// Ensure that we still have a raft instance after grabbing the read lock
2020-02-14 20:25:53 +00:00
if l . b . raft == nil {
2020-06-23 19:04:13 +00:00
l . b . l . RUnlock ( )
2020-02-14 20:25:53 +00:00
return nil , errors . New ( "attempted to grab a lock on a nil raft backend" )
}
2020-06-23 19:04:13 +00:00
// Cache the notifyCh locally
leaderNotifyCh := l . b . raftNotifyCh
2019-06-27 17:00:03 +00:00
// Check to see if we are already leader.
2019-06-21 00:55:10 +00:00
if l . b . raft . State ( ) == raft . Leader {
err := l . b . applyLog ( context . Background ( ) , & LogData {
Operations : [ ] * LogOperation {
2021-04-08 16:43:39 +00:00
{
2019-06-21 00:55:10 +00:00
OpType : putOp ,
Key : l . key ,
Value : l . value ,
} ,
} ,
} )
l . b . l . RUnlock ( )
if err != nil {
return nil , err
}
2019-06-27 17:00:03 +00:00
return l . monitorLeadership ( stopCh , leaderNotifyCh ) , nil
2019-06-21 00:55:10 +00:00
}
l . b . l . RUnlock ( )
2019-06-20 19:14:58 +00:00
for {
select {
2019-06-27 17:00:03 +00:00
case isLeader := <- leaderNotifyCh :
2019-06-20 19:14:58 +00:00
if isLeader {
// We are leader, set the key
l . b . l . RLock ( )
err := l . b . applyLog ( context . Background ( ) , & LogData {
Operations : [ ] * LogOperation {
2021-04-08 16:43:39 +00:00
{
2019-06-20 19:14:58 +00:00
OpType : putOp ,
Key : l . key ,
Value : l . value ,
} ,
} ,
} )
l . b . l . RUnlock ( )
if err != nil {
return nil , err
}
2019-06-27 17:00:03 +00:00
return l . monitorLeadership ( stopCh , leaderNotifyCh ) , nil
2019-06-20 19:14:58 +00:00
}
case <- stopCh :
return nil , nil
}
}
}
// Unlock gives up leadership.
func ( l * RaftLock ) Unlock ( ) error {
2020-06-23 19:04:13 +00:00
if l . b . raft == nil {
return nil
}
2019-06-20 19:14:58 +00:00
return l . b . raft . LeadershipTransfer ( ) . Error ( )
}
// Value reads the value of the lock. This informs us who is currently leader.
func ( l * RaftLock ) Value ( ) ( bool , string , error ) {
e , err := l . b . Get ( context . Background ( ) , l . key )
if err != nil {
return false , "" , err
}
if e == nil {
return false , "" , nil
}
value := string ( e . Value )
// TODO: how to tell if held?
return true , value , nil
}
// sealer implements the snapshot.Sealer interface and is used in the snapshot
// process for encrypting/decrypting the SHASUM file in snapshot archives.
type sealer struct {
2020-01-11 01:39:52 +00:00
access * seal . Access
2019-06-20 19:14:58 +00:00
}
// Seal encrypts the data with using the seal access object.
func ( s sealer ) Seal ( ctx context . Context , pt [ ] byte ) ( [ ] byte , error ) {
if s . access == nil {
return nil , errors . New ( "no seal access available" )
}
2020-01-11 01:39:52 +00:00
eblob , err := s . access . Encrypt ( ctx , pt , nil )
2019-06-20 19:14:58 +00:00
if err != nil {
return nil , err
}
return proto . Marshal ( eblob )
}
// Open decrypts the data using the seal access object.
func ( s sealer ) Open ( ctx context . Context , ct [ ] byte ) ( [ ] byte , error ) {
if s . access == nil {
return nil , errors . New ( "no seal access available" )
}
2022-08-23 19:37:16 +00:00
var eblob wrapping . BlobInfo
2019-06-20 19:14:58 +00:00
err := proto . Unmarshal ( ct , & eblob )
if err != nil {
return nil , err
}
2020-01-11 01:39:52 +00:00
return s . access . Decrypt ( ctx , & eblob , nil )
2019-06-20 19:14:58 +00:00
}
2021-06-21 18:35:40 +00:00
2021-11-23 04:16:57 +00:00
// boltOptions returns a bolt.Options struct, suitable for passing to
// bolt.Open(), pre-configured with all of our preferred defaults.
2022-01-11 13:16:53 +00:00
func boltOptions ( path string ) * bolt . Options {
2021-11-23 04:16:57 +00:00
o := & bolt . Options {
Timeout : 1 * time . Second ,
FreelistType : bolt . FreelistMapType ,
NoFreelistSync : true ,
2022-01-11 13:16:53 +00:00
MmapFlags : getMmapFlags ( path ) ,
2021-11-23 04:16:57 +00:00
}
2021-06-21 18:35:40 +00:00
if os . Getenv ( "VAULT_RAFT_FREELIST_TYPE" ) == "array" {
2021-11-23 04:16:57 +00:00
o . FreelistType = bolt . FreelistArrayType
2021-06-21 18:35:40 +00:00
}
if os . Getenv ( "VAULT_RAFT_FREELIST_SYNC" ) != "" {
2021-11-23 04:16:57 +00:00
o . NoFreelistSync = false
}
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
// so if users are wanting to turn this off, they can also set it to 0. Setting it
// to a negative value is the same as not setting it at all.
if os . Getenv ( "VAULT_RAFT_INITIAL_MMAP_SIZE" ) == "" {
o . InitialMmapSize = initialMmapSize
} else {
imms , err := strconv . Atoi ( os . Getenv ( "VAULT_RAFT_INITIAL_MMAP_SIZE" ) )
// If there's an error here, it means they passed something that's not convertible to
// a number. Rather than fail startup, just ignore it.
if err == nil && imms > 0 {
o . InitialMmapSize = imms
}
2021-06-21 18:35:40 +00:00
}
2021-11-23 04:16:57 +00:00
return o
2021-06-21 18:35:40 +00:00
}