rocksdb/utilities/transactions/lock/range/range_locking_test.cc

449 lines
13 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef OS_WIN
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/lock/point/point_lock_manager_test.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_test.h"
using std::string;
namespace ROCKSDB_NAMESPACE {
class RangeLockingTest : public ::testing::Test {
public:
TransactionDB* db;
std::string dbname;
Options options;
std::shared_ptr<RangeLockManagerHandle> range_lock_mgr;
TransactionDBOptions txn_db_options;
RangeLockingTest() : db(nullptr) {
options.create_if_missing = true;
dbname = test::PerThreadDBPath("range_locking_testdb");
EXPECT_OK(DestroyDB(dbname, options));
range_lock_mgr.reset(NewRangeLockManager(nullptr));
txn_db_options.lock_mgr_handle = range_lock_mgr;
auto s = TransactionDB::Open(options, txn_db_options, dbname, &db);
assert(s.ok());
}
~RangeLockingTest() {
delete db;
db = nullptr;
// This is to skip the assert statement in FaultInjectionTestEnv. There
// seems to be a bug in btrfs that the makes readdir return recently
// unlink-ed files. By using the default fs we simply ignore errors resulted
// from attempting to delete such files in DestroyDB.
EXPECT_OK(DestroyDB(dbname, options));
}
PessimisticTransaction* NewTxn(
TransactionOptions txn_opt = TransactionOptions()) {
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opt);
Prefer static_cast in place of most reinterpret_cast (#12308) Summary: The following are risks associated with pointer-to-pointer reinterpret_cast: * Can produce the "wrong result" (crash or memory corruption). IIRC, in theory this can happen for any up-cast or down-cast for a non-standard-layout type, though in practice would only happen for multiple inheritance cases (where the base class pointer might be "inside" the derived object). We don't use multiple inheritance a lot, but we do. * Can mask useful compiler errors upon code change, including converting between unrelated pointer types that you are expecting to be related, and converting between pointer and scalar types unintentionally. I can only think of some obscure cases where static_cast could be troublesome when it compiles as a replacement: * Going through `void*` could plausibly cause unnecessary or broken pointer arithmetic. Suppose we have `struct Derived: public Base1, public Base2`. If we have `Derived*` -> `void*` -> `Base2*` -> `Derived*` through reinterpret casts, this could plausibly work (though technical UB) assuming the `Base2*` is not dereferenced. Changing to static cast could introduce breaking pointer arithmetic. * Unnecessary (but safe) pointer arithmetic could arise in a case like `Derived*` -> `Base2*` -> `Derived*` where before the Base2 pointer might not have been dereferenced. This could potentially affect performance. With some light scripting, I tried replacing pointer-to-pointer reinterpret_casts with static_cast and kept the cases that still compile. Most occurrences of reinterpret_cast have successfully been changed (except for java/ and third-party/). 294 changed, 257 remain. A couple of related interventions included here: * Previously Cache::Handle was not actually derived from in the implementations and just used as a `void*` stand-in with reinterpret_cast. Now there is a relationship to allow static_cast. In theory, this could introduce pointer arithmetic (as described above) but is unlikely without multiple inheritance AND non-empty Cache::Handle. * Remove some unnecessary casts to void* as this is allowed to be implicit (for better or worse). Most of the remaining reinterpret_casts are for converting to/from raw bytes of objects. We could consider better idioms for these patterns in follow-up work. I wish there were a way to implement a template variant of static_cast that would only compile if no pointer arithmetic is generated, but best I can tell, this is not possible. AFAIK the best you could do is a dynamic check that the void* conversion after the static cast is unchanged. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12308 Test Plan: existing tests, CI Reviewed By: ltamasi Differential Revision: D53204947 Pulled By: pdillinger fbshipit-source-id: 9de23e618263b0d5b9820f4e15966876888a16e2
2024-02-07 18:44:11 +00:00
return static_cast<PessimisticTransaction*>(txn);
}
};
// TODO: set a smaller lock wait timeout so that the test runs faster.
TEST_F(RangeLockingTest, BasicRangeLocking) {
WriteOptions write_options;
TransactionOptions txn_options;
std::string value;
ReadOptions read_options;
auto cf = db->DefaultColumnFamily();
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
// Check that range Lock inhibits an overlapping range lock
{
auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
ASSERT_TRUE(s.IsTimedOut());
}
// Check that range Lock inhibits an overlapping point lock
{
auto s = txn1->GetForUpdate(read_options, cf, Slice("b"), &value);
ASSERT_TRUE(s.IsTimedOut());
}
// Get a point lock, check that it inhibits range locks
ASSERT_OK(txn0->Put(cf, Slice("n"), Slice("value")));
{
auto s = txn1->GetRangeLock(cf, Endpoint("m"), Endpoint("p"));
ASSERT_TRUE(s.IsTimedOut());
}
ASSERT_OK(txn0->Commit());
txn1->Rollback();
delete txn0;
delete txn1;
}
TEST_F(RangeLockingTest, MyRocksLikeUpdate) {
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto cf = db->DefaultColumnFamily();
Status s;
// Get a range lock for the range we are about to update
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
bool try_range_lock_called = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"RangeTreeLockManager::TryRangeLock:enter",
[&](void* /*arg*/) { try_range_lock_called = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// For performance reasons, the following must NOT call lock_mgr->TryLock():
// We verify that by checking the value of try_range_lock_called.
ASSERT_OK(txn0->Put(cf, Slice("b"), Slice("value"),
/*assume_tracked=*/true));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_FALSE(try_range_lock_called);
txn0->Rollback();
delete txn0;
}
TEST_F(RangeLockingTest, UpgradeLockAndGetConflict) {
WriteOptions write_options;
TransactionOptions txn_options;
auto cf = db->DefaultColumnFamily();
Status s;
std::string value;
txn_options.lock_timeout = 10;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Get the shared lock in txn0
s = txn0->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
false /*exclusive*/);
ASSERT_TRUE(s.IsNotFound());
// Get the shared lock on the same key in txn1
s = txn1->GetForUpdate(ReadOptions(), cf, Slice("a"), &value,
false /*exclusive*/);
ASSERT_TRUE(s.IsNotFound());
// Now, try getting an exclusive lock that overlaps with the above
s = txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("b"));
ASSERT_TRUE(s.IsTimedOut());
txn0->Rollback();
txn1->Rollback();
delete txn0;
delete txn1;
}
TEST_F(RangeLockingTest, SnapshotValidation) {
Status s;
Slice key_slice = Slice("k");
ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
auto txn0 = NewTxn();
txn0->Put(key_slice, Slice("initial"));
txn0->Commit();
// txn1
auto txn1 = NewTxn();
txn1->SetSnapshot();
std::string val1;
ASSERT_OK(txn1->Get(ReadOptions(), cfh, key_slice, &val1));
ASSERT_EQ(val1, "initial");
val1 = val1 + std::string("-txn1");
ASSERT_OK(txn1->Put(cfh, key_slice, Slice(val1)));
// txn2
auto txn2 = NewTxn();
txn2->SetSnapshot();
std::string val2;
// This will see the original value as nothing is committed
// This is also Get, so it is doesn't acquire any locks.
ASSERT_OK(txn2->Get(ReadOptions(), cfh, key_slice, &val2));
ASSERT_EQ(val2, "initial");
// txn1
ASSERT_OK(txn1->Commit());
// txn2
val2 = val2 + std::string("-txn2");
// Now, this call should do Snapshot Validation and fail:
s = txn2->Put(cfh, key_slice, Slice(val2));
ASSERT_TRUE(s.IsBusy());
ASSERT_OK(txn2->Commit());
delete txn0;
delete txn1;
delete txn2;
}
TEST_F(RangeLockingTest, MultipleTrxLockStatusData) {
WriteOptions write_options;
TransactionOptions txn_options;
auto cf = db->DefaultColumnFamily();
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("z"), Endpoint("z")));
ASSERT_OK(txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("e")));
auto s = range_lock_mgr->GetRangeLockStatusData();
ASSERT_EQ(s.size(), 2);
for (auto it = s.begin(); it != s.end(); ++it) {
ASSERT_EQ(it->first, cf->GetID());
auto val = it->second;
ASSERT_FALSE(val.start.inf_suffix);
ASSERT_FALSE(val.end.inf_suffix);
ASSERT_TRUE(val.exclusive);
ASSERT_EQ(val.ids.size(), 1);
if (val.ids[0] == txn0->GetID()) {
ASSERT_EQ(val.start.slice, "z");
ASSERT_EQ(val.end.slice, "z");
} else if (val.ids[0] == txn1->GetID()) {
ASSERT_EQ(val.start.slice, "b");
ASSERT_EQ(val.end.slice, "e");
} else {
FAIL(); // Unknown transaction ID.
}
}
delete txn0;
delete txn1;
}
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define SKIP_LOCK_ESCALATION_TEST 1
#endif
#else
#define SKIP_LOCK_ESCALATION_TEST 1
#endif
#ifndef SKIP_LOCK_ESCALATION_TEST
TEST_F(RangeLockingTest, BasicLockEscalation) {
auto cf = db->DefaultColumnFamily();
auto counters = range_lock_mgr->GetStatus();
// Initially not using any lock memory
ASSERT_EQ(counters.current_lock_memory, 0);
ASSERT_EQ(counters.escalation_count, 0);
ASSERT_EQ(0, range_lock_mgr->SetMaxLockMemory(2000));
// Insert until we see lock escalations
auto txn = NewTxn();
// Get the locks until we hit an escalation
for (int i = 0; i < 2020; i++) {
std::ostringstream buf;
buf << std::setw(8) << std::setfill('0') << i;
std::string buf_str = buf.str();
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
}
counters = range_lock_mgr->GetStatus();
ASSERT_GT(counters.escalation_count, 0);
ASSERT_LE(counters.current_lock_memory, 2000);
delete txn;
}
// An escalation barrier function. Allow escalation iff the first two bytes are
// identical.
static bool escalation_barrier(const Endpoint& a, const Endpoint& b) {
assert(a.slice.size() > 2);
assert(b.slice.size() > 2);
if (memcmp(a.slice.data(), b.slice.data(), 2)) {
return true; // This is a barrier
} else {
return false; // No barrier
}
}
TEST_F(RangeLockingTest, LockEscalationBarrier) {
auto cf = db->DefaultColumnFamily();
auto counters = range_lock_mgr->GetStatus();
// Initially not using any lock memory
ASSERT_EQ(counters.escalation_count, 0);
range_lock_mgr->SetMaxLockMemory(8000);
range_lock_mgr->SetEscalationBarrierFunc(escalation_barrier);
// Insert enough locks to cause lock escalations to happen
auto txn = NewTxn();
const int N = 2000;
for (int i = 0; i < N; i++) {
std::ostringstream buf;
buf << std::setw(4) << std::setfill('0') << i;
std::string buf_str = buf.str();
ASSERT_OK(txn->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
}
counters = range_lock_mgr->GetStatus();
ASSERT_GT(counters.escalation_count, 0);
// Check that lock escalation was not performed across escalation barriers:
// Use another txn to acquire locks near the barriers.
auto txn2 = NewTxn();
range_lock_mgr->SetMaxLockMemory(500000);
for (int i = 100; i < N; i += 100) {
std::ostringstream buf;
buf << std::setw(4) << std::setfill('0') << i - 1 << "-a";
std::string buf_str = buf.str();
// Check that we CAN get a lock near the escalation barrier
ASSERT_OK(txn2->GetRangeLock(cf, Endpoint(buf_str), Endpoint(buf_str)));
}
txn->Rollback();
txn2->Rollback();
delete txn;
delete txn2;
}
#endif
TEST_F(RangeLockingTest, LockWaitCount) {
TransactionOptions txn_options;
auto cf = db->DefaultColumnFamily();
txn_options.lock_timeout = 50;
Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
uint64_t lock_waits1 = range_lock_mgr->GetStatus().lock_wait_count;
// Attempt to get a conflicting lock
auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
ASSERT_TRUE(s.IsTimedOut());
// Check that the counter was incremented
uint64_t lock_waits2 = range_lock_mgr->GetStatus().lock_wait_count;
ASSERT_EQ(lock_waits1 + 1, lock_waits2);
txn0->Rollback();
txn1->Rollback();
delete txn0;
delete txn1;
}
Fix locktree accesses to PessimisticTransactions (#9898) Summary: The current locktree implementation stores the address of the PessimisticTransactions object as the TXNID. However, when a transaction is blocked on a lock, it records the list of waitees with conflicting locks using the rocksdb assigned TransactionID. This is performed by calling GetID() on PessimisticTransactions objects of the waitees, and then recorded in the waiter's list. However, there is no guarantee the objects are valid when recording the waitee list during the conflict callbacks because the waitee could have released the lock and freed the PessimisticTransactions object. The waitee/txnid values are only valid PessimisticTransaction objects while the mutex for the root of the locktree is held. The simplest fix for this problem is to use the address of the PessimisticTransaction as the TransactionID so that it is consistent with its usage in the locktree. The TXNID is only converted back to a PessimisticTransaction for the report_wait callbacks. Since these callbacks are now all made within the critical section where the lock_request queue mutx is held, these conversions will be safe. Otherwise, only the uint64_t TXNID of the waitee is registerd with the waiter transaction. The PessimisitcTransaction object of the waitee is never referenced. The main downside of this approach is the TransactionID will not change if the PessimisticTransaction object is reused for new transactions. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9898 Test Plan: Add a new test case and run unit tests. Also verified with MyRocks workloads using range locks that the crash no longer happens. Reviewed By: riversand963 Differential Revision: D35950376 Pulled By: hermanlee fbshipit-source-id: 8c9cae272e23e487fc139b6a8ed5b8f8f24b1570
2022-04-27 16:12:52 +00:00
TEST_F(RangeLockingTest, LockWaiteeAccess) {
TransactionOptions txn_options;
auto cf = db->DefaultColumnFamily();
txn_options.lock_timeout = 60;
Transaction* txn0 = db->BeginTransaction(WriteOptions(), txn_options);
Transaction* txn1 = db->BeginTransaction(WriteOptions(), txn_options);
// Get a range lock
ASSERT_OK(txn0->GetRangeLock(cf, Endpoint("a"), Endpoint("c")));
std::atomic<bool> reached(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"RangeTreeLockManager::TryRangeLock:EnterWaitingTxn", [&](void* /*arg*/) {
reached.store(true);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread t([&]() {
// Attempt to get a conflicting lock
auto s = txn1->GetRangeLock(cf, Endpoint("b"), Endpoint("z"));
ASSERT_TRUE(s.ok());
txn1->Rollback();
});
while (!reached.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
// Release locks and free the transaction
txn0->Rollback();
delete txn0;
t.join();
delete txn1;
}
void PointLockManagerTestExternalSetup(PointLockManagerTest* self) {
self->env_ = Env::Default();
self->db_dir_ = test::PerThreadDBPath("point_lock_manager_test");
ASSERT_OK(self->env_->CreateDir(self->db_dir_));
Options opt;
opt.create_if_missing = true;
TransactionDBOptions txn_opt;
txn_opt.transaction_lock_timeout = 0;
auto mutex_factory = std::make_shared<TransactionDBMutexFactoryImpl>();
self->locker_.reset(NewRangeLockManager(mutex_factory)->getLockManager());
std::shared_ptr<RangeLockManagerHandle> range_lock_mgr =
std::dynamic_pointer_cast<RangeLockManagerHandle>(self->locker_);
txn_opt.lock_mgr_handle = range_lock_mgr;
ASSERT_OK(TransactionDB::Open(opt, txn_opt, self->db_dir_, &self->db_));
self->wait_sync_point_name_ = "RangeTreeLockManager::TryRangeLock:WaitingTxn";
}
INSTANTIATE_TEST_CASE_P(RangeLockManager, AnyLockManagerTest,
::testing::Values(PointLockManagerTestExternalSetup));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else // OS_WIN
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "skipped as Range Locking is not supported on Windows\n");
return 0;
}
#endif // OS_WIN