consul: Support KVSLock and KVSUnlock
This commit is contained in:
parent
35996042ae
commit
cd7e3967be
|
@ -23,6 +23,17 @@ const (
|
|||
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
|
||||
)
|
||||
|
||||
// kvMode is used internally to control which type of set
|
||||
// operation we are performing
|
||||
type kvMode int
|
||||
|
||||
const (
|
||||
kvSet kvMode = iota
|
||||
kvCAS
|
||||
kvLock
|
||||
kvUnlock
|
||||
)
|
||||
|
||||
// The StateStore is responsible for maintaining all the Consul
|
||||
// state. It is manipulated by the FSM which maintains consistency
|
||||
// through the use of Raft. The goals of the StateStore are to provide
|
||||
|
@ -919,35 +930,8 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str
|
|||
|
||||
// KVSSet is used to create or update a KV entry
|
||||
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing node
|
||||
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the create and modify times
|
||||
if len(res) == 0 {
|
||||
d.CreateIndex = index
|
||||
} else {
|
||||
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
|
||||
}
|
||||
d.ModifyIndex = index
|
||||
|
||||
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
|
||||
return tx.Commit()
|
||||
_, err := s.kvsSet(index, d, kvSet)
|
||||
return err
|
||||
}
|
||||
|
||||
// KVSRestore is used to restore a DirEntry. It should only be used when
|
||||
|
@ -1075,8 +1059,26 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts .
|
|||
|
||||
// KVSCheckAndSet is used to perform an atomic check-and-set
|
||||
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
return s.kvsSet(index, d, kvCAS)
|
||||
}
|
||||
|
||||
// KVSLock works like KVSSet but only writes if the lock can be acquired
|
||||
func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
return s.kvsSet(index, d, kvLock)
|
||||
}
|
||||
|
||||
// KVSUnlock works like KVSSet but only writes if the lock can be unlocked
|
||||
func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
return s.kvsSet(index, d, kvUnlock)
|
||||
}
|
||||
|
||||
// kvsSet is the internal setter
|
||||
func (s *StateStore) kvsSet(
|
||||
index uint64,
|
||||
d *structs.DirEntry,
|
||||
mode kvMode) (bool, error) {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
tx, err := s.tables.StartTxn(false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -1097,10 +1099,51 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
|
|||
// Use the ModifyIndex as the constraint. A modify of time of 0
|
||||
// means we are doing a set-if-not-exists, while any other value
|
||||
// means we expect that modify time.
|
||||
if d.ModifyIndex == 0 && exist != nil {
|
||||
return false, nil
|
||||
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
||||
return false, nil
|
||||
if mode == kvCAS {
|
||||
if d.ModifyIndex == 0 && exist != nil {
|
||||
return false, nil
|
||||
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// If attempting to lock, check this is possible
|
||||
if mode == kvLock {
|
||||
// Verify we have a session
|
||||
if d.Session == "" {
|
||||
return false, fmt.Errorf("Missing session")
|
||||
}
|
||||
|
||||
// Bail if it is already locked
|
||||
if exist != nil && exist.Session != "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Verify the session exists
|
||||
res, err := s.sessionTable.GetTxn(tx, "id", d.Session)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(res) == 0 {
|
||||
return false, fmt.Errorf("Invalid session")
|
||||
}
|
||||
|
||||
// Update the lock index
|
||||
if exist != nil {
|
||||
exist.LockIndex++
|
||||
exist.Session = d.Session
|
||||
} else {
|
||||
d.LockIndex = 1
|
||||
}
|
||||
}
|
||||
|
||||
// If attempting to unlock, verify the key exists and is held
|
||||
if mode == kvUnlock {
|
||||
if exist == nil || exist.Session != d.Session {
|
||||
return false, nil
|
||||
}
|
||||
// Clear the session to unlock
|
||||
exist.Session = ""
|
||||
}
|
||||
|
||||
// Set the create and modify times
|
||||
|
@ -1108,6 +1151,9 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
|
|||
d.CreateIndex = index
|
||||
} else {
|
||||
d.CreateIndex = exist.CreateIndex
|
||||
d.LockIndex = exist.LockIndex
|
||||
d.Session = exist.Session
|
||||
|
||||
}
|
||||
d.ModifyIndex = index
|
||||
|
||||
|
|
|
@ -1884,3 +1884,134 @@ func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
|
|||
t.Fatalf("session should be invalidated")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSLock(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
session := &structs.Session{Node: "foo"}
|
||||
if err := store.SessionCreate(4, session); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Lock with a non-existing keys should work
|
||||
d := &structs.DirEntry{
|
||||
Key: "/foo",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
Session: session.ID,
|
||||
}
|
||||
ok, err := store.KVSLock(5, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("unexpected fail")
|
||||
}
|
||||
if d.LockIndex != 1 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Re-locking should fail
|
||||
ok, err = store.KVSLock(6, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("expected fail")
|
||||
}
|
||||
|
||||
// Set a normal key
|
||||
k1 := &structs.DirEntry{
|
||||
Key: "/bar",
|
||||
Flags: 0,
|
||||
Value: []byte("asdf"),
|
||||
}
|
||||
if err := store.KVSSet(7, k1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should acquire the lock
|
||||
k1.Session = session.ID
|
||||
ok, err = store.KVSLock(8, k1)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("unexpected fail")
|
||||
}
|
||||
|
||||
// Re-acquire should fail
|
||||
ok, err = store.KVSLock(9, k1)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("expected fail")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestKVSUnlock(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
session := &structs.Session{Node: "foo"}
|
||||
if err := store.SessionCreate(4, session); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Unlock with a non-existing keys should fail
|
||||
d := &structs.DirEntry{
|
||||
Key: "/foo",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
Session: session.ID,
|
||||
}
|
||||
ok, err := store.KVSUnlock(5, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("expected fail")
|
||||
}
|
||||
|
||||
// Lock should work
|
||||
d.Session = session.ID
|
||||
if ok, _ := store.KVSLock(6, d); !ok {
|
||||
t.Fatalf("expected lock")
|
||||
}
|
||||
|
||||
// Unlock should work
|
||||
d.Session = session.ID
|
||||
ok, err = store.KVSUnlock(7, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("unexpected fail")
|
||||
}
|
||||
|
||||
// Re-lock should work
|
||||
d.Session = session.ID
|
||||
if ok, err := store.KVSLock(8, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
} else if !ok {
|
||||
t.Fatalf("expected lock")
|
||||
}
|
||||
if d.LockIndex != 2 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -275,9 +275,11 @@ type IndexedNodeDump struct {
|
|||
type DirEntry struct {
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
LockIndex uint64
|
||||
Key string
|
||||
Flags uint64
|
||||
Value []byte
|
||||
Session string `json:",omitempty"`
|
||||
}
|
||||
type DirEntries []*DirEntry
|
||||
|
||||
|
@ -287,7 +289,9 @@ const (
|
|||
KVSSet KVSOp = "set"
|
||||
KVSDelete = "delete"
|
||||
KVSDeleteTree = "delete-tree"
|
||||
KVSCAS = "cas" // Check-and-set
|
||||
KVSCAS = "cas" // Check-and-set
|
||||
KVSLock = "lock" // Lock a key
|
||||
KVSUnlock = "unlock" // Unlock a key
|
||||
)
|
||||
|
||||
// KVSRequest is used to operate on the Key-Value store
|
||||
|
|
Loading…
Reference in New Issue