Merge pull request #2028 from hashicorp/f-atomic-kv

Adds support for atomic transactions spanning multiple KV entries.
This commit is contained in:
James Phillips 2016-05-15 13:46:05 -07:00
commit ffcba3df58
26 changed files with 5280 additions and 2121 deletions

156
api/kv.go
View File

@ -23,6 +23,43 @@ type KVPair struct {
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair
// KVOp constants give possible operations available in a KVTxn.
type KVOp string
const (
KVSet KVOp = "set"
KVDelete = "delete"
KVDeleteCAS = "delete-cas"
KVDeleteTree = "delete-tree"
KVCAS = "cas"
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVGetTree = "get-tree"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)
// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb string
Key string
Value []byte
Flags uint64
Index uint64
Session string
}
// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp
// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}
// KV is used to manipulate the K/V API
type KV struct {
c *Client
@ -238,3 +275,122 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}
// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KV *KVTxnOp
}
// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp
// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
}
// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult
// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}
// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError
// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setQueryOptions(q)
// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvOp := range txn {
ops = append(ops, &TxnOp{KV: kvOp})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}
// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}

View File

@ -3,6 +3,7 @@ package api
import (
"bytes"
"path"
"strings"
"testing"
"time"
)
@ -445,3 +446,120 @@ func TestClient_AcquireRelease(t *testing.T) {
t.Fatalf("unexpected value: %#v", meta)
}
}
func TestClient_Txn(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
kv := c.KV()
// Make a session.
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
// Acquire and get the key via a transaction, but don't supply a valid
// session.
key := testKey()
value := []byte("test")
txn := KVTxnOps{
&KVTxnOp{
Verb: KVLock,
Key: key,
Value: value,
},
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err := kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if ok {
t.Fatalf("transaction should have failed")
}
if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 {
t.Fatalf("bad: %v", ret)
}
if ret.Errors[0].OpIndex != 0 ||
!strings.Contains(ret.Errors[0].What, "missing session") ||
!strings.Contains(ret.Errors[1].What, "doesn't exist") {
t.Fatalf("bad: %v", ret.Errors[0])
}
// Now poke in a real session and try again.
txn[0].Session = id
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 2 {
t.Fatalf("bad: %v", ret)
}
for i, result := range ret.Results {
var expected []byte
if i == 1 {
expected = value
}
if result.Key != key ||
!bytes.Equal(result.Value, expected) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}
// Run a read-only transaction.
txn = KVTxnOps{
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 1 {
t.Fatalf("bad: %v", ret)
}
for _, result := range ret.Results {
if result.Key != key ||
!bytes.Equal(result.Value, value) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}
// Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}

View File

@ -269,6 +269,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.mux.HandleFunc("/v1/txn", s.wrap(s.Txn))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
@ -342,21 +344,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return
}
prettyPrint := false
if _, ok := req.URL.Query()["pretty"]; ok {
prettyPrint = true
}
// Write out the JSON object
if obj != nil {
var buf []byte
if prettyPrint {
buf, err = json.MarshalIndent(obj, "", " ")
} else {
buf, err = json.Marshal(obj)
}
buf, err = s.marshalJSON(req, obj)
if err != nil {
goto HAS_ERR
}
resp.Header().Set("Content-Type", "application/json")
resp.Write(buf)
}
@ -364,6 +358,25 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return f
}
// marshalJSON marshals the object into JSON, respecting the user's pretty-ness
// configuration.
func (s *HTTPServer) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) {
if _, ok := req.URL.Query()["pretty"]; ok {
buf, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return nil, err
}
buf = append(buf, "\n"...)
return buf, nil
}
buf, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return buf, err
}
// Returns true if the UI is enabled.
func (s *HTTPServer) IsUIEnabled() bool {
return s.uiDir != "" || s.agent.config.EnableUi

View File

@ -328,6 +328,7 @@ func testPrettyPrint(pretty string, t *testing.T) {
srv.wrap(handler)(resp, req)
expected, _ := json.MarshalIndent(r, "", " ")
expected = append(expected, "\n"...)
actual, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("err: %s", err)

View File

@ -0,0 +1,227 @@
package agent
import (
"encoding/base64"
"fmt"
"net/http"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs"
)
const (
// maxTxnOps is used to set an upper limit on the number of operations
// inside a transaction. If there are more operations than this, then the
// client is likely abusing transactions.
maxTxnOps = 64
)
// decodeValue decodes the value member of the given operation.
func decodeValue(rawKV interface{}) error {
rawMap, ok := rawKV.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw KV type: %T", rawKV)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "value":
// Leave the byte slice nil if we have a nil
// value.
if v == nil {
return nil
}
// Otherwise, base64 decode it.
s, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected value type: %T", v)
}
decoded, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return fmt.Errorf("failed to decode value: %v", err)
}
rawMap[k] = decoded
return nil
}
}
return nil
}
// fixupKVOp looks for non-nil KV operations and passes them on for
// value conversion.
func fixupKVOp(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "kv":
if v == nil {
return nil
}
return decodeValue(v)
}
}
return nil
}
// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops,
// replacing them with byte arrays.
func fixupKVOps(raw interface{}) error {
rawSlice, ok := raw.([]interface{})
if !ok {
return fmt.Errorf("unexpected raw type: %t", raw)
}
for _, rawOp := range rawSlice {
if err := fixupKVOp(rawOp); err != nil {
return err
}
}
return nil
}
// convertOps takes the incoming body in API format and converts it to the
// internal RPC format. This returns a count of the number of write ops, and
// a boolean, that if false means an error response has been generated and
// processing should stop.
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var ops api.TxnOps
if err := decodeBody(req, &ops, fixupKVOps); err != nil {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, 0, false
}
// Enforce a reasonable upper limit on the number of operations in a
// transaction in order to curb abuse.
if size := len(ops); size > maxTxnOps {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
resp.Write([]byte(fmt.Sprintf("Transaction contains too many operations (%d > %d)",
size, maxTxnOps)))
return nil, 0, false
}
// Convert the KV API format into the RPC format. Note that fixupKVOps
// above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over.
var opsRPC structs.TxnOps
var writes int
var netKVSize int
for _, in := range ops {
if in.KV != nil {
if size := len(in.KV.Value); size > maxKVSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
resp.Write([]byte(fmt.Sprintf("Value for key %q is too large (%d > %d bytes)",
in.KV.Key, size, maxKVSize)))
return nil, 0, false
} else {
netKVSize += size
}
verb := structs.KVSOp(in.KV.Verb)
if verb.IsWrite() {
writes += 1
}
out := &structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: verb,
DirEnt: structs.DirEntry{
Key: in.KV.Key,
Value: in.KV.Value,
Flags: in.KV.Flags,
Session: in.KV.Session,
RaftIndex: structs.RaftIndex{
ModifyIndex: in.KV.Index,
},
},
},
}
opsRPC = append(opsRPC, out)
}
}
// Enforce an overall size limit to help prevent abuse.
if netKVSize > maxKVSize {
resp.WriteHeader(http.StatusRequestEntityTooLarge)
resp.Write([]byte(fmt.Sprintf("Cumulative size of key data is too large (%d > %d bytes)",
netKVSize, maxKVSize)))
return nil, 0, false
}
return opsRPC, writes, true
}
// Txn handles requests to apply multiple operations in a single, atomic
// transaction. A transaction consisting of only read operations will be fast-
// pathed to an endpoint that supports consistency modes (but not blocking),
// and everything else will be routed through Raft like a normal write.
func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
// Convert the ops from the API format to the internal format.
ops, writes, ok := s.convertOps(resp, req)
if !ok {
return nil, nil
}
// Fast-path a transaction with only writes to the read-only endpoint,
// which bypasses Raft, and allows for staleness.
conflict := false
var ret interface{}
if writes == 0 {
args := structs.TxnReadRequest{Ops: ops}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var reply structs.TxnReadResponse
if err := s.agent.RPC("Txn.Read", &args, &reply); err != nil {
return nil, err
}
// Since we don't do blocking, we only add the relevant headers
// for metadata.
setLastContact(resp, reply.LastContact)
setKnownLeader(resp, reply.KnownLeader)
ret, conflict = reply, len(reply.Errors) > 0
} else {
args := structs.TxnRequest{Ops: ops}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
var reply structs.TxnResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err
}
ret, conflict = reply, len(reply.Errors) > 0
}
// If there was a conflict return the response object but set a special
// status code.
if conflict {
var buf []byte
var err error
buf, err = s.marshalJSON(req, ret)
if err != nil {
return nil, err
}
resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(http.StatusConflict)
resp.Write(buf)
return nil, nil
}
// Otherwise, return the results of the successful transaction.
return ret, nil
}

View File

@ -0,0 +1,434 @@
package agent
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{"))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 400 {
t.Fatalf("expected 400, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("Failed to parse")) {
t.Fatalf("expected conflicting args error")
}
})
}
func TestTxnEndpoint_Bad_Method(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{}"))
req, err := http.NewRequest("GET", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("expected 405, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "set",
"Key": "key",
"Value": %q
}
}
]
`, strings.Repeat("bad", 2*maxKVSize))))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 413 {
t.Fatalf("expected 413, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_Bad_Size_Net(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
value := strings.Repeat("X", maxKVSize/2)
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "set",
"Key": "key1",
"Value": %q
}
},
{
"KV": {
"Verb": "set",
"Key": "key1",
"Value": %q
}
},
{
"KV": {
"Verb": "set",
"Key": "key1",
"Value": %q
}
}
]
`, value, value, value)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 413 {
t.Fatalf("expected 413, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_Bad_Size_Ops(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
%s
{
"KV": {
"Verb": "set",
"Key": "key",
"Value": ""
}
}
]
`, strings.Repeat(`{ "KV": { "Verb": "get", "Key": "key" } },`, 2*maxTxnOps))))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 413 {
t.Fatalf("expected 413, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_KV_Actions(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Make sure all incoming fields get converted properly to the internal
// RPC format.
var index uint64
id := makeTestSession(t, srv)
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Flags": 23,
"Session": %q
}
},
{
"KV": {
"Verb": "get",
"Key": "key"
}
}
]
`, id)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
txnResp, ok := obj.(structs.TxnResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", txnResp)
}
index = txnResp.Results[0].KV.ModifyIndex
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: nil,
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
}
// Do a read-only transaction that should get routed to the
// fast-path endpoint.
{
buf := bytes.NewBuffer([]byte(`
[
{
"KV": {
"Verb": "get",
"Key": "key"
}
},
{
"KV": {
"Verb": "get-tree",
"Key": "key"
}
}
]
`))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
header := resp.Header().Get("X-Consul-KnownLeader")
if header != "true" {
t.Fatalf("bad: %v", header)
}
header = resp.Header().Get("X-Consul-LastContact")
if header != "0" {
t.Fatalf("bad: %v", header)
}
txnResp, ok := obj.(structs.TxnReadResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
}
// Now that we have an index we can do a CAS to make sure the
// index field gets translated to the RPC format.
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"KV": {
"Verb": "cas",
"Key": "key",
"Value": "Z29vZGJ5ZSB3b3JsZA==",
"Index": %d
}
},
{
"KV": {
"Verb": "get",
"Key": "key"
}
}
]
`, index)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
txnResp, ok := obj.(structs.TxnResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", txnResp)
}
modIndex := txnResp.Results[0].KV.ModifyIndex
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: nil,
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "key",
Value: []byte("goodbye world"),
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
},
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
}
})
// Verify an error inside a transaction.
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(`
[
{
"KV": {
"Verb": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Session": "nope"
}
},
{
"KV": {
"Verb": "get",
"Key": "key"
}
}
]
`))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 409 {
t.Fatalf("expected 409, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) {
t.Fatalf("bad: %s", resp.Body.String())
}
})
}

View File

@ -50,6 +50,35 @@ func FilterKeys(acl acl.ACL, keys []string) []string {
return keys[:FilterEntries(&kf)]
}
type txnResultsFilter struct {
acl acl.ACL
results structs.TxnResults
}
func (t *txnResultsFilter) Len() int {
return len(t.results)
}
func (t *txnResultsFilter) Filter(i int) bool {
result := t.results[i]
if result.KV != nil {
return !t.acl.KeyRead(result.KV.Key)
} else {
return false
}
}
func (t *txnResultsFilter) Move(dst, src, span int) {
copy(t.results[dst:dst+span], t.results[src:src+span])
}
// FilterTxnResults is used to filter a list of transaction results by
// applying an ACL policy.
func FilterTxnResults(acl acl.ACL, results structs.TxnResults) structs.TxnResults {
rf := txnResultsFilter{acl: acl, results: results}
return results[:FilterEntries(&rf)]
}
// Filter interface is used with FilterEntries to do an
// in-place filter of a slice.
type Filter interface {

View File

@ -8,7 +8,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
)
func TestFilterDirEnt(t *testing.T) {
func TestFilter_DirEnt(t *testing.T) {
policy, _ := acl.Parse(testFilterRules)
aclR, _ := acl.New(acl.DenyAll(), policy)
@ -49,7 +49,7 @@ func TestFilterDirEnt(t *testing.T) {
}
}
func TestKeys(t *testing.T) {
func TestFilter_Keys(t *testing.T) {
policy, _ := acl.Parse(testFilterRules)
aclR, _ := acl.New(acl.DenyAll(), policy)
@ -80,6 +80,55 @@ func TestKeys(t *testing.T) {
}
}
func TestFilter_TxnResults(t *testing.T) {
policy, _ := acl.Parse(testFilterRules)
aclR, _ := acl.New(acl.DenyAll(), policy)
type tcase struct {
in []string
out []string
}
cases := []tcase{
tcase{
in: []string{"foo/test", "foo/priv/nope", "foo/other", "zoo"},
out: []string{"foo/test", "foo/other"},
},
tcase{
in: []string{"abe", "lincoln"},
out: nil,
},
tcase{
in: []string{"abe", "foo/1", "foo/2", "foo/3", "nope"},
out: []string{"foo/1", "foo/2", "foo/3"},
},
}
for _, tc := range cases {
results := structs.TxnResults{}
for _, in := range tc.in {
results = append(results, &structs.TxnResult{KV: &structs.DirEntry{Key: in}})
}
results = FilterTxnResults(aclR, results)
var outL []string
for _, r := range results {
outL = append(outL, r.KV.Key)
}
if !reflect.DeepEqual(outL, tc.out) {
t.Fatalf("bad: %#v %#v", outL, tc.out)
}
}
// Run a non-KV result.
results := structs.TxnResults{}
results = append(results, &structs.TxnResult{})
results = FilterTxnResults(aclR, results)
if len(results) != 1 {
t.Fatalf("should not have filtered non-KV result")
}
}
var testFilterRules = `
key "" {
policy = "deny"

View File

@ -93,6 +93,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
case structs.PreparedQueryRequestType:
return c.applyPreparedQueryOperation(buf[1:], log.Index)
case structs.TxnRequestType:
return c.applyTxn(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -286,6 +288,16 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
}
}
func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
results, errors := c.state.TxnRW(index, req.Ops)
return structs.TxnResponse{results, errors}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))

View File

@ -1241,6 +1241,47 @@ func TestFSM_TombstoneReap(t *testing.T) {
}
}
func TestFSM_Txn(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set a key using a transaction.
req := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
},
},
},
}
buf, err := structs.Encode(structs.TxnRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(structs.TxnResponse); !ok {
t.Fatalf("bad response type: %T", resp)
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
}
func TestFSM_IgnoreUnknown(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
)
@ -13,54 +14,82 @@ type KVS struct {
srv *Server
}
// Apply is used to apply a KVS request to the data store. This should
// only be used for operations that modify the data
// preApply does all the verification of a KVS update that is performed BEFORE
// we submit as a Raft log entry. This includes enforcing the lock delay which
// must only be done on the leader.
func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry) (bool, error) {
// Verify the entry.
if dirEnt.Key == "" && op != structs.KVSDeleteTree {
return false, fmt.Errorf("Must provide key")
}
// Apply the ACL policy if any.
if acl != nil {
switch op {
case structs.KVSDeleteTree:
if !acl.KeyWritePrefix(dirEnt.Key) {
return false, permissionDeniedErr
}
case structs.KVSGet, structs.KVSGetTree:
// Filtering for GETs is done on the output side.
case structs.KVSCheckSession, structs.KVSCheckIndex:
// These could reveal information based on the outcome
// of the transaction, and they operate on individual
// keys so we check them here.
if !acl.KeyRead(dirEnt.Key) {
return false, permissionDeniedErr
}
default:
if !acl.KeyWrite(dirEnt.Key) {
return false, permissionDeniedErr
}
}
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer would expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if op == structs.KVSLock {
state := srv.fsm.State()
expires := state.KVSLockDelay(dirEnt.Key)
if expires.After(time.Now()) {
srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
dirEnt.Key, expires)
return false, nil
}
}
return true, nil
}
// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
// Verify the args
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
return fmt.Errorf("Must provide key")
}
// Apply the ACL policy if any
// Perform the pre-apply checks.
acl, err := k.srv.resolveToken(args.Token)
if err != nil {
return err
} else if acl != nil {
switch args.Op {
case structs.KVSDeleteTree:
if !acl.KeyWritePrefix(args.DirEnt.Key) {
return permissionDeniedErr
}
default:
if !acl.KeyWrite(args.DirEnt.Key) {
return permissionDeniedErr
}
}
}
ok, err := kvsPreApply(k.srv, acl, args.Op, &args.DirEnt)
if err != nil {
return err
}
if !ok {
*reply = false
return nil
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if args.Op == structs.KVSLock {
state := k.srv.fsm.State()
expires := state.KVSLockDelay(args.DirEnt.Key)
if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
args.DirEnt.Key, expires)
*reply = false
return nil
}
}
// Apply the update
// Apply the update.
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
@ -70,14 +99,14 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return respErr
}
// Check if the return type is a bool
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// Get is used to lookup a single key
// Get is used to lookup a single key.
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
return err
@ -119,7 +148,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
})
}
// List is used to list all keys with a given prefix
// List is used to list all keys with a given prefix.
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.List", args, args, reply); done {
return err
@ -162,7 +191,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
})
}
// ListKeys is used to list all keys with a given prefix to a separator
// ListKeys is used to list all keys with a given prefix to a separator.
func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error {
if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done {
return err

View File

@ -627,7 +627,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
@ -652,13 +652,13 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request
// Make a lock request.
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
@ -675,10 +675,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("should not acquire")
}
// Wait for lock-delay
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire
// Should acquire.
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -165,6 +165,7 @@ type endpoints struct {
ACL *ACL
Coordinate *Coordinate
PreparedQuery *PreparedQuery
Txn *Txn
}
// NewServer is used to construct a new Consul server from the
@ -438,6 +439,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.PreparedQuery = &PreparedQuery{s}
s.endpoints.Txn = &Txn{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
@ -449,6 +451,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.PreparedQuery)
s.rpcServer.Register(s.endpoints.Txn)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

624
consul/state/kvs.go Normal file
View File

@ -0,0 +1,624 @@
package state
import (
"fmt"
"strings"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// KVs is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil {
return nil, err
}
return iter, nil
}
// Tombstones is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
return s.store.kvsGraveyard.DumpTxn(s.tx)
}
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
if err := s.tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// We have a single top-level KVS watch trigger instead of doing
// tons of prefix watches.
return nil
}
// Tombstone is used when restoring from a snapshot. For general inserts, use
// Graveyard.InsertTxn.
func (s *StateRestore) Tombstone(stone *Tombstone) error {
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
return fmt.Errorf("failed restoring tombstone: %s", err)
}
return nil
}
// ReapTombstones is used to delete all the tombstones with an index
// less than or equal to the given index. This is used to prevent
// unbounded storage growth of the tombstones.
func (s *StateStore) ReapTombstones(index uint64) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.kvsGraveyard.ReapTxn(tx, index); err != nil {
return fmt.Errorf("failed to reap kvs tombstones: %s", err)
}
tx.Commit()
return nil
}
// KVSSet is used to store a key/value pair.
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual set.
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsSetTxn is used to insert or update a key/value pair in the state
// store. It is the inner method used and handles only the actual storage.
// If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is.
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Set the indexes.
if existing != nil {
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
} else {
entry.CreateIndex = idx
}
entry.ModifyIndex = idx
// Preserve the existing session unless told otherwise. The "existing"
// session for a new entry is "no session".
if !updateSession {
if existing != nil {
entry.Session = existing.(*structs.DirEntry).Session
} else {
entry.Session = ""
}
}
// Store the kv pair in the state store and update the index.
if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
return nil
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsGetTxn(tx, key)
}
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, key string) (uint64, *structs.DirEntry, error) {
// Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Retrieve the key.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry != nil {
return idx, entry.(*structs.DirEntry), nil
}
return idx, nil, nil
}
// KVSList is used to list out all keys under a given prefix. If the
// prefix is left empty, all keys in the KVS will be returned. The returned
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsListTxn(tx, prefix)
}
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *StateStore) kvsListTxn(tx *memdb.Txn, prefix string) (uint64, structs.DirEntries, error) {
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Query the prefix and list the available keys
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
// Gather all of the keys found in the store
var ents structs.DirEntries
var lindex uint64
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
if gindex > lindex {
lindex = gindex
}
} else {
lindex = idx
}
// Use the sub index if it was set and there are entries, otherwise use
// the full table index from above.
if lindex != 0 {
idx = lindex
}
return idx, ents, nil
}
// KVSListKeys is used to query the KV store for keys matching the given prefix.
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Fetch keys using the specified prefix
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
prefixLen := len(prefix)
sepLen := len(sep)
var keys []string
var lindex uint64
var last string
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
// Accumulate the high index
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
// Always accumulate if no separator provided
if sepLen == 0 {
keys = append(keys, e.Key)
continue
}
// Parse and de-duplicate the returned keys based on the
// key separator, if provided.
after := e.Key[prefixLen:]
sepIdx := strings.Index(after, sep)
if sepIdx > -1 {
key := e.Key[:prefixLen+sepIdx+sepLen]
if key != last {
keys = append(keys, key)
last = key
}
} else {
keys = append(keys, e.Key)
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
if gindex > lindex {
lindex = gindex
}
} else {
lindex = idx
}
// Use the sub index if it was set and there are entries, otherwise use
// the full table index from above.
if lindex != 0 {
idx = lindex
}
return idx, keys, nil
}
// KVSDelete is used to perform a shallow delete on a single key in the
// the state store.
func (s *StateStore) KVSDelete(idx uint64, key string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual delete
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
// Look up the entry in the state store.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil
}
// Create a tombstone.
if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
// Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(key, false) })
return nil
}
// KVSDeleteCAS is used to try doing a KV delete operation with a given
// raft index. If the CAS index specified is not equal to the last
// observed index for the given key, then the call is a noop, otherwise
// a normal KV delete is invoked.
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
set, err := s.kvsDeleteCASTxn(tx, idx, cidx, key)
if !set || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction.
func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
// Retrieve the existing kvs entry, if any exists.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := entry.(*structs.DirEntry)
if !ok || e.ModifyIndex != cidx {
return entry == nil, nil
}
// Call the actual deletion if the above passed.
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
return false, err
}
return true, nil
}
// KVSSetCAS is used to do a check-and-set operation on a KV entry. The
// ModifyIndex in the provided entry is used to determine if we should
// write the entry to the state store or bail. Returns a bool indicating
// if a write happened and any error.
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
set, err := s.kvsSetCASTxn(tx, idx, entry)
if !set || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
// transaction.
func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Check if the we should do the set. A ModifyIndex of 0 means that
// we are doing a set-if-not-exists.
if entry.ModifyIndex == 0 && existing != nil {
return false, nil
}
if entry.ModifyIndex != 0 && existing == nil {
return false, nil
}
e, ok := existing.(*structs.DirEntry)
if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex {
return false, nil
}
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
return false, err
}
return true, nil
}
// KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op.
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.kvsDeleteTreeTxn(tx, idx, prefix); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
// Get an iterator over all of the keys with the given prefix.
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Go over all of the keys and remove them. We call the delete
// directly so that we only update the index once. We also add
// tombstones as we go.
var modified bool
var objs []interface{}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
objs = append(objs, entry)
modified = true
}
// Do the actual deletes in a separate loop so we don't trash the
// iterator as we go.
for _, obj := range objs {
if err := tx.Delete("kvs", obj); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
}
// Update the index
if modified {
tx.Defer(func() { s.kvsWatch.Notify(prefix, true) })
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
return nil
}
// KVSLockDelay returns the expiration time for any lock delay associated with
// the given key.
func (s *StateStore) KVSLockDelay(key string) time.Time {
return s.lockDelay.GetExpiration(key)
}
// KVSLock is similar to KVSSet but only performs the set if the lock can be
// acquired.
func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
locked, err := s.kvsLockTxn(tx, idx, entry)
if !locked || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsLockTxn is the inner method that does a lock inside an existing
// transaction.
func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
}
// Verify that the session exists.
sess, err := tx.First("sessions", "id", entry.Session)
if err != nil {
return false, fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return false, fmt.Errorf("invalid session %#v", entry.Session)
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Set up the entry, using the existing entry if present.
if existing != nil {
e := existing.(*structs.DirEntry)
if e.Session == entry.Session {
// We already hold this lock, good to go.
entry.CreateIndex = e.CreateIndex
entry.LockIndex = e.LockIndex
} else if e.Session != "" {
// Bail out, someone else holds this lock.
return false, nil
} else {
// Set up a new lock with this session.
entry.CreateIndex = e.CreateIndex
entry.LockIndex = e.LockIndex + 1
}
} else {
entry.CreateIndex = idx
entry.LockIndex = 1
}
entry.ModifyIndex = idx
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err
}
return true, nil
}
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
// unlocked (the key must already exist and be locked).
func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
unlocked, err := s.kvsUnlockTxn(tx, idx, entry)
if !unlocked || err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// kvsUnlockTxn is the inner method that does an unlock inside an existing
// transaction.
func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Bail if there's no existing key.
if existing == nil {
return false, nil
}
// Make sure the given session is the lock holder.
e := existing.(*structs.DirEntry)
if e.Session != entry.Session {
return false, nil
}
// Clear the lock and update the entry.
entry.Session = ""
entry.LockIndex = e.LockIndex
entry.CreateIndex = e.CreateIndex
entry.ModifyIndex = idx
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err
}
return true, nil
}
// kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key.
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil, fmt.Errorf("failed to check session, key %q doesn't exist", key)
}
e := entry.(*structs.DirEntry)
if e.Session != session {
return nil, fmt.Errorf("failed session check for key %q, current session %q != %q", key, e.Session, session)
}
return e, nil
}
// kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key.
func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil, fmt.Errorf("failed to check index, key %q doesn't exist", key)
}
e := entry.(*structs.DirEntry)
if e.ModifyIndex != cidx {
return nil, fmt.Errorf("failed index check for key %q, current modify index %d != %d", key, e.ModifyIndex, cidx)
}
return e, nil
}

1540
consul/state/kvs_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -169,20 +169,6 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
return iter, nil
}
// KVs is used to pull the full list of KVS entries for use during snapshots.
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("kvs", "id_prefix")
if err != nil {
return nil, err
}
return iter, nil
}
// Tombstones is used to pull all the tombstones from the graveyard.
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
return s.store.kvsGraveyard.DumpTxn(s.tx)
}
// Sessions is used to pull the full list of sessions for use during snapshots.
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
@ -246,30 +232,6 @@ func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) er
return nil
}
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
if err := s.tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// We have a single top-level KVS watch trigger instead of doing
// tons of prefix watches.
return nil
}
// Tombstone is used when restoring from a snapshot. For general inserts, use
// Graveyard.InsertTxn.
func (s *StateRestore) Tombstone(stone *Tombstone) error {
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
return fmt.Errorf("failed restoring tombstone: %s", err)
}
return nil
}
// Session is used when restoring from a snapshot. For general inserts, use
// SessionCreate.
func (s *StateRestore) Session(sess *structs.Session) error {
@ -377,21 +339,6 @@ func indexUpdateMaxTxn(tx *memdb.Txn, idx uint64, table string) error {
return nil
}
// ReapTombstones is used to delete all the tombstones with an index
// less than or equal to the given index. This is used to prevent
// unbounded storage growth of the tombstones.
func (s *StateStore) ReapTombstones(index uint64) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.kvsGraveyard.ReapTxn(tx, index); err != nil {
return fmt.Errorf("failed to reap kvs tombstones: %s", err)
}
tx.Commit()
return nil
}
// getWatchTables returns the list of tables that should be watched and used for
// max index calculations for the given query method. This is used for all
// methods except for KVS. This will panic if the method is unknown.
@ -1408,468 +1355,6 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
return idx, results, nil
}
// KVSSet is used to store a key/value pair.
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual set.
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsSetTxn is used to insert or update a key/value pair in the state
// store. It is the inner method used and handles only the actual storage.
// If updateSession is true, then the incoming entry will set the new
// session (should be validated before calling this). Otherwise, we will keep
// whatever the existing session is.
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Set the indexes.
if existing != nil {
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex
} else {
entry.CreateIndex = idx
}
entry.ModifyIndex = idx
// Preserve the existing session unless told otherwise. The "existing"
// session for a new entry is "no session".
if !updateSession {
if existing != nil {
entry.Session = existing.(*structs.DirEntry).Session
} else {
entry.Session = ""
}
}
// Store the kv pair in the state store and update the index.
if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(entry.Key, false) })
return nil
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Retrieve the key.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry != nil {
return idx, entry.(*structs.DirEntry), nil
}
return idx, nil, nil
}
// KVSList is used to list out all keys under a given prefix. If the
// prefix is left empty, all keys in the KVS will be returned. The returned
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Query the prefix and list the available keys
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
// Gather all of the keys found in the store
var ents structs.DirEntries
var lindex uint64
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
if gindex > lindex {
lindex = gindex
}
} else {
lindex = idx
}
// Use the sub index if it was set and there are entries, otherwise use
// the full table index from above.
if lindex != 0 {
idx = lindex
}
return idx, ents, nil
}
// KVSListKeys is used to query the KV store for keys matching the given prefix.
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (s *StateStore) KVSListKeys(prefix, sep string) (uint64, []string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Fetch keys using the specified prefix
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
prefixLen := len(prefix)
sepLen := len(sep)
var keys []string
var lindex uint64
var last string
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
// Accumulate the high index
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
// Always accumulate if no separator provided
if sepLen == 0 {
keys = append(keys, e.Key)
continue
}
// Parse and de-duplicate the returned keys based on the
// key separator, if provided.
after := e.Key[prefixLen:]
sepIdx := strings.Index(after, sep)
if sepIdx > -1 {
key := e.Key[:prefixLen+sepIdx+sepLen]
if key != last {
keys = append(keys, key)
last = key
}
} else {
keys = append(keys, e.Key)
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
if gindex > lindex {
lindex = gindex
}
} else {
lindex = idx
}
// Use the sub index if it was set and there are entries, otherwise use
// the full table index from above.
if lindex != 0 {
idx = lindex
}
return idx, keys, nil
}
// KVSDelete is used to perform a shallow delete on a single key in the
// the state store.
func (s *StateStore) KVSDelete(idx uint64, key string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual delete
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
return err
}
tx.Commit()
return nil
}
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
// Look up the entry in the state store.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil
}
// Create a tombstone.
if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
// Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.kvsWatch.Notify(key, false) })
return nil
}
// KVSDeleteCAS is used to try doing a KV delete operation with a given
// raft index. If the CAS index specified is not equal to the last
// observed index for the given key, then the call is a noop, otherwise
// a normal KV delete is invoked.
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Retrieve the existing kvs entry, if any exists.
entry, err := tx.First("kvs", "id", key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// If the existing index does not match the provided CAS
// index arg, then we shouldn't update anything and can safely
// return early here.
e, ok := entry.(*structs.DirEntry)
if !ok || e.ModifyIndex != cidx {
return entry == nil, nil
}
// Call the actual deletion if the above passed.
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// KVSSetCAS is used to do a check-and-set operation on a KV entry. The
// ModifyIndex in the provided entry is used to determine if we should
// write the entry to the state store or bail. Returns a bool indicating
// if a write happened and any error.
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Check if the we should do the set. A ModifyIndex of 0 means that
// we are doing a set-if-not-exists.
if entry.ModifyIndex == 0 && existing != nil {
return false, nil
}
if entry.ModifyIndex != 0 && existing == nil {
return false, nil
}
e, ok := existing.(*structs.DirEntry)
if ok && entry.ModifyIndex != 0 && entry.ModifyIndex != e.ModifyIndex {
return false, nil
}
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, false); err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op.
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Get an iterator over all of the keys with the given prefix.
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
// Go over all of the keys and remove them. We call the delete
// directly so that we only update the index once. We also add
// tombstones as we go.
var modified bool
var objs []interface{}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
if err := s.kvsGraveyard.InsertTxn(tx, e.Key, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
objs = append(objs, entry)
modified = true
}
// Do the actual deletes in a separate loop so we don't trash the
// iterator as we go.
for _, obj := range objs {
if err := tx.Delete("kvs", obj); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
}
// Update the index
if modified {
tx.Defer(func() { s.kvsWatch.Notify(prefix, true) })
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
tx.Commit()
return nil
}
// KVSLockDelay returns the expiration time for any lock delay associated with
// the given key.
func (s *StateStore) KVSLockDelay(key string) time.Time {
return s.lockDelay.GetExpiration(key)
}
// KVSLock is similar to KVSSet but only performs the set if the lock can be
// acquired.
func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
}
// Verify that the session exists.
sess, err := tx.First("sessions", "id", entry.Session)
if err != nil {
return false, fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return false, fmt.Errorf("invalid session %#v", entry.Session)
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Set up the entry, using the existing entry if present.
if existing != nil {
e := existing.(*structs.DirEntry)
if e.Session == entry.Session {
// We already hold this lock, good to go.
entry.CreateIndex = e.CreateIndex
entry.LockIndex = e.LockIndex
} else if e.Session != "" {
// Bail out, someone else holds this lock.
return false, nil
} else {
// Set up a new lock with this session.
entry.CreateIndex = e.CreateIndex
entry.LockIndex = e.LockIndex + 1
}
} else {
entry.CreateIndex = idx
entry.LockIndex = 1
}
entry.ModifyIndex = idx
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
// unlocked (the key must already exist and be locked).
func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Verify that a session is present.
if entry.Session == "" {
return false, fmt.Errorf("missing session")
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
// Bail if there's no existing key.
if existing == nil {
return false, nil
}
// Make sure the given session is the lock holder.
e := existing.(*structs.DirEntry)
if e.Session != entry.Session {
return false, nil
}
// Clear the lock and update the entry.
entry.Session = ""
entry.LockIndex = e.LockIndex
entry.CreateIndex = e.CreateIndex
entry.ModifyIndex = idx
// If we made it this far, we should perform the set.
if err := s.kvsSetTxn(tx, idx, entry, true); err != nil {
return false, err
}
tx.Commit()
return true, nil
}
// SessionCreate is used to register a new session in the state store.
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true)

File diff suppressed because it is too large Load Diff

168
consul/state/txn.go Normal file
View File

@ -0,0 +1,168 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
)
// txnKVS handles all KV-related operations.
func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error
switch op.Verb {
case structs.KVSSet:
entry = &op.DirEnt
err = s.kvsSetTxn(tx, idx, entry, false)
case structs.KVSDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
case structs.KVSDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
case structs.KVSCAS:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsSetCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSLock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsLockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
}
case structs.KVSUnlock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsUnlockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
}
case structs.KVSGet:
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
if entry == nil && err == nil {
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
}
case structs.KVSGetTree:
var entries structs.DirEntries
_, entries, err = s.kvsListTxn(tx, op.DirEnt.Key)
if err == nil {
results := make(structs.TxnResults, 0, len(entries))
for _, e := range entries {
result := structs.TxnResult{KV: e}
results = append(results, &result)
}
return results, nil
}
case structs.KVSCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
case structs.KVSCheckIndex:
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
default:
err = fmt.Errorf("unknown KV verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == structs.KVSGet {
result := structs.TxnResult{KV: entry}
return structs.TxnResults{&result}, nil
}
clone := entry.Clone()
clone.Value = nil
result := structs.TxnResult{KV: clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnDispatch runs the given operations inside the state store transaction.
func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
var ret structs.TxnResults
var err error
// Dispatch based on the type of operation.
if op.KV != nil {
ret, err = s.txnKVS(tx, idx, op.KV)
} else {
err = fmt.Errorf("no operation specified")
}
// Accumulate the results.
results = append(results, ret...)
// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.TxnError{i, err.Error()})
}
}
if len(errors) > 0 {
return nil, errors
}
return results, nil
}
// TxnRW tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back. This
// is done in a full write transaction on the state store, so reads and writes
// are possible
func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
results, errors := s.txnDispatch(tx, idx, ops)
if len(errors) > 0 {
return nil, errors
}
tx.Commit()
return results, nil
}
// TxnRO runs the given operations inside a single read transaction in the state
// store. You must verify outside this function that no write operations are
// present, otherwise you'll get an error from the state store.
func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(false)
defer tx.Abort()
results, errors := s.txnDispatch(tx, 0, ops)
if len(errors) > 0 {
return nil, errors
}
return results, nil
}

794
consul/state/txn_test.go Normal file
View File

@ -0,0 +1,794 @@
package state
import (
"reflect"
"strings"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestStateStore_Txn_KVS(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 5, "foo/update", "stale")
// Make a real session.
testRegisterNode(t, s, 6, "node1")
session := testUUID()
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// Set up a transaction that hits every operation.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/zorp",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo/delete",
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 8,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: "",
},
},
},
}
results, errors := s.TxnRW(8, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
Value: []byte("baz"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
Value: []byte("zip"),
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/new",
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
},
}
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i, _ := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
}
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
entries := structs.DirEntries{
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
}
if len(actual) != len(entries) {
t.Fatalf("bad len: %d != %d", len(actual), len(entries))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad %d", i)
}
}
}
func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale")
testRegisterNode(t, s, 3, "node1")
session := testUUID()
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
bogus := testUUID()
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}
if idx != 5 {
t.Fatalf("bad index (%s): %d", desc, idx)
}
// Make sure it looks as expected.
entries := structs.DirEntries{
&structs.DirEntry{
Key: "foo/delete",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.DirEntry{
Key: "foo/lock",
Value: []byte("foo"),
LockIndex: 1,
Session: session,
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
if len(actual) != len(entries) {
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(entries[i]))
}
}
}
verifyStateStore("initial")
// Set up a transaction that fails every operation.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
Session: bogus,
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/lock",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
},
}
results, errors := s.TxnRW(7, ops)
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
if len(results) != 0 {
t.Fatalf("bad len: %d != 0", len(results))
}
verifyStateStore("after")
// Make sure the errors look reasonable.
expected := []string{
"index is stale",
"lock is already held",
"lock isn't held, or is held by another session",
"current session",
`key "nope" doesn't exist`,
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown KV verb",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %d: %v", i, errors[i].Error())
}
}
}
func TestStateStore_Txn_KVS_RO(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/bar/baz",
Session: "",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/bar/zip",
RaftIndex: structs.RaftIndex{
ModifyIndex: 3,
},
},
},
},
}
results, errors := s.TxnRO(ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
// Make sure the response looks as expected.
expected := structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
Value: []byte("baz"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
Value: []byte("zip"),
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/baz",
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "foo/bar/zip",
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
},
},
},
}
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i, _ := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
}
}
}
func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo",
Value: []byte("nope"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/bar/baz",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
},
}
results, errors := s.TxnRO(ops)
if len(results) > 0 {
t.Fatalf("bad: %v", results)
}
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
// Make sure the errors look reasonable (tombstone inserts cause the
// insert errors during the delete operations).
expected := []string{
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
"cannot insert in read-only transaction",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %d: %v", i, errors[i].Error())
}
}
}
func TestStateStore_Txn_Watches(t *testing.T) {
s := testStateStore(t)
// Verify that a basic transaction triggers multiple watches. We call
// the same underlying methods that are called above so this is more
// of a sanity check.
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two"),
},
},
},
}
results, errors := s.TxnRW(15, ops)
if len(results) != len(ops) {
t.Fatalf("bad len: %d != %d", len(results), len(ops))
}
if len(errors) != 0 {
t.Fatalf("bad len: %d != 0", len(errors))
}
})
})
// Verify that a rolled back transaction doesn't trigger any watches.
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one-updated"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two-updated"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "multi/nope",
Value: []byte("nope"),
},
},
},
}
results, errors := s.TxnRW(16, ops)
if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors))
}
if len(results) != 0 {
t.Fatalf("bad len: %d != 0", len(results))
}
})
})
}

View File

@ -36,6 +36,7 @@ const (
TombstoneRequestType
CoordinateBatchUpdateType
PreparedQueryRequestType
TxnRequestType
)
const (
@ -533,8 +534,26 @@ const (
KVSCAS = "cas" // Check-and-set
KVSLock = "lock" // Lock a key
KVSUnlock = "unlock" // Unlock a key
// The following operations are only available inside of atomic
// transactions via the Txn request.
KVSGet = "get" // Read the key during the transaction.
KVSGetTree = "get-tree" // Read all keys with the given prefix during the transaction.
KVSCheckSession = "check-session" // Check the session holds the key.
KVSCheckIndex = "check-index" // Check the modify index of the key.
)
// IsWrite returns true if the given operation alters the state store.
func (op KVSOp) IsWrite() bool {
switch op {
case KVSGet, KVSGetTree, KVSCheckSession, KVSCheckIndex:
return false
default:
return true
}
}
// KVSRequest is used to operate on the Key-Value store
type KVSRequest struct {
Datacenter string

85
consul/structs/txn.go Normal file
View File

@ -0,0 +1,85 @@
package structs
import (
"fmt"
)
// TxnKVOp is used to define a single operation on the KVS inside a
// transaction
type TxnKVOp struct {
Verb KVSOp
DirEnt DirEntry
}
// TxnKVResult is used to define the result of a single operation on the KVS
// inside a transaction.
type TxnKVResult *DirEntry
// TxnOp is used to define a single operation inside a transaction. Only one
// of the types should be filled out per entry.
type TxnOp struct {
KV *TxnKVOp
}
// TxnOps is a list of operations within a transaction.
type TxnOps []*TxnOp
// TxnRequest is used to apply multiple operations to the state store in a
// single transaction
type TxnRequest struct {
Datacenter string
Ops TxnOps
WriteRequest
}
func (r *TxnRequest) RequestDatacenter() string {
return r.Datacenter
}
// TxnReadRequest is used as a fast path for read-only transactions that don't
// modify the state store.
type TxnReadRequest struct {
Datacenter string
Ops TxnOps
QueryOptions
}
func (r *TxnReadRequest) RequestDatacenter() string {
return r.Datacenter
}
// TxnError is used to return information about an error for a specific
// operation.
type TxnError struct {
OpIndex int
What string
}
// Error returns the string representation of an atomic error.
func (e TxnError) Error() string {
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
}
// TxnErrors is a list of TxnError entries.
type TxnErrors []*TxnError
// TxnResult is used to define the result of a given operation inside a
// transaction. Only one of the types should be filled out per entry.
type TxnResult struct {
KV TxnKVResult
}
// TxnResults is a list of TxnResult entries.
type TxnResults []*TxnResult
// TxnResponse is the structure returned by a TxnRequest.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// TxnReadResponse is the structure returned by a TxnReadRequest.
type TxnReadResponse struct {
TxnResponse
QueryMeta
}

113
consul/txn_endpoint.go Normal file
View File

@ -0,0 +1,113 @@
package consul
import (
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
)
// Txn endpoint is used to perform multi-object atomic transactions.
type Txn struct {
srv *Server
}
// preCheck is used to verify the incoming operations before any further
// processing takes place. This checks things like ACLs.
func (t *Txn) preCheck(acl acl.ACL, ops structs.TxnOps) structs.TxnErrors {
var errors structs.TxnErrors
// Perform the pre-apply checks for any KV operations.
for i, op := range ops {
if op.KV != nil {
ok, err := kvsPreApply(t.srv, acl, op.KV.Verb, &op.KV.DirEnt)
if err != nil {
errors = append(errors, &structs.TxnError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key)
errors = append(errors, &structs.TxnError{i, err.Error()})
}
}
}
return errors
}
// Apply is used to apply multiple operations in a single, atomic transaction.
func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
// Run the pre-checks before we send the transaction into Raft.
acl, err := t.srv.resolveToken(args.Token)
if err != nil {
return err
}
reply.Errors = t.preCheck(acl, args.Ops)
if len(reply.Errors) > 0 {
return nil
}
// Apply the update.
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
if err != nil {
t.srv.logger.Printf("[ERR] consul.txn: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Convert the return type. This should be a cheap copy since we are
// just taking the two slices.
if txnResp, ok := resp.(structs.TxnResponse); ok {
if acl != nil {
txnResp.Results = FilterTxnResults(acl, txnResp.Results)
}
*reply = txnResp
} else {
return fmt.Errorf("unexpected return type %T", resp)
}
return nil
}
// Read is used to perform a read-only transaction that doesn't modify the state
// store. This is much more scaleable since it doesn't go through Raft and
// supports staleness, so this should be preferred if you're just performing
// reads.
func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error {
if done, err := t.srv.forward("Txn.Read", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "read"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
t.srv.setQueryMeta(&reply.QueryMeta)
if args.RequireConsistent {
if err := t.srv.consistentRead(); err != nil {
return err
}
}
// Run the pre-checks before we perform the read.
acl, err := t.srv.resolveToken(args.Token)
if err != nil {
return err
}
reply.Errors = t.preCheck(acl, args.Ops)
if len(reply.Errors) > 0 {
return nil
}
// Run the read transaction.
state := t.srv.fsm.State()
reply.Results, reply.Errors = state.TxnRO(args.Ops)
if acl != nil {
reply.Results = FilterTxnResults(acl, reply.Results)
}
return nil
}

518
consul/txn_endpoint_test.go Normal file
View File

@ -0,0 +1,518 @@
package consul
import (
"bytes"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestTxn_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
},
}
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
if d.Flags != 42 ||
!bytes.Equal(d.Value, []byte("test")) {
t.Fatalf("bad: %v", d)
}
// Verify the transaction's return value.
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Flags: 42,
Value: nil,
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "nope",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
},
WriteRequest: structs.WriteRequest{
Token: id,
},
}
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
var expected structs.TxnResponse
for i, op := range arg.Ops {
switch op.KV.Verb {
case structs.KVSGet, structs.KVSGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()})
}
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request via an atomic transaction.
arg := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
},
},
},
}
{
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 0 ||
len(out.Errors) != 1 ||
out.Errors[0].OpIndex != 0 ||
!strings.Contains(out.Errors[0].What, "due to lock delay") {
t.Fatalf("bad: %v", out)
}
}
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire.
{
var out structs.TxnResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 1 ||
len(out.Errors) != 0 ||
out.Results[0].KV.LockIndex != 2 {
t.Fatalf("bad: %v", out)
}
}
}
func TestTxn_Read(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
KV: &structs.DirEntry{
Key: "test",
Value: []byte("hello"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
},
},
},
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Read_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Put in a key to read back.
state := s1.fsm.State()
d := &structs.DirEntry{
Key: "nope",
Value: []byte("hello"),
}
if err := state.KVSSet(1, d); err != nil {
t.Fatalf("err: %v", err)
}
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.TxnReadRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSGetTree,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: structs.KVSCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
},
QueryOptions: structs.QueryOptions{
Token: id,
},
}
var out structs.TxnReadResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
expected := structs.TxnReadResponse{
QueryMeta: structs.QueryMeta{
KnownLeader: true,
},
}
for i, op := range arg.Ops {
switch op.KV.Verb {
case structs.KVSGet, structs.KVSGetTree:
// These get filtered but won't result in an error.
default:
expected.Errors = append(expected.Errors, &structs.TxnError{i, permissionDeniedErr.Error()})
}
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}

View File

@ -16,14 +16,14 @@ Each endpoint manages a different aspect of Consul:
* [acl](http/acl.html) - Access Control Lists
* [agent](http/agent.html) - Consul Agent
* [catalog](http/catalog.html) - Nodes and services
* [coordinate](http/coordinate.html) - Network coordinates
* [catalog](http/catalog.html) - Nodes and Services
* [coordinate](http/coordinate.html) - Network Coordinates
* [event](http/event.html) - User Events
* [health](http/health.html) - Health checks
* [kv](http/kv.html) - Key/Value store
* [health](http/health.html) - Health Checks
* [kv](http/kv.html) - Key/Value Store
* [query](http/query.html) - Prepared Queries
* [session](http/session.html) - Sessions
* [status](http/status.html) - Consul system status
* [status](http/status.html) - Consul System Status
Each of these is documented in detail at the links above. Consul also has a number
of internal APIs which are purposely undocumented and subject to change.

View File

@ -1,22 +1,29 @@
---
layout: "docs"
page_title: "Key/Value store (HTTP)"
page_title: "Key/Value Store (HTTP)"
sidebar_current: "docs-agent-http-kv"
description: >
The KV endpoint is used to access Consul's simple key/value store, useful for storing
The KV endpoints are used to access Consul's simple key/value store, useful for storing
service configuration or other metadata.
---
# Key/Value HTTP Endpoint
# Key/Value Store Endpoints
The KV endpoint is used to access Consul's simple key/value store, useful for storing
The KV endpoints are used to access Consul's simple key/value store, useful for storing
service configuration or other metadata.
It has only a single endpoint:
The following endpoints are supported:
/v1/kv/<key>
* [`/v1/kv/<key>`](#single): Manages updates of individual keys, deletes of individual
keys or key prefixes, and fetches of individual keys or key prefixes
* [`/v1/txn`](#txn): Manages updates or fetches of multiple keys inside a single,
atomic transaction
The `GET`, `PUT` and `DELETE` methods are all supported.
### <a name="single"></a> /v1/kv/&lt;key&gt;
This endpoint manages updates of individual keys, deletes of individual keys or key
prefixes, and fetches of individual keys or key prefixes. The `GET`, `PUT` and
`DELETE` methods are all supported.
By default, the datacenter of the agent is queried; however, the dc can be provided
using the "?dc=" query parameter. It is important to note that each datacenter has
@ -24,9 +31,9 @@ its own KV store, and there is no built-in replication between datacenters. If y
are interested in replication between datacenters, look at the
[Consul Replicate project](https://github.com/hashicorp/consul-replicate).
The KV endpoint supports the use of ACL tokens.
The KV endpoint supports the use of ACL tokens using the "?token=" query parameter.
### GET Method
#### GET Method
When using the `GET` method, Consul will return the specified key.
If the "?recurse" query parameter is provided, it will return
@ -67,7 +74,7 @@ the lock.
`Key` is simply the full path of the entry.
`Flags` are an opaque unsigned integer that can be attached to each entry. Clients
`Flags` is an opaque unsigned integer that can be attached to each entry. Clients
can choose to use this however makes sense for their application.
`Value` is a Base64-encoded blob of data. Note that values cannot be larger than
@ -96,7 +103,7 @@ encoding.
If no entries are found, a 404 code is returned.
### PUT method
#### PUT method
When using the `PUT` method, Consul expects the request body to be the
value corresponding to the key. There are a number of query parameters that can
@ -128,7 +135,7 @@ be used with a PUT request:
The return value is either `true` or `false`. If `false` is returned,
the update has not taken place.
### DELETE method
#### DELETE method
The `DELETE` method can be used to delete a single key or all keys sharing
a prefix. There are a few query parameters that can be used with a
@ -142,3 +149,227 @@ DELETE request:
synchronization primitives. Unlike `PUT`, the index must be greater than 0
for Consul to take any action: a 0 index will not delete the key. If the index
is non-zero, the key is only deleted if the index matches the `ModifyIndex` of that key.
### <a name="txn"></a> /v1/txn
Available in Consul 0.7 and later, this endpoint manages updates or fetches of
multiple keys inside a single, atomic transaction. Only the `PUT` method is supported.
By default, the datacenter of the agent receives the transaction; however, the dc
can be provided using the "?dc=" query parameter. It is important to note that each
datacenter has its own KV store, and there is no built-in replication between
datacenters. If you are interested in replication between datacenters, look at the
[Consul Replicate project](https://github.com/hashicorp/consul-replicate).
The transaction endpoint supports the use of ACL tokens using the "?token=" query
parameter.
#### PUT Method
The `PUT` method lets you submit a list of operations to apply to the key/value store
inside a transaction. If any operation fails, the transaction will be rolled back and
none of the changes will be applied.
If the transaction doesn't contain any write operations then it will be fast-pathed
internally to an endpoint that works like other reads, except that blocking queries
are not currently supported. In this mode, you may supply the "?stale" or "?consistent"
query parameters with the request to control consistency. To support bounding the
acceptable staleness of data, read-only transaction responses provide the `X-Consul-LastContact`
header containing the time in milliseconds that a server was last contacted by the leader node.
The `X-Consul-KnownLeader` header also indicates if there is a known leader. These
won't be present if the transaction contains any write operations, and any consistency
query parameters will be ignored, since writes are always managed by the leader via
the Raft consensus protocol.
The body of the request should be a list of operations to perform inside the atomic
transaction. Up to 64 operations may be present in a single transaction. Operations
look like this:
```javascript
[
{
"KV": {
"Verb": "<verb>",
"Key": "<key>",
"Value": "<Base64-encoded blob of data>",
"Flags": <flags>,
"Index": <index>,
"Session": "<session id>"
}
},
...
]
```
`KV` is the only available operation type, though other types of operations may be added
in future versions of Consul to be mixed with key/value operations. The following fields
are available:
* `Verb` is the type of operation to perform. Please see the table below for
available verbs.
* `Key` is simply the full path of the entry.
* `Value` is a Base64-encoded blob of data. Note that values cannot be larger than
512kB.
* `Flags` is an opaque unsigned integer that can be attached to each entry. Clients
can choose to use this however makes sense for their application.
* `Index` and `Session` are used for locking, unlocking, and check-and-set operations.
Please see the table below for details on how they are used.
The following table summarizes the available verbs and the fields that apply to that
operation ("X" means a field is required and "O" means it is optional):
<table class="table table-bordered table-striped">
<tr>
<th>Verb</th>
<th>Operation</th>
<th>Key</th>
<th>Value</th>
<th>Flags</th>
<th>Index</th>
<th>Session</th>
</tr>
<tr>
<td>set</td>
<td>Sets the `Key` to the given `Value`.</td>
<td align="center">X</td>
<td align="center">X</td>
<td align="center">O</td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>cas</td>
<td>Sets the `Key` to the given `Value` with check-and-set semantics. The `Key` will only be set if its current modify index matches the supplied `Index`.</td>
<td align="center">X</td>
<td align="center">X</td>
<td align="center">O</td>
<td align="center">X</td>
<td align="center"></td>
</tr>
<tr>
<td>lock</td>
<td>Locks the `Key` with the given `Session`. The `Key` will only obtain the lock if the `Session` is valid, and no other session has it locked.</td>
<td align="center">X</td>
<td align="center">X</td>
<td align="center">O</td>
<td align="center"></td>
<td align="center">X</td>
</tr>
<tr>
<td>unlock</td>
<td>Unlocks the `Key` with the given `Session`. The `Key` will only release the lock if the `Session` is valid and currently has it locked.</td>
<td align="center">X</td>
<td align="center">X</td>
<td align="center">O</td>
<td align="center"></td>
<td align="center">X</td>
</tr>
<tr>
<td>get</td>
<td>Gets the `Key` during the transaction. This fails the transaction if the `Key` doesn't exist. The key may not be present in the results if ACLs do not permit it to be read.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>get-tree</td>
<td>Gets all keys with a prefix of `Key` during the transaction. This does not fail the transaction if the `Key` doesn't exist. Not all keys may be present in the results if ACLs do not permit them to be read.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>check-index</td>
<td>Fails the transaction if `Key` does not have a modify index equal to `Index`.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center">X</td>
<td align="center"></td>
</tr>
<tr>
<td>check-session</td>
<td>Fails the transaction if `Key` is not currently locked by `Session`.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center">X</td>
</tr>
<tr>
<td>delete</td>
<td>Deletes the `Key`.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>delete-tree</td>
<td>Deletes all keys with a prefix of`Key`.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
<td align="center"></td>
</tr>
<tr>
<td>delete-cas</td>
<td>Deletes the `Key` with check-and-set semantics. The `Key` will only be deleted if its current modify index matches the supplied `Index`.</td>
<td align="center">X</td>
<td align="center"></td>
<td align="center"></td>
<td align="center">X</td>
<td align="center"></td>
</tr>
</table>
If the transaction can be processed, a status code of 200 will be returned if it
was successfully applied, or a status code of 409 will be returned if it was rolled
back. If either of these status codes are returned, the response will look like this:
```javascript
{
"Results": [
{
"KV": {
"LockIndex": <lock index>,
"Key": "<key>",
"Flags": <flags>,
"Value": "<Base64-encoded blob of data, or null>",
"CreateIndex": <index>,
"ModifyIndex": <index>
}
},
...
],
"Errors": [
{
"OpIndex": <index of failed operation>,
"What": "<error message for failed operation>"
},
...
]
}
```
`Results` has entries for some operations if the transaction was successful. To save
space, the `Value` will be `null` for any `Verb` other than "get" or "get-tree". Like
the `/v1/kv/<key>` endpoint, `Value` will be Base64-encoded if it is present. Also,
no result entries will be added for verbs that delete keys.
`Errors` has entries describing which operations failed if the transaction was rolled
back. The `OpIndex` gives the index of the failed operation in the transaction, and
`What` is a string with an error message about why that operation failed.
If any other status code is returned, such as 400 or 500, then the body of the response
will simply be an unstructured error message about what happened.

View File

@ -171,7 +171,7 @@
</li>
<li<%= sidebar_current("docs-agent-http-kv") %>>
<a href="/docs/agent/http/kv.html">Key/Value store</a>
<a href="/docs/agent/http/kv.html">Key/Value Store</a>
</li>
<li<%= sidebar_current("docs-agent-http-coordinate") %>>