3359f987ac
Co-authored-by: Josh Black <raskchanky@gmail.com>
1971 lines
58 KiB
Go
1971 lines
58 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package raft
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/golang/protobuf/proto"
|
|
log "github.com/hashicorp/go-hclog"
|
|
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
|
|
"github.com/hashicorp/go-raftchunking"
|
|
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
|
"github.com/hashicorp/go-secure-stdlib/tlsutil"
|
|
"github.com/hashicorp/go-uuid"
|
|
goversion "github.com/hashicorp/go-version"
|
|
"github.com/hashicorp/raft"
|
|
autopilot "github.com/hashicorp/raft-autopilot"
|
|
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
|
snapshot "github.com/hashicorp/raft-snapshot"
|
|
"github.com/hashicorp/vault/helper/metricsutil"
|
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
|
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
|
"github.com/hashicorp/vault/sdk/logical"
|
|
"github.com/hashicorp/vault/sdk/physical"
|
|
"github.com/hashicorp/vault/vault/cluster"
|
|
"github.com/hashicorp/vault/vault/seal"
|
|
"github.com/hashicorp/vault/version"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
const (
|
|
// EnvVaultRaftNodeID is used to fetch the Raft node ID from the environment.
|
|
EnvVaultRaftNodeID = "VAULT_RAFT_NODE_ID"
|
|
|
|
// EnvVaultRaftPath is used to fetch the path where Raft data is stored from the environment.
|
|
EnvVaultRaftPath = "VAULT_RAFT_PATH"
|
|
|
|
// EnvVaultRaftNonVoter is used to override the non_voter config option, telling Vault to join as a non-voter (i.e. read replica).
|
|
EnvVaultRaftNonVoter = "VAULT_RAFT_RETRY_JOIN_AS_NON_VOTER"
|
|
raftNonVoterConfigKey = "retry_join_as_non_voter"
|
|
)
|
|
|
|
var getMmapFlags = func(string) int { return 0 }
|
|
|
|
// Verify RaftBackend satisfies the correct interfaces
|
|
var (
|
|
_ physical.Backend = (*RaftBackend)(nil)
|
|
_ physical.Transactional = (*RaftBackend)(nil)
|
|
_ physical.HABackend = (*RaftBackend)(nil)
|
|
_ physical.Lock = (*RaftLock)(nil)
|
|
)
|
|
|
|
var (
|
|
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
|
// This is used to reduce disk I/O for the recently committed entries.
|
|
raftLogCacheSize = 512
|
|
|
|
raftState = "raft/"
|
|
peersFileName = "peers.json"
|
|
restoreOpDelayDuration = 5 * time.Second
|
|
defaultMaxEntrySize = uint64(2 * raftchunking.ChunkSize)
|
|
|
|
GetInTxnDisabledError = errors.New("get operations inside transactions are disabled in raft backend")
|
|
)
|
|
|
|
// RaftBackend implements the backend interfaces and uses the raft protocol to
|
|
// persist writes to the FSM.
|
|
type RaftBackend struct {
|
|
logger log.Logger
|
|
conf map[string]string
|
|
l sync.RWMutex
|
|
|
|
// fsm is the state store for vault's data
|
|
fsm *FSM
|
|
|
|
// raft is the instance of raft we will operate on.
|
|
raft *raft.Raft
|
|
|
|
// raftInitCh is used to block during HA lock acquisition if raft
|
|
// has not been initialized yet, which can occur if raft is being
|
|
// used for HA-only.
|
|
raftInitCh chan struct{}
|
|
|
|
// raftNotifyCh is used to receive updates about leadership changes
|
|
// regarding this node.
|
|
raftNotifyCh chan bool
|
|
|
|
// streamLayer is the network layer used to connect the nodes in the raft
|
|
// cluster.
|
|
streamLayer *raftLayer
|
|
|
|
// raftTransport is the transport layer that the raft library uses for RPC
|
|
// communication.
|
|
raftTransport raft.Transport
|
|
|
|
// snapStore is our snapshot mechanism.
|
|
snapStore raft.SnapshotStore
|
|
|
|
// logStore is used by the raft library to store the raft logs in durable
|
|
// storage.
|
|
logStore raft.LogStore
|
|
|
|
// stableStore is used by the raft library to store additional metadata in
|
|
// durable storage.
|
|
stableStore raft.StableStore
|
|
|
|
// bootstrapConfig is only set when this node needs to be bootstrapped upon
|
|
// startup.
|
|
bootstrapConfig *raft.Configuration
|
|
|
|
// dataDir is the location on the local filesystem that raft and FSM data
|
|
// will be stored.
|
|
dataDir string
|
|
|
|
// localID is the ID for this node. This can either be configured in the
|
|
// config file, via a file on disk, or is otherwise randomly generated.
|
|
localID string
|
|
|
|
// serverAddressProvider is used to map server IDs to addresses.
|
|
serverAddressProvider raft.ServerAddressProvider
|
|
|
|
// permitPool is used to limit the number of concurrent storage calls.
|
|
permitPool *physical.PermitPool
|
|
|
|
// maxEntrySize imposes a size limit (in bytes) on a raft entry (put or transaction).
|
|
// It is suggested to use a value of 2x the Raft chunking size for optimal
|
|
// performance.
|
|
maxEntrySize uint64
|
|
|
|
// autopilot is the instance of raft-autopilot library implementation of the
|
|
// autopilot features. This will be instantiated in both leader and followers.
|
|
// However, only active node will have a "running" autopilot.
|
|
autopilot *autopilot.Autopilot
|
|
|
|
// autopilotConfig represents the configuration required to instantiate autopilot.
|
|
autopilotConfig *AutopilotConfig
|
|
|
|
// followerStates represents the information about all the peers of the raft
|
|
// leader. This is used to track some state of the peers and as well as used
|
|
// to see if the peers are "alive" using the heartbeat received from them.
|
|
followerStates *FollowerStates
|
|
|
|
// followerHeartbeatTicker is used to compute dead servers using follower
|
|
// state heartbeats.
|
|
followerHeartbeatTicker *time.Ticker
|
|
|
|
// disableAutopilot if set will not put autopilot implementation to use. The
|
|
// fallback will be to interact with the raft instance directly. This can only
|
|
// be set during startup via the environment variable
|
|
// VAULT_RAFT_AUTOPILOT_DISABLE during startup and can't be updated once the
|
|
// node is up and running.
|
|
disableAutopilot bool
|
|
|
|
// autopilotReconcileInterval is how long between rounds of performing promotions, demotions
|
|
// and leadership transfers.
|
|
autopilotReconcileInterval time.Duration
|
|
|
|
// autopilotUpdateInterval is the time between the periodic state updates. These periodic
|
|
// state updates take in known servers from the delegate, request Raft stats be
|
|
// fetched and pull in other inputs such as the Raft configuration to create
|
|
// an updated view of the Autopilot State.
|
|
autopilotUpdateInterval time.Duration
|
|
|
|
// upgradeVersion is used to override the Vault SDK version when performing an autopilot automated upgrade.
|
|
upgradeVersion string
|
|
|
|
// redundancyZone specifies a redundancy zone for autopilot.
|
|
redundancyZone string
|
|
|
|
// nonVoter specifies whether the node should join the cluster as a non-voter. Non-voters get
|
|
// replicated to and can serve reads, but do not take part in leader elections.
|
|
nonVoter bool
|
|
|
|
effectiveSDKVersion string
|
|
failGetInTxn *uint32
|
|
}
|
|
|
|
// LeaderJoinInfo contains information required by a node to join itself as a
|
|
// follower to an existing raft cluster
|
|
type LeaderJoinInfo struct {
|
|
// AutoJoin defines any cloud auto-join metadata. If supplied, Vault will
|
|
// attempt to automatically discover peers in addition to what can be provided
|
|
// via 'leader_api_addr'.
|
|
AutoJoin string `json:"auto_join"`
|
|
|
|
// AutoJoinScheme defines the optional URI protocol scheme for addresses
|
|
// discovered via auto-join.
|
|
AutoJoinScheme string `json:"auto_join_scheme"`
|
|
|
|
// AutoJoinPort defines the optional port used for addressed discovered via
|
|
// auto-join.
|
|
AutoJoinPort uint `json:"auto_join_port"`
|
|
|
|
// LeaderAPIAddr is the address of the leader node to connect to
|
|
LeaderAPIAddr string `json:"leader_api_addr"`
|
|
|
|
// LeaderCACert is the CA cert of the leader node
|
|
LeaderCACert string `json:"leader_ca_cert"`
|
|
|
|
// LeaderClientCert is the client certificate for the follower node to
|
|
// establish client authentication during TLS
|
|
LeaderClientCert string `json:"leader_client_cert"`
|
|
|
|
// LeaderClientKey is the client key for the follower node to establish
|
|
// client authentication during TLS.
|
|
LeaderClientKey string `json:"leader_client_key"`
|
|
|
|
// LeaderCACertFile is the path on disk to the the CA cert file of the
|
|
// leader node. This should only be provided via Vault's configuration file.
|
|
LeaderCACertFile string `json:"leader_ca_cert_file"`
|
|
|
|
// LeaderClientCertFile is the path on disk to the client certificate file
|
|
// for the follower node to establish client authentication during TLS. This
|
|
// should only be provided via Vault's configuration file.
|
|
LeaderClientCertFile string `json:"leader_client_cert_file"`
|
|
|
|
// LeaderClientKeyFile is the path on disk to the client key file for the
|
|
// follower node to establish client authentication during TLS. This should
|
|
// only be provided via Vault's configuration file.
|
|
LeaderClientKeyFile string `json:"leader_client_key_file"`
|
|
|
|
// LeaderTLSServerName is the optional ServerName to expect in the leader's
|
|
// certificate, instead of the host/IP we're actually connecting to.
|
|
LeaderTLSServerName string `json:"leader_tls_servername"`
|
|
|
|
// Retry indicates if the join process should automatically be retried
|
|
Retry bool `json:"-"`
|
|
|
|
// TLSConfig for the API client to use when communicating with the leader node
|
|
TLSConfig *tls.Config `json:"-"`
|
|
}
|
|
|
|
// JoinConfig returns a list of information about possible leader nodes that
|
|
// this node can join as a follower
|
|
func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) {
|
|
config := b.conf["retry_join"]
|
|
if config == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
var leaderInfos []*LeaderJoinInfo
|
|
err := jsonutil.DecodeJSON([]byte(config), &leaderInfos)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode retry_join config: %w", err)
|
|
}
|
|
|
|
if len(leaderInfos) == 0 {
|
|
return nil, errors.New("invalid retry_join config")
|
|
}
|
|
|
|
for i, info := range leaderInfos {
|
|
if len(info.AutoJoin) != 0 && len(info.LeaderAPIAddr) != 0 {
|
|
return nil, errors.New("cannot provide both a leader_api_addr and auto_join")
|
|
}
|
|
|
|
if info.AutoJoinScheme != "" && (info.AutoJoinScheme != "http" && info.AutoJoinScheme != "https") {
|
|
return nil, fmt.Errorf("invalid scheme %q; must either be http or https", info.AutoJoinScheme)
|
|
}
|
|
|
|
info.Retry = true
|
|
info.TLSConfig, err = parseTLSInfo(info)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create tls config to communicate with leader node (retry_join index: %d): %w", i, err)
|
|
}
|
|
}
|
|
|
|
return leaderInfos, nil
|
|
}
|
|
|
|
// parseTLSInfo is a helper for parses the TLS information, preferring file
|
|
// paths over raw certificate content.
|
|
func parseTLSInfo(leaderInfo *LeaderJoinInfo) (*tls.Config, error) {
|
|
var tlsConfig *tls.Config
|
|
var err error
|
|
if len(leaderInfo.LeaderCACertFile) != 0 || len(leaderInfo.LeaderClientCertFile) != 0 || len(leaderInfo.LeaderClientKeyFile) != 0 {
|
|
tlsConfig, err = tlsutil.LoadClientTLSConfig(leaderInfo.LeaderCACertFile, leaderInfo.LeaderClientCertFile, leaderInfo.LeaderClientKeyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else if len(leaderInfo.LeaderCACert) != 0 || len(leaderInfo.LeaderClientCert) != 0 || len(leaderInfo.LeaderClientKey) != 0 {
|
|
tlsConfig, err = tlsutil.ClientTLSConfig([]byte(leaderInfo.LeaderCACert), []byte(leaderInfo.LeaderClientCert), []byte(leaderInfo.LeaderClientKey))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if tlsConfig != nil {
|
|
tlsConfig.ServerName = leaderInfo.LeaderTLSServerName
|
|
}
|
|
|
|
return tlsConfig, nil
|
|
}
|
|
|
|
// EnsurePath is used to make sure a path exists
|
|
func EnsurePath(path string, dir bool) error {
|
|
if !dir {
|
|
path = filepath.Dir(path)
|
|
}
|
|
return os.MkdirAll(path, 0o700)
|
|
}
|
|
|
|
// NewRaftBackend constructs a RaftBackend using the given directory
|
|
func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
|
path := os.Getenv(EnvVaultRaftPath)
|
|
if path == "" {
|
|
pathFromConfig, ok := conf["path"]
|
|
if !ok {
|
|
return nil, fmt.Errorf("'path' must be set")
|
|
}
|
|
path = pathFromConfig
|
|
}
|
|
|
|
var localID string
|
|
{
|
|
// Determine the local node ID from the environment.
|
|
if raftNodeID := os.Getenv(EnvVaultRaftNodeID); raftNodeID != "" {
|
|
localID = raftNodeID
|
|
}
|
|
|
|
// If not set in the environment check the configuration file.
|
|
if len(localID) == 0 {
|
|
localID = conf["node_id"]
|
|
}
|
|
|
|
// If not set in the config check the "node-id" file.
|
|
if len(localID) == 0 {
|
|
localIDRaw, err := ioutil.ReadFile(filepath.Join(path, "node-id"))
|
|
switch {
|
|
case err == nil:
|
|
if len(localIDRaw) > 0 {
|
|
localID = string(localIDRaw)
|
|
}
|
|
case os.IsNotExist(err):
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// If all of the above fails generate a UUID and persist it to the
|
|
// "node-id" file.
|
|
if len(localID) == 0 {
|
|
id, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ioutil.WriteFile(filepath.Join(path, "node-id"), []byte(id), 0o600); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
localID = id
|
|
}
|
|
}
|
|
|
|
// Create the FSM.
|
|
fsm, err := NewFSM(path, localID, logger.Named("fsm"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create fsm: %v", err)
|
|
}
|
|
|
|
if delayRaw, ok := conf["apply_delay"]; ok {
|
|
delay, err := parseutil.ParseDurationSecond(delayRaw)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err)
|
|
}
|
|
fsm.applyCallback = func() {
|
|
time.Sleep(delay)
|
|
}
|
|
}
|
|
|
|
// Build an all in-memory setup for dev mode, otherwise prepare a full
|
|
// disk-based setup.
|
|
var log raft.LogStore
|
|
var stable raft.StableStore
|
|
var snap raft.SnapshotStore
|
|
|
|
var devMode bool
|
|
if devMode {
|
|
store := raft.NewInmemStore()
|
|
stable = store
|
|
log = store
|
|
snap = raft.NewInmemSnapshotStore()
|
|
} else {
|
|
// Create the base raft path.
|
|
path := filepath.Join(path, raftState)
|
|
if err := EnsurePath(path, true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the backend raft store for logs and stable storage.
|
|
dbPath := filepath.Join(path, "raft.db")
|
|
opts := boltOptions(dbPath)
|
|
raftOptions := raftboltdb.Options{
|
|
Path: dbPath,
|
|
BoltOptions: opts,
|
|
}
|
|
store, err := raftboltdb.New(raftOptions)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stable = store
|
|
|
|
// Wrap the store in a LogCache to improve performance.
|
|
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log = cacheStore
|
|
|
|
// Create the snapshot store.
|
|
snapshots, err := NewBoltSnapshotStore(path, logger.Named("snapshot"), fsm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
snap = snapshots
|
|
}
|
|
|
|
if delayRaw, ok := conf["snapshot_delay"]; ok {
|
|
delay, err := parseutil.ParseDurationSecond(delayRaw)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err)
|
|
}
|
|
snap = newSnapshotStoreDelay(snap, delay, logger)
|
|
}
|
|
|
|
maxEntrySize := defaultMaxEntrySize
|
|
if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 {
|
|
i, err := strconv.Atoi(maxEntrySizeCfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err)
|
|
}
|
|
|
|
maxEntrySize = uint64(i)
|
|
}
|
|
|
|
var reconcileInterval time.Duration
|
|
if interval := conf["autopilot_reconcile_interval"]; interval != "" {
|
|
interval, err := parseutil.ParseDurationSecond(interval)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err)
|
|
}
|
|
reconcileInterval = interval
|
|
}
|
|
|
|
var updateInterval time.Duration
|
|
if interval := conf["autopilot_update_interval"]; interval != "" {
|
|
interval, err := parseutil.ParseDurationSecond(interval)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("autopilot_update_interval does not parse as a duration: %w", err)
|
|
}
|
|
updateInterval = interval
|
|
}
|
|
|
|
effectiveReconcileInterval := autopilot.DefaultReconcileInterval
|
|
effectiveUpdateInterval := autopilot.DefaultUpdateInterval
|
|
|
|
if reconcileInterval != 0 {
|
|
effectiveReconcileInterval = reconcileInterval
|
|
}
|
|
if updateInterval != 0 {
|
|
effectiveUpdateInterval = updateInterval
|
|
}
|
|
|
|
if effectiveReconcileInterval < effectiveUpdateInterval {
|
|
return nil, fmt.Errorf("autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)", effectiveReconcileInterval, effectiveUpdateInterval)
|
|
}
|
|
|
|
var upgradeVersion string
|
|
if uv, ok := conf["autopilot_upgrade_version"]; ok && uv != "" {
|
|
upgradeVersion = uv
|
|
_, err := goversion.NewVersion(upgradeVersion)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("autopilot_upgrade_version does not parse as a semantic version: %w", err)
|
|
}
|
|
}
|
|
|
|
var nonVoter bool
|
|
if v := os.Getenv(EnvVaultRaftNonVoter); v != "" {
|
|
// Consistent with handling of other raft boolean env vars
|
|
// VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC
|
|
nonVoter = true
|
|
} else if v, ok := conf[raftNonVoterConfigKey]; ok {
|
|
nonVoter, err = strconv.ParseBool(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse %s config value %q as a boolean: %w", raftNonVoterConfigKey, v, err)
|
|
}
|
|
}
|
|
|
|
if nonVoter && conf["retry_join"] == "" {
|
|
return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey)
|
|
}
|
|
|
|
return &RaftBackend{
|
|
logger: logger,
|
|
fsm: fsm,
|
|
raftInitCh: make(chan struct{}),
|
|
conf: conf,
|
|
logStore: log,
|
|
stableStore: stable,
|
|
snapStore: snap,
|
|
dataDir: path,
|
|
localID: localID,
|
|
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
|
|
maxEntrySize: maxEntrySize,
|
|
followerHeartbeatTicker: time.NewTicker(time.Second),
|
|
autopilotReconcileInterval: reconcileInterval,
|
|
autopilotUpdateInterval: updateInterval,
|
|
redundancyZone: conf["autopilot_redundancy_zone"],
|
|
nonVoter: nonVoter,
|
|
upgradeVersion: upgradeVersion,
|
|
failGetInTxn: new(uint32),
|
|
}, nil
|
|
}
|
|
|
|
type snapshotStoreDelay struct {
|
|
logger log.Logger
|
|
wrapped raft.SnapshotStore
|
|
delay time.Duration
|
|
}
|
|
|
|
func (s snapshotStoreDelay) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
|
|
s.logger.Trace("delaying before creating snapshot", "delay", s.delay)
|
|
time.Sleep(s.delay)
|
|
return s.wrapped.Create(version, index, term, configuration, configurationIndex, trans)
|
|
}
|
|
|
|
func (s snapshotStoreDelay) List() ([]*raft.SnapshotMeta, error) {
|
|
return s.wrapped.List()
|
|
}
|
|
|
|
func (s snapshotStoreDelay) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
|
|
return s.wrapped.Open(id)
|
|
}
|
|
|
|
var _ raft.SnapshotStore = &snapshotStoreDelay{}
|
|
|
|
func newSnapshotStoreDelay(snap raft.SnapshotStore, delay time.Duration, logger log.Logger) *snapshotStoreDelay {
|
|
return &snapshotStoreDelay{
|
|
logger: logger,
|
|
wrapped: snap,
|
|
delay: delay,
|
|
}
|
|
}
|
|
|
|
// Close is used to gracefully close all file resources. N.B. This method
|
|
// should only be called if you are sure the RaftBackend will never be used
|
|
// again.
|
|
func (b *RaftBackend) Close() error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
if err := b.fsm.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := b.stableStore.(*raftboltdb.BoltStore).Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *RaftBackend) FailGetInTxn(fail bool) {
|
|
var val uint32
|
|
if fail {
|
|
val = 1
|
|
}
|
|
atomic.StoreUint32(b.failGetInTxn, val)
|
|
}
|
|
|
|
func (b *RaftBackend) SetEffectiveSDKVersion(sdkVersion string) {
|
|
b.l.Lock()
|
|
b.effectiveSDKVersion = sdkVersion
|
|
b.l.Unlock()
|
|
}
|
|
|
|
func (b *RaftBackend) RedundancyZone() string {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
return b.redundancyZone
|
|
}
|
|
|
|
func (b *RaftBackend) NonVoter() bool {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
return b.nonVoter
|
|
}
|
|
|
|
func (b *RaftBackend) EffectiveVersion() string {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.upgradeVersion != "" {
|
|
return b.upgradeVersion
|
|
}
|
|
|
|
return version.GetVersion().Version
|
|
}
|
|
|
|
// DisableUpgradeMigration returns the state of the DisableUpgradeMigration config flag and whether it was set or not
|
|
func (b *RaftBackend) DisableUpgradeMigration() (bool, bool) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.autopilotConfig == nil {
|
|
return false, false
|
|
}
|
|
|
|
return b.autopilotConfig.DisableUpgradeMigration, true
|
|
}
|
|
|
|
func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) {
|
|
var stats map[string]string
|
|
b.l.RLock()
|
|
logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats()
|
|
fsmStats := b.fsm.Stats()
|
|
if b.raft != nil {
|
|
stats = b.raft.Stats()
|
|
}
|
|
b.l.RUnlock()
|
|
b.collectMetricsWithStats(logstoreStats, sink, "logstore")
|
|
b.collectMetricsWithStats(fsmStats, sink, "fsm")
|
|
labels := []metrics.Label{
|
|
{
|
|
Name: "peer_id",
|
|
Value: b.localID,
|
|
},
|
|
}
|
|
if stats != nil {
|
|
for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} {
|
|
n, err := strconv.ParseUint(stats[key], 10, 64)
|
|
if err == nil {
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) {
|
|
txstats := stats.TxStats
|
|
labels := []metricsutil.Label{{"database", database}}
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "free_pages"}, float32(stats.FreePageN), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "pending_pages"}, float32(stats.PendingPageN), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "allocated_bytes"}, float32(stats.FreeAlloc), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "freelist", "used_bytes"}, float32(stats.FreelistInuse), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "started_read_transactions"}, float32(stats.TxN), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "transaction", "currently_open_read_transactions"}, float32(stats.OpenTxN), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "count"}, float32(txstats.GetPageCount()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "page", "bytes_allocated"}, float32(txstats.GetPageAlloc()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "cursor", "count"}, float32(txstats.GetCursorCount()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "count"}, float32(txstats.GetNodeCount()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "node", "dereferences"}, float32(txstats.GetNodeDeref()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "rebalance", "count"}, float32(txstats.GetRebalance()), labels)
|
|
sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "rebalance", "time"}, float32(txstats.GetRebalanceTime().Milliseconds()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "split", "count"}, float32(txstats.GetSplit()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "spill", "count"}, float32(txstats.GetSpill()), labels)
|
|
sink.AddSampleWithLabels([]string{"raft_storage", "bolt", "spill", "time"}, float32(txstats.GetSpillTime().Milliseconds()), labels)
|
|
sink.SetGaugeWithLabels([]string{"raft_storage", "bolt", "write", "count"}, float32(txstats.GetWrite()), labels)
|
|
sink.IncrCounterWithLabels([]string{"raft_storage", "bolt", "write", "time"}, float32(txstats.GetWriteTime().Milliseconds()), labels)
|
|
}
|
|
|
|
// RaftServer has information about a server in the Raft configuration
|
|
type RaftServer struct {
|
|
// NodeID is the name of the server
|
|
NodeID string `json:"node_id"`
|
|
|
|
// Address is the IP:port of the server, used for Raft communications
|
|
Address string `json:"address"`
|
|
|
|
// Leader is true if this server is the current cluster leader
|
|
Leader bool `json:"leader"`
|
|
|
|
// Protocol version is the raft protocol version used by the server
|
|
ProtocolVersion string `json:"protocol_version"`
|
|
|
|
// Voter is true if this server has a vote in the cluster. This might
|
|
// be false if the server is staging and still coming online.
|
|
Voter bool `json:"voter"`
|
|
}
|
|
|
|
// RaftConfigurationResponse is returned when querying for the current Raft
|
|
// configuration.
|
|
type RaftConfigurationResponse struct {
|
|
// Servers has the list of servers in the Raft configuration.
|
|
Servers []*RaftServer `json:"servers"`
|
|
|
|
// Index has the Raft index of this configuration.
|
|
Index uint64 `json:"index"`
|
|
}
|
|
|
|
// Peer defines the ID and Address for a given member of the raft cluster.
|
|
type Peer struct {
|
|
ID string `json:"id"`
|
|
Address string `json:"address"`
|
|
Suffrage int `json:"suffrage"`
|
|
}
|
|
|
|
// NodeID returns the identifier of the node
|
|
func (b *RaftBackend) NodeID() string {
|
|
return b.localID
|
|
}
|
|
|
|
// Initialized tells if raft is running or not
|
|
func (b *RaftBackend) Initialized() bool {
|
|
b.l.RLock()
|
|
init := b.raft != nil
|
|
b.l.RUnlock()
|
|
return init
|
|
}
|
|
|
|
// SetTLSKeyring is used to install a new keyring. If the active key has changed
|
|
// it will also close any network connections or streams forcing a reconnect
|
|
// with the new key.
|
|
func (b *RaftBackend) SetTLSKeyring(keyring *TLSKeyring) error {
|
|
b.l.RLock()
|
|
err := b.streamLayer.setTLSKeyring(keyring)
|
|
b.l.RUnlock()
|
|
|
|
return err
|
|
}
|
|
|
|
// SetServerAddressProvider sets a the address provider for determining the raft
|
|
// node addresses. This is currently only used in tests.
|
|
func (b *RaftBackend) SetServerAddressProvider(provider raft.ServerAddressProvider) {
|
|
b.l.Lock()
|
|
b.serverAddressProvider = provider
|
|
b.l.Unlock()
|
|
}
|
|
|
|
// Bootstrap prepares the given peers to be part of the raft cluster
|
|
func (b *RaftBackend) Bootstrap(peers []Peer) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
hasState, err := raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if hasState {
|
|
return errors.New("error bootstrapping cluster: cluster already has state")
|
|
}
|
|
|
|
raftConfig := &raft.Configuration{
|
|
Servers: make([]raft.Server, len(peers)),
|
|
}
|
|
|
|
for i, p := range peers {
|
|
raftConfig.Servers[i] = raft.Server{
|
|
ID: raft.ServerID(p.ID),
|
|
Address: raft.ServerAddress(p.Address),
|
|
Suffrage: raft.ServerSuffrage(p.Suffrage),
|
|
}
|
|
}
|
|
|
|
// Store the config for later use
|
|
b.bootstrapConfig = raftConfig
|
|
return nil
|
|
}
|
|
|
|
// SetRestoreCallback sets the callback to be used when a restoreCallbackOp is
|
|
// processed through the FSM.
|
|
func (b *RaftBackend) SetRestoreCallback(restoreCb restoreCallback) {
|
|
b.fsm.l.Lock()
|
|
b.fsm.restoreCb = restoreCb
|
|
b.fsm.l.Unlock()
|
|
}
|
|
|
|
func (b *RaftBackend) applyConfigSettings(config *raft.Config) error {
|
|
config.Logger = b.logger
|
|
multiplierRaw, ok := b.conf["performance_multiplier"]
|
|
multiplier := 5
|
|
if ok {
|
|
var err error
|
|
multiplier, err = strconv.Atoi(multiplierRaw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
config.ElectionTimeout *= time.Duration(multiplier)
|
|
config.HeartbeatTimeout *= time.Duration(multiplier)
|
|
config.LeaderLeaseTimeout *= time.Duration(multiplier)
|
|
|
|
snapThresholdRaw, ok := b.conf["snapshot_threshold"]
|
|
if ok {
|
|
var err error
|
|
snapThreshold, err := strconv.Atoi(snapThresholdRaw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
config.SnapshotThreshold = uint64(snapThreshold)
|
|
}
|
|
|
|
trailingLogsRaw, ok := b.conf["trailing_logs"]
|
|
if ok {
|
|
var err error
|
|
trailingLogs, err := strconv.Atoi(trailingLogsRaw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
config.TrailingLogs = uint64(trailingLogs)
|
|
}
|
|
snapshotIntervalRaw, ok := b.conf["snapshot_interval"]
|
|
if ok {
|
|
var err error
|
|
snapshotInterval, err := parseutil.ParseDurationSecond(snapshotIntervalRaw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
config.SnapshotInterval = snapshotInterval
|
|
}
|
|
|
|
config.NoSnapshotRestoreOnStart = true
|
|
config.MaxAppendEntries = 64
|
|
|
|
// Setting BatchApplyCh allows the raft library to enqueue up to
|
|
// MaxAppendEntries into each raft apply rather than relying on the
|
|
// scheduler.
|
|
config.BatchApplyCh = true
|
|
|
|
b.logger.Trace("applying raft config", "inputs", b.conf)
|
|
return nil
|
|
}
|
|
|
|
// SetupOpts are used to pass options to the raft setup function.
|
|
type SetupOpts struct {
|
|
// TLSKeyring is the keyring to use for the cluster traffic.
|
|
TLSKeyring *TLSKeyring
|
|
|
|
// ClusterListener is the cluster hook used to register the raft handler and
|
|
// client with core's cluster listeners.
|
|
ClusterListener cluster.ClusterHook
|
|
|
|
// StartAsLeader is used to specify this node should start as leader and
|
|
// bypass the leader election. This should be used with caution.
|
|
StartAsLeader bool
|
|
|
|
// RecoveryModeConfig is the configuration for the raft cluster in recovery
|
|
// mode.
|
|
RecoveryModeConfig *raft.Configuration
|
|
}
|
|
|
|
func (b *RaftBackend) StartRecoveryCluster(ctx context.Context, peer Peer) error {
|
|
recoveryModeConfig := &raft.Configuration{
|
|
Servers: []raft.Server{
|
|
{
|
|
ID: raft.ServerID(peer.ID),
|
|
Address: raft.ServerAddress(peer.Address),
|
|
},
|
|
},
|
|
}
|
|
|
|
return b.SetupCluster(context.Background(), SetupOpts{
|
|
StartAsLeader: true,
|
|
RecoveryModeConfig: recoveryModeConfig,
|
|
})
|
|
}
|
|
|
|
func (b *RaftBackend) HasState() (bool, error) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
return raft.HasExistingState(b.logStore, b.stableStore, b.snapStore)
|
|
}
|
|
|
|
// SetupCluster starts the raft cluster and enables the networking needed for
|
|
// the raft nodes to communicate.
|
|
func (b *RaftBackend) SetupCluster(ctx context.Context, opts SetupOpts) error {
|
|
b.logger.Trace("setting up raft cluster")
|
|
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
// We are already unsealed
|
|
if b.raft != nil {
|
|
b.logger.Debug("raft already started, not setting up cluster")
|
|
return nil
|
|
}
|
|
|
|
if len(b.localID) == 0 {
|
|
return errors.New("no local node id configured")
|
|
}
|
|
|
|
// Setup the raft config
|
|
raftConfig := raft.DefaultConfig()
|
|
if err := b.applyConfigSettings(raftConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
listenerIsNil := func(cl cluster.ClusterHook) bool {
|
|
switch {
|
|
case opts.ClusterListener == nil:
|
|
return true
|
|
default:
|
|
// Concrete type checks
|
|
switch cl.(type) {
|
|
case *cluster.Listener:
|
|
return cl.(*cluster.Listener) == nil
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var initialTimeoutMultiplier time.Duration
|
|
switch {
|
|
case opts.TLSKeyring == nil && listenerIsNil(opts.ClusterListener):
|
|
// If we don't have a provided network we use an in-memory one.
|
|
// This allows us to bootstrap a node without bringing up a cluster
|
|
// network. This will be true during bootstrap, tests and dev modes.
|
|
_, b.raftTransport = raft.NewInmemTransportWithTimeout(raft.ServerAddress(b.localID), time.Second)
|
|
case opts.TLSKeyring == nil:
|
|
return errors.New("no keyring provided")
|
|
case listenerIsNil(opts.ClusterListener):
|
|
return errors.New("no cluster listener provided")
|
|
default:
|
|
initialTimeoutMultiplier = 3
|
|
if !opts.StartAsLeader {
|
|
electionTimeout, heartbeatTimeout := raftConfig.ElectionTimeout, raftConfig.HeartbeatTimeout
|
|
// Use bigger values for first election
|
|
raftConfig.ElectionTimeout *= initialTimeoutMultiplier
|
|
raftConfig.HeartbeatTimeout *= initialTimeoutMultiplier
|
|
b.logger.Trace("using larger timeouts for raft at startup",
|
|
"initial_election_timeout", raftConfig.ElectionTimeout,
|
|
"initial_heartbeat_timeout", raftConfig.HeartbeatTimeout,
|
|
"normal_election_timeout", electionTimeout,
|
|
"normal_heartbeat_timeout", heartbeatTimeout)
|
|
}
|
|
|
|
// Set the local address and localID in the streaming layer and the raft config.
|
|
streamLayer, err := NewRaftLayer(b.logger.Named("stream"), opts.TLSKeyring, opts.ClusterListener)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
transConfig := &raft.NetworkTransportConfig{
|
|
Stream: streamLayer,
|
|
MaxPool: 3,
|
|
Timeout: 10 * time.Second,
|
|
ServerAddressProvider: b.serverAddressProvider,
|
|
Logger: b.logger.Named("raft-net"),
|
|
}
|
|
transport := raft.NewNetworkTransportWithConfig(transConfig)
|
|
|
|
b.streamLayer = streamLayer
|
|
b.raftTransport = transport
|
|
}
|
|
|
|
raftConfig.LocalID = raft.ServerID(b.localID)
|
|
|
|
// Set up a channel for reliable leader notifications.
|
|
raftNotifyCh := make(chan bool, 10)
|
|
raftConfig.NotifyCh = raftNotifyCh
|
|
|
|
// If we have a bootstrapConfig set we should bootstrap now.
|
|
if b.bootstrapConfig != nil {
|
|
bootstrapConfig := b.bootstrapConfig
|
|
// Unset the bootstrap config
|
|
b.bootstrapConfig = nil
|
|
|
|
// Bootstrap raft with our known cluster members.
|
|
if err := raft.BootstrapCluster(raftConfig, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *bootstrapConfig); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Setup the Raft store.
|
|
b.fsm.SetNoopRestore(true)
|
|
|
|
raftPath := filepath.Join(b.dataDir, raftState)
|
|
peersFile := filepath.Join(raftPath, peersFileName)
|
|
_, err := os.Stat(peersFile)
|
|
if err == nil {
|
|
b.logger.Info("raft recovery initiated", "recovery_file", peersFileName)
|
|
|
|
recoveryConfig, err := raft.ReadConfigJSON(peersFile)
|
|
if err != nil {
|
|
return fmt.Errorf("raft recovery failed to parse peers.json: %w", err)
|
|
}
|
|
|
|
// Non-voting servers are only allowed in enterprise. If Suffrage is disabled,
|
|
// error out to indicate that it isn't allowed.
|
|
for idx := range recoveryConfig.Servers {
|
|
if !nonVotersAllowed && recoveryConfig.Servers[idx].Suffrage == raft.Nonvoter {
|
|
return fmt.Errorf("raft recovery failed to parse configuration for node %q: setting `non_voter` is only supported in enterprise", recoveryConfig.Servers[idx].ID)
|
|
}
|
|
}
|
|
|
|
b.logger.Info("raft recovery found new config", "config", recoveryConfig)
|
|
|
|
err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, recoveryConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("raft recovery failed: %w", err)
|
|
}
|
|
|
|
err = os.Remove(peersFile)
|
|
if err != nil {
|
|
return fmt.Errorf("raft recovery failed to delete peers.json; please delete manually: %w", err)
|
|
}
|
|
b.logger.Info("raft recovery deleted peers.json")
|
|
}
|
|
|
|
if opts.RecoveryModeConfig != nil {
|
|
err = raft.RecoverCluster(raftConfig, b.fsm, b.logStore, b.stableStore, b.snapStore, b.raftTransport, *opts.RecoveryModeConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("recovering raft cluster failed: %w", err)
|
|
}
|
|
}
|
|
|
|
b.logger.Info("creating Raft", "config", fmt.Sprintf("%#v", raftConfig))
|
|
raftObj, err := raft.NewRaft(raftConfig, b.fsm.chunker, b.logStore, b.stableStore, b.snapStore, b.raftTransport)
|
|
b.fsm.SetNoopRestore(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we are expecting to start as leader wait until we win the election.
|
|
// This should happen quickly since there is only one node in the cluster.
|
|
// StartAsLeader is only set during init, recovery mode, storage migration,
|
|
// and tests.
|
|
if opts.StartAsLeader {
|
|
// ticker is used to prevent memory leak of using time.After in
|
|
// for - select pattern.
|
|
ticker := time.NewTicker(10 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for {
|
|
if raftObj.State() == raft.Leader {
|
|
break
|
|
}
|
|
|
|
ticker.Reset(10 * time.Millisecond)
|
|
select {
|
|
case <-ctx.Done():
|
|
future := raftObj.Shutdown()
|
|
if future.Error() != nil {
|
|
return fmt.Errorf("shutdown while waiting for leadership: %w", future.Error())
|
|
}
|
|
|
|
return errors.New("shutdown while waiting for leadership")
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
b.raft = raftObj
|
|
b.raftNotifyCh = raftNotifyCh
|
|
|
|
if err := b.fsm.upgradeLocalNodeConfig(); err != nil {
|
|
b.logger.Error("failed to upgrade local node configuration")
|
|
return err
|
|
}
|
|
|
|
if b.streamLayer != nil {
|
|
// Add Handler to the cluster.
|
|
opts.ClusterListener.AddHandler(consts.RaftStorageALPN, b.streamLayer)
|
|
|
|
// Add Client to the cluster.
|
|
opts.ClusterListener.AddClient(consts.RaftStorageALPN, b.streamLayer)
|
|
}
|
|
|
|
// Close the init channel to signal setup has been completed
|
|
close(b.raftInitCh)
|
|
|
|
reloadConfig := func() {
|
|
newCfg := raft.ReloadableConfig{
|
|
TrailingLogs: raftConfig.TrailingLogs,
|
|
SnapshotInterval: raftConfig.SnapshotInterval,
|
|
SnapshotThreshold: raftConfig.SnapshotThreshold,
|
|
HeartbeatTimeout: raftConfig.HeartbeatTimeout / initialTimeoutMultiplier,
|
|
ElectionTimeout: raftConfig.ElectionTimeout / initialTimeoutMultiplier,
|
|
}
|
|
err := raftObj.ReloadConfig(newCfg)
|
|
if err != nil {
|
|
b.logger.Error("failed to reload raft config to set lower timeouts", "error", err)
|
|
} else {
|
|
b.logger.Trace("reloaded raft config to set lower timeouts", "config", fmt.Sprintf("%#v", newCfg))
|
|
}
|
|
}
|
|
confFuture := raftObj.GetConfiguration()
|
|
numServers := 0
|
|
if err := confFuture.Error(); err != nil {
|
|
// This should probably never happen, but just in case we'll log the error.
|
|
// We'll default in this case to the multi-node behaviour.
|
|
b.logger.Error("failed to read raft configuration", "error", err)
|
|
} else {
|
|
clusterConf := confFuture.Configuration()
|
|
numServers = len(clusterConf.Servers)
|
|
}
|
|
if initialTimeoutMultiplier != 0 {
|
|
if numServers == 1 {
|
|
reloadConfig()
|
|
} else {
|
|
go func() {
|
|
ticker := time.NewTicker(50 * time.Millisecond)
|
|
// Emulate the random timeout used in Raft lib, to ensure that
|
|
// if all nodes are brought up simultaneously, they don't all
|
|
// call for an election at once.
|
|
extra := time.Duration(rand.Int63()) % raftConfig.HeartbeatTimeout
|
|
timeout := time.NewTimer(raftConfig.HeartbeatTimeout + extra)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
switch raftObj.State() {
|
|
case raft.Candidate, raft.Leader:
|
|
b.logger.Trace("triggering raft config reload due to being candidate or leader")
|
|
reloadConfig()
|
|
return
|
|
case raft.Shutdown:
|
|
return
|
|
}
|
|
case <-timeout.C:
|
|
b.logger.Trace("triggering raft config reload due to initial timeout")
|
|
reloadConfig()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
b.logger.Trace("finished setting up raft cluster")
|
|
return nil
|
|
}
|
|
|
|
// TeardownCluster shuts down the raft cluster
|
|
func (b *RaftBackend) TeardownCluster(clusterListener cluster.ClusterHook) error {
|
|
if clusterListener != nil {
|
|
clusterListener.StopHandler(consts.RaftStorageALPN)
|
|
clusterListener.RemoveClient(consts.RaftStorageALPN)
|
|
}
|
|
|
|
b.l.Lock()
|
|
|
|
// Perform shutdown only if the raft object is non-nil. The object could be nil
|
|
// if the node is unsealed but has not joined the peer set.
|
|
var future raft.Future
|
|
if b.raft != nil {
|
|
future = b.raft.Shutdown()
|
|
}
|
|
|
|
b.raft = nil
|
|
|
|
// If we're tearing down, then we need to recreate the raftInitCh
|
|
b.raftInitCh = make(chan struct{})
|
|
b.l.Unlock()
|
|
|
|
if future != nil {
|
|
return future.Error()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommittedIndex returns the latest index committed to stable storage
|
|
func (b *RaftBackend) CommittedIndex() uint64 {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft == nil {
|
|
return 0
|
|
}
|
|
|
|
return b.raft.LastIndex()
|
|
}
|
|
|
|
// AppliedIndex returns the latest index applied to the FSM
|
|
func (b *RaftBackend) AppliedIndex() uint64 {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.fsm == nil {
|
|
return 0
|
|
}
|
|
|
|
// We use the latest index that the FSM has seen here, which may be behind
|
|
// raft.AppliedIndex() due to the async nature of the raft library.
|
|
indexState, _ := b.fsm.LatestState()
|
|
return indexState.Index
|
|
}
|
|
|
|
// Term returns the raft term of this node.
|
|
func (b *RaftBackend) Term() uint64 {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.fsm == nil {
|
|
return 0
|
|
}
|
|
|
|
// We use the latest index that the FSM has seen here, which may be behind
|
|
// raft.AppliedIndex() due to the async nature of the raft library.
|
|
indexState, _ := b.fsm.LatestState()
|
|
return indexState.Term
|
|
}
|
|
|
|
// RemovePeer removes the given peer ID from the raft cluster. If the node is
|
|
// ourselves we will give up leadership.
|
|
func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if b.disableAutopilot {
|
|
if b.raft == nil {
|
|
return errors.New("raft storage is not initialized")
|
|
}
|
|
b.logger.Trace("removing server from raft", "id", peerID)
|
|
future := b.raft.RemoveServer(raft.ServerID(peerID), 0, 0)
|
|
return future.Error()
|
|
}
|
|
|
|
if b.autopilot == nil {
|
|
return errors.New("raft storage autopilot is not initialized")
|
|
}
|
|
|
|
b.logger.Trace("removing server from raft via autopilot", "id", peerID)
|
|
return b.autopilot.RemoveServer(raft.ServerID(peerID))
|
|
}
|
|
|
|
// GetConfigurationOffline is used to read the stale, last known raft
|
|
// configuration to this node. It accesses the last state written into the
|
|
// FSM. When a server is online use GetConfiguration instead.
|
|
func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, error) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft != nil {
|
|
return nil, errors.New("raft storage is initialized, used GetConfiguration instead")
|
|
}
|
|
|
|
if b.fsm == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
state, configuration := b.fsm.LatestState()
|
|
config := &RaftConfigurationResponse{
|
|
Index: state.Index,
|
|
}
|
|
|
|
if configuration == nil || configuration.Servers == nil {
|
|
return config, nil
|
|
}
|
|
|
|
for _, server := range configuration.Servers {
|
|
entry := &RaftServer{
|
|
NodeID: server.Id,
|
|
Address: server.Address,
|
|
// Since we are offline no node is the leader.
|
|
Leader: false,
|
|
Voter: raft.ServerSuffrage(server.Suffrage) == raft.Voter,
|
|
}
|
|
config.Servers = append(config.Servers, entry)
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft == nil {
|
|
return nil, errors.New("raft storage is not initialized")
|
|
}
|
|
|
|
future := b.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config := &RaftConfigurationResponse{
|
|
Index: future.Index(),
|
|
}
|
|
|
|
for _, server := range future.Configuration().Servers {
|
|
entry := &RaftServer{
|
|
NodeID: string(server.ID),
|
|
Address: string(server.Address),
|
|
// Since we only service this request on the active node our node ID
|
|
// denotes the raft leader.
|
|
Leader: string(server.ID) == b.NodeID(),
|
|
Voter: server.Suffrage == raft.Voter,
|
|
ProtocolVersion: strconv.Itoa(raft.ProtocolVersionMax),
|
|
}
|
|
|
|
config.Servers = append(config.Servers, entry)
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// AddPeer adds a new server to the raft cluster
|
|
func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.disableAutopilot {
|
|
if b.raft == nil {
|
|
return errors.New("raft storage is not initialized")
|
|
}
|
|
b.logger.Trace("adding server to raft", "id", peerID)
|
|
future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)
|
|
return future.Error()
|
|
}
|
|
|
|
if b.autopilot == nil {
|
|
return errors.New("raft storage autopilot is not initialized")
|
|
}
|
|
|
|
b.logger.Trace("adding server to raft via autopilot", "id", peerID)
|
|
return b.autopilot.AddServer(&autopilot.Server{
|
|
ID: raft.ServerID(peerID),
|
|
Name: peerID,
|
|
Address: raft.ServerAddress(clusterAddr),
|
|
RaftVersion: raft.ProtocolVersionMax,
|
|
NodeType: autopilot.NodeVoter,
|
|
})
|
|
}
|
|
|
|
// Peers returns all the servers present in the raft cluster
|
|
func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) {
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft == nil {
|
|
return nil, errors.New("raft storage is not initialized")
|
|
}
|
|
|
|
future := b.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ret := make([]Peer, len(future.Configuration().Servers))
|
|
for i, s := range future.Configuration().Servers {
|
|
ret[i] = Peer{
|
|
ID: string(s.ID),
|
|
Address: string(s.Address),
|
|
Suffrage: int(s.Suffrage),
|
|
}
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
// SnapshotHTTP is a wrapper for Snapshot that sends the snapshot as an HTTP
|
|
// response.
|
|
func (b *RaftBackend) SnapshotHTTP(out *logical.HTTPResponseWriter, access seal.Access) error {
|
|
out.Header().Add("Content-Disposition", "attachment")
|
|
out.Header().Add("Content-Type", "application/gzip")
|
|
|
|
return b.Snapshot(out, access)
|
|
}
|
|
|
|
// Snapshot takes a raft snapshot, packages it into a archive file and writes it
|
|
// to the provided writer. Seal access is used to encrypt the SHASUM file so we
|
|
// can validate the snapshot was taken using the same root keys or not.
|
|
func (b *RaftBackend) Snapshot(out io.Writer, access seal.Access) error {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft == nil {
|
|
return errors.New("raft storage is sealed")
|
|
}
|
|
|
|
// If we have access to the seal create a sealer object
|
|
var s snapshot.Sealer
|
|
if access != nil {
|
|
s = &sealer{
|
|
access: access,
|
|
}
|
|
}
|
|
|
|
return snapshot.Write(b.logger.Named("snapshot"), b.raft, s, out)
|
|
}
|
|
|
|
// WriteSnapshotToTemp reads a snapshot archive off the provided reader,
|
|
// extracts the data and writes the snapshot to a temporary file. The seal
|
|
// access is used to decrypt the SHASUM file in the archive to ensure this
|
|
// snapshot has the same root key as the running instance. If the provided
|
|
// access is nil then it will skip that validation.
|
|
func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access seal.Access) (*os.File, func(), raft.SnapshotMeta, error) {
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
var metadata raft.SnapshotMeta
|
|
if b.raft == nil {
|
|
return nil, nil, metadata, errors.New("raft storage is sealed")
|
|
}
|
|
|
|
// If we have access to the seal create a sealer object
|
|
var s snapshot.Sealer
|
|
if access != nil {
|
|
s = &sealer{
|
|
access: access,
|
|
}
|
|
}
|
|
|
|
snap, cleanup, err := snapshot.WriteToTempFileWithSealer(b.logger.Named("snapshot"), in, &metadata, s)
|
|
return snap, cleanup, metadata, err
|
|
}
|
|
|
|
// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
|
|
// raft.
|
|
func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.l.RLock()
|
|
defer b.l.RUnlock()
|
|
|
|
if b.raft == nil {
|
|
return errors.New("raft storage is not initialized")
|
|
}
|
|
|
|
if err := b.raft.Restore(&metadata, snap, 0); err != nil {
|
|
b.logger.Named("snapshot").Error("failed to restore snapshot", "error", err)
|
|
return err
|
|
}
|
|
|
|
// Apply a log that tells the follower nodes to run the restore callback
|
|
// function. This is done after the restore call so we can be sure the
|
|
// snapshot applied to a quorum of nodes.
|
|
command := &LogData{
|
|
Operations: []*LogOperation{
|
|
{
|
|
OpType: restoreCallbackOp,
|
|
},
|
|
},
|
|
}
|
|
|
|
err := b.applyLog(ctx, command)
|
|
|
|
// Do a best-effort attempt to let the standbys apply the restoreCallbackOp
|
|
// before we continue.
|
|
time.Sleep(restoreOpDelayDuration)
|
|
return err
|
|
}
|
|
|
|
// Delete inserts an entry in the log to delete the given path
|
|
func (b *RaftBackend) Delete(ctx context.Context, path string) error {
|
|
defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
command := &LogData{
|
|
Operations: []*LogOperation{
|
|
{
|
|
OpType: deleteOp,
|
|
Key: path,
|
|
},
|
|
},
|
|
}
|
|
b.permitPool.Acquire()
|
|
defer b.permitPool.Release()
|
|
|
|
b.l.RLock()
|
|
err := b.applyLog(ctx, command)
|
|
b.l.RUnlock()
|
|
return err
|
|
}
|
|
|
|
// Get returns the value corresponding to the given path from the fsm
|
|
func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, error) {
|
|
defer metrics.MeasureSince([]string{"raft-storage", "get"}, time.Now())
|
|
if b.fsm == nil {
|
|
return nil, errors.New("raft: fsm not configured")
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b.permitPool.Acquire()
|
|
defer b.permitPool.Release()
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
entry, err := b.fsm.Get(ctx, path)
|
|
if entry != nil {
|
|
valueLen := len(entry.Value)
|
|
if uint64(valueLen) > b.maxEntrySize {
|
|
b.logger.Warn("retrieved entry value is too large, has raft's max_entry_size been reduced?",
|
|
"size", valueLen, "max_entry_size", b.maxEntrySize)
|
|
}
|
|
}
|
|
|
|
return entry, err
|
|
}
|
|
|
|
// Put inserts an entry in the log for the put operation. It will return an
|
|
// error if the resulting entry encoding exceeds the configured max_entry_size
|
|
// or if the call to applyLog fails.
|
|
func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
|
defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())
|
|
if len(entry.Key) > bolt.MaxKeySize {
|
|
return fmt.Errorf("%s, max key size for integrated storage is %d", physical.ErrKeyTooLarge, bolt.MaxKeySize)
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
command := &LogData{
|
|
Operations: []*LogOperation{
|
|
{
|
|
OpType: putOp,
|
|
Key: entry.Key,
|
|
Value: entry.Value,
|
|
},
|
|
},
|
|
}
|
|
|
|
b.permitPool.Acquire()
|
|
defer b.permitPool.Release()
|
|
|
|
b.l.RLock()
|
|
err := b.applyLog(ctx, command)
|
|
b.l.RUnlock()
|
|
return err
|
|
}
|
|
|
|
// List enumerates all the items under the prefix from the fsm
|
|
func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
|
defer metrics.MeasureSince([]string{"raft-storage", "list"}, time.Now())
|
|
if b.fsm == nil {
|
|
return nil, errors.New("raft: fsm not configured")
|
|
}
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
b.permitPool.Acquire()
|
|
defer b.permitPool.Release()
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return b.fsm.List(ctx, prefix)
|
|
}
|
|
|
|
// Transaction applies all the given operations into a single log and
|
|
// applies it.
|
|
func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
|
defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
failGetInTxn := atomic.LoadUint32(b.failGetInTxn)
|
|
for _, t := range txns {
|
|
if t.Operation == physical.GetOperation && failGetInTxn != 0 {
|
|
return GetInTxnDisabledError
|
|
}
|
|
}
|
|
|
|
txnMap := make(map[string]*physical.TxnEntry)
|
|
|
|
command := &LogData{
|
|
Operations: make([]*LogOperation, len(txns)),
|
|
}
|
|
for i, txn := range txns {
|
|
op := &LogOperation{}
|
|
switch txn.Operation {
|
|
case physical.PutOperation:
|
|
if len(txn.Entry.Key) > bolt.MaxKeySize {
|
|
return fmt.Errorf("%s, max key size for integrated storage is %d", physical.ErrKeyTooLarge, bolt.MaxKeySize)
|
|
}
|
|
op.OpType = putOp
|
|
op.Key = txn.Entry.Key
|
|
op.Value = txn.Entry.Value
|
|
case physical.DeleteOperation:
|
|
op.OpType = deleteOp
|
|
op.Key = txn.Entry.Key
|
|
case physical.GetOperation:
|
|
op.OpType = getOp
|
|
op.Key = txn.Entry.Key
|
|
txnMap[op.Key] = txn
|
|
default:
|
|
return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
|
|
}
|
|
|
|
command.Operations[i] = op
|
|
}
|
|
|
|
b.permitPool.Acquire()
|
|
defer b.permitPool.Release()
|
|
|
|
b.l.RLock()
|
|
err := b.applyLog(ctx, command)
|
|
b.l.RUnlock()
|
|
|
|
// loop over results and update pointers to get operations
|
|
for _, logOp := range command.Operations {
|
|
if logOp.OpType == getOp {
|
|
if txn, found := txnMap[logOp.Key]; found {
|
|
txn.Entry.Value = logOp.Value
|
|
}
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// applyLog will take a given log command and apply it to the raft log. applyLog
|
|
// doesn't return until the log has been applied to a quorum of servers and is
|
|
// persisted to the local FSM. Caller should hold the backend's read lock.
|
|
func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
|
|
if b.raft == nil {
|
|
return errors.New("raft storage is not initialized")
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
commandBytes, err := proto.Marshal(command)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cmdSize := len(commandBytes)
|
|
if uint64(cmdSize) > b.maxEntrySize {
|
|
return fmt.Errorf("%s; got %d bytes, max: %d bytes", physical.ErrValueTooLarge, cmdSize, b.maxEntrySize)
|
|
}
|
|
|
|
defer metrics.AddSample([]string{"raft-storage", "entry_size"}, float32(cmdSize))
|
|
|
|
var chunked bool
|
|
var applyFuture raft.ApplyFuture
|
|
switch {
|
|
case len(commandBytes) <= raftchunking.ChunkSize:
|
|
applyFuture = b.raft.Apply(commandBytes, 0)
|
|
default:
|
|
chunked = true
|
|
applyFuture = raftchunking.ChunkingApply(commandBytes, nil, 0, b.raft.ApplyLog)
|
|
}
|
|
|
|
if err := applyFuture.Error(); err != nil {
|
|
return err
|
|
}
|
|
|
|
resp := applyFuture.Response()
|
|
|
|
if chunked {
|
|
// In this case we didn't apply all chunks successfully, possibly due
|
|
// to a term change
|
|
if resp == nil {
|
|
// This returns the error in the interface because the raft library
|
|
// returns errors from the FSM via the future, not via err from the
|
|
// apply function. Downstream client code expects to see any error
|
|
// from the FSM (as opposed to the apply itself) and decide whether
|
|
// it can retry in the future's response.
|
|
return errors.New("applying chunking failed, please retry")
|
|
}
|
|
|
|
// We expect that this conversion should always work
|
|
chunkedSuccess, ok := resp.(raftchunking.ChunkingSuccess)
|
|
if !ok {
|
|
return errors.New("unknown type of response back from chunking FSM")
|
|
}
|
|
|
|
// Replace the reply with the inner wrapped version
|
|
resp = chunkedSuccess.Response
|
|
}
|
|
|
|
fsmar, ok := resp.(*FSMApplyResponse)
|
|
if !ok || !fsmar.Success {
|
|
return errors.New("could not apply data")
|
|
}
|
|
|
|
// populate command with our results
|
|
if fsmar.EntrySlice == nil {
|
|
return errors.New("entries on FSM response were empty")
|
|
}
|
|
|
|
for i, logOp := range command.Operations {
|
|
if logOp.OpType == getOp {
|
|
fsmEntry := fsmar.EntrySlice[i]
|
|
|
|
// this should always be true because the entries in the slice were created in the same order as
|
|
// the command operations.
|
|
if logOp.Key == fsmEntry.Key {
|
|
if len(fsmEntry.Value) > 0 {
|
|
logOp.Value = fsmEntry.Value
|
|
}
|
|
} else {
|
|
// this shouldn't happen
|
|
return errors.New("entries in FSM response were out of order")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// HAEnabled is the implementation of the HABackend interface
|
|
func (b *RaftBackend) HAEnabled() bool { return true }
|
|
|
|
// HAEnabled is the implementation of the HABackend interface
|
|
func (b *RaftBackend) LockWith(key, value string) (physical.Lock, error) {
|
|
return &RaftLock{
|
|
key: key,
|
|
value: []byte(value),
|
|
b: b,
|
|
}, nil
|
|
}
|
|
|
|
// SetDesiredSuffrage sets a field in the fsm indicating the suffrage intent for
|
|
// this node.
|
|
func (b *RaftBackend) SetDesiredSuffrage(nonVoter bool) error {
|
|
b.l.Lock()
|
|
defer b.l.Unlock()
|
|
|
|
var desiredSuffrage string
|
|
switch nonVoter {
|
|
case true:
|
|
desiredSuffrage = "non-voter"
|
|
default:
|
|
desiredSuffrage = "voter"
|
|
}
|
|
|
|
err := b.fsm.recordSuffrage(desiredSuffrage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *RaftBackend) DesiredSuffrage() string {
|
|
return b.fsm.DesiredSuffrage()
|
|
}
|
|
|
|
// RaftLock implements the physical Lock interface and enables HA for this
|
|
// backend. The Lock uses the raftNotifyCh for receiving leadership edge
|
|
// triggers. Vault's active duty matches raft's leadership.
|
|
type RaftLock struct {
|
|
key string
|
|
value []byte
|
|
|
|
b *RaftBackend
|
|
}
|
|
|
|
// monitorLeadership waits until we receive an update on the raftNotifyCh and
|
|
// closes the leaderLost channel.
|
|
func (l *RaftLock) monitorLeadership(stopCh <-chan struct{}, leaderNotifyCh <-chan bool) <-chan struct{} {
|
|
leaderLost := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case isLeader := <-leaderNotifyCh:
|
|
// leaderNotifyCh may deliver a true value initially if this
|
|
// server is already the leader prior to RaftLock.Lock call
|
|
// (the true message was already queued). The next message is
|
|
// always going to be false. The for loop should loop at most
|
|
// twice.
|
|
if !isLeader {
|
|
close(leaderLost)
|
|
return
|
|
}
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return leaderLost
|
|
}
|
|
|
|
// Lock blocks until we become leader or are shutdown. It returns a channel that
|
|
// is closed when we detect a loss of leadership.
|
|
func (l *RaftLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
|
// If not initialized, block until it is
|
|
if !l.b.Initialized() {
|
|
select {
|
|
case <-l.b.raftInitCh:
|
|
case <-stopCh:
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
l.b.l.RLock()
|
|
|
|
// Ensure that we still have a raft instance after grabbing the read lock
|
|
if l.b.raft == nil {
|
|
l.b.l.RUnlock()
|
|
return nil, errors.New("attempted to grab a lock on a nil raft backend")
|
|
}
|
|
|
|
// Cache the notifyCh locally
|
|
leaderNotifyCh := l.b.raftNotifyCh
|
|
|
|
// Check to see if we are already leader.
|
|
if l.b.raft.State() == raft.Leader {
|
|
err := l.b.applyLog(context.Background(), &LogData{
|
|
Operations: []*LogOperation{
|
|
{
|
|
OpType: putOp,
|
|
Key: l.key,
|
|
Value: l.value,
|
|
},
|
|
},
|
|
})
|
|
l.b.l.RUnlock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return l.monitorLeadership(stopCh, leaderNotifyCh), nil
|
|
}
|
|
l.b.l.RUnlock()
|
|
|
|
for {
|
|
select {
|
|
case isLeader := <-leaderNotifyCh:
|
|
if isLeader {
|
|
// We are leader, set the key
|
|
l.b.l.RLock()
|
|
err := l.b.applyLog(context.Background(), &LogData{
|
|
Operations: []*LogOperation{
|
|
{
|
|
OpType: putOp,
|
|
Key: l.key,
|
|
Value: l.value,
|
|
},
|
|
},
|
|
})
|
|
l.b.l.RUnlock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return l.monitorLeadership(stopCh, leaderNotifyCh), nil
|
|
}
|
|
case <-stopCh:
|
|
return nil, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Unlock gives up leadership.
|
|
func (l *RaftLock) Unlock() error {
|
|
if l.b.raft == nil {
|
|
return nil
|
|
}
|
|
|
|
return l.b.raft.LeadershipTransfer().Error()
|
|
}
|
|
|
|
// Value reads the value of the lock. This informs us who is currently leader.
|
|
func (l *RaftLock) Value() (bool, string, error) {
|
|
e, err := l.b.Get(context.Background(), l.key)
|
|
if err != nil {
|
|
return false, "", err
|
|
}
|
|
if e == nil {
|
|
return false, "", nil
|
|
}
|
|
|
|
value := string(e.Value)
|
|
// TODO: how to tell if held?
|
|
return true, value, nil
|
|
}
|
|
|
|
// sealer implements the snapshot.Sealer interface and is used in the snapshot
|
|
// process for encrypting/decrypting the SHASUM file in snapshot archives.
|
|
type sealer struct {
|
|
access seal.Access
|
|
}
|
|
|
|
// Seal encrypts the data with using the seal access object.
|
|
func (s sealer) Seal(ctx context.Context, pt []byte) ([]byte, error) {
|
|
if s.access == nil {
|
|
return nil, errors.New("no seal access available")
|
|
}
|
|
eblob, err := s.access.Encrypt(ctx, pt, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return proto.Marshal(eblob)
|
|
}
|
|
|
|
// Open decrypts the data using the seal access object.
|
|
func (s sealer) Open(ctx context.Context, ct []byte) ([]byte, error) {
|
|
if s.access == nil {
|
|
return nil, errors.New("no seal access available")
|
|
}
|
|
|
|
var eblob wrapping.BlobInfo
|
|
err := proto.Unmarshal(ct, &eblob)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s.access.Decrypt(ctx, &eblob, nil)
|
|
}
|
|
|
|
// boltOptions returns a bolt.Options struct, suitable for passing to
|
|
// bolt.Open(), pre-configured with all of our preferred defaults.
|
|
func boltOptions(path string) *bolt.Options {
|
|
o := &bolt.Options{
|
|
Timeout: 1 * time.Second,
|
|
FreelistType: bolt.FreelistMapType,
|
|
NoFreelistSync: true,
|
|
MmapFlags: getMmapFlags(path),
|
|
}
|
|
|
|
if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
|
|
o.FreelistType = bolt.FreelistArrayType
|
|
}
|
|
|
|
if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
|
|
o.NoFreelistSync = false
|
|
}
|
|
|
|
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
|
|
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
|
|
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
|
|
// so if users are wanting to turn this off, they can also set it to 0. Setting it
|
|
// to a negative value is the same as not setting it at all.
|
|
if os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE") == "" {
|
|
o.InitialMmapSize = initialMmapSize
|
|
} else {
|
|
imms, err := strconv.Atoi(os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE"))
|
|
|
|
// If there's an error here, it means they passed something that's not convertible to
|
|
// a number. Rather than fail startup, just ignore it.
|
|
if err == nil && imms > 0 {
|
|
o.InitialMmapSize = imms
|
|
}
|
|
}
|
|
|
|
return o
|
|
}
|