Add a unit test for the fix in #11763 (#11810)

Summary:
The unit test depended on https://github.com/facebook/rocksdb/issues/11753, which landed after the bug fix

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11810

Reviewed By: jaykorean

Differential Revision: D49128695

Pulled By: ajkr

fbshipit-source-id: e0a98bd65a292a7c7bd03913650f73c26d0864c7
This commit is contained in:
Andrew Kryczka 2023-09-11 12:54:50 -07:00 committed by Facebook GitHub Bot
parent 760ea373a8
commit 694e49cbb1
2 changed files with 59 additions and 0 deletions

View File

@ -11,6 +11,7 @@
#include "port/port.h"
#include "rocksdb/system_clock.h"
#include "test_util/mock_time_env.h"
#include "test_util/sync_point.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
@ -86,7 +87,10 @@ class MockSystemClock : public SystemClockWrapper {
std::this_thread::yield();
bool mock_timeout = Random::GetTLSInstance()->OneIn(2);
if (mock_timeout) {
TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPreSleep");
current_time_us_.fetch_add(delay_micros);
TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPostSleep1");
TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPostSleep2");
}
cv->GetMutex()->Lock();
return mock_timeout;

View File

@ -498,6 +498,61 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
}
TEST_F(RateLimiterTest, WaitHangingBug) {
// At t=0: Threads 0 and 1 request `kBytesPerRefill` bytes at low-pri. One
// will be granted immediately and the other will enter `TimedWait()`.
//
// At t=`kMicrosPerRefill`: Thread 2 requests `kBytesPerRefill` bytes at
// low-pri. Thread 2's request enters the queue. To expose the bug scenario,
// `SyncPoint`s ensure this happens while the lock is temporarily released in
// `TimedWait()`. Before the bug fix, Thread 2's request would then hang in
// `Wait()` interminably.
const int kBytesPerSecond = 100;
const int kMicrosPerSecond = 1000 * 1000;
const int kMicrosPerRefill = kMicrosPerSecond;
const int kBytesPerRefill =
kBytesPerSecond * kMicrosPerRefill / kMicrosPerSecond;
auto mock_clock =
std::make_shared<MockSystemClock>(Env::Default()->GetSystemClock());
std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(
kBytesPerSecond, kMicrosPerRefill, 10 /* fairness */,
RateLimiter::Mode::kWritesOnly, mock_clock, false /* auto_tuned */));
std::array<std::thread, 3> request_threads;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"RateLimiterTest::WaitHangingBug:InitialRequestsReady",
"MockSystemClock::TimedWait:UnlockedPreSleep"},
{"MockSystemClock::TimedWait:UnlockedPostSleep1",
"RateLimiterTest::WaitHangingBug:TestThreadRequestBegin"},
{"RateLimiterTest::WaitHangingBug:TestThreadRequestEnd",
"MockSystemClock::TimedWait:UnlockedPostSleep2"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 2; i++) {
request_threads[i] = std::thread([&]() {
limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW,
nullptr /* stats */, RateLimiter::OpType::kWrite);
});
}
while (limiter->GetTotalRequests() < 2) {
}
TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:InitialRequestsReady");
TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestBegin");
request_threads[2] = std::thread([&]() {
limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW,
nullptr /* stats */, RateLimiter::OpType::kWrite);
});
while (limiter->GetTotalRequests() < 3) {
}
TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestEnd");
for (int i = 0; i < 3; i++) {
request_threads[i].join();
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {