open-consul/agent/consul/txn_endpoint_test.go

590 lines
12 KiB
Go

package consul
import (
"bytes"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestTxn_CheckNotExists(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
apply := func(arg *structs.TxnRequest) (*structs.TxnResponse, error) {
out := new(structs.TxnResponse)
err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", arg, out)
return out, err
}
checkKeyNotExists := &structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
{
KV: &structs.TxnKVOp{
Verb: api.KVCheckNotExists,
DirEnt: structs.DirEntry{Key: "test"},
},
},
},
}
createKey := &structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
{
KV: &structs.TxnKVOp{
Verb: api.KVSet,
DirEnt: structs.DirEntry{Key: "test"},
},
},
},
}
if _, err := apply(checkKeyNotExists); err != nil {
t.Fatalf("testing for non-existent key failed: %s", err)
}
if _, err := apply(createKey); err != nil {
t.Fatalf("creating new key failed: %s", err)
}
out, err := apply(checkKeyNotExists)
if err != nil || out == nil || len(out.Errors) != 1 || out.Errors[0].Error() != `op 0: key "test" exists` {
t.Fatalf("testing for existent key failed: %#v", out)
}
}
func TestTxn_Apply(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
},
}
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet(nil, "test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
if d.Flags != 42 ||
!bytes.Equal(d.Value, []byte("test")) {
t.Fatalf("bad: %v", d)
}
// Verify the transaction's return value.
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Flags: 42,
Value: nil,
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_ACLDeny(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "nope",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVSet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVDelete,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVDeleteCAS,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVDeleteTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCAS,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVLock,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVUnlock,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCheckNotExists,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
},
WriteRequest: structs.WriteRequest{
Token: id,
},
}
var out structs.TxnResponse
if err := s1.RPC("Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
var expected structs.TxnResponse
for i, op := range arg.Ops {
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_LockDelay(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validID := session.ID
// Make a lock request via an atomic transaction.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validID,
},
},
},
},
}
{
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 0 ||
len(out.Errors) != 1 ||
out.Errors[0].OpIndex != 0 ||
!strings.Contains(out.Errors[0].What, "due to lock delay") {
t.Fatalf("bad: %v", out)
}
}
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire.
{
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 1 ||
len(out.Errors) != 0 ||
out.Results[0].KV.LockIndex != 2 {
t.Fatalf("bad: %v", out)
}
}
}
func TestTxn_Read(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Read_ACLDeny(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "nope",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
},
QueryOptions: structs.QueryOptions{
Token: id,
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
for i, op := range arg.Ops {
switch op.KV.Verb {
case api.KVGet, api.KVGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{
OpIndex: i,
What: acl.ErrPermissionDenied.Error(),
})
}
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}