394 lines
9.7 KiB
Go
394 lines
9.7 KiB
Go
package consul
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/consul/structs"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
"github.com/hashicorp/consul/testutil/retry"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
)
|
|
|
|
// verifySnapshot is a helper that does a snapshot and restore.
|
|
func verifySnapshot(t *testing.T, s *Server, dc, token string) {
|
|
codec := rpcClient(t, s)
|
|
defer codec.Close()
|
|
|
|
// Set a key to a before value.
|
|
{
|
|
args := structs.KVSRequest{
|
|
Datacenter: dc,
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "test",
|
|
Value: []byte("hello"),
|
|
},
|
|
WriteRequest: structs.WriteRequest{
|
|
Token: token,
|
|
},
|
|
}
|
|
var out bool
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Take a snapshot.
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: dc,
|
|
Token: token,
|
|
Op: structs.SnapshotSave,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer snap.Close()
|
|
|
|
// Read back the before value.
|
|
{
|
|
getR := structs.KeyRequest{
|
|
Datacenter: dc,
|
|
Key: "test",
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: token,
|
|
},
|
|
}
|
|
var dirent structs.IndexedDirEntries
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(dirent.Entries) != 1 {
|
|
t.Fatalf("Bad: %v", dirent)
|
|
}
|
|
d := dirent.Entries[0]
|
|
if string(d.Value) != "hello" {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
}
|
|
|
|
// Set a key to an after value.
|
|
{
|
|
args := structs.KVSRequest{
|
|
Datacenter: dc,
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "test",
|
|
Value: []byte("goodbye"),
|
|
},
|
|
WriteRequest: structs.WriteRequest{
|
|
Token: token,
|
|
},
|
|
}
|
|
var out bool
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Read back the before value.
|
|
{
|
|
getR := structs.KeyRequest{
|
|
Datacenter: dc,
|
|
Key: "test",
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: token,
|
|
},
|
|
}
|
|
var dirent structs.IndexedDirEntries
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(dirent.Entries) != 1 {
|
|
t.Fatalf("Bad: %v", dirent)
|
|
}
|
|
d := dirent.Entries[0]
|
|
if string(d.Value) != "goodbye" {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
}
|
|
|
|
// Restore the snapshot.
|
|
args.Op = structs.SnapshotRestore
|
|
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
|
|
&args, snap, &reply)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer restore.Close()
|
|
|
|
// Read back the before value post-snapshot.
|
|
{
|
|
getR := structs.KeyRequest{
|
|
Datacenter: dc,
|
|
Key: "test",
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: token,
|
|
},
|
|
}
|
|
var dirent structs.IndexedDirEntries
|
|
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(dirent.Entries) != 1 {
|
|
t.Fatalf("Bad: %v", dirent)
|
|
}
|
|
d := dirent.Entries[0]
|
|
if string(d.Value) != "hello" {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSnapshot(t *testing.T) {
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
verifySnapshot(t, s1, "dc1", "")
|
|
}
|
|
|
|
func TestSnapshot_LeaderState(t *testing.T) {
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
// Make a before session.
|
|
var before string
|
|
{
|
|
args := structs.SessionRequest{
|
|
Datacenter: s1.config.Datacenter,
|
|
Op: structs.SessionCreate,
|
|
Session: structs.Session{
|
|
Node: s1.config.NodeName,
|
|
TTL: "60s",
|
|
},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &before); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Take a snapshot.
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: s1.config.Datacenter,
|
|
Op: structs.SnapshotSave,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer snap.Close()
|
|
|
|
// Make an after session.
|
|
var after string
|
|
{
|
|
args := structs.SessionRequest{
|
|
Datacenter: s1.config.Datacenter,
|
|
Op: structs.SessionCreate,
|
|
Session: structs.Session{
|
|
Node: s1.config.NodeName,
|
|
TTL: "60s",
|
|
},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &args, &after); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure the leader has timers setup.
|
|
if _, ok := s1.sessionTimers[before]; !ok {
|
|
t.Fatalf("missing session timer")
|
|
}
|
|
if _, ok := s1.sessionTimers[after]; !ok {
|
|
t.Fatalf("missing session timer")
|
|
}
|
|
|
|
// Restore the snapshot.
|
|
args.Op = structs.SnapshotRestore
|
|
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
|
|
&args, snap, &reply)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer restore.Close()
|
|
|
|
// Make sure the before time is still there, and that the after timer
|
|
// got reverted. This proves we fully cycled the leader state.
|
|
if _, ok := s1.sessionTimers[before]; !ok {
|
|
t.Fatalf("missing session timer")
|
|
}
|
|
if _, ok := s1.sessionTimers[after]; ok {
|
|
t.Fatalf("unexpected session timer")
|
|
}
|
|
}
|
|
|
|
func TestSnapshot_ACLDeny(t *testing.T) {
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Take a snapshot.
|
|
func() {
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.SnapshotSave,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Restore a snapshot.
|
|
func() {
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.SnapshotRestore,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}()
|
|
|
|
// With the token in place everything should go through.
|
|
verifySnapshot(t, s1, "dc1", "root")
|
|
}
|
|
|
|
func TestSnapshot_Forward_Leader(t *testing.T) {
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = true
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
})
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
// Try to join.
|
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
|
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
|
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
|
|
|
// Run against the leader and the follower to ensure we forward.
|
|
for _, s := range []*Server{s1, s2} {
|
|
verifySnapshot(t, s, "dc1", "")
|
|
verifySnapshot(t, s, "dc1", "")
|
|
}
|
|
}
|
|
|
|
func TestSnapshot_Forward_Datacenter(t *testing.T) {
|
|
dir1, s1 := testServerDC(t, "dc1")
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerDC(t, "dc2")
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
|
|
|
// Try to WAN join.
|
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
|
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
|
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
retry.Run(t, func(r *retry.R) {
|
|
if got, want := len(s1.WANMembers()), 2; got < want {
|
|
r.Fatalf("got %d WAN members want at least %d", got, want)
|
|
}
|
|
})
|
|
|
|
// Run a snapshot from each server locally and remotely to ensure we
|
|
// forward.
|
|
for _, s := range []*Server{s1, s2} {
|
|
verifySnapshot(t, s, "dc1", "")
|
|
verifySnapshot(t, s, "dc2", "")
|
|
}
|
|
}
|
|
|
|
func TestSnapshot_AllowStale(t *testing.T) {
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
|
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
c.Bootstrap = false
|
|
})
|
|
defer os.RemoveAll(dir2)
|
|
defer s2.Shutdown()
|
|
|
|
// Run against the servers which aren't haven't been set up to establish
|
|
// a leader and make sure we get a no leader error.
|
|
for _, s := range []*Server{s1, s2} {
|
|
// Take a snapshot.
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
Op: structs.SnapshotSave,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Run in stale mode and make sure we get an error from Raft (snapshot
|
|
// was attempted), and not a no leader error.
|
|
for _, s := range []*Server{s1, s2} {
|
|
// Take a snapshot.
|
|
args := structs.SnapshotRequest{
|
|
Datacenter: s.config.Datacenter,
|
|
AllowStale: true,
|
|
Op: structs.SnapshotSave,
|
|
}
|
|
var reply structs.SnapshotResponse
|
|
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr,
|
|
&args, bytes.NewReader([]byte("")), &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
}
|