Performs basic plumbing of KVS transactions through all the layers.

This commit is contained in:
James Phillips 2016-05-06 17:50:58 -07:00
parent dca00c96f7
commit 471160d8f0
9 changed files with 494 additions and 63 deletions

View File

@ -23,6 +23,48 @@ type KVPair struct {
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair
const (
KVSet = "set"
KVDelete = "delete"
KVDeleteCAS = "delete-cas"
KVDeleteTree = "delete-tree"
KVCAS = "cas"
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)
// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Op string
Key string
Value []byte
Flags uint64
Index uint64
Session string
}
// KVTxn defines a set of operations to be performed inside a single transaction.
type KVTxn []KVTxnOp
// KVTxnError is used to return information about an operation in a
// transaction.
type KVTxnError struct {
OpIndex int
What string
}
// KVTxnErrors is a list of KVTxnError objects.
type KVTxnErrors []KVTxnError
// KVTxnResult is used to return the results of a transaction.
type KVTxnResult struct {
Errors KVTxnErrors
Results KVPairs
}
// KV is used to manipulate the K/V API
type KV struct {
c *Client
@ -238,3 +280,35 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}
// Txn is used to apply multiple KV operations in a single, atomic transaction.
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice.
func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) {
r := k.c.newRequest("PUT", "/v1/kv-txn")
r.setWriteOptions(q)
r.obj = txn
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var result KVTxnResult
if err := decodeBody(resp, &result); err != nil {
return false, nil, nil, err
}
return resp.StatusCode == http.StatusOK, &result, wm, nil
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}

View File

@ -3,6 +3,7 @@ package api
import (
"bytes"
"path"
"strings"
"testing"
"time"
)
@ -445,3 +446,93 @@ func TestClient_AcquireRelease(t *testing.T) {
t.Fatalf("unexpected value: %#v", meta)
}
}
func TestClient_Txn(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
kv := c.KV()
// Make a session.
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
// Acquire and get the key via a transaction, but don't supply a valid
// session.
key := testKey()
value := []byte("test")
txn := KVTxn{
KVTxnOp{
Op: KVLock,
Key: key,
Value: value,
},
KVTxnOp{
Op: KVGet,
Key: key,
},
}
ok, ret, _, err := kv.Txn(&txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if ok {
t.Fatalf("transaction should have failed")
}
if ret == nil || len(ret.Errors) != 1 || len(ret.Results) != 0 {
t.Fatalf("bad: %v", ret)
}
if ret.Errors[0].OpIndex != 0 ||
!strings.Contains(ret.Errors[0].What, "missing session") {
t.Fatalf("bad: %v", ret.Errors[0])
}
// Now poke in a real session and try again.
txn[0].Session = id
ok, ret, _, err = kv.Txn(&txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}
if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 2 {
t.Fatalf("bad: %v", ret)
}
for i, result := range ret.Results {
var expected []byte
if i == 1 {
expected = value
}
if result.Key != key ||
!bytes.Equal(result.Value, expected) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}
// Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}

View File

@ -242,6 +242,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList))
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
s.mux.HandleFunc("/v1/kv-txn", s.wrap(s.KVSTxn))
s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate))
s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy))
@ -342,21 +343,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return
}
prettyPrint := false
if _, ok := req.URL.Query()["pretty"]; ok {
prettyPrint = true
}
// Write out the JSON object
if obj != nil {
var buf []byte
if prettyPrint {
buf, err = json.MarshalIndent(obj, "", " ")
} else {
buf, err = json.Marshal(obj)
}
buf, err = s.marshalJSON(req, obj)
if err != nil {
goto HAS_ERR
}
resp.Header().Set("Content-Type", "application/json")
resp.Write(buf)
}
@ -364,6 +357,18 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return f
}
// marshalJSON marshals the object into JSON, respecting the user's pretty-ness
// configuration.
func (s *HTTPServer) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) {
if _, ok := req.URL.Query()["pretty"]; ok {
buf, err := json.MarshalIndent(obj, "", " ")
return buf, err
}
buf, err := json.Marshal(obj)
return buf, err
}
// Returns true if the UI is enabled.
func (s *HTTPServer) IsUIEnabled() bool {
return s.uiDir != "" || s.agent.config.EnableUi

View File

@ -2,12 +2,14 @@ package agent
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs"
)
@ -284,3 +286,114 @@ func conflictingFlags(resp http.ResponseWriter, req *http.Request, flags ...stri
return false
}
// fixupValues takes the raw decoded JSON and base64 decodes all the values,
// replacing them with byte arrays with the data.
func fixupValues(raw interface{}) error {
// decodeValue decodes the value member of the given operation.
decodeValue := func(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "value":
// Leave the byte slice nil if we have a nil
// value.
if v == nil {
return nil
}
// Otherwise, base64 decode it.
s, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected value type: %T", v)
}
decoded, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return fmt.Errorf("failed to decode value: %v", err)
}
rawMap[k] = decoded
return nil
}
}
return nil
}
rawSlice, ok := raw.([]interface{})
if !ok {
return fmt.Errorf("unexpected raw type: %t", raw)
}
for _, rawOp := range rawSlice {
if err := decodeValue(rawOp); err != nil {
return err
}
}
return nil
}
// KVSTxn handles requests to apply multiple KVS operations in a single, atomic
// transaction.
func (s *HTTPServer) KVSTxn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.KVSAtomicRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 500 since we don't have enough context to
// associate the error with a given operation.
var txn api.KVTxn
if err := decodeBody(req, &txn, fixupValues); err != nil {
return nil, fmt.Errorf("failed to parse body: %v", err)
}
// Convert the API format into the RPC format. Note that fixupValues
// above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over.
for _, in := range txn {
out := &structs.KVSAtomicOp{
Op: structs.KVSOp(in.Op),
DirEnt: structs.DirEntry{
Key: in.Key,
Value: in.Value,
Flags: in.Flags,
Session: in.Session,
RaftIndex: structs.RaftIndex{
ModifyIndex: in.Index,
},
},
}
args.Ops = append(args.Ops, out)
}
// Make the request and return a conflict status if there were errors
// reported from the transaction.
var reply structs.KVSAtomicResponse
if err := s.agent.RPC("KVS.AtomicApply", &args, &reply); err != nil {
return nil, err
}
if len(reply.Errors) > 0 {
var buf []byte
var err error
buf, err = s.marshalJSON(req, reply)
if err != nil {
return nil, err
}
resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(http.StatusConflict)
resp.Write(buf)
return nil, nil
}
// Otherwise, return the results of the successful transaction.
return reply, nil
}

View File

@ -83,6 +83,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index)
case structs.KVSAtomicRequestType:
return c.applyKVSAtomicOperation(buf[1:], log.Index)
case structs.SessionRequestType:
return c.applySessionOperation(buf[1:], log.Index)
case structs.ACLRequestType:
@ -193,6 +195,16 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
}
}
func (c *consulFSM) applyKVSAtomicOperation(buf []byte, index uint64) interface{} {
var req structs.KVSAtomicRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs-atomic"}, time.Now())
entries, errors := c.state.KVSAtomicUpdate(index, req.Ops)
return structs.KVSAtomicResponse{errors, entries}
}
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
)
@ -13,54 +14,70 @@ type KVS struct {
srv *Server
}
// Apply is used to apply a KVS request to the data store. This should
// only be used for operations that modify the data
// preApply does all the verification of a KVS update that is performed BEFORE
// we submit as a Raft log entry. This includes enforcing the lock delay which
// must only be done on the leader.
func (k *KVS) preApply(acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry) (bool, error) {
// Verify the entry.
if dirEnt.Key == "" && op != structs.KVSDeleteTree {
return false, fmt.Errorf("Must provide key")
}
// Apply the ACL policy if any.
if acl != nil {
switch op {
case structs.KVSDeleteTree:
if !acl.KeyWritePrefix(dirEnt.Key) {
return false, permissionDeniedErr
}
default:
if !acl.KeyWrite(dirEnt.Key) {
return false, permissionDeniedErr
}
}
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer would expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if op == structs.KVSLock {
state := k.srv.fsm.State()
expires := state.KVSLockDelay(dirEnt.Key)
if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
dirEnt.Key, expires)
return false, nil
}
}
return true, nil
}
// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
// Verify the args
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
return fmt.Errorf("Must provide key")
}
// Apply the ACL policy if any
// Perform the pre-apply checks.
acl, err := k.srv.resolveToken(args.Token)
if err != nil {
return err
} else if acl != nil {
switch args.Op {
case structs.KVSDeleteTree:
if !acl.KeyWritePrefix(args.DirEnt.Key) {
return permissionDeniedErr
}
default:
if !acl.KeyWrite(args.DirEnt.Key) {
return permissionDeniedErr
}
}
}
ok, err := k.preApply(acl, args.Op, &args.DirEnt)
if err != nil {
return err
}
if !ok {
*reply = false
return nil
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if args.Op == structs.KVSLock {
state := k.srv.fsm.State()
expires := state.KVSLockDelay(args.DirEnt.Key)
if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
args.DirEnt.Key, expires)
*reply = false
return nil
}
}
// Apply the update
// Apply the update.
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
@ -70,14 +87,60 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return respErr
}
// Check if the return type is a bool
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// Get is used to lookup a single key
// AtomicApply is used to apply multiple KVS operations in a single, atomic
// transaction.
func (k *KVS) AtomicApply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error {
if done, err := k.srv.forward("KVS.AtomicApply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply-atomic"}, time.Now())
// Perform the pre-apply checks on each of the operations.
acl, err := k.srv.resolveToken(args.Token)
if err != nil {
return err
}
for i, op := range args.Ops {
ok, err := k.preApply(acl, op.Op, &op.DirEnt)
if err != nil {
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key)
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
}
}
if len(reply.Errors) > 0 {
return nil
}
// Apply the update.
resp, err := k.srv.raftApply(structs.KVSAtomicRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Convert the return type. This should be a cheap copy since we are
// just taking the two slices.
if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok {
*reply = respAtomic
} else {
return fmt.Errorf("unexpected return type %T", resp)
}
return nil
}
// Get is used to lookup a single key.
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
return err
@ -119,7 +182,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
})
}
// List is used to list all keys with a given prefix
// List is used to list all keys with a given prefix.
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.List", args, args, reply); done {
return err
@ -162,7 +225,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
})
}
// ListKeys is used to list all keys with a given prefix to a separator
// ListKeys is used to list all keys with a given prefix to a separator.
func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error {
if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done {
return err

View File

@ -581,13 +581,13 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
// KVSAtomicUpdate performs a series of updates atomically, all inside a single
// transaction that only succeeds if all the operations succeed.
func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.IndexedErrors) {
func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.KVSAtomicErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
// Dispatch all of the operations inside the transaction.
entries := make(structs.DirEntries, 0, len(ops))
errors := make(structs.IndexedErrors, 0, len(ops))
errors := make(structs.KVSAtomicErrors, 0, len(ops))
for i, op := range ops {
var entry *structs.DirEntry
var err error
@ -665,7 +665,7 @@ func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (stru
// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.IndexedError{i, err})
errors = append(errors, &structs.KVSAtomicError{i, err.Error()})
}
}
if len(errors) > 0 {

View File

@ -1626,7 +1626,7 @@ func TestStateStore_KVS_Atomic_Rollback(t *testing.T) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
if len(entries) != 0 {
t.Fatalf("bad len: %d != 0", len(entries), 0)
t.Fatalf("bad len: %d != 0", len(entries))
}
verifyStateStore("after")
@ -1648,8 +1648,8 @@ func TestStateStore_KVS_Atomic_Rollback(t *testing.T) {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error.Error(), msg) {
t.Fatalf("bad %i: %v", i, errors[i].Error.Error())
if !strings.Contains(errors[i].Error(), msg) {
t.Fatalf("bad %i: %v", i, errors[i].Error())
}
}
}
@ -1886,6 +1886,73 @@ func TestStateStore_KVS_Watches(t *testing.T) {
})
})
})
// Verify that a basic transaction triggers multiple watches. We call
// the same underlying methods that are called above so this is more
// of a sanity check.
verifyWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two"),
},
},
}
entries, errors := s.KVSAtomicUpdate(15, ops)
if len(entries) != len(ops) {
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
}
if len(errors) != 0 {
t.Fatalf("bad len: %d != 0", len(errors))
}
})
})
// Verify that a rolled back transaction doesn't trigger any watches.
verifyNoWatch(t, s.GetKVSWatch("multi/one"), func() {
verifyNoWatch(t, s.GetKVSWatch("multi/two"), func() {
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/one",
Value: []byte("one-updated"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "multi/two",
Value: []byte("two-updated"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "multi/nope",
Value: []byte("nope"),
},
},
}
entries, errors := s.KVSAtomicUpdate(16, ops)
if len(errors) != 1 {
t.Fatalf("bad len: %d != 1", len(errors))
}
if len(entries) != 0 {
t.Fatalf("bad len: %d != 0", len(entries))
}
})
})
}
func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {

View File

@ -569,25 +569,31 @@ type KVSAtomicOps []*KVSAtomicOp
type KVSAtomicRequest struct {
Datacenter string
Ops KVSAtomicOps
WriteRequest
}
func (r *KVSAtomicRequest) RequestDatacenter() string {
return r.Datacenter
}
// IndexedError is used to return information about an error for a specific
// KVSAtomicError is used to return information about an error for a specific
// operation.
type IndexedError struct {
type KVSAtomicError struct {
OpIndex int
Error error
What string
}
// IndexedErrors is a list of IndexedError entries.
type IndexedErrors []*IndexedError
// Error returns the string representation of an atomic error.
func (e KVSAtomicError) Error() string {
return fmt.Sprintf("op %d: %s", e.OpIndex, e.What)
}
// KVSAtomicErrors is a list of KVSAtomicError entries.
type KVSAtomicErrors []*KVSAtomicError
// KVSAtomicResponse is the structure returned by a KVSAtomicRequest.
type KVSAtomicResponse struct {
Errors IndexedErrors
Errors KVSAtomicErrors
Results DirEntries
}