Adds a get-tree verb to KV transaction operations.

This commit is contained in:
James Phillips 2016-05-13 16:57:39 -07:00
parent 0f94a7a326
commit a11f32a1da
9 changed files with 129 additions and 15 deletions

View File

@ -35,6 +35,7 @@ const (
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVGetTree = "get-tree"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)

View File

@ -244,6 +244,12 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
"Verb": "get",
"Key": "key"
}
},
{
"KV": {
"Verb": "get-tree",
"Key": "key"
}
}
]
`))
@ -274,9 +280,6 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(txnResp.Results) != 1 {
t.Fatalf("bad: %v", txnResp)
}
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
@ -293,6 +296,19 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
QueryMeta: structs.QueryMeta{

View File

@ -31,7 +31,7 @@ func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.Dir
return false, permissionDeniedErr
}
case structs.KVSGet:
case structs.KVSGet, structs.KVSGetTree:
// Filtering for GETs is done on the output side.
case structs.KVSCheckSession, structs.KVSCheckIndex:

View File

@ -151,6 +151,12 @@ func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error)
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsListTxn(tx, prefix)
}
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.DirEntries, error) {
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")

View File

@ -60,6 +60,18 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
}
case structs.KVSGetTree:
var entries structs.DirEntries
_, entries, err = s.kvsListTxn(tx, op.DirEnt.Key)
if err == nil {
results := make(structs.TxnResults, 0, len(entries))
for _, e := range entries {
result := structs.TxnResult{KV: e}
results = append(results, &result)
}
return results, nil
}
case structs.KVSCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)

View File

@ -27,6 +27,14 @@ func TestStateStore_Txn_KVS(t *testing.T) {
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
@ -157,6 +165,26 @@ func TestStateStore_Txn_KVS(t *testing.T) {
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
Value: []byte("baz"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
Value: []byte("zip"),
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/new",
@ -517,6 +545,14 @@ func TestStateStore_Txn_KVS_RO(t *testing.T) {
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
@ -550,12 +586,29 @@ func TestStateStore_Txn_KVS_RO(t *testing.T) {
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
Value: []byte("baz"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
Value: []byte("zip"),
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo",

View File

@ -538,6 +538,7 @@ const (
// The following operations are only available inside of atomic
// transactions via the Txn request.
KVSGet = "get" // Read the key during the transaction.
KVSGetTree = "get-tree" // Read all keys with the given prefix during the transaction.
KVSCheckSession = "check-session" // Check the session holds the key.
KVSCheckIndex = "check-index" // Check the modify index of the key.
)
@ -545,7 +546,7 @@ const (
// IsWrite returns true if the given operation alters the state store.
func (op KVSOp) IsWrite() bool {
switch op {
case KVSGet, KVSCheckSession, KVSCheckIndex:
case KVSGet, KVSGetTree, KVSCheckSession, KVSCheckIndex:
return false
default:

View File

@ -209,6 +209,14 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
@ -239,7 +247,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
var expected structs.TxnResponse
for i, op := range arg.Ops {
switch op.KV.Verb {
case structs.KVSGet:
case structs.KVSGet, structs.KVSGetTree:
// These get filtered but won't result in an error.
default:
@ -455,6 +463,14 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
@ -489,7 +505,7 @@ func TestTxn_Read_ACLDeny(t *testing.T) {
}
for i, op := range arg.Ops {
switch op.KV.Verb {
case structs.KVSGet:
case structs.KVSGet, structs.KVSGetTree:
// These get filtered but won't result in an error.
default:

View File

@ -262,7 +262,7 @@ operation ("X" means a field is required and "O" means it is optional):
</tr>
<tr>
<td>unlock</td>
<td>Unlocks the `Key` with the given `Session`. The `Key` will only release the lock if the `Session` is valid and currently has it locked..</td>
<td>Unlocks the `Key` with the given `Session`. The `Key` will only release the lock if the `Session` is valid and currently has it locked.</td>
<td align="center">X</td>
<td align="center">X</td>
<td align="center">O</td>
@ -271,7 +271,16 @@ operation ("X" means a field is required and "O" means it is optional):
</tr>
<tr>
<td>get</td>
<td>Gets the `Key` during the transaction. This fails the transaction if the `Key` doesn't exist.</td>
<td>Gets the `Key` during the transaction. This fails the transaction if the `Key` doesn't exist. The key may not be present in the results if ACLs do not permit it to be read.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>get-tree</td>
<td>Gets all keys with a prefix of `Key` during the transaction. This does not fail the transaction if the `Key` doesn't exist. Not all keys may be present in the results if ACLs do not permit them to be read.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
@ -355,9 +364,9 @@ back. If either of these status codes are returned, the response will look like
```
`Results` has entries for some operations if the transaction was successful. To save
space, the `Value` will be `null` for any `Verb` other than "get". Like the `/v1/kv/<key>`
endpoint, `Value` will be Base64-encoded if it is present. Also, no result entries will be
added for verbs that delete keys.
space, the `Value` will be `null` for any `Verb` other than "get" or "get-tree". Like
the `/v1/kv/<key>` endpoint, `Value` will be Base64-encoded if it is present. Also,
no result entries will be added for verbs that delete keys.
`Errors` has entries describing which operations failed if the transaction was rolled
back. The `OpIndex` gives the index of the failed operation in the transaction, and