Adds state store support for atomic KVS ops.

This commit is contained in:
James Phillips 2016-05-05 15:46:59 -07:00
parent a1a59bee73
commit dca00c96f7
3 changed files with 620 additions and 1 deletions

View file

@ -578,3 +578,138 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
}
return true, nil
}
// KVSAtomicUpdate performs a series of updates atomically, all inside a single
// transaction that only succeeds if all the operations succeed.
func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.IndexedErrors) {
tx := s.db.Txn(true)
defer tx.Abort()
// Dispatch all of the operations inside the transaction.
entries := make(structs.DirEntries, 0, len(ops))
errors := make(structs.IndexedErrors, 0, len(ops))
for i, op := range ops {
var entry *structs.DirEntry
var err error
switch op.Op {
case structs.KVSSet:
entry = &op.DirEnt
err = s.kvsSetTxn(tx, idx, entry, false)
case structs.KVSDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
case structs.KVSDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
case structs.KVSCAS:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsSetCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
}
case structs.KVSLock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsLockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
}
case structs.KVSUnlock:
var ok bool
entry = &op.DirEnt
ok, err = s.kvsUnlockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
}
case structs.KVSAtomicGet:
_, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key)
case structs.KVSAtomicCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
case structs.KVSAtomicCheckIndex:
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
default:
err = fmt.Errorf("unknown operation %q", op.Op)
}
// Accumulate the entries. For a GET we keep the value, otherwise
// we clone and blank out the value (we have to clone so we don't
// modify the entry being used by the state store).
if entry != nil {
if op.Op == structs.KVSAtomicGet {
entries = append(entries, entry)
} else {
clone := entry.Clone()
clone.Value = nil
entries = append(entries, clone)
}
} else {
entries = append(entries, nil)
}
// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.IndexedError{i, err})
}
}
if len(errors) > 0 {
return nil, errors
}
tx.Commit()
return entries, nil
}
// kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key.
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil, fmt.Errorf("failed to check session, key %q doesn't exist", key)
}
e := entry.(*structs.DirEntry)
if e.Session != session {
return nil, fmt.Errorf("failed session check for key %q, current session %q != %q", key, e.Session, session)
}
return e, nil
}
// kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key.
func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
if entry == nil {
return nil, fmt.Errorf("failed to check index, key %q doesn't exist", key)
}
e := entry.(*structs.DirEntry)
if e.ModifyIndex != cidx {
return nil, fmt.Errorf("failed index check for key %q, current modify index %d != %d", key, e.ModifyIndex, cidx)
}
return e, nil
}

View file

@ -890,7 +890,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
func TestStateStore_KVSDeleteTree(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/bar", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
@ -1214,6 +1214,446 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
}
func TestStateStore_KVS_Atomic(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 5, "foo/update", "stale")
// Make a real session.
testRegisterNode(t, s, 6, "node1")
session := testUUID()
if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// Set up a transaction that hits every operation.
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo/zorp",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo/delete",
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo/bar",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 5,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/update",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
ModifyIndex: 8,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "foo/lock",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: session,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: "",
},
},
}
entries, errors := s.KVSAtomicUpdate(8, ops)
if len(errors) > 0 {
t.Fatalf("err: %v", errors)
}
if len(entries) != len(ops) {
t.Fatalf("bad len: %d != %d", len(entries), len(ops))
}
// Make sure the response looks as expected.
expected := structs.DirEntries{
&structs.DirEntry{
Key: "foo/new",
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
nil, // delete
nil, // delete tree
nil, // delete CAS
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
nil, // get on foo/lock before it's created
&structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
Session: session,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
}
if len(entries) != len(expected) {
t.Fatalf("bad: %v", entries)
}
for i, _ := range entries {
if !reflect.DeepEqual(entries[i], expected[i]) {
t.Fatalf("bad %d: %v != %v", i, *(entries[i]), *(expected[i]))
}
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Make sure it looks as expected.
expected = structs.DirEntries{
&structs.DirEntry{
Key: "foo/lock",
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/new",
Value: []byte("one"),
RaftIndex: structs.RaftIndex{
CreateIndex: 8,
ModifyIndex: 8,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 8,
},
},
}
if len(actual) != len(expected) {
t.Fatalf("bad len: %d != %d", len(actual), len(expected))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], expected[i]) {
t.Fatalf("bad %d: %v != %v", i, *(actual[i]), *(expected[i]))
}
}
}
func TestStateStore_KVS_Atomic_Rollback(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale")
testRegisterNode(t, s, 3, "node1")
session := testUUID()
if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session})
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
bogus := testUUID()
if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil {
t.Fatalf("err: %s", err)
}
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList("")
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}
if idx != 5 {
t.Fatalf("bad index (%s): %d", desc, idx)
}
// Make sure it looks as expected.
expected := structs.DirEntries{
&structs.DirEntry{
Key: "foo/delete",
Value: []byte("bar"),
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
},
&structs.DirEntry{
Key: "foo/lock",
Value: []byte("foo"),
LockIndex: 1,
Session: session,
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,
},
},
&structs.DirEntry{
Key: "foo/update",
Value: []byte("stale"),
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
},
}
if len(actual) != len(expected) {
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(expected))
}
for i, _ := range actual {
if !reflect.DeepEqual(actual[i], expected[i]) {
t.Fatalf("bad %d (%s): %v != %v", desc, i, *(actual[i]), *(expected[i]))
}
}
}
verifyStateStore("initial")
// Set up a transaction that fails every operation.
ops := structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo/update",
Value: []byte("new"),
RaftIndex: structs.RaftIndex{
ModifyIndex: 1,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "foo/lock",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
Session: bogus,
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "foo/lock",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
RaftIndex: structs.RaftIndex{
ModifyIndex: 6,
},
},
},
&structs.KVSAtomicOp{
Op: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
}
entries, errors := s.KVSAtomicUpdate(7, ops)
if len(errors) != len(ops) {
t.Fatalf("bad len: %d != %d", len(errors), len(ops))
}
if len(entries) != 0 {
t.Fatalf("bad len: %d != 0", len(entries), 0)
}
verifyStateStore("after")
// Make sure the errors look reasonable.
expected := []string{
"index is stale",
"lock is already held",
"lock isn't held, or is held by another session",
"current session",
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown operation",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
}
for i, msg := range expected {
if errors[i].OpIndex != i {
t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex)
}
if !strings.Contains(errors[i].Error.Error(), msg) {
t.Fatalf("bad %i: %v", i, errors[i].Error.Error())
}
}
}
func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)

View file

@ -36,6 +36,7 @@ const (
TombstoneRequestType
CoordinateBatchUpdateType
PreparedQueryRequestType
KVSAtomicRequestType
)
const (
@ -533,6 +534,12 @@ const (
KVSCAS = "cas" // Check-and-set
KVSLock = "lock" // Lock a key
KVSUnlock = "unlock" // Unlock a key
// KVSAtomic* operations are only available in KVSAtomicRequest
// transactions.
KVSAtomicGet = "get" // Read the key during the transaction.
KVSAtomicCheckSession = "check-session" // Check the session holds the key.
KVSAtomicCheckIndex = "check-index" // Check the modify index of the key.
)
// KVSRequest is used to operate on the Key-Value store
@ -547,6 +554,43 @@ func (r *KVSRequest) RequestDatacenter() string {
return r.Datacenter
}
// KVSAtomicOp is used to define a single operation within an multi-key
// transaction.
type KVSAtomicOp struct {
Op KVSOp
DirEnt DirEntry
}
// KVSAtomicOps is a list of atomic operations.
type KVSAtomicOps []*KVSAtomicOp
// KVSAtomicRequest is used to perform atomic multi-key operations on the
// Key-Value store.
type KVSAtomicRequest struct {
Datacenter string
Ops KVSAtomicOps
}
func (r *KVSAtomicRequest) RequestDatacenter() string {
return r.Datacenter
}
// IndexedError is used to return information about an error for a specific
// operation.
type IndexedError struct {
OpIndex int
Error error
}
// IndexedErrors is a list of IndexedError entries.
type IndexedErrors []*IndexedError
// KVSAtomicResponse is the structure returned by a KVSAtomicRequest.
type KVSAtomicResponse struct {
Errors IndexedErrors
Results DirEntries
}
// KeyRequest is used to request a key, or key prefix
type KeyRequest struct {
Datacenter string