Adds a read-only optimized path for transactions.

This commit is contained in:
James Phillips 2016-05-12 17:38:25 -07:00
parent 0c34ed078c
commit 2649a6336e
12 changed files with 625 additions and 56 deletions

View File

@ -308,6 +308,7 @@ type TxnResponse struct {
} }
// Txn is used to apply multiple KV operations in a single, atomic transaction. // Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values // Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a // automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure // list of operations to perform, using the KVOp constants and KVTxnOp structure
@ -318,6 +319,13 @@ type TxnResponse struct {
// transaction API client, but it will be easy to keep this KV-specific variant // transaction API client, but it will be easy to keep this KV-specific variant
// supported. // supported.
// //
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example: // Here's an example:
// //
// ops := KVTxnOps{ // ops := KVTxnOps{
@ -343,9 +351,9 @@ type TxnResponse struct {
// is a KVGet. If the transaction was rolled back, the Errors member will have // is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error // entries referencing the index of the operation that failed along with an error
// message. // message.
func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) { func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn") r := k.c.newRequest("PUT", "/v1/txn")
r.setWriteOptions(q) r.setQueryOptions(q)
// Convert into the internal format since this is an all-KV txn. // Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn)) ops := make(TxnOps, 0, len(txn))
@ -359,8 +367,9 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet
} }
defer resp.Body.Close() defer resp.Body.Close()
wm := &WriteMeta{} qm := &QueryMeta{}
wm.RequestTime = rtt parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse var txnResp TxnResponse
@ -375,7 +384,7 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet
for _, result := range txnResp.Results { for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV) kvResp.Results = append(kvResp.Results, result.KV)
} }
return resp.StatusCode == http.StatusOK, &kvResp, wm, nil return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
} }
var buf bytes.Buffer var buf bytes.Buffer

View File

@ -519,6 +519,32 @@ func TestClient_Txn(t *testing.T) {
} }
} }
// Run a read-only transaction.
txn = KVTxnOps{
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 1 {
t.Fatalf("bad: %v", ret)
}
for _, result := range ret.Results {
if result.Key != key ||
!bytes.Equal(result.Value, value) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}
// Sanity check using the regular GET API. // Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil) pair, meta, err := kv.Get(key, nil)
if err != nil { if err != nil {

View File

@ -75,18 +75,11 @@ func fixupKVOps(raw interface{}) error {
return nil return nil
} }
// Txn handles requests to apply multiple operations in a single, atomic // convertOps takes the incoming body in API format and converts it to the
// transaction. // internal RPC format. This returns a count of the number of write ops, and
func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // a boolean, that if false means an error response has been generated and
if req.Method != "PUT" { // processing should stop.
resp.WriteHeader(http.StatusMethodNotAllowed) func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
return nil, nil
}
var args structs.TxnRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Note the body is in API format, and not the RPC format. If we can't // Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to // decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation. // associate the error with a given operation.
@ -94,24 +87,31 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
if err := decodeBody(req, &ops, fixupKVOps); err != nil { if err := decodeBody(req, &ops, fixupKVOps); err != nil {
resp.WriteHeader(http.StatusBadRequest) resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err))) resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, nil return nil, 0, false
} }
// Convert the KV API format into the RPC format. Note that fixupKVOps // Convert the KV API format into the RPC format. Note that fixupKVOps
// above will have already converted the base64 encoded strings into // above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over. // byte arrays so we can assign right over.
var opsRPC structs.TxnOps
var writes int
for _, in := range ops { for _, in := range ops {
if in.KV != nil { if in.KV != nil {
if size := len(in.KV.Value); size > maxKVSize { if size := len(in.KV.Value); size > maxKVSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge) resp.WriteHeader(http.StatusRequestEntityTooLarge)
resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)", resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)",
in.KV.Key, size, maxKVSize))) in.KV.Key, size, maxKVSize)))
return nil, nil return nil, 0, false
}
verb := structs.KVSOp(in.KV.Verb)
if verb.IsWrite() {
writes += 1
} }
out := &structs.TxnOp{ out := &structs.TxnOp{
KV: &structs.TxnKVOp{ KV: &structs.TxnKVOp{
Verb: structs.KVSOp(in.KV.Verb), Verb: verb,
DirEnt: structs.DirEntry{ DirEnt: structs.DirEntry{
Key: in.KV.Key, Key: in.KV.Key,
Value: in.KV.Value, Value: in.KV.Value,
@ -123,20 +123,67 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
}, },
}, },
} }
args.Ops = append(args.Ops, out) opsRPC = append(opsRPC, out)
} }
} }
return opsRPC, writes, true
}
// Make the request and return a conflict status if there were errors // Txn handles requests to apply multiple operations in a single, atomic
// reported from the transaction. // transaction. A transaction consisting of only read operations will be fast-
var reply structs.TxnResponse // pathed to an endpoint that supports consistency modes (but not blocking),
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { // and everything else will be routed through Raft like a normal write.
return nil, err func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
} }
if len(reply.Errors) > 0 {
// Convert the ops from the API format to the internal format.
ops, writes, ok := s.convertOps(resp, req)
if !ok {
return nil, nil
}
// Fast-path a transaction with only writes to the read-only endpoint,
// which bypasses Raft, and allows for staleness.
conflict := false
var ret interface{}
if writes == 0 {
args := structs.TxnReadRequest{Ops: ops}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.TxnReadResponse
if err := s.agent.RPC("Txn.Read", &args, &reply); err != nil {
return nil, err
}
// Since we don't do blocking, we only add the relevant headers
// for metadata.
setLastContact(resp, reply.LastContact)
setKnownLeader(resp, reply.KnownLeader)
ret, conflict = reply, len(reply.Errors) > 0
} else {
args := structs.TxnRequest{Ops: ops}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
var reply structs.TxnResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err
}
ret, conflict = reply, len(reply.Errors) > 0
}
// If there was a conflict return the response object but set a special
// status code.
if conflict {
var buf []byte var buf []byte
var err error var err error
buf, err = s.marshalJSON(req, reply) buf, err = s.marshalJSON(req, ret)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -148,5 +195,5 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface
} }
// Otherwise, return the results of the successful transaction. // Otherwise, return the results of the successful transaction.
return reply, nil return ret, nil
} }

View File

@ -162,6 +162,76 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
} }
} }
// Do a read-only transaction that should get routed to the
// fast-path endpoint.
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "get",
"Key": "key"
}
}
]
`, index)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
header := resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("bad: %v", header)
}
header = resp.Header().Get("X-Consul-LastContact")
if header != "0" {
t.Fatalf("bad: %v", header)
}
txnResp, ok := obj.(structs.TxnReadResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(txnResp.Results) != 1 {
t.Fatalf("bad: %v", txnResp)
}
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
}
// Now that we have an index we can do a CAS to make sure the // Now that we have an index we can do a CAS to make sure the
// index field gets translated to the RPC format. // index field gets translated to the RPC format.
{ {

View File

@ -294,7 +294,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
results, errors := c.state.TxnRun(index, req.Ops) results, errors := c.state.TxnRW(index, req.Ops)
return structs.TxnResponse{results, errors} return structs.TxnResponse{results, errors}
} }

View File

@ -89,13 +89,8 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
return nil, nil return nil, nil
} }
// TxnRun tries to run the given operations all inside a single transaction. If // txnDispatch runs the given operations inside the state store transaction.
// any of the operations fail, the entire transaction will be rolled back. func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
// Dispatch all of the operations inside the transaction.
results := make(structs.TxnResults, 0, len(ops)) results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops { for i, op := range ops {
@ -118,6 +113,23 @@ func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults,
errors = append(errors, &structs.TxnError{i, err.Error()}) errors = append(errors, &structs.TxnError{i, err.Error()})
} }
} }
if len(errors) > 0 {
return nil, errors
}
return results, nil
}
// TxnRW tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back. This
// is done in a full write transaction on the state store, so reads and writes
// are possible
func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
results, errors := s.txnDispatch(tx, idx, ops)
if len(errors) > 0 { if len(errors) > 0 {
return nil, errors return nil, errors
} }
@ -125,3 +137,18 @@ func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults,
tx.Commit() tx.Commit()
return results, nil return results, nil
} }
// TxnRO runs the given operations inside a single read transaction in the state
// store. You must verify outside this function that no write operations are
// present, otherwise you'll get an error from the state store.
func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(false)
defer tx.Abort()
results, errors := s.txnDispatch(tx, 0, ops)
if len(errors) > 0 {
return nil, errors
}
return results, nil
}

View File

@ -11,7 +11,7 @@ import (
func TestStateStore_Txn_KVS(t *testing.T) { func TestStateStore_Txn_KVS(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Create kvs results in the state store. // Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar") testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz") testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip") testSetKey(t, s, 3, "foo/bar/zip", "zip")
@ -150,7 +150,7 @@ func TestStateStore_Txn_KVS(t *testing.T) {
}, },
}, },
} }
results, errors := s.TxnRun(8, ops) results, errors := s.TxnRW(8, ops)
if len(errors) > 0 { if len(errors) > 0 {
t.Fatalf("err: %v", errors) t.Fatalf("err: %v", errors)
} }
@ -321,7 +321,7 @@ func TestStateStore_Txn_KVS(t *testing.T) {
func TestStateStore_Txn_KVS_Rollback(t *testing.T) { func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Create kvs results in the state store. // Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar") testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale") testSetKey(t, s, 2, "foo/update", "stale")
@ -479,7 +479,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
}, },
}, },
} }
results, errors := s.TxnRun(7, ops) results, errors := s.TxnRW(7, ops)
if len(errors) != len(ops) { if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops)) t.Fatalf("bad len: %d != %d", len(errors), len(ops))
} }
@ -513,6 +513,158 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
} }
} }
func TestStateStore_Txn_KVS_RO(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/bar/baz",
Session: "",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/bar/zip",
RaftIndex: structs.RaftIndex{
ModifyIndex: 3,
},
},
},
},
}
results, errors := s.TxnRO(ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
}
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i, _ := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
}
}
}
func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo",
Value: []byte("nope"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/bar/baz",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
}
results, errors := s.TxnRO(ops)
if len(results) > 0 {
t.Fatalf("bad: %v", results)
}
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
// Make sure the errors look reasonable (tombstone inserts cause the
// insert errors during the delete operations).
expected := []string{
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %d: %v", i, errors[i].Error())
}
}
}
func TestStateStore_Txn_Watches(t *testing.T) { func TestStateStore_Txn_Watches(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -541,7 +693,7 @@ func TestStateStore_Txn_Watches(t *testing.T) {
}, },
}, },
} }
results, errors := s.TxnRun(15, ops) results, errors := s.TxnRW(15, ops)
if len(results) != len(ops) { if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops)) t.Fatalf("bad len: %d != %d", len(results), len(ops))
} }
@ -583,7 +735,7 @@ func TestStateStore_Txn_Watches(t *testing.T) {
}, },
}, },
} }
results, errors := s.TxnRun(16, ops) results, errors := s.TxnRW(16, ops)
if len(errors) != 1 { if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors)) t.Fatalf("bad len: %d != 1", len(errors))
} }

View File

@ -542,6 +542,17 @@ const (
KVSCheckIndex = "check-index" // Check the modify index of the key. KVSCheckIndex = "check-index" // Check the modify index of the key.
) )
// IsWrite returns true if the given operation alters the state store.
func (op KVSOp) IsWrite() bool {
switch op {
case KVSGet, KVSCheckSession, KVSCheckIndex:
return false
default:
return true
}
}
// KVSRequest is used to operate on the Key-Value store // KVSRequest is used to operate on the Key-Value store
type KVSRequest struct { type KVSRequest struct {
Datacenter string Datacenter string

View File

@ -36,6 +36,18 @@ func (r *TxnRequest) RequestDatacenter() string {
return r.Datacenter return r.Datacenter
} }
// TxnReadRequest is used as a fast path for read-only transactions that don't
// modify the state store.
type TxnReadRequest struct {
Datacenter string
Ops TxnOps
QueryOptions
}
func (r *TxnReadRequest) RequestDatacenter() string {
return r.Datacenter
}
// TxnError is used to return information about an error for a specific // TxnError is used to return information about an error for a specific
// operation. // operation.
type TxnError struct { type TxnError struct {
@ -65,3 +77,9 @@ type TxnResponse struct {
Results TxnResults Results TxnResults
Errors TxnErrors Errors TxnErrors
} }
// TxnReadResponse is the structure returned by a TxnReadRequest.
type TxnReadResponse struct {
TxnResponse
QueryMeta
}

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -13,6 +14,27 @@ type Txn struct {
srv *Server srv *Server
} }
// preCheck is used to verify the incoming operations before any further
// processing takes place. This checks things like ACLs.
func (t *Txn) preCheck(acl acl.ACL, ops structs.TxnOps) structs.TxnErrors {
var errors structs.TxnErrors
// Perform the pre-apply checks for any KV operations.
for i, op := range ops {
if op.KV != nil {
ok, err := kvsPreApply(t.srv, acl, op.KV.Verb, &op.KV.DirEnt)
if err != nil {
errors = append(errors, &structs.TxnError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key)
errors = append(errors, &structs.TxnError{i, err.Error()})
}
}
}
return errors
}
// Apply is used to apply multiple operations in a single, atomic transaction. // Apply is used to apply multiple operations in a single, atomic transaction.
func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
@ -20,22 +42,12 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
} }
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
// Perform the pre-apply checks for any KV operations. // Run the pre-checks before we send the transaction into Raft.
acl, err := t.srv.resolveToken(args.Token) acl, err := t.srv.resolveToken(args.Token)
if err != nil { if err != nil {
return err return err
} }
for i, op := range args.Ops { reply.Errors = t.preCheck(acl, args.Ops)
if op.KV != nil {
ok, err := kvsPreApply(t.srv, acl, op.KV.Verb, &op.KV.DirEnt)
if err != nil {
reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key)
reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()})
}
}
}
if len(reply.Errors) > 0 { if len(reply.Errors) > 0 {
return nil return nil
} }
@ -59,3 +71,37 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
} }
return nil return nil
} }
// Read is used to perform a read-only transaction that doesn't modify the state
// store. This is much more scaleable since it doesn't go through Raft and
// supports staleness, so this should be preferred if you're just performing
// reads.
func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error {
if done, err := t.srv.forward("Txn.Read", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "read"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
t.srv.setQueryMeta(&reply.QueryMeta)
if args.RequireConsistent {
if err := t.srv.consistentRead(); err != nil {
return err
}
}
// Run the pre-checks before we perform the read.
acl, err := t.srv.resolveToken(args.Token)
if err != nil {
return err
}
reply.Errors = t.preCheck(acl, args.Ops)
if len(reply.Errors) > 0 {
return nil
}
// Run the read transaction.
state := t.srv.fsm.State()
reply.Results, reply.Errors = state.TxnRO(args.Ops)
return nil
}

View File

@ -216,7 +216,9 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
}, },
}, },
}, },
WriteRequest: structs.WriteRequest{Token: id}, WriteRequest: structs.WriteRequest{
Token: id,
},
} }
var out structs.TxnResponse var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
@ -232,6 +234,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
t.Fatalf("bad %v", out) t.Fatalf("bad %v", out)
} }
} }
func TestTxn_Apply_LockDelay(t *testing.T) { func TestTxn_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -316,3 +319,152 @@ func TestTxn_Apply_LockDelay(t *testing.T) {
} }
} }
} }
func TestTxn_Read(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Read_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
},
QueryOptions: structs.QueryOptions{
Token: id,
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
for i, _ := range arg.Ops {
expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()})
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}

View File

@ -170,6 +170,17 @@ The `PUT` method lets you submit a list of operations to apply to the key/value
inside a transaction. If any operation fails, the transaction will be rolled back and inside a transaction. If any operation fails, the transaction will be rolled back and
none of the changes will be applied. none of the changes will be applied.
If the transaction doesn't contain any write operations then it will be fast-pathed
internally to an endpoint that works like other reads, except that blocking queries
are not currently supported. In this mode, you may supply the "?stale" or "?consistent"
query parameters with the request to control consistency. To support bounding the
acceptable staleness of data, read-only transaction responses provide the `X-Consul-LastContact`
header containing the time in milliseconds that a server was last contacted by the leader node.
The `X-Consul-KnownLeader` header also indicates if there is a known leader. These
won't be present if the transaction contains any write operations, and any consistency
query parameters will be ignored, since writes are always managed by the leader via
the Raft consensus protocol.
The body of the request should be a list of operations to perform inside the atomic The body of the request should be a list of operations to perform inside the atomic
transaction, which looks like this: transaction, which looks like this: