diff --git a/consul/state_store.go b/consul/state_store.go index f2b47a69f..92b8f1d43 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -23,6 +23,17 @@ const ( dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size ) +// kvMode is used internally to control which type of set +// operation we are performing +type kvMode int + +const ( + kvSet kvMode = iota + kvCAS + kvLock + kvUnlock +) + // The StateStore is responsible for maintaining all the Consul // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide @@ -919,35 +930,8 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str // KVSSet is used to create or update a KV entry func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { - // Start a new txn - tx, err := s.kvsTable.StartTxn(false, nil) - if err != nil { - return err - } - defer tx.Abort() - - // Get the existing node - res, err := s.kvsTable.GetTxn(tx, "id", d.Key) - if err != nil { - return err - } - - // Set the create and modify times - if len(res) == 0 { - d.CreateIndex = index - } else { - d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex - } - d.ModifyIndex = index - - if err := s.kvsTable.InsertTxn(tx, d); err != nil { - return err - } - if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { - return err - } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) - return tx.Commit() + _, err := s.kvsSet(index, d, kvSet) + return err } // KVSRestore is used to restore a DirEntry. It should only be used when @@ -1075,8 +1059,26 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvCAS) +} + +// KVSLock works like KVSSet but only writes if the lock can be acquired +func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvLock) +} + +// KVSUnlock works like KVSSet but only writes if the lock can be unlocked +func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) { + return s.kvsSet(index, d, kvUnlock) +} + +// kvsSet is the internal setter +func (s *StateStore) kvsSet( + index uint64, + d *structs.DirEntry, + mode kvMode) (bool, error) { // Start a new txn - tx, err := s.kvsTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return false, err } @@ -1097,10 +1099,51 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er // Use the ModifyIndex as the constraint. A modify of time of 0 // means we are doing a set-if-not-exists, while any other value // means we expect that modify time. - if d.ModifyIndex == 0 && exist != nil { - return false, nil - } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { - return false, nil + if mode == kvCAS { + if d.ModifyIndex == 0 && exist != nil { + return false, nil + } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { + return false, nil + } + } + + // If attempting to lock, check this is possible + if mode == kvLock { + // Verify we have a session + if d.Session == "" { + return false, fmt.Errorf("Missing session") + } + + // Bail if it is already locked + if exist != nil && exist.Session != "" { + return false, nil + } + + // Verify the session exists + res, err := s.sessionTable.GetTxn(tx, "id", d.Session) + if err != nil { + return false, err + } + if len(res) == 0 { + return false, fmt.Errorf("Invalid session") + } + + // Update the lock index + if exist != nil { + exist.LockIndex++ + exist.Session = d.Session + } else { + d.LockIndex = 1 + } + } + + // If attempting to unlock, verify the key exists and is held + if mode == kvUnlock { + if exist == nil || exist.Session != d.Session { + return false, nil + } + // Clear the session to unlock + exist.Session = "" } // Set the create and modify times @@ -1108,6 +1151,9 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er d.CreateIndex = index } else { d.CreateIndex = exist.CreateIndex + d.LockIndex = exist.LockIndex + d.Session = exist.Session + } d.ModifyIndex = index diff --git a/consul/state_store_test.go b/consul/state_store_test.go index d048b2fc2..d46bd6b4b 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1884,3 +1884,134 @@ func TestSessionInvalidate_DeleteNodeService(t *testing.T) { t.Fatalf("session should be invalidated") } } + +func TestKVSLock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Lock with a non-existing keys should work + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSLock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + if d.LockIndex != 1 { + t.Fatalf("bad: %v", d) + } + + // Re-locking should fail + ok, err = store.KVSLock(6, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + + // Set a normal key + k1 := &structs.DirEntry{ + Key: "/bar", + Flags: 0, + Value: []byte("asdf"), + } + if err := store.KVSSet(7, k1); err != nil { + t.Fatalf("err: %v", err) + } + + // Should acquire the lock + k1.Session = session.ID + ok, err = store.KVSLock(8, k1) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Re-acquire should fail + ok, err = store.KVSLock(9, k1) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + +} + +func TestKVSUnlock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Unlock with a non-existing keys should fail + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSUnlock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("expected fail") + } + + // Lock should work + d.Session = session.ID + if ok, _ := store.KVSLock(6, d); !ok { + t.Fatalf("expected lock") + } + + // Unlock should work + d.Session = session.ID + ok, err = store.KVSUnlock(7, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Re-lock should work + d.Session = session.ID + if ok, err := store.KVSLock(8, d); err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("expected lock") + } + if d.LockIndex != 2 { + t.Fatalf("bad: %v", d) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index f1bbd60bd..9f9116612 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -275,9 +275,11 @@ type IndexedNodeDump struct { type DirEntry struct { CreateIndex uint64 ModifyIndex uint64 + LockIndex uint64 Key string Flags uint64 Value []byte + Session string `json:",omitempty"` } type DirEntries []*DirEntry @@ -287,7 +289,9 @@ const ( KVSSet KVSOp = "set" KVSDelete = "delete" KVSDeleteTree = "delete-tree" - KVSCAS = "cas" // Check-and-set + KVSCAS = "cas" // Check-and-set + KVSLock = "lock" // Lock a key + KVSUnlock = "unlock" // Unlock a key ) // KVSRequest is used to operate on the Key-Value store