package raftchunking import ( "io" "github.com/golang/protobuf/proto" "github.com/hashicorp/errwrap" "github.com/hashicorp/go-raftchunking/types" "github.com/hashicorp/raft" ) var _ raft.FSM = (*ChunkingFSM)(nil) var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil) type ChunkingSuccess struct { Response interface{} } // ChunkingFSM is an FSM that implements chunking; it's the sister of // ChunkingApply. // // N.B.: If a term change happens the final apply from the client will have a // nil result and not be passed through to the underlying FSM. To detect this, // the final apply to the underlying FSM is wrapped in ChunkingSuccess. type ChunkingFSM struct { underlying raft.FSM store ChunkStorage lastTerm uint64 } type ChunkingConfigurationStore struct { *ChunkingFSM underlyingConfigurationStore raft.ConfigurationStore } func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM { ret := &ChunkingFSM{ underlying: underlying, store: store, } if store == nil { ret.store = NewInmemChunkStorage() } return ret } func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore { ret := &ChunkingConfigurationStore{ ChunkingFSM: &ChunkingFSM{ underlying: underlying, store: store, }, underlyingConfigurationStore: underlying, } if store == nil { ret.ChunkingFSM.store = NewInmemChunkStorage() } return ret } // Apply applies the log, handling chunking as needed. The return value will // either be an error or whatever is returned from the underlying Apply. func (c *ChunkingFSM) Apply(l *raft.Log) interface{} { // Not chunking or wrong type, pass through if l.Type != raft.LogCommand || l.Extensions == nil { return c.underlying.Apply(l) } if l.Term != c.lastTerm { // Term has changed. A raft library client that was applying chunks // should get an error that it's no longer the leader and bail, and // then any client of (Consul, Vault, etc.) should then retry the full // chunking operation automatically, which will be under a different // opnum. So it should be safe in this case to clear the map. if err := c.store.RestoreChunks(nil); err != nil { return err } c.lastTerm = l.Term } // Get chunk info from extensions var ci types.ChunkInfo if err := proto.Unmarshal(l.Extensions, &ci); err != nil { return errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err) } // Store the current chunk and find out if all chunks have arrived done, err := c.store.StoreChunk(&ChunkInfo{ OpNum: ci.OpNum, SequenceNum: ci.SequenceNum, NumChunks: ci.NumChunks, Term: l.Term, Data: l.Data, }) if err != nil { return err } if !done { return nil } // All chunks are here; get the full set and clear storage of the op chunks, err := c.store.FinalizeOp(ci.OpNum) if err != nil { return err } finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize) for _, chunk := range chunks { finalData = append(finalData, chunk.Data...) } // Use the latest log's values with the final data logToApply := &raft.Log{ Index: l.Index, Term: l.Term, Type: l.Type, Data: finalData, Extensions: ci.NextExtensions, } return ChunkingSuccess{Response: c.Apply(logToApply)} } func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) { return c.underlying.Snapshot() } func (c *ChunkingFSM) Restore(rc io.ReadCloser) error { return c.underlying.Restore(rc) } // Note: this is used in tests via the Raft package test helper functions, even // if it's not used in client code func (c *ChunkingFSM) Underlying() raft.FSM { return c.underlying } func (c *ChunkingFSM) CurrentState() (*State, error) { chunks, err := c.store.GetChunks() if err != nil { return nil, err } return &State{ ChunkMap: chunks, }, nil } func (c *ChunkingFSM) RestoreState(state *State) error { // If nil we'll restore to blank, so create a new state with a nil map if state == nil { state = new(State) } return c.store.RestoreChunks(state.ChunkMap) } func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) { c.underlyingConfigurationStore.StoreConfiguration(index, configuration) }