package raft import ( "fmt" "io" "sync" "time" ) // Future is used to represent an action that may occur in the future. type Future interface { // Error blocks until the future arrives and then // returns the error status of the future. // This may be called any number of times - all // calls will return the same value. // Note that it is not OK to call this method // twice concurrently on the same Future instance. Error() error } // IndexFuture is used for future actions that can result in a raft log entry // being created. type IndexFuture interface { Future // Index holds the index of the newly applied log entry. // This must not be called until after the Error method has returned. Index() uint64 } // ApplyFuture is used for Apply and can return the FSM response. type ApplyFuture interface { IndexFuture // Response returns the FSM response as returned // by the FSM.Apply method. This must not be called // until after the Error method has returned. Response() interface{} } // ConfigurationFuture is used for GetConfiguration and can return the // latest configuration in use by Raft. type ConfigurationFuture interface { IndexFuture // Configuration contains the latest configuration. This must // not be called until after the Error method has returned. Configuration() Configuration } // SnapshotFuture is used for waiting on a user-triggered snapshot to complete. type SnapshotFuture interface { Future // Open is a function you can call to access the underlying snapshot and // its metadata. This must not be called until after the Error method // has returned. Open() (*SnapshotMeta, io.ReadCloser, error) } // LeadershipTransferFuture is used for waiting on a user-triggered leadership // transfer to complete. type LeadershipTransferFuture interface { Future } // errorFuture is used to return a static error. type errorFuture struct { err error } func (e errorFuture) Error() error { return e.err } func (e errorFuture) Response() interface{} { return nil } func (e errorFuture) Index() uint64 { return 0 } // deferError can be embedded to allow a future // to provide an error in the future. type deferError struct { err error errCh chan error responded bool } func (d *deferError) init() { d.errCh = make(chan error, 1) } func (d *deferError) Error() error { if d.err != nil { // Note that when we've received a nil error, this // won't trigger, but the channel is closed after // send so we'll still return nil below. return d.err } if d.errCh == nil { panic("waiting for response on nil channel") } d.err = <-d.errCh return d.err } func (d *deferError) respond(err error) { if d.errCh == nil { return } if d.responded { return } d.errCh <- err close(d.errCh) d.responded = true } // There are several types of requests that cause a configuration entry to // be appended to the log. These are encoded here for leaderLoop() to process. // This is internal to a single server. type configurationChangeFuture struct { logFuture req configurationChangeRequest } // bootstrapFuture is used to attempt a live bootstrap of the cluster. See the // Raft object's BootstrapCluster member function for more details. type bootstrapFuture struct { deferError // configuration is the proposed bootstrap configuration to apply. configuration Configuration } // logFuture is used to apply a log entry and waits until // the log is considered committed. type logFuture struct { deferError log Log response interface{} dispatch time.Time } func (l *logFuture) Response() interface{} { return l.response } func (l *logFuture) Index() uint64 { return l.log.Index } type shutdownFuture struct { raft *Raft } func (s *shutdownFuture) Error() error { if s.raft == nil { return nil } s.raft.waitShutdown() if closeable, ok := s.raft.trans.(WithClose); ok { closeable.Close() } return nil } // userSnapshotFuture is used for waiting on a user-triggered snapshot to // complete. type userSnapshotFuture struct { deferError // opener is a function used to open the snapshot. This is filled in // once the future returns with no error. opener func() (*SnapshotMeta, io.ReadCloser, error) } // Open is a function you can call to access the underlying snapshot and its // metadata. func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) { if u.opener == nil { return nil, nil, fmt.Errorf("no snapshot available") } else { // Invalidate the opener so it can't get called multiple times, // which isn't generally safe. defer func() { u.opener = nil }() return u.opener() } } // userRestoreFuture is used for waiting on a user-triggered restore of an // external snapshot to complete. type userRestoreFuture struct { deferError // meta is the metadata that belongs with the snapshot. meta *SnapshotMeta // reader is the interface to read the snapshot contents from. reader io.Reader } // reqSnapshotFuture is used for requesting a snapshot start. // It is only used internally. type reqSnapshotFuture struct { deferError // snapshot details provided by the FSM runner before responding index uint64 term uint64 snapshot FSMSnapshot } // restoreFuture is used for requesting an FSM to perform a // snapshot restore. Used internally only. type restoreFuture struct { deferError ID string } // verifyFuture is used to verify the current node is still // the leader. This is to prevent a stale read. type verifyFuture struct { deferError notifyCh chan *verifyFuture quorumSize int votes int voteLock sync.Mutex } // leadershipTransferFuture is used to track the progress of a leadership // transfer internally. type leadershipTransferFuture struct { deferError ID *ServerID Address *ServerAddress } // configurationsFuture is used to retrieve the current configurations. This is // used to allow safe access to this information outside of the main thread. type configurationsFuture struct { deferError configurations configurations } // Configuration returns the latest configuration in use by Raft. func (c *configurationsFuture) Configuration() Configuration { return c.configurations.latest } // Index returns the index of the latest configuration in use by Raft. func (c *configurationsFuture) Index() uint64 { return c.configurations.latestIndex } // vote is used to respond to a verifyFuture. // This may block when responding on the notifyCh. func (v *verifyFuture) vote(leader bool) { v.voteLock.Lock() defer v.voteLock.Unlock() // Guard against having notified already if v.notifyCh == nil { return } if leader { v.votes++ if v.votes >= v.quorumSize { v.notifyCh <- v v.notifyCh = nil } } else { v.notifyCh <- v v.notifyCh = nil } } // appendFuture is used for waiting on a pipelined append // entries RPC. type appendFuture struct { deferError start time.Time args *AppendEntriesRequest resp *AppendEntriesResponse } func (a *appendFuture) Start() time.Time { return a.start } func (a *appendFuture) Request() *AppendEntriesRequest { return a.args } func (a *appendFuture) Response() *AppendEntriesResponse { return a.resp }