183 lines
3.2 KiB
Go
183 lines
3.2 KiB
Go
package raft
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Future is used to represent an action that may occur in the future.
|
|
type Future interface {
|
|
Error() error
|
|
}
|
|
|
|
// ApplyFuture is used for Apply() and can returns the FSM response.
|
|
type ApplyFuture interface {
|
|
Future
|
|
Response() interface{}
|
|
Index() uint64
|
|
}
|
|
|
|
// errorFuture is used to return a static error.
|
|
type errorFuture struct {
|
|
err error
|
|
}
|
|
|
|
func (e errorFuture) Error() error {
|
|
return e.err
|
|
}
|
|
|
|
func (e errorFuture) Response() interface{} {
|
|
return nil
|
|
}
|
|
|
|
func (e errorFuture) Index() uint64 {
|
|
return 0
|
|
}
|
|
|
|
// deferError can be embedded to allow a future
|
|
// to provide an error in the future.
|
|
type deferError struct {
|
|
err error
|
|
errCh chan error
|
|
responded bool
|
|
}
|
|
|
|
func (d *deferError) init() {
|
|
d.errCh = make(chan error, 1)
|
|
}
|
|
|
|
func (d *deferError) Error() error {
|
|
if d.err != nil {
|
|
return d.err
|
|
}
|
|
if d.errCh == nil {
|
|
panic("waiting for response on nil channel")
|
|
}
|
|
d.err = <-d.errCh
|
|
return d.err
|
|
}
|
|
|
|
func (d *deferError) respond(err error) {
|
|
if d.errCh == nil {
|
|
return
|
|
}
|
|
if d.responded {
|
|
return
|
|
}
|
|
d.errCh <- err
|
|
close(d.errCh)
|
|
d.responded = true
|
|
}
|
|
|
|
// logFuture is used to apply a log entry and waits until
|
|
// the log is considered committed.
|
|
type logFuture struct {
|
|
deferError
|
|
log Log
|
|
policy quorumPolicy
|
|
response interface{}
|
|
dispatch time.Time
|
|
}
|
|
|
|
func (l *logFuture) Response() interface{} {
|
|
return l.response
|
|
}
|
|
|
|
func (l *logFuture) Index() uint64 {
|
|
return l.log.Index
|
|
}
|
|
|
|
type peerFuture struct {
|
|
deferError
|
|
peers []string
|
|
}
|
|
|
|
type shutdownFuture struct {
|
|
raft *Raft
|
|
}
|
|
|
|
func (s *shutdownFuture) Error() error {
|
|
for s.raft.getRoutines() > 0 {
|
|
time.Sleep(5 * time.Millisecond)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// snapshotFuture is used for waiting on a snapshot to complete.
|
|
type snapshotFuture struct {
|
|
deferError
|
|
}
|
|
|
|
// reqSnapshotFuture is used for requesting a snapshot start.
|
|
// It is only used internally.
|
|
type reqSnapshotFuture struct {
|
|
deferError
|
|
|
|
// snapshot details provided by the FSM runner before responding
|
|
index uint64
|
|
term uint64
|
|
peers []string
|
|
snapshot FSMSnapshot
|
|
}
|
|
|
|
// restoreFuture is used for requesting an FSM to perform a
|
|
// snapshot restore. Used internally only.
|
|
type restoreFuture struct {
|
|
deferError
|
|
ID string
|
|
}
|
|
|
|
// verifyFuture is used to verify the current node is still
|
|
// the leader. This is to prevent a stale read.
|
|
type verifyFuture struct {
|
|
deferError
|
|
notifyCh chan *verifyFuture
|
|
quorumSize int
|
|
votes int
|
|
voteLock sync.Mutex
|
|
}
|
|
|
|
// vote is used to respond to a verifyFuture.
|
|
// This may block when responding on the notifyCh.
|
|
func (v *verifyFuture) vote(leader bool) {
|
|
v.voteLock.Lock()
|
|
defer v.voteLock.Unlock()
|
|
|
|
// Guard against having notified already
|
|
if v.notifyCh == nil {
|
|
return
|
|
}
|
|
|
|
if leader {
|
|
v.votes++
|
|
if v.votes >= v.quorumSize {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
} else {
|
|
v.notifyCh <- v
|
|
v.notifyCh = nil
|
|
}
|
|
}
|
|
|
|
// appendFuture is used for waiting on a pipelined append
|
|
// entries RPC.
|
|
type appendFuture struct {
|
|
deferError
|
|
start time.Time
|
|
args *AppendEntriesRequest
|
|
resp *AppendEntriesResponse
|
|
}
|
|
|
|
func (a *appendFuture) Start() time.Time {
|
|
return a.start
|
|
}
|
|
|
|
func (a *appendFuture) Request() *AppendEntriesRequest {
|
|
return a.args
|
|
}
|
|
|
|
func (a *appendFuture) Response() *AppendEntriesResponse {
|
|
return a.resp
|
|
}
|