Refactors TxnRequest/TxnResponse into a form that will allow non-KV ops.

This isn't needed/used yet, but it's a good hook to get in there so we
can add more atomic operations in the future. The Go API hides this detail
so that feels like a KV-specific API. The implications on the REST API are
pretty minimal.
This commit is contained in:
James Phillips 2016-05-11 01:35:27 -07:00
parent d980cbcd9d
commit dc662f7e35
15 changed files with 1270 additions and 904 deletions

108
api/kv.go
View file

@ -41,7 +41,7 @@ const (
// KVTxnOp defines a single operation inside a transaction. // KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct { type KVTxnOp struct {
Op string Verb string
Key string Key string
Value []byte Value []byte
Flags uint64 Flags uint64
@ -49,23 +49,14 @@ type KVTxnOp struct {
Session string Session string
} }
// KVTxn defines a set of operations to be performed inside a single transaction. // KVTxnOps defines a set of operations to be performed inside a single
type KVTxn []KVTxnOp
// KVTxnError is used to return information about an operation in a
// transaction. // transaction.
type KVTxnError struct { type KVTxnOps []*KVTxnOp
OpIndex int
What string
}
// KVTxnErrors is a list of KVTxnError objects. // KVTxnResponse has the outcome of a transaction.
type KVTxnErrors []KVTxnError type KVTxnResponse struct {
Results []*KVPair
// KVTxnResult is used to return the results of a transaction. Errors TxnErrors
type KVTxnResult struct {
Errors KVTxnErrors
Results KVPairs
} }
// KV is used to manipulate the K/V API // KV is used to manipulate the K/V API
@ -284,43 +275,84 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
return res, qm, nil return res, qm, nil
} }
// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KVS *KVTxnOp
}
// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp
// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KVS *struct{ DirEnt *KVPair }
}
// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult
// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}
// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError
// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// Txn is used to apply multiple KV operations in a single, atomic transaction. // Txn is used to apply multiple KV operations in a single, atomic transaction.
// Note that Go will perform the required base64 encoding on the values // Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a // automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure // list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied // to define operations. If any operation fails, none of the changes are applied
// to the state store. // to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
// //
// Here's an example: // Here's an example:
// //
// txn := KVTxn{ // ops := KVTxnOps{
// KVTxnOp{ // &KVTxnOp{
// Op: KVLock, // Verb: KVLock,
// Key: "test/lock", // Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", // Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"), // Value: []byte("hello"),
// }, // },
// KVTxnOp{ // &KVTxnOp{
// Op: KVGet, // Verb: KVGet,
// Key: "another/key", // Key: "another/key",
// }, // },
// } // }
// ok, result, _, err := kv.Txn(&txn, nil) // ok, response, _, err := kv.Txn(&ops, nil)
// //
// If there is a problem making the transaction request then an error will be // If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded // returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The result is a structured return value which // or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries // will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save // for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation // space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have // is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error // entries referencing the index of the operation that failed along with an error
// message. // message.
func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) { func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn") r := k.c.newRequest("PUT", "/v1/txn")
r.setWriteOptions(q) r.setWriteOptions(q)
r.obj = txn // Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvsOp := range txn {
ops = append(ops, &TxnOp{KVS: kvsOp})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r) rtt, resp, err := k.c.doRequest(r)
if err != nil { if err != nil {
return false, nil, nil, err return false, nil, nil, err
@ -331,11 +363,23 @@ func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, e
wm.RequestTime = rtt wm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var result KVTxnResult var txnResp TxnResponse
if err := decodeBody(resp, &result); err != nil { if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err return false, nil, nil, err
} }
return resp.StatusCode == http.StatusOK, &result, wm, nil
// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
var entry *KVPair
if result.KVS != nil {
entry = result.KVS.DirEnt
}
kvResp.Results = append(kvResp.Results, entry)
}
return resp.StatusCode == http.StatusOK, &kvResp, wm, nil
} }
var buf bytes.Buffer var buf bytes.Buffer

View file

@ -466,18 +466,18 @@ func TestClient_Txn(t *testing.T) {
// session. // session.
key := testKey() key := testKey()
value := []byte("test") value := []byte("test")
txn := KVTxn{ txn := KVTxnOps{
KVTxnOp{ &KVTxnOp{
Op: KVLock, Verb: KVLock,
Key: key, Key: key,
Value: value, Value: value,
}, },
KVTxnOp{ &KVTxnOp{
Op: KVGet, Verb: KVGet,
Key: key, Key: key,
}, },
} }
ok, ret, _, err := kv.Txn(&txn, nil) ok, ret, _, err := kv.Txn(txn, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} else if ok { } else if ok {
@ -494,7 +494,7 @@ func TestClient_Txn(t *testing.T) {
// Now poke in a real session and try again. // Now poke in a real session and try again.
txn[0].Session = id txn[0].Session = id
ok, ret, _, err = kv.Txn(&txn, nil) ok, ret, _, err = kv.Txn(txn, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} else if !ok { } else if !ok {

View file

@ -10,14 +10,14 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
// fixupValues takes the raw decoded JSON and base64 decodes all the values, // fixupKVSOps takes the raw decoded JSON and base64 decodes all the KVS values,
// replacing them with byte arrays with the data. // replacing them with byte arrays with the data.
func fixupValues(raw interface{}) error { func fixupKVSOps(raw interface{}) error {
// decodeValue decodes the value member of the given operation. // decodeValue decodes the value member of the given operation.
decodeValue := func(rawOp interface{}) error { decodeValue := func(rawKVS interface{}) error {
rawMap, ok := rawOp.(map[string]interface{}) rawMap, ok := rawKVS.(map[string]interface{})
if !ok { if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp) return fmt.Errorf("unexpected raw KVS type: %T", rawKVS)
} }
for k, v := range rawMap { for k, v := range rawMap {
switch strings.ToLower(k) { switch strings.ToLower(k) {
@ -41,7 +41,25 @@ func fixupValues(raw interface{}) error {
return nil return nil
} }
} }
return nil
}
// fixupKVSOp looks for non-nil KVS operations and passes them on for
// value conversion.
fixupKVSOp := func(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "kvs":
if v == nil {
return nil
}
return decodeValue(v)
}
}
return nil return nil
} }
@ -50,11 +68,10 @@ func fixupValues(raw interface{}) error {
return fmt.Errorf("unexpected raw type: %t", raw) return fmt.Errorf("unexpected raw type: %t", raw)
} }
for _, rawOp := range rawSlice { for _, rawOp := range rawSlice {
if err := decodeValue(rawOp); err != nil { if err := fixupKVSOp(rawOp); err != nil {
return err return err
} }
} }
return nil return nil
} }
@ -66,44 +83,53 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
return nil, nil return nil, nil
} }
var args structs.KVSAtomicRequest var args structs.TxnRequest
s.parseDC(req, &args.Datacenter) s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token) s.parseToken(req, &args.Token)
// Note the body is in API format, and not the RPC format. If we can't // Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to // decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation. // associate the error with a given operation.
var txn api.KVTxn var ops api.TxnOps
if err := decodeBody(req, &txn, fixupValues); err != nil { if err := decodeBody(req, &ops, fixupKVSOps); err != nil {
resp.WriteHeader(http.StatusBadRequest) resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err))) resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, nil return nil, nil
} }
// Convert the API format into the RPC format. Note that fixupValues // Convert the KVS API format into the RPC format. Note that fixupKVSOps
// above will have already converted the base64 encoded strings into // above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over. // byte arrays so we can assign right over.
for _, in := range txn { for _, in := range ops {
// TODO @slackpad - Verify the size here, or move that down into if in.KVS != nil {
// the endpoint. if size := len(in.KVS.Value); size > maxKVSize {
out := &structs.KVSAtomicOp{ resp.WriteHeader(http.StatusRequestEntityTooLarge)
Op: structs.KVSOp(in.Op), resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)",
DirEnt: structs.DirEntry{ in.KVS.Key, size, maxKVSize)))
Key: in.Key, return nil, nil
Value: in.Value, }
Flags: in.Flags,
Session: in.Session, out := &structs.TxnOp{
RaftIndex: structs.RaftIndex{ KVS: &structs.TxnKVSOp{
ModifyIndex: in.Index, Verb: structs.KVSOp(in.KVS.Verb),
DirEnt: structs.DirEntry{
Key: in.KVS.Key,
Value: in.KVS.Value,
Flags: in.KVS.Flags,
Session: in.KVS.Session,
RaftIndex: structs.RaftIndex{
ModifyIndex: in.KVS.Index,
},
},
}, },
}, }
args.Ops = append(args.Ops, out)
} }
args.Ops = append(args.Ops, out)
} }
// Make the request and return a conflict status if there were errors // Make the request and return a conflict status if there were errors
// reported from the transaction. // reported from the transaction.
var reply structs.KVSAtomicResponse var reply structs.TxnResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err return nil, err
} }

View file

@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
@ -50,6 +51,34 @@ func TestTxnEndpoint_Bad_Method(t *testing.T) {
}) })
} }
func TestTxnEndpoint_Bad_Size(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KVS": {
"Verb": "set",
"Key": "key",
"Value": %q
}
}
]
`, strings.Repeat("bad", 2*maxKVSize))))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 413 {
t.Fatalf("expected 413, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_KVS_Actions(t *testing.T) { func TestTxnEndpoint_KVS_Actions(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
// Make sure all incoming fields get converted properly to the internal // Make sure all incoming fields get converted properly to the internal
@ -60,15 +89,19 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[ [
{ {
"Op": "lock", "KVS": {
"Key": "key", "Verb": "lock",
"Value": "aGVsbG8gd29ybGQ=", "Key": "key",
"Flags": 23, "Value": "aGVsbG8gd29ybGQ=",
"Session": %q "Flags": 23,
"Session": %q
}
}, },
{ {
"Op": "get", "KVS": {
"Key": "key" "Verb": "get",
"Key": "key"
}
} }
] ]
`, id))) `, id)))
@ -86,42 +119,50 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
t.Fatalf("expected 200, got %d", resp.Code) t.Fatalf("expected 200, got %d", resp.Code)
} }
atomic, ok := obj.(structs.KVSAtomicResponse) txnResp, ok := obj.(structs.TxnResponse)
if !ok { if !ok {
t.Fatalf("bad type: %T", obj) t.Fatalf("bad type: %T", obj)
} }
if len(atomic.Results) != 2 { if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", atomic) t.Fatalf("bad: %v", txnResp)
} }
index = atomic.Results[0].ModifyIndex index = txnResp.Results[0].KVS.DirEnt.ModifyIndex
expected := structs.KVSAtomicResponse{ expected := structs.TxnResponse{
Results: structs.DirEntries{ Results: structs.TxnResults{
&structs.DirEntry{ &structs.TxnResult{
Key: "key", KVS: &structs.TxnKVSResult{
Value: nil, DirEnt: &structs.DirEntry{
Flags: 23, Key: "key",
Session: id, Value: nil,
LockIndex: 1, Flags: 23,
RaftIndex: structs.RaftIndex{ Session: id,
CreateIndex: index, LockIndex: 1,
ModifyIndex: index, RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
}, },
}, },
&structs.DirEntry{ &structs.TxnResult{
Key: "key", KVS: &structs.TxnKVSResult{
Value: []byte("hello world"), DirEnt: &structs.DirEntry{
Flags: 23, Key: "key",
Session: id, Value: []byte("hello world"),
LockIndex: 1, Flags: 23,
RaftIndex: structs.RaftIndex{ Session: id,
CreateIndex: index, LockIndex: 1,
ModifyIndex: index, RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
}, },
}, },
}, },
} }
if !reflect.DeepEqual(atomic, expected) { if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", atomic) t.Fatalf("bad: %v", txnResp)
} }
} }
@ -131,14 +172,18 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[ [
{ {
"Op": "cas", "KVS": {
"Key": "key", "Verb": "cas",
"Value": "Z29vZGJ5ZSB3b3JsZA==", "Key": "key",
"Index": %d "Value": "Z29vZGJ5ZSB3b3JsZA==",
"Index": %d
}
}, },
{ {
"Op": "get", "KVS": {
"Key": "key" "Verb": "get",
"Key": "key"
}
} }
] ]
`, index))) `, index)))
@ -156,38 +201,46 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
t.Fatalf("expected 200, got %d", resp.Code) t.Fatalf("expected 200, got %d", resp.Code)
} }
atomic, ok := obj.(structs.KVSAtomicResponse) txnResp, ok := obj.(structs.TxnResponse)
if !ok { if !ok {
t.Fatalf("bad type: %T", obj) t.Fatalf("bad type: %T", obj)
} }
if len(atomic.Results) != 2 { if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", atomic) t.Fatalf("bad: %v", txnResp)
} }
modIndex := atomic.Results[0].ModifyIndex modIndex := txnResp.Results[0].KVS.DirEnt.ModifyIndex
expected := structs.KVSAtomicResponse{ expected := structs.TxnResponse{
Results: structs.DirEntries{ Results: structs.TxnResults{
&structs.DirEntry{ &structs.TxnResult{
Key: "key", KVS: &structs.TxnKVSResult{
Value: nil, DirEnt: &structs.DirEntry{
Session: id, Key: "key",
RaftIndex: structs.RaftIndex{ Value: nil,
CreateIndex: index, Session: id,
ModifyIndex: modIndex, RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
}, },
}, },
&structs.DirEntry{ &structs.TxnResult{
Key: "key", KVS: &structs.TxnKVSResult{
Value: []byte("goodbye world"), DirEnt: &structs.DirEntry{
Session: id, Key: "key",
RaftIndex: structs.RaftIndex{ Value: []byte("goodbye world"),
CreateIndex: index, Session: id,
ModifyIndex: modIndex, RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
}, },
}, },
}, },
} }
if !reflect.DeepEqual(atomic, expected) { if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", atomic) t.Fatalf("bad: %v", txnResp)
} }
} }
}) })
@ -197,14 +250,18 @@ func TestTxnEndpoint_KVS_Actions(t *testing.T) {
buf := bytes.NewBuffer([]byte(` buf := bytes.NewBuffer([]byte(`
[ [
{ {
"Op": "lock", "KVS": {
"Key": "key", "Verb": "lock",
"Value": "aGVsbG8gd29ybGQ=", "Key": "key",
"Session": "nope" "Value": "aGVsbG8gd29ybGQ=",
"Session": "nope"
}
}, },
{ {
"Op": "get", "KVS": {
"Key": "key" "Verb": "get",
"Key": "key"
}
} }
] ]
`)) `))

View file

@ -83,8 +83,6 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyDeregister(buf[1:], log.Index) return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType: case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index) return c.applyKVSOperation(buf[1:], log.Index)
case structs.KVSAtomicRequestType:
return c.applyKVSAtomicOperation(buf[1:], log.Index)
case structs.SessionRequestType: case structs.SessionRequestType:
return c.applySessionOperation(buf[1:], log.Index) return c.applySessionOperation(buf[1:], log.Index)
case structs.ACLRequestType: case structs.ACLRequestType:
@ -95,6 +93,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyCoordinateBatchUpdate(buf[1:], log.Index) return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
case structs.PreparedQueryRequestType: case structs.PreparedQueryRequestType:
return c.applyPreparedQueryOperation(buf[1:], log.Index) return c.applyPreparedQueryOperation(buf[1:], log.Index)
case structs.TxnRequestType:
return c.applyTxn(buf[1:], log.Index)
default: default:
if ignoreUnknown { if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -195,16 +195,6 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
} }
} }
func (c *consulFSM) applyKVSAtomicOperation(buf []byte, index uint64) interface{} {
var req structs.KVSAtomicRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs-atomic"}, time.Now())
entries, errors := c.state.KVSAtomicUpdate(index, req.Ops)
return structs.KVSAtomicResponse{errors, entries}
}
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} { func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
@ -298,6 +288,16 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
} }
} }
func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
results, errors := c.state.TxnRun(index, req.Ops)
return structs.TxnResponse{results, errors}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) { defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))

View file

@ -1241,6 +1241,47 @@ func TestFSM_TombstoneReap(t *testing.T) {
} }
} }
func TestFSM_Txn(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set a key using a transaction.
req := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
},
},
},
}
buf, err := structs.Encode(structs.TxnRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(structs.TxnResponse); !ok {
t.Fatalf("bad response type: %T", resp)
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
}
func TestFSM_IgnoreUnknown(t *testing.T) { func TestFSM_IgnoreUnknown(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr) fsm, err := NewFSM(nil, os.Stderr)
if err != nil { if err != nil {

View file

@ -31,9 +31,9 @@ func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.Dir
return false, permissionDeniedErr return false, permissionDeniedErr
} }
case structs.KVSAtomicGet, case structs.KVSGet,
structs.KVSAtomicCheckSession, structs.KVSCheckSession,
structs.KVSAtomicCheckIndex: structs.KVSCheckIndex:
if !acl.KeyRead(dirEnt.Key) { if !acl.KeyRead(dirEnt.Key) {
return false, permissionDeniedErr return false, permissionDeniedErr
} }

View file

@ -579,103 +579,6 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
return true, nil return true, nil
} }
// KVSAtomicUpdate performs a series of updates atomically, all inside a single
// transaction that only succeeds if all the operations succeed.
func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.KVSAtomicErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
// Dispatch all of the operations inside the transaction.
entries := make(structs.DirEntries, 0, len(ops))
errors := make(structs.KVSAtomicErrors, 0, len(ops))
for i, op := range ops {
var entry *structs.DirEntry
var err error
switch op.Op {
case structs.KVSSet:
entry = &op.DirEnt
err = s.kvsSetTxn(tx, idx, entry, false)
case structs.KVSDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
case structs.KVSDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
case structs.KVSCAS:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsSetCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSLock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsLockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
}
case structs.KVSUnlock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsUnlockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
}
case structs.KVSAtomicGet:
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
case structs.KVSAtomicCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
case structs.KVSAtomicCheckIndex:
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
default:
err = fmt.Errorf("unknown operation %q", op.Op)
}
// Accumulate the entries. For a GET we keep the value, otherwise
// we clone and blank out the value (we have to clone so we don't
// modify the entry being used by the state store).
if entry != nil {
if op.Op == structs.KVSAtomicGet {
entries = append(entries, entry)
} else {
clone := entry.Clone()
clone.Value = nil
entries = append(entries, clone)
}
} else {
entries = append(entries, nil)
}
// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.KVSAtomicError{i, err.Error()})
}
}
if len(errors) > 0 {
return nil, errors
}
tx.Commit()
return entries, nil
}
// kvsCheckSessionTxn checks to see if the given session matches the current // kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key. // entry for a key.
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) { func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {

View file

@ -1214,453 +1214,6 @@ func TestStateStore_KVSUnlock(t *testing.T) {
} }
} }
func TestStateStore_KVS_Atomic(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 5, "foo/update", "stale")
// Make a real session.
testRegisterNode(t, s, 6, "node1")
session := testUUID()
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// Set up a transaction that hits every operation.
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/zorp",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo/delete",
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "not/there",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 8,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/lock",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: "",
},
},
}
entries, errors := s.KVSAtomicUpdate(8, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
if len(entries) != len(ops) {
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
}
// Make sure the response looks as expected.
expected := structs.DirEntries{
&structs.DirEntry{
Key: "foo/new",
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
nil, // delete
nil, // delete tree
nil, // delete CAS
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
nil, // get on not/there
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
nil, // get on foo/lock before it's created
&structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
}
if len(entries) != len(expected) {
t.Fatalf("bad: %v", entries)
}
for i, _ := range entries {
if !reflect.DeepEqual(entries[i], expected[i]) {
t.Fatalf("bad %d: %v != %v", i, *(entries[i]), *(expected[i]))
}
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
expected = structs.DirEntries{
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
}
if len(actual) != len(expected) {
t.Fatalf("bad len: %d != %d", len(actual), len(expected))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], expected[i]) {
t.Fatalf("bad %d: %v != %v", i, *(actual[i]), *(expected[i]))
}
}
}
func TestStateStore_KVS_Atomic_Rollback(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale")
testRegisterNode(t, s, 3, "node1")
session := testUUID()
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
bogus := testUUID()
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}
if idx != 5 {
t.Fatalf("bad index (%s): %d", desc, idx)
}
// Make sure it looks as expected.
expected := structs.DirEntries{
&structs.DirEntry{
Key: "foo/delete",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.DirEntry{
Key: "foo/lock",
Value: []byte("foo"),
LockIndex: 1,
Session: session,
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
if len(actual) != len(expected) {
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(expected))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], expected[i]) {
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(expected[i]))
}
}
}
verifyStateStore("initial")
// Set up a transaction that fails every operation.
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/lock",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
&structs.KVSAtomicOp{
Op: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
}
entries, errors := s.KVSAtomicUpdate(7, ops)
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
if len(entries) != 0 {
t.Fatalf("bad len: %d != 0", len(entries))
}
verifyStateStore("after")
// Make sure the errors look reasonable.
expected := []string{
"index is stale",
"lock is already held",
"lock isn't held, or is held by another session",
"current session",
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown operation",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %d: %v", i, errors[i].Error())
}
}
}
func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -1893,73 +1446,6 @@ func TestStateStore_KVS_Watches(t *testing.T) {
}) })
}) })
}) })
// Verify that a basic transaction triggers multiple watches. We call
// the same underlying methods that are called above so this is more
// of a sanity check.
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two"),
},
},
}
entries, errors := s.KVSAtomicUpdate(15, ops)
if len(entries) != len(ops) {
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
}
if len(errors) != 0 {
t.Fatalf("bad len: %d != 0", len(errors))
}
})
})
// Verify that a rolled back transaction doesn't trigger any watches.
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one-updated"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two-updated"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "multi/nope",
Value: []byte("nope"),
},
},
}
entries, errors := s.KVSAtomicUpdate(16, ops)
if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors))
}
if len(entries) != 0 {
t.Fatalf("bad len: %d != 0", len(entries))
}
})
})
} }
func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) { func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {

123
consul/state/txn.go Normal file
View file

@ -0,0 +1,123 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVSOp) (*structs.TxnKVSResult, error) {
var entry *structs.DirEntry
var err error
switch op.Verb {
case structs.KVSSet:
entry = &op.DirEnt
err = s.kvsSetTxn(tx, idx, entry, false)
case structs.KVSDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
case structs.KVSDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
case structs.KVSCAS:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsSetCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSLock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsLockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
}
case structs.KVSUnlock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsUnlockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
}
case structs.KVSGet:
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
case structs.KVSCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
case structs.KVSCheckIndex:
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
default:
err = fmt.Errorf("unknown KVS verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == structs.KVSGet {
return &structs.TxnKVSResult{entry}, nil
}
clone := entry.Clone()
clone.Value = nil
return &structs.TxnKVSResult{clone}, nil
}
return nil, nil
}
// TxnRun tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back.
func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
// Dispatch all of the operations inside the transaction.
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
var result structs.TxnResult
var err error
// Dispatch based on the type of operation.
if op.KVS != nil {
result.KVS, err = s.txnKVS(tx, idx, op.KVS)
} else {
err = fmt.Errorf("no operation specified")
}
// Accumulate the results.
results = append(results, &result)
// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.TxnError{i, err.Error()})
}
}
if len(errors) > 0 {
return nil, errors
}
tx.Commit()
return results, nil
}

624
consul/state/txn_test.go Normal file
View file

@ -0,0 +1,624 @@
package state
import (
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_Txn_KVS(t *testing.T) {
s := testStateStore(t)
// Create kvs results in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 5, "foo/update", "stale")
// Make a real session.
testRegisterNode(t, s, 6, "node1")
session := testUUID()
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/zorp",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo/delete",
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "not/there",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 8,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo/lock",
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: "",
},
},
},
}
results, errors := s.TxnRun(8, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/new",
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{}, // delete
&structs.TxnResult{}, // delete tree
&structs.TxnResult{}, // delete CAS
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{}, // get on not/there
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{}, // get on foo/lock before it's created
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
},
&structs.TxnResult{
KVS: &structs.TxnKVSResult{
DirEnt: &structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
},
}
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i, _ := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
}
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
entries := structs.DirEntries{
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
}
if len(actual) != len(entries) {
t.Fatalf("bad len: %d != %d", len(actual), len(entries))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad %d", i)
}
}
}
func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
s := testStateStore(t)
// Create kvs results in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale")
testRegisterNode(t, s, 3, "node1")
session := testUUID()
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
bogus := testUUID()
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}
if idx != 5 {
t.Fatalf("bad index (%s): %d", desc, idx)
}
// Make sure it looks as expected.
entries := structs.DirEntries{
&structs.DirEntry{
Key: "foo/delete",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.DirEntry{
Key: "foo/lock",
Value: []byte("foo"),
LockIndex: 1,
Session: session,
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
if len(actual) != len(entries) {
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(entries[i]))
}
}
}
verifyStateStore("initial")
// Set up a transaction that fails every operation.
ops := structs.TxnOps{
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
Session: bogus,
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/lock",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
},
}
results, errors := s.TxnRun(7, ops)
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
if len(results) != 0 {
t.Fatalf("bad len: %d != 0", len(results))
}
verifyStateStore("after")
// Make sure the errors look reasonable.
expected := []string{
"index is stale",
"lock is already held",
"lock isn't held, or is held by another session",
"current session",
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown KVS verb",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %d: %v", i, errors[i].Error())
}
}
}
func TestStateStore_Txn_Watches(t *testing.T) {
s := testStateStore(t)
// Verify that a basic transaction triggers multiple watches. We call
// the same underlying methods that are called above so this is more
// of a sanity check.
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one"),
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two"),
},
},
},
}
results, errors := s.TxnRun(15, ops)
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
if len(errors) != 0 {
t.Fatalf("bad len: %d != 0", len(errors))
}
})
})
// Verify that a rolled back transaction doesn't trigger any watches.
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one-updated"),
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two-updated"),
},
},
},
&structs.TxnOp{
KVS: &structs.TxnKVSOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "multi/nope",
Value: []byte("nope"),
},
},
},
}
results, errors := s.TxnRun(16, ops)
if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors))
}
if len(results) != 0 {
t.Fatalf("bad len: %d != 0", len(results))
}
})
})
}

View file

@ -36,7 +36,7 @@ const (
TombstoneRequestType TombstoneRequestType
CoordinateBatchUpdateType CoordinateBatchUpdateType
PreparedQueryRequestType PreparedQueryRequestType
KVSAtomicRequestType TxnRequestType
) )
const ( const (
@ -535,11 +535,11 @@ const (
KVSLock = "lock" // Lock a key KVSLock = "lock" // Lock a key
KVSUnlock = "unlock" // Unlock a key KVSUnlock = "unlock" // Unlock a key
// KVSAtomic* operations are only available in KVSAtomicRequest // The following operations are only available inside of atomic
// transactions. // transactions via the Txn request.
KVSAtomicGet = "get" // Read the key during the transaction. KVSGet = "get" // Read the key during the transaction.
KVSAtomicCheckSession = "check-session" // Check the session holds the key. KVSCheckSession = "check-session" // Check the session holds the key.
KVSAtomicCheckIndex = "check-index" // Check the modify index of the key. KVSCheckIndex = "check-index" // Check the modify index of the key.
) )
// KVSRequest is used to operate on the Key-Value store // KVSRequest is used to operate on the Key-Value store
@ -554,49 +554,6 @@ func (r *KVSRequest) RequestDatacenter() string {
return r.Datacenter return r.Datacenter
} }
// KVSAtomicOp is used to define a single operation within an multi-key
// transaction.
type KVSAtomicOp struct {
Op KVSOp
DirEnt DirEntry
}
// KVSAtomicOps is a list of atomic operations.
type KVSAtomicOps []*KVSAtomicOp
// KVSAtomicRequest is used to perform atomic multi-key operations on the
// Key-Value store.
type KVSAtomicRequest struct {
Datacenter string
Ops KVSAtomicOps
WriteRequest
}
func (r *KVSAtomicRequest) RequestDatacenter() string {
return r.Datacenter
}
// KVSAtomicError is used to return information about an error for a specific
// operation.
type KVSAtomicError struct {
OpIndex int
What string
}
// Error returns the string representation of an atomic error.
func (e KVSAtomicError) Error() string {
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
}
// KVSAtomicErrors is a list of KVSAtomicError entries.
type KVSAtomicErrors []*KVSAtomicError
// KVSAtomicResponse is the structure returned by a KVSAtomicRequest.
type KVSAtomicResponse struct {
Errors KVSAtomicErrors
Results DirEntries
}
// KeyRequest is used to request a key, or key prefix // KeyRequest is used to request a key, or key prefix
type KeyRequest struct { type KeyRequest struct {
Datacenter string Datacenter string

69
consul/structs/txn.go Normal file
View file

@ -0,0 +1,69 @@
package structs
import (
"fmt"
)
// TxnKVSOp is used to define a single operation on the KVS inside a
// transaction
type TxnKVSOp struct {
Verb KVSOp
DirEnt DirEntry
}
// TxnKVSResult is used to define the result of a single operation on the KVS
// inside a transaction.
type TxnKVSResult struct {
DirEnt *DirEntry
}
// TxnOp is used to define a single operation inside a transaction. Only one
// of the types should be filled out per entry.
type TxnOp struct {
KVS *TxnKVSOp
}
// TxnOps is a list of operations within a transaction.
type TxnOps []*TxnOp
// TxnRequest is used to apply multiple operations to the state store in a
// single transaction
type TxnRequest struct {
Datacenter string
Ops TxnOps
WriteRequest
}
func (r *TxnRequest) RequestDatacenter() string {
return r.Datacenter
}
// TxnError is used to return information about an error for a specific
// operation.
type TxnError struct {
OpIndex int
What string
}
// Error returns the string representation of an atomic error.
func (e TxnError) Error() string {
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
}
// TxnErrors is a list of TxnError entries.
type TxnErrors []*TxnError
// TxnResult is used to define the result of a given operation inside a
// transaction. Only one of the types should be filled out per entry.
type TxnResult struct {
KVS *TxnKVSResult
}
// TxnResults is a list of TxnResult entries.
type TxnResults []*TxnResult
// TxnResponse is the structure returned by a TxnRequest.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}

View file

@ -14,24 +14,26 @@ type Txn struct {
} }
// Apply is used to apply multiple operations in a single, atomic transaction. // Apply is used to apply multiple operations in a single, atomic transaction.
func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error { func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
// Perform the pre-apply checks on each of the operations. // Perform the pre-apply checks for any KVS operations.
acl, err := t.srv.resolveToken(args.Token) acl, err := t.srv.resolveToken(args.Token)
if err != nil { if err != nil {
return err return err
} }
for i, op := range args.Ops { for i, op := range args.Ops {
ok, err := kvsPreApply(t.srv, acl, op.Op, &op.DirEnt) if op.KVS != nil {
if err != nil { ok, err := kvsPreApply(t.srv, acl, op.KVS.Verb, &op.KVS.DirEnt)
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()}) if err != nil {
} else if !ok { reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key) } else if !ok {
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()}) err = fmt.Errorf("failed to lock key %q due to lock delay", op.KVS.DirEnt.Key)
reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
}
} }
} }
if len(reply.Errors) > 0 { if len(reply.Errors) > 0 {
@ -39,9 +41,9 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp
} }
// Apply the update. // Apply the update.
resp, err := t.srv.raftApply(structs.KVSAtomicRequestType, args) resp, err := t.srv.raftApply(structs.TxnRequestType, args)
if err != nil { if err != nil {
t.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err) t.srv.logger.Printf("[ERR] consul.txn: Apply failed: %v", err)
return err return err
} }
if respErr, ok := resp.(error); ok { if respErr, ok := resp.(error); ok {
@ -50,8 +52,8 @@ func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResp
// Convert the return type. This should be a cheap copy since we are // Convert the return type. This should be a cheap copy since we are
// just taking the two slices. // just taking the two slices.
if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok { if txnResp, ok := resp.(structs.TxnResponse); ok {
*reply = respAtomic *reply = txnResp
} else { } else {
return fmt.Errorf("unexpected return type %T", resp) return fmt.Errorf("unexpected return type %T", resp)
} }

View file

@ -25,26 +25,30 @@ func TestTxn_Apply(t *testing.T) {
// Do a super basic request. The state store test covers the details so // Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and // we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately. // the results are converted appropriately.
arg := structs.KVSAtomicRequest{ arg := structs.TxnRequest{
Datacenter: "dc1", Datacenter: "dc1",
Ops: structs.KVSAtomicOps{ Ops: structs.TxnOps{
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSSet, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSSet,
Key: "test", DirEnt: structs.DirEntry{
Flags: 42, Key: "test",
Value: []byte("test"), Flags: 42,
Value: []byte("test"),
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSAtomicGet, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSGet,
Key: "test", DirEnt: structs.DirEntry{
Key: "test",
},
}, },
}, },
}, },
} }
var out structs.KVSAtomicResponse var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -64,24 +68,32 @@ func TestTxn_Apply(t *testing.T) {
} }
// Verify the transaction's return value. // Verify the transaction's return value.
expected := structs.KVSAtomicResponse{ expected := structs.TxnResponse{
Results: structs.DirEntries{ Results: structs.TxnResults{
&structs.DirEntry{ &structs.TxnResult{
Key: "test", KVS: &structs.TxnKVSResult{
Flags: 42, DirEnt: &structs.DirEntry{
Value: nil, Key: "test",
RaftIndex: structs.RaftIndex{ Flags: 42,
CreateIndex: d.CreateIndex, Value: nil,
ModifyIndex: d.ModifyIndex, RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
}, },
}, },
&structs.DirEntry{ &structs.TxnResult{
Key: "test", KVS: &structs.TxnKVSResult{
Flags: 42, DirEnt: &structs.DirEntry{
Value: []byte("test"), Key: "test",
RaftIndex: structs.RaftIndex{ Flags: 42,
CreateIndex: d.CreateIndex, Value: []byte("test"),
ModifyIndex: d.ModifyIndex, RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
}, },
}, },
}, },
@ -124,81 +136,101 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
// Set up a transaction where every operation should get blocked due to // Set up a transaction where every operation should get blocked due to
// ACLs. // ACLs.
arg := structs.KVSAtomicRequest{ arg := structs.TxnRequest{
Datacenter: "dc1", Datacenter: "dc1",
Ops: structs.KVSAtomicOps{ Ops: structs.TxnOps{
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSSet, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSSet,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSDelete, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSDelete,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSDeleteCAS, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSDeleteCAS,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSDeleteTree, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSDeleteTree,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSCAS, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSCAS,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSLock, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSLock,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSUnlock, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSUnlock,
Key: "foo", DirEnt: structs.DirEntry{
Key: "foo",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSAtomicGet, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSGet,
Key: "nope", DirEnt: structs.DirEntry{
Key: "nope",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSAtomicCheckSession, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSCheckSession,
Key: "nope", DirEnt: structs.DirEntry{
Key: "nope",
},
}, },
}, },
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSAtomicCheckIndex, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSCheckIndex,
Key: "nope", DirEnt: structs.DirEntry{
Key: "nope",
},
}, },
}, },
}, },
WriteRequest: structs.WriteRequest{Token: id}, WriteRequest: structs.WriteRequest{Token: id},
} }
var out structs.KVSAtomicResponse var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Verify the transaction's return value. // Verify the transaction's return value.
var expected structs.KVSAtomicResponse var expected structs.TxnResponse
for i, _ := range arg.Ops { for i, _ := range arg.Ops {
expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()}) expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()})
} }
if !reflect.DeepEqual(out, expected) { if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out) t.Fatalf("bad %v", out)
@ -245,20 +277,22 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
validId := session.ID validId := session.ID
// Make a lock request via an atomic transaction. // Make a lock request via an atomic transaction.
arg := structs.KVSAtomicRequest{ arg := structs.TxnRequest{
Datacenter: "dc1", Datacenter: "dc1",
Ops: structs.KVSAtomicOps{ Ops: structs.TxnOps{
&structs.KVSAtomicOp{ &structs.TxnOp{
Op: structs.KVSLock, KVS: &structs.TxnKVSOp{
DirEnt: structs.DirEntry{ Verb: structs.KVSLock,
Key: "test", DirEnt: structs.DirEntry{
Session: validId, Key: "test",
Session: validId,
},
}, },
}, },
}, },
} }
{ {
var out structs.KVSAtomicResponse var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -275,13 +309,13 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
// Should acquire. // Should acquire.
{ {
var out structs.KVSAtomicResponse var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if len(out.Results) != 1 || if len(out.Results) != 1 ||
len(out.Errors) != 0 || len(out.Errors) != 0 ||
out.Results[0].LockIndex != 2 { out.Results[0].KVS.DirEnt.LockIndex != 2 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }