bc29610124
* Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
290 lines
6.7 KiB
Go
290 lines
6.7 KiB
Go
package raft
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Future is used to represent an action that may occur in the future.
|
|
type Future interface {
|
|
// Error blocks until the future arrives and then
|
|
// returns the error status of the future.
|
|
// This may be called any number of times - all
|
|
// calls will return the same value.
|
|
// Note that it is not OK to call this method
|
|
// twice concurrently on the same Future instance.
|
|
Error() error
|
|
}
|
|
|
|
// IndexFuture is used for future actions that can result in a raft log entry
|
|
// being created.
|
|
type IndexFuture interface {
|
|
Future
|
|
|
|
// Index holds the index of the newly applied log entry.
|
|
// This must not be called until after the Error method has returned.
|
|
Index() uint64
|
|
}
|
|
|
|
// ApplyFuture is used for Apply and can return the FSM response.
|
|
type ApplyFuture interface {
|
|
IndexFuture
|
|
|
|
// Response returns the FSM response as returned
|
|
// by the FSM.Apply method. This must not be called
|
|
// until after the Error method has returned.
|
|
Response() interface{}
|
|
}
|
|
|
|
// ConfigurationFuture is used for GetConfiguration and can return the
|
|
// latest configuration in use by Raft.
|
|
type ConfigurationFuture interface {
|
|
IndexFuture
|
|
|
|
// Configuration contains the latest configuration. This must
|
|
// not be called until after the Error method has returned.
|
|
Configuration() Configuration
|
|
}
|
|
|
|
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
|
|
type SnapshotFuture interface {
|
|
Future
|
|
|
|
// Open is a function you can call to access the underlying snapshot and
|
|
// its metadata. This must not be called until after the Error method
|
|
// has returned.
|
|
Open() (*SnapshotMeta, io.ReadCloser, error)
|
|
}
|
|
|
|
// errorFuture is used to return a static error.
|
|
type errorFuture struct {
|
|
err error
|
|
}
|
|
|
|
func (e errorFuture) Error() error {
|
|
return e.err
|
|
}
|
|
|
|
func (e errorFuture) Response() interface{} {
|
|
return nil
|
|
}
|
|
|
|
func (e errorFuture) Index() uint64 {
|
|
return 0
|
|
}
|
|
|
|
// deferError can be embedded to allow a future
|
|
// to provide an error in the future.
|
|
type deferError struct {
|
|
err error
|
|
errCh chan error
|
|
responded bool
|
|
}
|
|
|
|
func (d *deferError) init() {
|
|
d.errCh = make(chan error, 1)
|
|
}
|
|
|
|
func (d *deferError) Error() error {
|
|
if d.err != nil {
|
|
// Note that when we've received a nil error, this
|
|
// won't trigger, but the channel is closed after
|
|
// send so we'll still return nil below.
|
|
return d.err
|
|
}
|
|
if d.errCh == nil {
|
|
panic("waiting for response on nil channel")
|
|
}
|
|
d.err = <-d.errCh
|
|
return d.err
|
|
}
|
|
|
|
func (d *deferError) respond(err error) {
|
|
if d.errCh == nil {
|
|
return
|
|
}
|
|
if d.responded {
|
|
return
|
|
}
|
|
d.errCh <- err
|
|
close(d.errCh)
|
|
d.responded = true
|
|
}
|
|
|
|
// There are several types of requests that cause a configuration entry to
|
|
// be appended to the log. These are encoded here for leaderLoop() to process.
|
|
// This is internal to a single server.
|
|
type configurationChangeFuture struct {
|
|
logFuture
|
|
req configurationChangeRequest
|
|
}
|
|
|
|
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
|
|
// Raft object's BootstrapCluster member function for more details.
|
|
type bootstrapFuture struct {
|
|
deferError
|
|
|
|
// configuration is the proposed bootstrap configuration to apply.
|
|
configuration Configuration
|
|
}
|
|
|
|
// logFuture is used to apply a log entry and waits until
|
|
// the log is considered committed.
|
|
type logFuture struct {
|
|
deferError
|
|
log Log
|
|
response interface{}
|
|
dispatch time.Time
|
|
}
|
|
|
|
func (l *logFuture) Response() interface{} {
|
|
return l.response
|
|
}
|
|
|
|
func (l *logFuture) Index() uint64 {
|
|
return l.log.Index
|
|
}
|
|
|
|
type shutdownFuture struct {
|
|
raft *Raft
|
|
}
|
|
|
|
func (s *shutdownFuture) Error() error {
|
|
if s.raft == nil {
|
|
return nil
|
|
}
|
|
s.raft.waitShutdown()
|
|
if closeable, ok := s.raft.trans.(WithClose); ok {
|
|
closeable.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
|
|
// complete.
|
|
type userSnapshotFuture struct {
|
|
deferError
|
|
|
|
// opener is a function used to open the snapshot. This is filled in
|
|
// once the future returns with no error.
|
|
opener func() (*SnapshotMeta, io.ReadCloser, error)
|
|
}
|
|
|
|
// Open is a function you can call to access the underlying snapshot and its
|
|
// metadata.
|
|
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
|
|
if u.opener == nil {
|
|
return nil, nil, fmt.Errorf("no snapshot available")
|
|
} else {
|
|
// Invalidate the opener so it can't get called multiple times,
|
|
// which isn't generally safe.
|
|
defer func() {
|
|
u.opener = nil
|
|
}()
|
|
return u.opener()
|
|
}
|
|
}
|
|
|
|
// userRestoreFuture is used for waiting on a user-triggered restore of an
|
|
// external snapshot to complete.
|
|
type userRestoreFuture struct {
|
|
deferError
|
|
|
|
// meta is the metadata that belongs with the snapshot.
|
|
meta *SnapshotMeta
|
|
|
|
// reader is the interface to read the snapshot contents from.
|
|
reader io.Reader
|
|
}
|
|
|
|
// reqSnapshotFuture is used for requesting a snapshot start.
|
|
// It is only used internally.
|
|
type reqSnapshotFuture struct {
|
|
deferError
|
|
|
|
// snapshot details provided by the FSM runner before responding
|
|
index uint64
|
|
term uint64
|
|
snapshot FSMSnapshot
|
|
}
|
|
|
|
// restoreFuture is used for requesting an FSM to perform a
|
|
// snapshot restore. Used internally only.
|
|
type restoreFuture struct {
|
|
deferError
|
|
ID string
|
|
}
|
|
|
|
// verifyFuture is used to verify the current node is still
|
|
// the leader. This is to prevent a stale read.
|
|
type verifyFuture struct {
|
|
deferError
|
|
notifyCh chan *verifyFuture
|
|
quorumSize int
|
|
votes int
|
|
voteLock sync.Mutex
|
|
}
|
|
|
|
// configurationsFuture is used to retrieve the current configurations. This is
|
|
// used to allow safe access to this information outside of the main thread.
|
|
type configurationsFuture struct {
|
|
deferError
|
|
configurations configurations
|
|
}
|
|
|
|
// Configuration returns the latest configuration in use by Raft.
|
|
func (c *configurationsFuture) Configuration() Configuration {
|
|
return c.configurations.latest
|
|
}
|
|
|
|
// Index returns the index of the latest configuration in use by Raft.
|
|
func (c *configurationsFuture) Index() uint64 {
|
|
return c.configurations.latestIndex
|
|
}
|
|
|
|
// vote is used to respond to a verifyFuture.
|
|
// This may block when responding on the notifyCh.
|
|
func (v *verifyFuture) vote(leader bool) {
|
|
v.voteLock.Lock()
|
|
defer v.voteLock.Unlock()
|
|
|
|
// Guard against having notified already
|
|
if v.notifyCh == nil {
|
|
return
|
|
}
|
|
|
|
if leader {
|
|
v.votes++
|
|
if v.votes >= v.quorumSize {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
} else {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
}
|
|
|
|
// appendFuture is used for waiting on a pipelined append
|
|
// entries RPC.
|
|
type appendFuture struct {
|
|
deferError
|
|
start time.Time
|
|
args *AppendEntriesRequest
|
|
resp *AppendEntriesResponse
|
|
}
|
|
|
|
func (a *appendFuture) Start() time.Time {
|
|
return a.start
|
|
}
|
|
|
|
func (a *appendFuture) Request() *AppendEntriesRequest {
|
|
return a.args
|
|
}
|
|
|
|
func (a *appendFuture) Response() *AppendEntriesResponse {
|
|
return a.resp
|
|
}
|