From dca00c96f72eb8a2eaacc482608d6a5bf24d4e3f Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 5 May 2016 15:46:59 -0700 Subject: [PATCH] Adds state store support for atomic KVS ops. --- consul/state/kvs.go | 135 ++++++++++++ consul/state/kvs_test.go | 442 +++++++++++++++++++++++++++++++++++++- consul/structs/structs.go | 44 ++++ 3 files changed, 620 insertions(+), 1 deletion(-) diff --git a/consul/state/kvs.go b/consul/state/kvs.go index 43d08e33c..29771e159 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -578,3 +578,138 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE } return true, nil } + +// KVSAtomicUpdate performs a series of updates atomically, all inside a single +// transaction that only succeeds if all the operations succeed. +func (s *StateStore) KVSAtomicUpdate(idx uint64, ops structs.KVSAtomicOps) (structs.DirEntries, structs.IndexedErrors) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Dispatch all of the operations inside the transaction. + entries := make(structs.DirEntries, 0, len(ops)) + errors := make(structs.IndexedErrors, 0, len(ops)) + for i, op := range ops { + var entry *structs.DirEntry + var err error + + switch op.Op { + case structs.KVSSet: + entry = &op.DirEnt + err = s.kvsSetTxn(tx, idx, entry, false) + + case structs.KVSDelete: + err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key) + + case structs.KVSDeleteCAS: + var ok bool + ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key) + if !ok && err == nil { + err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key) + } + + case structs.KVSDeleteTree: + err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key) + + case structs.KVSCAS: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsSetCASTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key) + } + + case structs.KVSLock: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsLockTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key) + } + + case structs.KVSUnlock: + var ok bool + entry = &op.DirEnt + ok, err = s.kvsUnlockTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key) + } + + case structs.KVSAtomicGet: + _, entry, err = s.kvsGetTxn(tx, op.DirEnt.Key) + + case structs.KVSAtomicCheckSession: + entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session) + + case structs.KVSAtomicCheckIndex: + entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex) + + default: + err = fmt.Errorf("unknown operation %q", op.Op) + } + + // Accumulate the entries. For a GET we keep the value, otherwise + // we clone and blank out the value (we have to clone so we don't + // modify the entry being used by the state store). + if entry != nil { + if op.Op == structs.KVSAtomicGet { + entries = append(entries, entry) + } else { + clone := entry.Clone() + clone.Value = nil + entries = append(entries, clone) + } + } else { + entries = append(entries, nil) + } + + // Capture any error along with the index of the operation that + // failed. + if err != nil { + errors = append(errors, &structs.IndexedError{i, err}) + } + } + if len(errors) > 0 { + return nil, errors + } + + tx.Commit() + return entries, nil +} + +// kvsCheckSessionTxn checks to see if the given session matches the current +// entry for a key. +func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) { + entry, err := tx.First("kvs", "id", key) + if err != nil { + return nil, fmt.Errorf("failed kvs lookup: %s", err) + } + if entry == nil { + return nil, fmt.Errorf("failed to check session, key %q doesn't exist", key) + } + + e := entry.(*structs.DirEntry) + if e.Session != session { + return nil, fmt.Errorf("failed session check for key %q, current session %q != %q", key, e.Session, session) + } + + return e, nil +} + +// kvsCheckIndexTxn checks to see if the given modify index matches the current +// entry for a key. +func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) { + entry, err := tx.First("kvs", "id", key) + if err != nil { + return nil, fmt.Errorf("failed kvs lookup: %s", err) + } + if entry == nil { + return nil, fmt.Errorf("failed to check index, key %q doesn't exist", key) + } + + e := entry.(*structs.DirEntry) + if e.ModifyIndex != cidx { + return nil, fmt.Errorf("failed index check for key %q, current modify index %d != %d", key, e.ModifyIndex, cidx) + } + + return e, nil +} diff --git a/consul/state/kvs_test.go b/consul/state/kvs_test.go index 7da1fec89..56ec618a7 100644 --- a/consul/state/kvs_test.go +++ b/consul/state/kvs_test.go @@ -890,7 +890,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) { func TestStateStore_KVSDeleteTree(t *testing.T) { s := testStateStore(t) - // Create kvs entries in the state store + // Create kvs entries in the state store. testSetKey(t, s, 1, "foo/bar", "bar") testSetKey(t, s, 2, "foo/bar/baz", "baz") testSetKey(t, s, 3, "foo/bar/zip", "zip") @@ -1214,6 +1214,446 @@ func TestStateStore_KVSUnlock(t *testing.T) { } } +func TestStateStore_KVS_Atomic(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store. + testSetKey(t, s, 1, "foo/delete", "bar") + testSetKey(t, s, 2, "foo/bar/baz", "baz") + testSetKey(t, s, 3, "foo/bar/zip", "zip") + testSetKey(t, s, 4, "foo/zorp", "zorp") + testSetKey(t, s, 5, "foo/update", "stale") + + // Make a real session. + testRegisterNode(t, s, 6, "node1") + session := testUUID() + if err := s.SessionCreate(7, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // Set up a transaction that hits every operation. + ops := structs.KVSAtomicOps{ + &structs.KVSAtomicOp{ + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo/new", + Value: []byte("one"), + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "foo/zorp", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDeleteCAS, + DirEnt: structs.DirEntry{ + Key: "foo/delete", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 1, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDeleteTree, + DirEnt: structs.DirEntry{ + Key: "foo/bar", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicGet, + DirEnt: structs.DirEntry{ + Key: "foo/update", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 5, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + ModifyIndex: 5, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicGet, + DirEnt: structs.DirEntry{ + Key: "foo/update", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 8, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicGet, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: session, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: "", + }, + }, + } + entries, errors := s.KVSAtomicUpdate(8, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + if len(entries) != len(ops) { + t.Fatalf("bad len: %d != %d", len(entries), len(ops)) + } + + // Make sure the response looks as expected. + expected := structs.DirEntries{ + &structs.DirEntry{ + Key: "foo/new", + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + nil, // delete + nil, // delete tree + nil, // delete CAS + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("stale"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + nil, // get on foo/lock before it's created + &structs.DirEntry{ + Key: "foo/lock", + Session: session, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/lock", + Session: session, + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + } + if len(entries) != len(expected) { + t.Fatalf("bad: %v", entries) + } + for i, _ := range entries { + if !reflect.DeepEqual(entries[i], expected[i]) { + t.Fatalf("bad %d: %v != %v", i, *(entries[i]), *(expected[i])) + } + } + + // Pull the resulting state store contents. + idx, actual, err := s.KVSList("") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expected = structs.DirEntries{ + &structs.DirEntry{ + Key: "foo/lock", + LockIndex: 1, + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/new", + Value: []byte("one"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 8, + ModifyIndex: 8, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 8, + }, + }, + } + if len(actual) != len(expected) { + t.Fatalf("bad len: %d != %d", len(actual), len(expected)) + } + for i, _ := range actual { + if !reflect.DeepEqual(actual[i], expected[i]) { + t.Fatalf("bad %d: %v != %v", i, *(actual[i]), *(expected[i])) + } + } +} + +func TestStateStore_KVS_Atomic_Rollback(t *testing.T) { + s := testStateStore(t) + + // Create kvs entries in the state store. + testSetKey(t, s, 1, "foo/delete", "bar") + testSetKey(t, s, 2, "foo/update", "stale") + + testRegisterNode(t, s, 3, "node1") + session := testUUID() + if err := s.SessionCreate(4, &structs.Session{ID: session, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + ok, err := s.KVSLock(5, &structs.DirEntry{Key: "foo/lock", Value: []byte("foo"), Session: session}) + if !ok || err != nil { + t.Fatalf("didn't get the lock: %v %s", ok, err) + } + + bogus := testUUID() + if err := s.SessionCreate(6, &structs.Session{ID: bogus, Node: "node1"}); err != nil { + t.Fatalf("err: %s", err) + } + + // This function verifies that the state store wasn't changed. + verifyStateStore := func(desc string) { + idx, actual, err := s.KVSList("") + if err != nil { + t.Fatalf("err (%s): %s", desc, err) + } + if idx != 5 { + t.Fatalf("bad index (%s): %d", desc, idx) + } + + // Make sure it looks as expected. + expected := structs.DirEntries{ + &structs.DirEntry{ + Key: "foo/delete", + Value: []byte("bar"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + &structs.DirEntry{ + Key: "foo/lock", + Value: []byte("foo"), + LockIndex: 1, + Session: session, + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + &structs.DirEntry{ + Key: "foo/update", + Value: []byte("stale"), + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + } + if len(actual) != len(expected) { + t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(expected)) + } + for i, _ := range actual { + if !reflect.DeepEqual(actual[i], expected[i]) { + t.Fatalf("bad %d (%s): %v != %v", desc, i, *(actual[i]), *(expected[i])) + } + } + } + verifyStateStore("initial") + + // Set up a transaction that fails every operation. + ops := structs.KVSAtomicOps{ + &structs.KVSAtomicOp{ + Op: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo/update", + Value: []byte("new"), + RaftIndex: structs.RaftIndex{ + ModifyIndex: 1, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckSession, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + Session: bogus, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckSession, + DirEnt: structs.DirEntry{ + Key: "nope", + Session: bogus, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckIndex, + DirEnt: structs.DirEntry{ + Key: "foo/lock", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 6, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckIndex, + DirEnt: structs.DirEntry{ + Key: "nope", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 6, + }, + }, + }, + &structs.KVSAtomicOp{ + Op: "nope", + DirEnt: structs.DirEntry{ + Key: "foo/delete", + }, + }, + } + entries, errors := s.KVSAtomicUpdate(7, ops) + if len(errors) != len(ops) { + t.Fatalf("bad len: %d != %d", len(errors), len(ops)) + } + if len(entries) != 0 { + t.Fatalf("bad len: %d != 0", len(entries), 0) + } + verifyStateStore("after") + + // Make sure the errors look reasonable. + expected := []string{ + "index is stale", + "lock is already held", + "lock isn't held, or is held by another session", + "current session", + `key "nope" doesn't exist`, + "current modify index", + `key "nope" doesn't exist`, + "unknown operation", + } + if len(errors) != len(expected) { + t.Fatalf("bad len: %d != %d", len(errors), len(expected)) + } + for i, msg := range expected { + if errors[i].OpIndex != i { + t.Fatalf("bad index: %d != %d", i, errors[i].OpIndex) + } + if !strings.Contains(errors[i].Error.Error(), msg) { + t.Fatalf("bad %i: %v", i, errors[i].Error.Error()) + } + } +} + func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { s := testStateStore(t) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 1979b9b9e..238227703 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -36,6 +36,7 @@ const ( TombstoneRequestType CoordinateBatchUpdateType PreparedQueryRequestType + KVSAtomicRequestType ) const ( @@ -533,6 +534,12 @@ const ( KVSCAS = "cas" // Check-and-set KVSLock = "lock" // Lock a key KVSUnlock = "unlock" // Unlock a key + + // KVSAtomic* operations are only available in KVSAtomicRequest + // transactions. + KVSAtomicGet = "get" // Read the key during the transaction. + KVSAtomicCheckSession = "check-session" // Check the session holds the key. + KVSAtomicCheckIndex = "check-index" // Check the modify index of the key. ) // KVSRequest is used to operate on the Key-Value store @@ -547,6 +554,43 @@ func (r *KVSRequest) RequestDatacenter() string { return r.Datacenter } +// KVSAtomicOp is used to define a single operation within an multi-key +// transaction. +type KVSAtomicOp struct { + Op KVSOp + DirEnt DirEntry +} + +// KVSAtomicOps is a list of atomic operations. +type KVSAtomicOps []*KVSAtomicOp + +// KVSAtomicRequest is used to perform atomic multi-key operations on the +// Key-Value store. +type KVSAtomicRequest struct { + Datacenter string + Ops KVSAtomicOps +} + +func (r *KVSAtomicRequest) RequestDatacenter() string { + return r.Datacenter +} + +// IndexedError is used to return information about an error for a specific +// operation. +type IndexedError struct { + OpIndex int + Error error +} + +// IndexedErrors is a list of IndexedError entries. +type IndexedErrors []*IndexedError + +// KVSAtomicResponse is the structure returned by a KVSAtomicRequest. +type KVSAtomicResponse struct { + Errors IndexedErrors + Results DirEntries +} + // KeyRequest is used to request a key, or key prefix type KeyRequest struct { Datacenter string