mirror of https://github.com/facebook/rocksdb.git
Range Locking: add support for escalation barriers (#9290)
Summary: Range Locking supports Lock Escalation. Lock Escalation is invoked when lock memory is nearly exhausted and it reduced the amount of memory used by joining adjacent locks. Bridging the gap between certain locks has adverse effects. For example, in MyRocks it is not a good idea to bridge the gap between locks in different indexes, as that get the lock to cover large portions of indexes, or even entire indexes. Resolve this by introducing Escalation Barrier. The escalation process will call the user-provided barrier callback function: bool(const Endpoint& a, const Endpoint& b) If the function returns true, there's a barrier between a and b and Lock Escalation will not try to bridge the gap between a and b. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9290 Reviewed By: akankshamahajan15 Differential Revision: D33486753 Pulled By: riversand963 fbshipit-source-id: f97910b67aba0579ea1d35f523ca6863d3dd018e
This commit is contained in:
parent
93b1de4f45
commit
c9042db619
|
@ -97,6 +97,21 @@ class RangeLockManagerHandle : public LockManagerHandle {
|
|||
using RangeLockStatus =
|
||||
std::unordered_multimap<ColumnFamilyId, RangeLockInfo>;
|
||||
|
||||
// Lock Escalation barrier check function.
|
||||
// It is called for a couple of endpoints A and B, such that A < B.
|
||||
// If escalation_barrier_check_func(A, B)==true, then there's a lock
|
||||
// escalation barrier between A and B, and lock escalation is not allowed
|
||||
// to bridge the gap between A and B.
|
||||
//
|
||||
// The function may be called from any thread that acquires or releases
|
||||
// locks. It should not throw exceptions. There is currently no way to return
|
||||
// an error.
|
||||
using EscalationBarrierFunc =
|
||||
std::function<bool(const Endpoint& a, const Endpoint& b)>;
|
||||
|
||||
// Set the user-provided barrier check function
|
||||
virtual void SetEscalationBarrierFunc(EscalationBarrierFunc func) = 0;
|
||||
|
||||
virtual RangeLockStatus GetRangeLockStatusData() = 0;
|
||||
|
||||
class Counters {
|
||||
|
|
|
@ -276,9 +276,10 @@ TEST_F(RangeLockingTest, BasicLockEscalation) {
|
|||
|
||||
// Get the locks until we hit an escalation
|
||||
for (int i = 0; i < 2020; i++) {
|
||||
char buf[32];
|
||||
snprintf(buf, sizeof(buf) - 1, "%08d", i);
|
||||
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf), Endpoint(buf)));
|
||||
std::ostringstream buf;
|
||||
buf << std::setw(8) << std::setfill('0') << i;
|
||||
std::string buf_str = buf.str();
|
||||
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
|
||||
}
|
||||
counters = range_lock_mgr->GetStatus();
|
||||
ASSERT_GT(counters.escalation_count, 0);
|
||||
|
@ -286,6 +287,60 @@ TEST_F(RangeLockingTest, BasicLockEscalation) {
|
|||
|
||||
delete txn;
|
||||
}
|
||||
|
||||
// An escalation barrier function. Allow escalation iff the first two bytes are
|
||||
// identical.
|
||||
static bool escalation_barrier(const Endpoint& a, const Endpoint& b) {
|
||||
assert(a.slice.size() > 2);
|
||||
assert(b.slice.size() > 2);
|
||||
if (memcmp(a.slice.data(), b.slice.data(), 2)) {
|
||||
return true; // This is a barrier
|
||||
} else {
|
||||
return false; // No barrier
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(RangeLockingTest, LockEscalationBarrier) {
|
||||
auto cf = db->DefaultColumnFamily();
|
||||
|
||||
auto counters = range_lock_mgr->GetStatus();
|
||||
|
||||
// Initially not using any lock memory
|
||||
ASSERT_EQ(counters.escalation_count, 0);
|
||||
|
||||
range_lock_mgr->SetMaxLockMemory(8000);
|
||||
range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier);
|
||||
|
||||
// Insert enough locks to cause lock escalations to happen
|
||||
auto txn = NewTxn();
|
||||
const int N = 2000;
|
||||
for (int i = 0; i < N; i++) {
|
||||
std::ostringstream buf;
|
||||
buf << std::setw(4) << std::setfill('0') << i;
|
||||
std::string buf_str = buf.str();
|
||||
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
|
||||
}
|
||||
counters = range_lock_mgr->GetStatus();
|
||||
ASSERT_GT(counters.escalation_count, 0);
|
||||
|
||||
// Check that lock escalation was not performed across escalation barriers:
|
||||
// Use another txn to acquire locks near the barriers.
|
||||
auto txn2 = NewTxn();
|
||||
range_lock_mgr->SetMaxLockMemory(500000);
|
||||
for (int i = 100; i < N; i += 100) {
|
||||
std::ostringstream buf;
|
||||
buf << std::setw(4) << std::setfill('0') << i - 1 << "-a";
|
||||
std::string buf_str = buf.str();
|
||||
// Check that we CAN get a lock near the escalation barrier
|
||||
ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
|
||||
}
|
||||
|
||||
txn->Rollback();
|
||||
txn2->Rollback();
|
||||
delete txn;
|
||||
delete txn2;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
TEST_F(RangeLockingTest, LockWaitCount) {
|
||||
|
|
|
@ -96,9 +96,19 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
|
|||
m_sto_end_early_count = 0;
|
||||
m_sto_end_early_time = 0;
|
||||
|
||||
m_escalation_barrier = [](const DBT *, const DBT *, void *) -> bool {
|
||||
return false;
|
||||
};
|
||||
|
||||
m_lock_request_info.init(mutex_factory);
|
||||
}
|
||||
|
||||
void locktree::set_escalation_barrier_func(
|
||||
lt_escalation_barrier_check_func func, void *extra) {
|
||||
m_escalation_barrier = func;
|
||||
m_escalation_barrier_arg = extra;
|
||||
}
|
||||
|
||||
void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) {
|
||||
pending_lock_requests.create();
|
||||
pending_is_empty = true;
|
||||
|
@ -863,14 +873,19 @@ void locktree::escalate(lt_escalate_cb after_escalate_callback,
|
|||
// - belongs to a different txnid, or
|
||||
// - belongs to several txnids, or
|
||||
// - is a shared lock (we could potentially merge those but
|
||||
// currently we don't)
|
||||
// currently we don't), or
|
||||
// - is across a lock escalation barrier.
|
||||
int next_txnid_index = current_index + 1;
|
||||
|
||||
while (next_txnid_index < num_extracted &&
|
||||
(extracted_buf[current_index].txnid ==
|
||||
extracted_buf[next_txnid_index].txnid) &&
|
||||
!extracted_buf[next_txnid_index].is_shared &&
|
||||
!extracted_buf[next_txnid_index].owners) {
|
||||
!extracted_buf[next_txnid_index].owners &&
|
||||
!m_escalation_barrier(
|
||||
extracted_buf[current_index].range.get_right_key(),
|
||||
extracted_buf[next_txnid_index].range.get_left_key(),
|
||||
m_escalation_barrier_arg)) {
|
||||
next_txnid_index++;
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,9 @@ typedef void (*lt_destroy_cb)(locktree *lt);
|
|||
typedef void (*lt_escalate_cb)(TXNID txnid, const locktree *lt,
|
||||
const range_buffer &buffer, void *extra);
|
||||
|
||||
typedef bool (*lt_escalation_barrier_check_func)(const DBT *a, const DBT *b,
|
||||
void *extra);
|
||||
|
||||
struct lt_counters {
|
||||
uint64_t wait_count, wait_time;
|
||||
uint64_t long_wait_count, long_wait_time;
|
||||
|
@ -343,6 +346,20 @@ class locktree {
|
|||
|
||||
void set_comparator(const comparator &cmp);
|
||||
|
||||
// Set the user-provided Lock Escalation Barrier check function and its
|
||||
// argument
|
||||
//
|
||||
// Lock Escalation Barrier limits the scope of Lock Escalation.
|
||||
// For two keys A and B (such that A < B),
|
||||
// escalation_barrier_check_func(A, B)==true means that there's a lock
|
||||
// escalation barrier between A and B, and lock escalation is not allowed to
|
||||
// bridge the gap between A and B.
|
||||
//
|
||||
// This method sets the user-provided barrier check function and its
|
||||
// parameter.
|
||||
void set_escalation_barrier_func(lt_escalation_barrier_check_func func,
|
||||
void *extra);
|
||||
|
||||
int compare(const locktree *lt) const;
|
||||
|
||||
DICTIONARY_ID get_dict_id() const;
|
||||
|
@ -373,6 +390,9 @@ class locktree {
|
|||
// userdata pointer below. see locktree_manager::get_lt w/ on_create_extra
|
||||
comparator m_cmp;
|
||||
|
||||
lt_escalation_barrier_check_func m_escalation_barrier;
|
||||
void *m_escalation_barrier_arg;
|
||||
|
||||
concurrent_tree *m_rangetree;
|
||||
|
||||
void *m_userdata;
|
||||
|
|
|
@ -48,14 +48,15 @@ void serialize_endpoint(const Endpoint& endp, std::string* buf) {
|
|||
}
|
||||
|
||||
// Decode the endpoint from the format it is stored in the locktree (DBT) to
|
||||
// one used outside (EndpointWithString)
|
||||
void deserialize_endpoint(const DBT* dbt, EndpointWithString* endp) {
|
||||
// the one used outside: either Endpoint or EndpointWithString
|
||||
template <typename EndpointStruct>
|
||||
void deserialize_endpoint(const DBT* dbt, EndpointStruct* endp) {
|
||||
assert(dbt->size >= 1);
|
||||
const char* dbt_data = (const char*)dbt->data;
|
||||
char suffix = dbt_data[0];
|
||||
assert(suffix == SUFFIX_INFIMUM || suffix == SUFFIX_SUPREMUM);
|
||||
endp->inf_suffix = (suffix == SUFFIX_SUPREMUM);
|
||||
endp->slice.assign(dbt_data + 1, dbt->size - 1);
|
||||
endp->slice = decltype(EndpointStruct::slice)(dbt_data + 1, dbt->size - 1);
|
||||
}
|
||||
|
||||
// Get a range lock on [start_key; end_key] range
|
||||
|
@ -263,6 +264,21 @@ RangeTreeLockManager::RangeTreeLockManager(
|
|||
ltm_.create(on_create, on_destroy, on_escalate, nullptr, mutex_factory_);
|
||||
}
|
||||
|
||||
int RangeTreeLockManager::on_create(toku::locktree* lt, void* arg) {
|
||||
// arg is a pointer to RangeTreeLockManager
|
||||
lt->set_escalation_barrier_func(&OnEscalationBarrierCheck, arg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool RangeTreeLockManager::OnEscalationBarrierCheck(const DBT* a, const DBT* b,
|
||||
void* extra) {
|
||||
Endpoint a_endp, b_endp;
|
||||
deserialize_endpoint(a, &a_endp);
|
||||
deserialize_endpoint(b, &b_endp);
|
||||
auto self = static_cast<RangeTreeLockManager*>(extra);
|
||||
return self->barrier_func_(a_endp, b_endp);
|
||||
}
|
||||
|
||||
void RangeTreeLockManager::SetRangeDeadlockInfoBufferSize(
|
||||
uint32_t target_size) {
|
||||
dlock_buffer_.Resize(target_size);
|
||||
|
@ -357,8 +373,9 @@ void RangeTreeLockManager::AddColumnFamily(const ColumnFamilyHandle* cfh) {
|
|||
DICTIONARY_ID dict_id = {.dictid = column_family_id};
|
||||
toku::comparator cmp;
|
||||
cmp.create(CompareDbtEndpoints, (void*)cfh->GetComparator());
|
||||
toku::locktree* ltree = ltm_.get_lt(dict_id, cmp,
|
||||
/* on_create_extra*/ nullptr);
|
||||
toku::locktree* ltree =
|
||||
ltm_.get_lt(dict_id, cmp,
|
||||
/* on_create_extra*/ static_cast<void*>(this));
|
||||
// This is ok to because get_lt has copied the comparator:
|
||||
cmp.destroy();
|
||||
|
||||
|
|
|
@ -91,9 +91,16 @@ class RangeTreeLockManager : public RangeLockManagerBase,
|
|||
// Get the locktree which stores locks for the Column Family with given cf_id
|
||||
std::shared_ptr<toku::locktree> GetLockTreeForCF(ColumnFamilyId cf_id);
|
||||
|
||||
void SetEscalationBarrierFunc(EscalationBarrierFunc func) override {
|
||||
barrier_func_ = func;
|
||||
}
|
||||
|
||||
private:
|
||||
toku::locktree_manager ltm_;
|
||||
|
||||
EscalationBarrierFunc barrier_func_ =
|
||||
[](const Endpoint&, const Endpoint&) -> bool { return false; };
|
||||
|
||||
std::shared_ptr<TransactionDBMutexFactory> mutex_factory_;
|
||||
|
||||
// Map from cf_id to locktree*. Can only be accessed while holding the
|
||||
|
@ -114,10 +121,12 @@ class RangeTreeLockManager : public RangeLockManagerBase,
|
|||
static int CompareDbtEndpoints(void* arg, const DBT* a_key, const DBT* b_key);
|
||||
|
||||
// Callbacks
|
||||
static int on_create(toku::locktree*, void*) { return 0; /* no error */ }
|
||||
static int on_create(toku::locktree*, void*);
|
||||
static void on_destroy(toku::locktree*) {}
|
||||
static void on_escalate(TXNID txnid, const toku::locktree* lt,
|
||||
const toku::range_buffer& buffer, void* extra);
|
||||
|
||||
static bool OnEscalationBarrierCheck(const DBT* a, const DBT* b, void* extra);
|
||||
};
|
||||
|
||||
void serialize_endpoint(const Endpoint& endp, std::string* buf);
|
||||
|
|
Loading…
Reference in New Issue