open-vault/vendor/github.com/hashicorp/raft/snapshot.go
Brian Kassouf ed14061578
Raft Storage Backend (#6888)
* Work on raft backend

* Add logstore locally

* Add encryptor and unsealable interfaces

* Add clustering support to raft

* Remove client and handler

* Bootstrap raft on init

* Cleanup raft logic a bit

* More raft work

* Work on TLS config

* More work on bootstrapping

* Fix build

* More work on bootstrapping

* More bootstrapping work

* fix build

* Remove consul dep

* Fix build

* merged oss/master into raft-storage

* Work on bootstrapping

* Get bootstrapping to work

* Clean up FMS and node-id

* Update local node ID logic

* Cleanup node-id change

* Work on snapshotting

* Raft: Add remove peer API (#906)

* Add remove peer API

* Add some comments

* Fix existing snapshotting (#909)

* Raft get peers API (#912)

* Read raft configuration

* address review feedback

* Use the Leadership Transfer API to step-down the active node (#918)

* Raft join and unseal using Shamir keys (#917)

* Raft join using shamir

* Store AEAD instead of master key

* Split the raft join process to answer the challenge after a successful unseal

* get the follower to standby state

* Make unseal work

* minor changes

* Some input checks

* reuse the shamir seal access instead of new default seal access

* refactor joinRaftSendAnswer function

* Synchronously send answer in auto-unseal case

* Address review feedback

* Raft snapshots (#910)

* Fix existing snapshotting

* implement the noop snapshotting

* Add comments and switch log libraries

* add some snapshot tests

* add snapshot test file

* add TODO

* More work on raft snapshotting

* progress on the ConfigStore strategy

* Don't use two buckets

* Update the snapshot store logic to hide the file logic

* Add more backend tests

* Cleanup code a bit

* [WIP] Raft recovery (#938)

* Add recovery functionality

* remove fmt.Printfs

* Fix a few fsm bugs

* Add max size value for raft backend (#942)

* Add max size value for raft backend

* Include physical.ErrValueTooLarge in the message

* Raft snapshot Take/Restore API  (#926)

* Inital work on raft snapshot APIs

* Always redirect snapshot install/download requests

* More work on the snapshot APIs

* Cleanup code a bit

* On restore handle special cases

* Use the seal to encrypt the sha sum file

* Add sealer mechanism and fix some bugs

* Call restore while state lock is held

* Send restore cb trigger through raft log

* Make error messages nicer

* Add test helpers

* Add snapshot test

* Add shamir unseal test

* Add more raft snapshot API tests

* Fix locking

* Change working to initalize

* Add underlying raw object to test cluster core

* Move leaderUUID to core

* Add raft TLS rotation logic (#950)

* Add TLS rotation logic

* Cleanup logic a bit

* Add/Remove from follower state on add/remove peer

* add comments

* Update more comments

* Update request_forwarding_service.proto

* Make sure we populate all nodes in the followerstate obj

* Update times

* Apply review feedback

* Add more raft config setting (#947)

* Add performance config setting

* Add more config options and fix tests

* Test Raft Recovery (#944)

* Test raft recovery

* Leave out a node during recovery

* remove unused struct

* Update physical/raft/snapshot_test.go

* Update physical/raft/snapshot_test.go

* fix vendoring

* Switch to new raft interface

* Remove unused files

* Switch a gogo -> proto instance

* Remove unneeded vault dep in go.sum

* Update helper/testhelpers/testhelpers.go

Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com>

* Update vault/cluster/cluster.go

* track active key within the keyring itself (#6915)

* track active key within the keyring itself

* lookup and store using the active key ID

* update docstring

* minor refactor

* Small text fixes (#6912)

* Update physical/raft/raft.go

Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com>

* review feedback

* Move raft logical system into separate file

* Update help text a bit

* Enforce cluster addr is set and use it for raft bootstrapping

* Fix tests

* fix http test panic

* Pull in latest raft-snapshot library

* Add comment
2019-06-20 12:14:58 -07:00

240 lines
7.5 KiB
Go

package raft
import (
"fmt"
"io"
"time"
"github.com/armon/go-metrics"
)
// SnapshotMeta is for metadata of a snapshot.
type SnapshotMeta struct {
// Version is the version number of the snapshot metadata. This does not cover
// the application's data in the snapshot, that should be versioned
// separately.
Version SnapshotVersion
// ID is opaque to the store, and is used for opening.
ID string
// Index and Term store when the snapshot was taken.
Index uint64
Term uint64
// Peers is deprecated and used to support version 0 snapshots, but will
// be populated in version 1 snapshots as well to help with upgrades.
Peers []byte
// Configuration and ConfigurationIndex are present in version 1
// snapshots and later.
Configuration Configuration
ConfigurationIndex uint64
// Size is the size of the snapshot in bytes.
Size int64
}
// SnapshotStore interface is used to allow for flexible implementations
// of snapshot storage and retrieval. For example, a client could implement
// a shared state store such as S3, allowing new nodes to restore snapshots
// without streaming from the leader.
type SnapshotStore interface {
// Create is used to begin a snapshot at a given index and term, and with
// the given committed configuration. The version parameter controls
// which snapshot version to create.
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error)
// List is used to list the available snapshots in the store.
// It should return then in descending order, with the highest index first.
List() ([]*SnapshotMeta, error)
// Open takes a snapshot ID and provides a ReadCloser. Once close is
// called it is assumed the snapshot is no longer needed.
Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}
// SnapshotSink is returned by StartSnapshot. The FSM will Write state
// to the sink and call Close on completion. On error, Cancel will be invoked.
type SnapshotSink interface {
io.WriteCloser
ID() string
Cancel() error
}
// runSnapshots is a long running goroutine used to manage taking
// new snapshots of the FSM. It runs in parallel to the FSM and
// main goroutines, so that snapshots do not block normal operation.
func (r *Raft) runSnapshots() {
for {
select {
case <-randomTimeout(r.conf.SnapshotInterval):
// Check if we should snapshot
if !r.shouldSnapshot() {
continue
}
// Trigger a snapshot
if _, err := r.takeSnapshot(); err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err))
}
case future := <-r.userSnapshotCh:
// User-triggered, run immediately
id, err := r.takeSnapshot()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to take snapshot: %v", err))
} else {
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
return r.snapshots.Open(id)
}
}
future.respond(err)
case <-r.shutdownCh:
return
}
}
}
// shouldSnapshot checks if we meet the conditions to take
// a new snapshot.
func (r *Raft) shouldSnapshot() bool {
// Check the last snapshot index
lastSnap, _ := r.getLastSnapshot()
// Check the last log index
lastIdx, err := r.logs.LastIndex()
if err != nil {
r.logger.Error(fmt.Sprintf("Failed to get last log index: %v", err))
return false
}
// Compare the delta to the threshold
delta := lastIdx - lastSnap
return delta >= r.conf.SnapshotThreshold
}
// takeSnapshot is used to take a new snapshot. This must only be called from
// the snapshot thread, never the main thread. This returns the ID of the new
// snapshot, along with an error.
func (r *Raft) takeSnapshot() (string, error) {
defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now())
// Create a request for the FSM to perform a snapshot.
snapReq := &reqSnapshotFuture{}
snapReq.init()
// Wait for dispatch or shutdown.
select {
case r.fsmSnapshotCh <- snapReq:
case <-r.shutdownCh:
return "", ErrRaftShutdown
}
// Wait until we get a response
if err := snapReq.Error(); err != nil {
if err != ErrNothingNewToSnapshot {
err = fmt.Errorf("failed to start snapshot: %v", err)
}
return "", err
}
defer snapReq.snapshot.Release()
// Make a request for the configurations and extract the committed info.
// We have to use the future here to safely get this information since
// it is owned by the main thread.
configReq := &configurationsFuture{}
configReq.init()
select {
case r.configurationsCh <- configReq:
case <-r.shutdownCh:
return "", ErrRaftShutdown
}
if err := configReq.Error(); err != nil {
return "", err
}
committed := configReq.configurations.committed
committedIndex := configReq.configurations.committedIndex
// We don't support snapshots while there's a config change outstanding
// since the snapshot doesn't have a means to represent this state. This
// is a little weird because we need the FSM to apply an index that's
// past the configuration change, even though the FSM itself doesn't see
// the configuration changes. It should be ok in practice with normal
// application traffic flowing through the FSM. If there's none of that
// then it's not crucial that we snapshot, since there's not much going
// on Raft-wise.
if snapReq.index < committedIndex {
return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
committedIndex, snapReq.index)
}
// Create a new snapshot.
r.logger.Info(fmt.Sprintf("Starting snapshot up to %d", snapReq.index))
start := time.Now()
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
if err != nil {
return "", fmt.Errorf("failed to create snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start)
// Try to persist the snapshot.
start = time.Now()
if err := snapReq.snapshot.Persist(sink); err != nil {
sink.Cancel()
return "", fmt.Errorf("failed to persist snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start)
// Close and check for error.
if err := sink.Close(); err != nil {
return "", fmt.Errorf("failed to close snapshot: %v", err)
}
// Update the last stable snapshot info.
r.setLastSnapshot(snapReq.index, snapReq.term)
// Compact the logs.
if err := r.compactLogs(snapReq.index); err != nil {
return "", err
}
r.logger.Info(fmt.Sprintf("Snapshot to %d complete", snapReq.index))
return sink.ID(), nil
}
// compactLogs takes the last inclusive index of a snapshot
// and trims the logs that are no longer needed.
func (r *Raft) compactLogs(snapIdx uint64) error {
defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now())
// Determine log ranges to compact
minLog, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
// Check if we have enough logs to truncate
lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
return nil
}
// Truncate up to the end of the snapshot, or `TrailingLogs`
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)
// Log this
r.logger.Info(fmt.Sprintf("Compacting logs from %d to %d", minLog, maxLog))
// Compact the logs
if err := r.logs.DeleteRange(minLog, maxLog); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}
return nil
}