package raft import ( "fmt" "io" "time" "github.com/armon/go-metrics" ) // FSM provides an interface that can be implemented by // clients to make use of the replicated log. type FSM interface { // Apply log is invoked once a log entry is committed. // It returns a value which will be made available in the // ApplyFuture returned by Raft.Apply method if that // method was called on the same Raft node as the FSM. Apply(*Log) interface{} // Snapshot is used to support log compaction. This call should // return an FSMSnapshot which can be used to save a point-in-time // snapshot of the FSM. Apply and Snapshot are not called in multiple // threads, but Apply will be called concurrently with Persist. This means // the FSM should be implemented in a fashion that allows for concurrent // updates while a snapshot is happening. Snapshot() (FSMSnapshot, error) // Restore is used to restore an FSM from a snapshot. It is not called // concurrently with any other command. The FSM must discard all previous // state. Restore(io.ReadCloser) error } // FSMSnapshot is returned by an FSM in response to a Snapshot // It must be safe to invoke FSMSnapshot methods with concurrent // calls to Apply. type FSMSnapshot interface { // Persist should dump all necessary state to the WriteCloser 'sink', // and call sink.Close() when finished or call sink.Cancel() on error. Persist(sink SnapshotSink) error // Release is invoked when we are finished with the snapshot. Release() } // runFSM is a long running goroutine responsible for applying logs // to the FSM. This is done async of other logs since we don't want // the FSM to block our internal operations. func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 commit := func(req *commitTuple) { // Apply the log if a command or config change var resp interface{} // Make sure we send a response defer func() { // Invoke the future if given if req.future != nil { req.future.response = resp req.future.respond(nil) } }() switch req.log.Type { case LogCommand: start := time.Now() resp = r.fsm.Apply(req.log) metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) case LogConfiguration: configStore, ok := r.fsm.(ConfigurationStore) if !ok { // Return early to avoid incrementing the index and term for // an unimplemented operation. return } start := time.Now() configStore.StoreConfiguration(req.log.Index, decodeConfiguration(req.log.Data)) metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start) } // Update the indexes lastIndex = req.log.Index lastTerm = req.log.Term } restore := func(req *restoreFuture) { // Open the snapshot meta, source, err := r.snapshots.Open(req.ID) if err != nil { req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) return } // Attempt to restore start := time.Now() if err := r.fsm.Restore(source); err != nil { req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) source.Close() return } source.Close() metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) // Update the last index and term lastIndex = meta.Index lastTerm = meta.Term req.respond(nil) } snapshot := func(req *reqSnapshotFuture) { // Is there something to snapshot? if lastIndex == 0 { req.respond(ErrNothingNewToSnapshot) return } // Start a snapshot start := time.Now() snap, err := r.fsm.Snapshot() metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) // Respond to the request req.index = lastIndex req.term = lastTerm req.snapshot = snap req.respond(err) } for { select { case ptr := <-r.fsmMutateCh: switch req := ptr.(type) { case *commitTuple: commit(req) case *restoreFuture: restore(req) default: panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr)) } case req := <-r.fsmSnapshotCh: snapshot(req) case <-r.shutdownCh: return } } }