290 lines
6.7 KiB
Go
290 lines
6.7 KiB
Go
package raft
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Future is used to represent an action that may occur in the future.
|
|
type Future interface {
|
|
// Error blocks until the future arrives and then
|
|
// returns the error status of the future.
|
|
// This may be called any number of times - all
|
|
// calls will return the same value.
|
|
// Note that it is not OK to call this method
|
|
// twice concurrently on the same Future instance.
|
|
Error() error
|
|
}
|
|
|
|
// IndexFuture is used for future actions that can result in a raft log entry
|
|
// being created.
|
|
type IndexFuture interface {
|
|
Future
|
|
|
|
// Index holds the index of the newly applied log entry.
|
|
// This must not be called until after the Error method has returned.
|
|
Index() uint64
|
|
}
|
|
|
|
// ApplyFuture is used for Apply and can return the FSM response.
|
|
type ApplyFuture interface {
|
|
IndexFuture
|
|
|
|
// Response returns the FSM response as returned
|
|
// by the FSM.Apply method. This must not be called
|
|
// until after the Error method has returned.
|
|
Response() interface{}
|
|
}
|
|
|
|
// ConfigurationFuture is used for GetConfiguration and can return the
|
|
// latest configuration in use by Raft.
|
|
type ConfigurationFuture interface {
|
|
IndexFuture
|
|
|
|
// Configuration contains the latest configuration. This must
|
|
// not be called until after the Error method has returned.
|
|
Configuration() Configuration
|
|
}
|
|
|
|
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
|
|
type SnapshotFuture interface {
|
|
Future
|
|
|
|
// Open is a function you can call to access the underlying snapshot and
|
|
// its metadata. This must not be called until after the Error method
|
|
// has returned.
|
|
Open() (*SnapshotMeta, io.ReadCloser, error)
|
|
}
|
|
|
|
// errorFuture is used to return a static error.
|
|
type errorFuture struct {
|
|
err error
|
|
}
|
|
|
|
func (e errorFuture) Error() error {
|
|
return e.err
|
|
}
|
|
|
|
func (e errorFuture) Response() interface{} {
|
|
return nil
|
|
}
|
|
|
|
func (e errorFuture) Index() uint64 {
|
|
return 0
|
|
}
|
|
|
|
// deferError can be embedded to allow a future
|
|
// to provide an error in the future.
|
|
type deferError struct {
|
|
err error
|
|
errCh chan error
|
|
responded bool
|
|
}
|
|
|
|
func (d *deferError) init() {
|
|
d.errCh = make(chan error, 1)
|
|
}
|
|
|
|
func (d *deferError) Error() error {
|
|
if d.err != nil {
|
|
// Note that when we've received a nil error, this
|
|
// won't trigger, but the channel is closed after
|
|
// send so we'll still return nil below.
|
|
return d.err
|
|
}
|
|
if d.errCh == nil {
|
|
panic("waiting for response on nil channel")
|
|
}
|
|
d.err = <-d.errCh
|
|
return d.err
|
|
}
|
|
|
|
func (d *deferError) respond(err error) {
|
|
if d.errCh == nil {
|
|
return
|
|
}
|
|
if d.responded {
|
|
return
|
|
}
|
|
d.errCh <- err
|
|
close(d.errCh)
|
|
d.responded = true
|
|
}
|
|
|
|
// There are several types of requests that cause a configuration entry to
|
|
// be appended to the log. These are encoded here for leaderLoop() to process.
|
|
// This is internal to a single server.
|
|
type configurationChangeFuture struct {
|
|
logFuture
|
|
req configurationChangeRequest
|
|
}
|
|
|
|
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
|
|
// Raft object's BootstrapCluster member function for more details.
|
|
type bootstrapFuture struct {
|
|
deferError
|
|
|
|
// configuration is the proposed bootstrap configuration to apply.
|
|
configuration Configuration
|
|
}
|
|
|
|
// logFuture is used to apply a log entry and waits until
|
|
// the log is considered committed.
|
|
type logFuture struct {
|
|
deferError
|
|
log Log
|
|
response interface{}
|
|
dispatch time.Time
|
|
}
|
|
|
|
func (l *logFuture) Response() interface{} {
|
|
return l.response
|
|
}
|
|
|
|
func (l *logFuture) Index() uint64 {
|
|
return l.log.Index
|
|
}
|
|
|
|
type shutdownFuture struct {
|
|
raft *Raft
|
|
}
|
|
|
|
func (s *shutdownFuture) Error() error {
|
|
if s.raft == nil {
|
|
return nil
|
|
}
|
|
s.raft.waitShutdown()
|
|
if closeable, ok := s.raft.trans.(WithClose); ok {
|
|
closeable.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
|
|
// complete.
|
|
type userSnapshotFuture struct {
|
|
deferError
|
|
|
|
// opener is a function used to open the snapshot. This is filled in
|
|
// once the future returns with no error.
|
|
opener func() (*SnapshotMeta, io.ReadCloser, error)
|
|
}
|
|
|
|
// Open is a function you can call to access the underlying snapshot and its
|
|
// metadata.
|
|
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
|
|
if u.opener == nil {
|
|
return nil, nil, fmt.Errorf("no snapshot available")
|
|
} else {
|
|
// Invalidate the opener so it can't get called multiple times,
|
|
// which isn't generally safe.
|
|
defer func() {
|
|
u.opener = nil
|
|
}()
|
|
return u.opener()
|
|
}
|
|
}
|
|
|
|
// userRestoreFuture is used for waiting on a user-triggered restore of an
|
|
// external snapshot to complete.
|
|
type userRestoreFuture struct {
|
|
deferError
|
|
|
|
// meta is the metadata that belongs with the snapshot.
|
|
meta *SnapshotMeta
|
|
|
|
// reader is the interface to read the snapshot contents from.
|
|
reader io.Reader
|
|
}
|
|
|
|
// reqSnapshotFuture is used for requesting a snapshot start.
|
|
// It is only used internally.
|
|
type reqSnapshotFuture struct {
|
|
deferError
|
|
|
|
// snapshot details provided by the FSM runner before responding
|
|
index uint64
|
|
term uint64
|
|
snapshot FSMSnapshot
|
|
}
|
|
|
|
// restoreFuture is used for requesting an FSM to perform a
|
|
// snapshot restore. Used internally only.
|
|
type restoreFuture struct {
|
|
deferError
|
|
ID string
|
|
}
|
|
|
|
// verifyFuture is used to verify the current node is still
|
|
// the leader. This is to prevent a stale read.
|
|
type verifyFuture struct {
|
|
deferError
|
|
notifyCh chan *verifyFuture
|
|
quorumSize int
|
|
votes int
|
|
voteLock sync.Mutex
|
|
}
|
|
|
|
// configurationsFuture is used to retrieve the current configurations. This is
|
|
// used to allow safe access to this information outside of the main thread.
|
|
type configurationsFuture struct {
|
|
deferError
|
|
configurations configurations
|
|
}
|
|
|
|
// Configuration returns the latest configuration in use by Raft.
|
|
func (c *configurationsFuture) Configuration() Configuration {
|
|
return c.configurations.latest
|
|
}
|
|
|
|
// Index returns the index of the latest configuration in use by Raft.
|
|
func (c *configurationsFuture) Index() uint64 {
|
|
return c.configurations.latestIndex
|
|
}
|
|
|
|
// vote is used to respond to a verifyFuture.
|
|
// This may block when responding on the notifyCh.
|
|
func (v *verifyFuture) vote(leader bool) {
|
|
v.voteLock.Lock()
|
|
defer v.voteLock.Unlock()
|
|
|
|
// Guard against having notified already
|
|
if v.notifyCh == nil {
|
|
return
|
|
}
|
|
|
|
if leader {
|
|
v.votes++
|
|
if v.votes >= v.quorumSize {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
} else {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
}
|
|
|
|
// appendFuture is used for waiting on a pipelined append
|
|
// entries RPC.
|
|
type appendFuture struct {
|
|
deferError
|
|
start time.Time
|
|
args *AppendEntriesRequest
|
|
resp *AppendEntriesResponse
|
|
}
|
|
|
|
func (a *appendFuture) Start() time.Time {
|
|
return a.start
|
|
}
|
|
|
|
func (a *appendFuture) Request() *AppendEntriesRequest {
|
|
return a.args
|
|
}
|
|
|
|
func (a *appendFuture) Response() *AppendEntriesResponse {
|
|
return a.resp
|
|
}
|