ed14061578
* Work on raft backend * Add logstore locally * Add encryptor and unsealable interfaces * Add clustering support to raft * Remove client and handler * Bootstrap raft on init * Cleanup raft logic a bit * More raft work * Work on TLS config * More work on bootstrapping * Fix build * More work on bootstrapping * More bootstrapping work * fix build * Remove consul dep * Fix build * merged oss/master into raft-storage * Work on bootstrapping * Get bootstrapping to work * Clean up FMS and node-id * Update local node ID logic * Cleanup node-id change * Work on snapshotting * Raft: Add remove peer API (#906) * Add remove peer API * Add some comments * Fix existing snapshotting (#909) * Raft get peers API (#912) * Read raft configuration * address review feedback * Use the Leadership Transfer API to step-down the active node (#918) * Raft join and unseal using Shamir keys (#917) * Raft join using shamir * Store AEAD instead of master key * Split the raft join process to answer the challenge after a successful unseal * get the follower to standby state * Make unseal work * minor changes * Some input checks * reuse the shamir seal access instead of new default seal access * refactor joinRaftSendAnswer function * Synchronously send answer in auto-unseal case * Address review feedback * Raft snapshots (#910) * Fix existing snapshotting * implement the noop snapshotting * Add comments and switch log libraries * add some snapshot tests * add snapshot test file * add TODO * More work on raft snapshotting * progress on the ConfigStore strategy * Don't use two buckets * Update the snapshot store logic to hide the file logic * Add more backend tests * Cleanup code a bit * [WIP] Raft recovery (#938) * Add recovery functionality * remove fmt.Printfs * Fix a few fsm bugs * Add max size value for raft backend (#942) * Add max size value for raft backend * Include physical.ErrValueTooLarge in the message * Raft snapshot Take/Restore API (#926) * Inital work on raft snapshot APIs * Always redirect snapshot install/download requests * More work on the snapshot APIs * Cleanup code a bit * On restore handle special cases * Use the seal to encrypt the sha sum file * Add sealer mechanism and fix some bugs * Call restore while state lock is held * Send restore cb trigger through raft log * Make error messages nicer * Add test helpers * Add snapshot test * Add shamir unseal test * Add more raft snapshot API tests * Fix locking * Change working to initalize * Add underlying raw object to test cluster core * Move leaderUUID to core * Add raft TLS rotation logic (#950) * Add TLS rotation logic * Cleanup logic a bit * Add/Remove from follower state on add/remove peer * add comments * Update more comments * Update request_forwarding_service.proto * Make sure we populate all nodes in the followerstate obj * Update times * Apply review feedback * Add more raft config setting (#947) * Add performance config setting * Add more config options and fix tests * Test Raft Recovery (#944) * Test raft recovery * Leave out a node during recovery * remove unused struct * Update physical/raft/snapshot_test.go * Update physical/raft/snapshot_test.go * fix vendoring * Switch to new raft interface * Remove unused files * Switch a gogo -> proto instance * Remove unneeded vault dep in go.sum * Update helper/testhelpers/testhelpers.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * Update vault/cluster/cluster.go * track active key within the keyring itself (#6915) * track active key within the keyring itself * lookup and store using the active key ID * update docstring * minor refactor * Small text fixes (#6912) * Update physical/raft/raft.go Co-Authored-By: Calvin Leung Huang <cleung2010@gmail.com> * review feedback * Move raft logical system into separate file * Update help text a bit * Enforce cluster addr is set and use it for raft bootstrapping * Fix tests * fix http test panic * Pull in latest raft-snapshot library * Add comment
419 lines
8.8 KiB
Go
419 lines
8.8 KiB
Go
package raft
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
fmt "fmt"
|
|
"hash/crc64"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/vault/sdk/physical"
|
|
)
|
|
|
|
type idAddr struct {
|
|
id string
|
|
}
|
|
|
|
func (a *idAddr) Network() string { return "inmem" }
|
|
func (a *idAddr) String() string { return a.id }
|
|
|
|
func addPeer(t *testing.T, leader, follower *RaftBackend) {
|
|
t.Helper()
|
|
if err := leader.AddPeer(context.Background(), follower.NodeID(), follower.NodeID()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
peers, err := leader.Peers(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = follower.Bootstrap(context.Background(), peers)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = follower.SetupCluster(context.Background(), nil, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
leader.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(follower.NodeID()), follower.raftTransport)
|
|
follower.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(leader.NodeID()), leader.raftTransport)
|
|
}
|
|
|
|
func TestRaft_Snapshot_Loading(t *testing.T) {
|
|
raft, dir := getRaft(t, true, false)
|
|
defer os.RemoveAll(dir)
|
|
|
|
// Write some data
|
|
for i := 0; i < 1000; i++ {
|
|
err := raft.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
readCloser, writeCloser := io.Pipe()
|
|
metaReadCloser, metaWriteCloser := io.Pipe()
|
|
|
|
go func() {
|
|
raft.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser)
|
|
}()
|
|
|
|
// Create a CRC64 hash
|
|
stateHash := crc64.New(crc64.MakeTable(crc64.ECMA))
|
|
|
|
// Compute the hash
|
|
size1, err := io.Copy(stateHash, metaReadCloser)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
computed1 := stateHash.Sum(nil)
|
|
|
|
// Create a CRC64 hash
|
|
stateHash = crc64.New(crc64.MakeTable(crc64.ECMA))
|
|
|
|
// Compute the hash
|
|
size2, err := io.Copy(stateHash, readCloser)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
computed2 := stateHash.Sum(nil)
|
|
|
|
if size1 != size2 {
|
|
t.Fatal("sizes did not match")
|
|
}
|
|
|
|
if !bytes.Equal(computed1, computed2) {
|
|
t.Fatal("hashes did not match")
|
|
}
|
|
|
|
snapFuture := raft.raft.Snapshot()
|
|
if err := snapFuture.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
meta, reader, err := snapFuture.Open()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if meta.Size != size1 {
|
|
t.Fatal("meta size did not match expected")
|
|
}
|
|
|
|
// Create a CRC64 hash
|
|
stateHash = crc64.New(crc64.MakeTable(crc64.ECMA))
|
|
|
|
// Compute the hash
|
|
size3, err := io.Copy(stateHash, reader)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
computed3 := stateHash.Sum(nil)
|
|
if size1 != size3 {
|
|
t.Fatal("sizes did not match")
|
|
}
|
|
|
|
if !bytes.Equal(computed1, computed3) {
|
|
t.Fatal("hashes did not match")
|
|
}
|
|
|
|
}
|
|
|
|
func TestRaft_Snapshot_Index(t *testing.T) {
|
|
raft, dir := getRaft(t, true, false)
|
|
defer os.RemoveAll(dir)
|
|
|
|
err := raft.Put(context.Background(), &physical.Entry{
|
|
Key: "key",
|
|
Value: []byte("value"),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Get index
|
|
index, _ := raft.fsm.LatestState()
|
|
if index.Term != 1 {
|
|
t.Fatalf("unexpected term, got %d expected 1", index.Term)
|
|
}
|
|
if index.Index != 3 {
|
|
t.Fatalf("unexpected index, got %d expected 3", index.Term)
|
|
}
|
|
|
|
// Write some data
|
|
for i := 0; i < 100; i++ {
|
|
err := raft.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Get index
|
|
index, _ = raft.fsm.LatestState()
|
|
if index.Term != 1 {
|
|
t.Fatalf("unexpected term, got %d expected 1", index.Term)
|
|
}
|
|
if index.Index != 103 {
|
|
t.Fatalf("unexpected index, got %d expected 103", index.Term)
|
|
}
|
|
|
|
// Take a snapshot
|
|
snapFuture := raft.raft.Snapshot()
|
|
if err := snapFuture.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
meta, reader, err := snapFuture.Open()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
io.Copy(ioutil.Discard, reader)
|
|
|
|
if meta.Index != index.Index {
|
|
t.Fatalf("indexes did not match, got %d expected %d", meta.Index, index.Index)
|
|
}
|
|
if meta.Term != index.Term {
|
|
t.Fatalf("term did not match, got %d expected %d", meta.Term, index.Term)
|
|
}
|
|
|
|
// Write some more data
|
|
for i := 0; i < 100; i++ {
|
|
err := raft.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Open the same snapshot again
|
|
meta, reader, err = raft.snapStore.Open(meta.ID)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
io.Copy(ioutil.Discard, reader)
|
|
|
|
// Make sure the meta data has updated to the new values
|
|
if meta.Index != 203 {
|
|
t.Fatalf("unexpected snapshot index %d", meta.Index)
|
|
}
|
|
if meta.Term != 1 {
|
|
t.Fatalf("unexpected snapshot term %d", meta.Term)
|
|
}
|
|
}
|
|
|
|
func TestRaft_Snapshot_Peers(t *testing.T) {
|
|
raft1, dir := getRaft(t, true, false)
|
|
raft2, dir2 := getRaft(t, false, false)
|
|
raft3, dir3 := getRaft(t, false, false)
|
|
defer os.RemoveAll(dir)
|
|
defer os.RemoveAll(dir2)
|
|
defer os.RemoveAll(dir3)
|
|
|
|
// Write some data
|
|
for i := 0; i < 1000; i++ {
|
|
err := raft1.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Force a snapshot
|
|
snapFuture := raft1.raft.Snapshot()
|
|
if err := snapFuture.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Add raft2 to the cluster
|
|
addPeer(t, raft1, raft2)
|
|
|
|
// TODO: remove sleeps from these tests
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Make sure the snapshot was applied correctly on the follower
|
|
compareDBs(t, raft1.fsm.db, raft2.fsm.db)
|
|
|
|
// Write some more data
|
|
for i := 1000; i < 2000; i++ {
|
|
err := raft1.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
snapFuture = raft1.raft.Snapshot()
|
|
if err := snapFuture.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Add raft3 to the cluster
|
|
addPeer(t, raft1, raft3)
|
|
|
|
// TODO: remove sleeps from these tests
|
|
time.Sleep(10 * time.Second)
|
|
|
|
// Make sure all stores are the same
|
|
compareFSMs(t, raft1.fsm, raft2.fsm)
|
|
compareFSMs(t, raft1.fsm, raft3.fsm)
|
|
}
|
|
|
|
func TestRaft_Snapshot_Restart(t *testing.T) {
|
|
raft1, dir := getRaft(t, true, false)
|
|
defer os.RemoveAll(dir)
|
|
raft2, dir2 := getRaft(t, false, false)
|
|
defer os.RemoveAll(dir2)
|
|
|
|
// Write some data
|
|
for i := 0; i < 100; i++ {
|
|
err := raft1.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Take a snapshot
|
|
snapFuture := raft1.raft.Snapshot()
|
|
if err := snapFuture.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Advance FSM's index past configuration change
|
|
raft1.Put(context.Background(), &physical.Entry{
|
|
Key: "key",
|
|
Value: []byte("value"),
|
|
})
|
|
|
|
// Add raft2 to the cluster
|
|
addPeer(t, raft1, raft2)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
peers, err := raft2.Peers(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(peers) != 2 {
|
|
t.Fatal(peers)
|
|
}
|
|
|
|
// Shutdown raft1
|
|
if err := raft1.TeardownCluster(nil); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Start Raft
|
|
err = raft1.SetupCluster(context.Background(), nil, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
peers, err = raft1.Peers(context.Background())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(peers) != 2 {
|
|
t.Fatal(peers)
|
|
}
|
|
|
|
compareFSMs(t, raft1.fsm, raft2.fsm)
|
|
}
|
|
|
|
func TestRaft_Snapshot_Take_Restore(t *testing.T) {
|
|
raft1, dir := getRaft(t, true, false)
|
|
defer os.RemoveAll(dir)
|
|
raft2, dir2 := getRaft(t, false, false)
|
|
defer os.RemoveAll(dir2)
|
|
|
|
addPeer(t, raft1, raft2)
|
|
|
|
// Write some data
|
|
for i := 0; i < 100; i++ {
|
|
err := raft1.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
snap := &bytes.Buffer{}
|
|
|
|
err := raft1.Snapshot(snap, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Write some more data
|
|
for i := 100; i < 200; i++ {
|
|
err := raft1.Put(context.Background(), &physical.Entry{
|
|
Key: fmt.Sprintf("key-%d", i),
|
|
Value: []byte(fmt.Sprintf("value-%d", i)),
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
snapFile, cleanup, metadata, err := raft1.WriteSnapshotToTemp(ioutil.NopCloser(snap), nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer cleanup()
|
|
|
|
err = raft1.RestoreSnapshot(context.Background(), metadata, snapFile)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// make sure we don't have the second batch of writes
|
|
for i := 100; i < 200; i++ {
|
|
{
|
|
value, err := raft1.Get(context.Background(), fmt.Sprintf("key-%d", i))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if value != nil {
|
|
t.Fatal("didn't remove data")
|
|
}
|
|
}
|
|
{
|
|
value, err := raft2.Get(context.Background(), fmt.Sprintf("key-%d", i))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if value != nil {
|
|
t.Fatal("didn't remove data")
|
|
}
|
|
}
|
|
}
|
|
|
|
time.Sleep(10 * time.Second)
|
|
compareFSMs(t, raft1.fsm, raft2.fsm)
|
|
}
|