From c9042db61962babbe0f2697a157feeeb3b8fb746 Mon Sep 17 00:00:00 2001 From: Sergei Petrunia Date: Fri, 14 Jan 2022 12:45:20 -0800 Subject: [PATCH] Range Locking: add support for escalation barriers (#9290) Summary: Range Locking supports Lock Escalation. Lock Escalation is invoked when lock memory is nearly exhausted and it reduced the amount of memory used by joining adjacent locks. Bridging the gap between certain locks has adverse effects. For example, in MyRocks it is not a good idea to bridge the gap between locks in different indexes, as that get the lock to cover large portions of indexes, or even entire indexes. Resolve this by introducing Escalation Barrier. The escalation process will call the user-provided barrier callback function: bool(const Endpoint& a, const Endpoint& b) If the function returns true, there's a barrier between a and b and Lock Escalation will not try to bridge the gap between a and b. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9290 Reviewed By: akankshamahajan15 Differential Revision: D33486753 Pulled By: riversand963 fbshipit-source-id: f97910b67aba0579ea1d35f523ca6863d3dd018e --- include/rocksdb/utilities/transaction_db.h | 15 +++++ .../lock/range/range_locking_test.cc | 61 ++++++++++++++++++- .../range/range_tree/lib/locktree/locktree.cc | 19 +++++- .../range/range_tree/lib/locktree/locktree.h | 20 ++++++ .../range_tree/range_tree_lock_manager.cc | 27 ++++++-- .../range_tree/range_tree_lock_manager.h | 11 +++- 6 files changed, 142 insertions(+), 11 deletions(-) diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 04ed2d3345..6a54bfe519 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -97,6 +97,21 @@ class RangeLockManagerHandle : public LockManagerHandle { using RangeLockStatus = std::unordered_multimap; + // Lock Escalation barrier check function. + // It is called for a couple of endpoints A and B, such that A < B. + // If escalation_barrier_check_func(A, B)==true, then there's a lock + // escalation barrier between A and B, and lock escalation is not allowed + // to bridge the gap between A and B. + // + // The function may be called from any thread that acquires or releases + // locks. It should not throw exceptions. There is currently no way to return + // an error. + using EscalationBarrierFunc = + std::function; + + // Set the user-provided barrier check function + virtual void SetEscalationBarrierFunc(EscalationBarrierFunc func) = 0; + virtual RangeLockStatus GetRangeLockStatusData() = 0; class Counters { diff --git a/utilities/transactions/lock/range/range_locking_test.cc b/utilities/transactions/lock/range/range_locking_test.cc index 2e8170837c..dd95dc436a 100644 --- a/utilities/transactions/lock/range/range_locking_test.cc +++ b/utilities/transactions/lock/range/range_locking_test.cc @@ -276,9 +276,10 @@ TEST_F(RangeLockingTest, BasicLockEscalation) { // Get the locks until we hit an escalation for (int i = 0; i < 2020; i++) { - char buf[32]; - snprintf(buf, sizeof(buf) - 1, "%08d", i); - ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf))); + std::ostringstream buf; + buf << std::setw(8) << std::setfill('0') << i; + std::string buf_str = buf.str(); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); } counters = range_lock_mgr->GetStatus(); ASSERT_GT(counters.escalation_count, 0); @@ -286,6 +287,60 @@ TEST_F(RangeLockingTest, BasicLockEscalation) { delete txn; } + +// An escalation barrier function. Allow escalation iff the first two bytes are +// identical. +static bool escalation_barrier(const Endpoint& a, const Endpoint& b) { + assert(a.slice.size() > 2); + assert(b.slice.size() > 2); + if (memcmp(a.slice.data(), b.slice.data(), 2)) { + return true; // This is a barrier + } else { + return false; // No barrier + } +} + +TEST_F(RangeLockingTest, LockEscalationBarrier) { + auto cf = db->DefaultColumnFamily(); + + auto counters = range_lock_mgr->GetStatus(); + + // Initially not using any lock memory + ASSERT_EQ(counters.escalation_count, 0); + + range_lock_mgr->SetMaxLockMemory(8000); + range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier); + + // Insert enough locks to cause lock escalations to happen + auto txn = NewTxn(); + const int N = 2000; + for (int i = 0; i < N; i++) { + std::ostringstream buf; + buf << std::setw(4) << std::setfill('0') << i; + std::string buf_str = buf.str(); + ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); + } + counters = range_lock_mgr->GetStatus(); + ASSERT_GT(counters.escalation_count, 0); + + // Check that lock escalation was not performed across escalation barriers: + // Use another txn to acquire locks near the barriers. + auto txn2 = NewTxn(); + range_lock_mgr->SetMaxLockMemory(500000); + for (int i = 100; i < N; i += 100) { + std::ostringstream buf; + buf << std::setw(4) << std::setfill('0') << i - 1 << "-a"; + std::string buf_str = buf.str(); + // Check that we CAN get a lock near the escalation barrier + ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str))); + } + + txn->Rollback(); + txn2->Rollback(); + delete txn; + delete txn2; +} + #endif TEST_F(RangeLockingTest, LockWaitCount) { diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc index c238b02048..0d99130ae7 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc @@ -96,9 +96,19 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, m_sto_end_early_count = 0; m_sto_end_early_time = 0; + m_escalation_barrier = [](const DBT *, const DBT *, void *) -> bool { + return false; + }; + m_lock_request_info.init(mutex_factory); } +void locktree::set_escalation_barrier_func( + lt_escalation_barrier_check_func func, void *extra) { + m_escalation_barrier = func; + m_escalation_barrier_arg = extra; +} + void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) { pending_lock_requests.create(); pending_is_empty = true; @@ -863,14 +873,19 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback, // - belongs to a different txnid, or // - belongs to several txnids, or // - is a shared lock (we could potentially merge those but - // currently we don't) + // currently we don't), or + // - is across a lock escalation barrier. int next_txnid_index = current_index + 1; while (next_txnid_index < num_extracted && (extracted_buf[current_index].txnid == extracted_buf[next_txnid_index].txnid) && !extracted_buf[next_txnid_index].is_shared && - !extracted_buf[next_txnid_index].owners) { + !extracted_buf[next_txnid_index].owners && + !m_escalation_barrier( + extracted_buf[current_index].range.get_right_key(), + extracted_buf[next_txnid_index].range.get_left_key(), + m_escalation_barrier_arg)) { next_txnid_index++; } diff --git a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h index 3e438f5029..f0f4b042d3 100644 --- a/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h +++ b/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.h @@ -85,6 +85,9 @@ typedef void (*lt_destroy_cb)(locktree *lt); typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt, const range_buffer &buffer, void *extra); +typedef bool (*lt_escalation_barrier_check_func)(const DBT *a, const DBT *b, + void *extra); + struct lt_counters { uint64_t wait_count, wait_time; uint64_t long_wait_count, long_wait_time; @@ -343,6 +346,20 @@ class locktree { void set_comparator(const comparator &cmp); + // Set the user-provided Lock Escalation Barrier check function and its + // argument + // + // Lock Escalation Barrier limits the scope of Lock Escalation. + // For two keys A and B (such that A < B), + // escalation_barrier_check_func(A, B)==true means that there's a lock + // escalation barrier between A and B, and lock escalation is not allowed to + // bridge the gap between A and B. + // + // This method sets the user-provided barrier check function and its + // parameter. + void set_escalation_barrier_func(lt_escalation_barrier_check_func func, + void *extra); + int compare(const locktree *lt) const; DICTIONARY_ID get_dict_id() const; @@ -373,6 +390,9 @@ class locktree { // userdata pointer below. see locktree_manager::get_lt w/ on_create_extra comparator m_cmp; + lt_escalation_barrier_check_func m_escalation_barrier; + void *m_escalation_barrier_arg; + concurrent_tree *m_rangetree; void *m_userdata; diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc index 054e24d3a4..ae99be5343 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.cc @@ -48,14 +48,15 @@ void serialize_endpoint(const Endpoint& endp, std::string* buf) { } // Decode the endpoint from the format it is stored in the locktree (DBT) to -// one used outside (EndpointWithString) -void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) { +// the one used outside: either Endpoint or EndpointWithString +template +void deserialize_endpoint(const DBT* dbt, EndpointStruct* endp) { assert(dbt->size >= 1); const char* dbt_data = (const char*)dbt->data; char suffix = dbt_data[0]; assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM); endp->inf_suffix = (suffix == SUFFIX_SUPREMUM); - endp->slice.assign(dbt_data + 1, dbt->size - 1); + endp->slice = decltype(EndpointStruct::slice)(dbt_data + 1, dbt->size - 1); } // Get a range lock on [start_key; end_key] range @@ -263,6 +264,21 @@ RangeTreeLockManager::RangeTreeLockManager( ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_); } +int RangeTreeLockManager::on_create(toku::locktree* lt, void* arg) { + // arg is a pointer to RangeTreeLockManager + lt->set_escalation_barrier_func(&OnEscalationBarrierCheck, arg); + return 0; +} + +bool RangeTreeLockManager::OnEscalationBarrierCheck(const DBT* a, const DBT* b, + void* extra) { + Endpoint a_endp, b_endp; + deserialize_endpoint(a, &a_endp); + deserialize_endpoint(b, &b_endp); + auto self = static_cast(extra); + return self->barrier_func_(a_endp, b_endp); +} + void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize( uint32_t target_size) { dlock_buffer_.Resize(target_size); @@ -357,8 +373,9 @@ void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) { DICTIONARY_ID dict_id = {.dictid = column_family_id}; toku::comparator cmp; cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator()); - toku::locktree* ltree = ltm_.get_lt(dict_id, cmp, - /* on_create_extra*/ nullptr); + toku::locktree* ltree = + ltm_.get_lt(dict_id, cmp, + /* on_create_extra*/ static_cast(this)); // This is ok to because get_lt has copied the comparator: cmp.destroy(); diff --git a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h index e7c150281c..e4236d600a 100644 --- a/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h +++ b/utilities/transactions/lock/range/range_tree/range_tree_lock_manager.h @@ -91,9 +91,16 @@ class RangeTreeLockManager : public RangeLockManagerBase, // Get the locktree which stores locks for the Column Family with given cf_id std::shared_ptr GetLockTreeForCF(ColumnFamilyId cf_id); + void SetEscalationBarrierFunc(EscalationBarrierFunc func) override { + barrier_func_ = func; + } + private: toku::locktree_manager ltm_; + EscalationBarrierFunc barrier_func_ = + [](const Endpoint&, const Endpoint&) -> bool { return false; }; + std::shared_ptr mutex_factory_; // Map from cf_id to locktree*. Can only be accessed while holding the @@ -114,10 +121,12 @@ class RangeTreeLockManager : public RangeLockManagerBase, static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key); // Callbacks - static int on_create(toku::locktree*, void*) { return 0; /* no error */ } + static int on_create(toku::locktree*, void*); static void on_destroy(toku::locktree*) {} static void on_escalate(TXNID txnid, const toku::locktree* lt, const toku::range_buffer& buffer, void* extra); + + static bool OnEscalationBarrierCheck(const DBT* a, const DBT* b, void* extra); }; void serialize_endpoint(const Endpoint& endp, std::string* buf);