diff --git a/api/kv.go b/api/kv.go index dd06b9101..d696f941d 100644 --- a/api/kv.go +++ b/api/kv.go @@ -308,6 +308,7 @@ type TxnResponse struct { } // Txn is used to apply multiple KV operations in a single, atomic transaction. +// // Note that Go will perform the required base64 encoding on the values // automatically because the type is a byte slice. Transactions are defined as a // list of operations to perform, using the KVOp constants and KVTxnOp structure @@ -318,6 +319,13 @@ type TxnResponse struct { // transaction API client, but it will be easy to keep this KV-specific variant // supported. // +// Even though this is generally a write operation, we take a QueryOptions input +// and return a QueryMeta output. If the transaction contains only read ops, then +// Consul will fast-path it to a different endpoint internally which supports +// consistency controls, but not blocking. If there are write operations then +// the request will always be routed through raft and any consistency settings +// will be ignored. +// // Here's an example: // // ops := KVTxnOps{ @@ -343,9 +351,9 @@ type TxnResponse struct { // is a KVGet. If the transaction was rolled back, the Errors member will have // entries referencing the index of the operation that failed along with an error // message. -func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) { +func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) { r := k.c.newRequest("PUT", "/v1/txn") - r.setWriteOptions(q) + r.setQueryOptions(q) // Convert into the internal format since this is an all-KV txn. ops := make(TxnOps, 0, len(txn)) @@ -359,8 +367,9 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet } defer resp.Body.Close() - wm := &WriteMeta{} - wm.RequestTime = rtt + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { var txnResp TxnResponse @@ -375,7 +384,7 @@ func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMet for _, result := range txnResp.Results { kvResp.Results = append(kvResp.Results, result.KV) } - return resp.StatusCode == http.StatusOK, &kvResp, wm, nil + return resp.StatusCode == http.StatusOK, &kvResp, qm, nil } var buf bytes.Buffer diff --git a/api/kv_test.go b/api/kv_test.go index 7e8b4cc18..bd9f2ef39 100644 --- a/api/kv_test.go +++ b/api/kv_test.go @@ -519,6 +519,32 @@ func TestClient_Txn(t *testing.T) { } } + // Run a read-only transaction. + txn = KVTxnOps{ + &KVTxnOp{ + Verb: KVGet, + Key: key, + }, + } + ok, ret, _, err = kv.Txn(txn, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("transaction failure") + } + + if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 1 { + t.Fatalf("bad: %v", ret) + } + for _, result := range ret.Results { + if result.Key != key || + !bytes.Equal(result.Value, value) || + result.Session != id || + result.LockIndex != 1 { + t.Fatalf("bad: %v", result) + } + } + // Sanity check using the regular GET API. pair, meta, err := kv.Get(key, nil) if err != nil { diff --git a/command/agent/txn_endpoint.go b/command/agent/txn_endpoint.go index 902f1bf69..00d511062 100644 --- a/command/agent/txn_endpoint.go +++ b/command/agent/txn_endpoint.go @@ -75,18 +75,11 @@ func fixupKVOps(raw interface{}) error { return nil } -// Txn handles requests to apply multiple operations in a single, atomic -// transaction. -func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "PUT" { - resp.WriteHeader(http.StatusMethodNotAllowed) - return nil, nil - } - - var args structs.TxnRequest - s.parseDC(req, &args.Datacenter) - s.parseToken(req, &args.Token) - +// convertOps takes the incoming body in API format and converts it to the +// internal RPC format. This returns a count of the number of write ops, and +// a boolean, that if false means an error response has been generated and +// processing should stop. +func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) { // Note the body is in API format, and not the RPC format. If we can't // decode it, we will return a 400 since we don't have enough context to // associate the error with a given operation. @@ -94,24 +87,31 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface if err := decodeBody(req, &ops, fixupKVOps); err != nil { resp.WriteHeader(http.StatusBadRequest) resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err))) - return nil, nil + return nil, 0, false } // Convert the KV API format into the RPC format. Note that fixupKVOps // above will have already converted the base64 encoded strings into // byte arrays so we can assign right over. + var opsRPC structs.TxnOps + var writes int for _, in := range ops { if in.KV != nil { if size := len(in.KV.Value); size > maxKVSize { resp.WriteHeader(http.StatusRequestEntityTooLarge) resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, maxKVSize))) - return nil, nil + return nil, 0, false + } + + verb := structs.KVSOp(in.KV.Verb) + if verb.IsWrite() { + writes += 1 } out := &structs.TxnOp{ KV: &structs.TxnKVOp{ - Verb: structs.KVSOp(in.KV.Verb), + Verb: verb, DirEnt: structs.DirEntry{ Key: in.KV.Key, Value: in.KV.Value, @@ -123,20 +123,67 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface }, }, } - args.Ops = append(args.Ops, out) + opsRPC = append(opsRPC, out) } } + return opsRPC, writes, true +} - // Make the request and return a conflict status if there were errors - // reported from the transaction. - var reply structs.TxnResponse - if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { - return nil, err +// Txn handles requests to apply multiple operations in a single, atomic +// transaction. A transaction consisting of only read operations will be fast- +// pathed to an endpoint that supports consistency modes (but not blocking), +// and everything else will be routed through Raft like a normal write. +func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil } - if len(reply.Errors) > 0 { + + // Convert the ops from the API format to the internal format. + ops, writes, ok := s.convertOps(resp, req) + if !ok { + return nil, nil + } + + // Fast-path a transaction with only writes to the read-only endpoint, + // which bypasses Raft, and allows for staleness. + conflict := false + var ret interface{} + if writes == 0 { + args := structs.TxnReadRequest{Ops: ops} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.TxnReadResponse + if err := s.agent.RPC("Txn.Read", &args, &reply); err != nil { + return nil, err + } + + // Since we don't do blocking, we only add the relevant headers + // for metadata. + setLastContact(resp, reply.LastContact) + setKnownLeader(resp, reply.KnownLeader) + + ret, conflict = reply, len(reply.Errors) > 0 + } else { + args := structs.TxnRequest{Ops: ops} + s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) + + var reply structs.TxnResponse + if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil { + return nil, err + } + ret, conflict = reply, len(reply.Errors) > 0 + } + + // If there was a conflict return the response object but set a special + // status code. + if conflict { var buf []byte var err error - buf, err = s.marshalJSON(req, reply) + buf, err = s.marshalJSON(req, ret) if err != nil { return nil, err } @@ -148,5 +195,5 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface } // Otherwise, return the results of the successful transaction. - return reply, nil + return ret, nil } diff --git a/command/agent/txn_endpoint_test.go b/command/agent/txn_endpoint_test.go index 4d40b5f0a..7f035c108 100644 --- a/command/agent/txn_endpoint_test.go +++ b/command/agent/txn_endpoint_test.go @@ -162,6 +162,76 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) { } } + // Do a read-only transaction that should get routed to the + // fast-path endpoint. + { + buf := bytes.NewBuffer([]byte(fmt.Sprintf(` +[ + { + "KV": { + "Verb": "get", + "Key": "key" + } + } +] +`, index))) + req, err := http.NewRequest("PUT", "/v1/txn", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.Txn(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("expected 200, got %d", resp.Code) + } + + header := resp.Header().Get("X-Consul-KnownLeader") + if header != "true" { + t.Fatalf("bad: %v", header) + } + header = resp.Header().Get("X-Consul-LastContact") + if header != "0" { + t.Fatalf("bad: %v", header) + } + + txnResp, ok := obj.(structs.TxnReadResponse) + if !ok { + t.Fatalf("bad type: %T", obj) + } + if len(txnResp.Results) != 1 { + t.Fatalf("bad: %v", txnResp) + } + expected := structs.TxnReadResponse{ + TxnResponse: structs.TxnResponse{ + Results: structs.TxnResults{ + &structs.TxnResult{ + KV: &structs.DirEntry{ + Key: "key", + Value: []byte("hello world"), + Flags: 23, + Session: id, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + QueryMeta: structs.QueryMeta{ + KnownLeader: true, + }, + } + if !reflect.DeepEqual(txnResp, expected) { + t.Fatalf("bad: %v", txnResp) + } + } + // Now that we have an index we can do a CAS to make sure the // index field gets translated to the RPC format. { diff --git a/consul/fsm.go b/consul/fsm.go index fab5b8c12..6694b87f7 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -294,7 +294,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now()) - results, errors := c.state.TxnRun(index, req.Ops) + results, errors := c.state.TxnRW(index, req.Ops) return structs.TxnResponse{results, errors} } diff --git a/consul/state/txn.go b/consul/state/txn.go index ff9f182e5..43ad8b8c0 100644 --- a/consul/state/txn.go +++ b/consul/state/txn.go @@ -89,13 +89,8 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str return nil, nil } -// TxnRun tries to run the given operations all inside a single transaction. If -// any of the operations fail, the entire transaction will be rolled back. -func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Dispatch all of the operations inside the transaction. +// txnDispatch runs the given operations inside the state store transaction. +func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops)) for i, op := range ops { @@ -118,6 +113,23 @@ func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, errors = append(errors, &structs.TxnError{i, err.Error()}) } } + + if len(errors) > 0 { + return nil, errors + } + + return results, nil +} + +// TxnRW tries to run the given operations all inside a single transaction. If +// any of the operations fail, the entire transaction will be rolled back. This +// is done in a full write transaction on the state store, so reads and writes +// are possible +func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { + tx := s.db.Txn(true) + defer tx.Abort() + + results, errors := s.txnDispatch(tx, idx, ops) if len(errors) > 0 { return nil, errors } @@ -125,3 +137,18 @@ func (s *StateStore) TxnRun(idx uint64, ops structs.TxnOps) (structs.TxnResults, tx.Commit() return results, nil } + +// TxnRO runs the given operations inside a single read transaction in the state +// store. You must verify outside this function that no write operations are +// present, otherwise you'll get an error from the state store. +func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { + tx := s.db.Txn(false) + defer tx.Abort() + + results, errors := s.txnDispatch(tx, 0, ops) + if len(errors) > 0 { + return nil, errors + } + + return results, nil +} diff --git a/consul/state/txn_test.go b/consul/state/txn_test.go index 9e8f88234..c8c14bda7 100644 --- a/consul/state/txn_test.go +++ b/consul/state/txn_test.go @@ -11,7 +11,7 @@ import ( func TestStateStore_Txn_KVS(t *testing.T) { s := testStateStore(t) - // Create kvs results in the state store. + // Create KV entries in the state store. testSetKey(t, s, 1, "foo/delete", "bar") testSetKey(t, s, 2, "foo/bar/baz", "baz") testSetKey(t, s, 3, "foo/bar/zip", "zip") @@ -150,7 +150,7 @@ func TestStateStore_Txn_KVS(t *testing.T) { }, }, } - results, errors := s.TxnRun(8, ops) + results, errors := s.TxnRW(8, ops) if len(errors) > 0 { t.Fatalf("err: %v", errors) } @@ -321,7 +321,7 @@ func TestStateStore_Txn_KVS(t *testing.T) { func TestStateStore_Txn_KVS_Rollback(t *testing.T) { s := testStateStore(t) - // Create kvs results in the state store. + // Create KV entries in the state store. testSetKey(t, s, 1, "foo/delete", "bar") testSetKey(t, s, 2, "foo/update", "stale") @@ -479,7 +479,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) { }, }, } - results, errors := s.TxnRun(7, ops) + results, errors := s.TxnRW(7, ops) if len(errors) != len(ops) { t.Fatalf("bad len: %d != %d", len(errors), len(ops)) } @@ -513,6 +513,158 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) { } } +func TestStateStore_Txn_KVS_RO(t *testing.T) { + s := testStateStore(t) + + // Create KV entries in the state store. + testSetKey(t, s, 1, "foo", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + + // Set up a transaction that hits all the read-only operations. + ops := structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/bar/baz", + Session: "", + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/bar/zip", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 3, + }, + }, + }, + }, + } + results, errors := s.TxnRO(ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + if len(results) != len(ops) { + t.Fatalf("bad len: %d != %d", len(results), len(ops)) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + KV: &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + }, + &structs.TxnResult{ + KV: &structs.DirEntry{ + Key: "foo/bar/baz", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + }, + &structs.TxnResult{ + KV: &structs.DirEntry{ + Key: "foo/bar/zip", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 3, + }, + }, + }, + } + if len(results) != len(expected) { + t.Fatalf("bad: %v", results) + } + for i, _ := range results { + if !reflect.DeepEqual(results[i], expected[i]) { + t.Fatalf("bad %d", i) + } + } +} + +func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) { + s := testStateStore(t) + + // Create KV entries in the state store. + testSetKey(t, s, 1, "foo", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + + // Set up a transaction that hits all the read-only operations. + ops := structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo", + Value: []byte("nope"), + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "foo/bar/baz", + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSDeleteTree, + DirEnt: structs.DirEntry{ + Key: "foo/bar", + }, + }, + }, + } + results, errors := s.TxnRO(ops) + if len(results) > 0 { + t.Fatalf("bad: %v", results) + } + if len(errors) != len(ops) { + t.Fatalf("bad len: %d != %d", len(errors), len(ops)) + } + + // Make sure the errors look reasonable (tombstone inserts cause the + // insert errors during the delete operations). + expected := []string{ + "cannot insert in read-only transaction", + "cannot insert in read-only transaction", + "cannot insert in read-only transaction", + } + if len(errors) != len(expected) { + t.Fatalf("bad len: %d != %d", len(errors), len(expected)) + } + for i, msg := range expected { + if errors[i].OpIndex != i { + t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) + } + if !strings.Contains(errors[i].Error(), msg) { + t.Fatalf("bad %d: %v", i, errors[i].Error()) + } + } +} + func TestStateStore_Txn_Watches(t *testing.T) { s := testStateStore(t) @@ -541,7 +693,7 @@ func TestStateStore_Txn_Watches(t *testing.T) { }, }, } - results, errors := s.TxnRun(15, ops) + results, errors := s.TxnRW(15, ops) if len(results) != len(ops) { t.Fatalf("bad len: %d != %d", len(results), len(ops)) } @@ -583,7 +735,7 @@ func TestStateStore_Txn_Watches(t *testing.T) { }, }, } - results, errors := s.TxnRun(16, ops) + results, errors := s.TxnRW(16, ops) if len(errors) != 1 { t.Fatalf("bad len: %d != 1", len(errors)) } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 1ea930081..f2f86604b 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -542,6 +542,17 @@ const ( KVSCheckIndex = "check-index" // Check the modify index of the key. ) +// IsWrite returns true if the given operation alters the state store. +func (op KVSOp) IsWrite() bool { + switch op { + case KVSGet, KVSCheckSession, KVSCheckIndex: + return false + + default: + return true + } +} + // KVSRequest is used to operate on the Key-Value store type KVSRequest struct { Datacenter string diff --git a/consul/structs/txn.go b/consul/structs/txn.go index f815f5b7e..3f8035b97 100644 --- a/consul/structs/txn.go +++ b/consul/structs/txn.go @@ -36,6 +36,18 @@ func (r *TxnRequest) RequestDatacenter() string { return r.Datacenter } +// TxnReadRequest is used as a fast path for read-only transactions that don't +// modify the state store. +type TxnReadRequest struct { + Datacenter string + Ops TxnOps + QueryOptions +} + +func (r *TxnReadRequest) RequestDatacenter() string { + return r.Datacenter +} + // TxnError is used to return information about an error for a specific // operation. type TxnError struct { @@ -65,3 +77,9 @@ type TxnResponse struct { Results TxnResults Errors TxnErrors } + +// TxnReadResponse is the structure returned by a TxnReadRequest. +type TxnReadResponse struct { + TxnResponse + QueryMeta +} diff --git a/consul/txn_endpoint.go b/consul/txn_endpoint.go index 662b1f683..72f81f729 100644 --- a/consul/txn_endpoint.go +++ b/consul/txn_endpoint.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/consul/structs" ) @@ -13,6 +14,27 @@ type Txn struct { srv *Server } +// preCheck is used to verify the incoming operations before any further +// processing takes place. This checks things like ACLs. +func (t *Txn) preCheck(acl acl.ACL, ops structs.TxnOps) structs.TxnErrors { + var errors structs.TxnErrors + + // Perform the pre-apply checks for any KV operations. + for i, op := range ops { + if op.KV != nil { + ok, err := kvsPreApply(t.srv, acl, op.KV.Verb, &op.KV.DirEnt) + if err != nil { + errors = append(errors, &structs.TxnError{i, err.Error()}) + } else if !ok { + err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key) + errors = append(errors, &structs.TxnError{i, err.Error()}) + } + } + } + + return errors +} + // Apply is used to apply multiple operations in a single, atomic transaction. func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { @@ -20,22 +42,12 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error } defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now()) - // Perform the pre-apply checks for any KV operations. + // Run the pre-checks before we send the transaction into Raft. acl, err := t.srv.resolveToken(args.Token) if err != nil { return err } - for i, op := range args.Ops { - if op.KV != nil { - ok, err := kvsPreApply(t.srv, acl, op.KV.Verb, &op.KV.DirEnt) - if err != nil { - reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()}) - } else if !ok { - err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key) - reply.Errors = append(reply.Errors, &structs.TxnError{i, err.Error()}) - } - } - } + reply.Errors = t.preCheck(acl, args.Ops) if len(reply.Errors) > 0 { return nil } @@ -59,3 +71,37 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error } return nil } + +// Read is used to perform a read-only transaction that doesn't modify the state +// store. This is much more scaleable since it doesn't go through Raft and +// supports staleness, so this should be preferred if you're just performing +// reads. +func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error { + if done, err := t.srv.forward("Txn.Read", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "txn", "read"}, time.Now()) + + // We have to do this ourselves since we are not doing a blocking RPC. + t.srv.setQueryMeta(&reply.QueryMeta) + if args.RequireConsistent { + if err := t.srv.consistentRead(); err != nil { + return err + } + } + + // Run the pre-checks before we perform the read. + acl, err := t.srv.resolveToken(args.Token) + if err != nil { + return err + } + reply.Errors = t.preCheck(acl, args.Ops) + if len(reply.Errors) > 0 { + return nil + } + + // Run the read transaction. + state := t.srv.fsm.State() + reply.Results, reply.Errors = state.TxnRO(args.Ops) + return nil +} diff --git a/consul/txn_endpoint_test.go b/consul/txn_endpoint_test.go index 97098f558..e15e42c4d 100644 --- a/consul/txn_endpoint_test.go +++ b/consul/txn_endpoint_test.go @@ -216,7 +216,9 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { }, }, }, - WriteRequest: structs.WriteRequest{Token: id}, + WriteRequest: structs.WriteRequest{ + Token: id, + }, } var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { @@ -232,6 +234,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { t.Fatalf("bad %v", out) } } + func TestTxn_Apply_LockDelay(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -316,3 +319,152 @@ func TestTxn_Apply_LockDelay(t *testing.T) { } } } + +func TestTxn_Read(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Put in a key to read back. + state := s1.fsm.State() + d := &structs.DirEntry{ + Key: "test", + Value: []byte("hello"), + } + if err := state.KVSSet(1, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Do a super basic request. The state store test covers the details so + // we just need to be sure that the transaction is sent correctly and + // the results are converted appropriately. + arg := structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "test", + }, + }, + }, + }, + } + var out structs.TxnReadResponse + if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the transaction's return value. + expected := structs.TxnReadResponse{ + TxnResponse: structs.TxnResponse{ + Results: structs.TxnResults{ + &structs.TxnResult{ + KV: &structs.DirEntry{ + Key: "test", + Value: []byte("hello"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + }, + }, + }, + QueryMeta: structs.QueryMeta{ + KnownLeader: true, + }, + } + if !reflect.DeepEqual(out, expected) { + t.Fatalf("bad %v", out) + } +} + +func TestTxn_Read_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create the ACL. + var id string + { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testListRules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Set up a transaction where every operation should get blocked due to + // ACLs. + arg := structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSGet, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSCheckSession, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + }, + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: structs.KVSCheckIndex, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + }, + }, + QueryOptions: structs.QueryOptions{ + Token: id, + }, + } + var out structs.TxnReadResponse + if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the transaction's return value. + expected := structs.TxnReadResponse{ + QueryMeta: structs.QueryMeta{ + KnownLeader: true, + }, + } + for i, _ := range arg.Ops { + expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()}) + } + if !reflect.DeepEqual(out, expected) { + t.Fatalf("bad %v", out) + } +} diff --git a/website/source/docs/agent/http/kv.html.markdown b/website/source/docs/agent/http/kv.html.markdown index 9d2f2def5..09fa796a7 100644 --- a/website/source/docs/agent/http/kv.html.markdown +++ b/website/source/docs/agent/http/kv.html.markdown @@ -170,6 +170,17 @@ The `PUT` method lets you submit a list of operations to apply to the key/value inside a transaction. If any operation fails, the transaction will be rolled back and none of the changes will be applied. +If the transaction doesn't contain any write operations then it will be fast-pathed +internally to an endpoint that works like other reads, except that blocking queries +are not currently supported. In this mode, you may supply the "?stale" or "?consistent" +query parameters with the request to control consistency. To support bounding the +acceptable staleness of data, read-only transaction responses provide the `X-Consul-LastContact` +header containing the time in milliseconds that a server was last contacted by the leader node. +The `X-Consul-KnownLeader` header also indicates if there is a known leader. These +won't be present if the transaction contains any write operations, and any consistency +query parameters will be ignored, since writes are always managed by the leader via +the Raft consensus protocol. + The body of the request should be a list of operations to perform inside the atomic transaction, which looks like this: