diff --git a/test_util/mock_time_env.h b/test_util/mock_time_env.h index e11bed0d1c..19bb9e76de 100644 --- a/test_util/mock_time_env.h +++ b/test_util/mock_time_env.h @@ -11,6 +11,7 @@ #include "port/port.h" #include "rocksdb/system_clock.h" #include "test_util/mock_time_env.h" +#include "test_util/sync_point.h" #include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -86,7 +87,10 @@ class MockSystemClock : public SystemClockWrapper { std::this_thread::yield(); bool mock_timeout = Random::GetTLSInstance()->OneIn(2); if (mock_timeout) { + TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPreSleep"); current_time_us_.fetch_add(delay_micros); + TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPostSleep1"); + TEST_SYNC_POINT("MockSystemClock::TimedWait:UnlockedPostSleep2"); } cv->GetMutex()->Lock(); return mock_timeout; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index dfaa3a2cd6..f31981a5c7 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -498,6 +498,61 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); } +TEST_F(RateLimiterTest, WaitHangingBug) { + // At t=0: Threads 0 and 1 request `kBytesPerRefill` bytes at low-pri. One + // will be granted immediately and the other will enter `TimedWait()`. + // + // At t=`kMicrosPerRefill`: Thread 2 requests `kBytesPerRefill` bytes at + // low-pri. Thread 2's request enters the queue. To expose the bug scenario, + // `SyncPoint`s ensure this happens while the lock is temporarily released in + // `TimedWait()`. Before the bug fix, Thread 2's request would then hang in + // `Wait()` interminably. + const int kBytesPerSecond = 100; + const int kMicrosPerSecond = 1000 * 1000; + const int kMicrosPerRefill = kMicrosPerSecond; + const int kBytesPerRefill = + kBytesPerSecond * kMicrosPerRefill / kMicrosPerSecond; + + auto mock_clock = + std::make_shared(Env::Default()->GetSystemClock()); + std::unique_ptr limiter(new GenericRateLimiter( + kBytesPerSecond, kMicrosPerRefill, 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, mock_clock, false /* auto_tuned */)); + std::array request_threads; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RateLimiterTest::WaitHangingBug:InitialRequestsReady", + "MockSystemClock::TimedWait:UnlockedPreSleep"}, + {"MockSystemClock::TimedWait:UnlockedPostSleep1", + "RateLimiterTest::WaitHangingBug:TestThreadRequestBegin"}, + {"RateLimiterTest::WaitHangingBug:TestThreadRequestEnd", + "MockSystemClock::TimedWait:UnlockedPostSleep2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < 2; i++) { + request_threads[i] = std::thread([&]() { + limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW, + nullptr /* stats */, RateLimiter::OpType::kWrite); + }); + } + while (limiter->GetTotalRequests() < 2) { + } + TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:InitialRequestsReady"); + + TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestBegin"); + request_threads[2] = std::thread([&]() { + limiter->Request(kBytesPerRefill /* bytes */, Env::IOPriority::IO_LOW, + nullptr /* stats */, RateLimiter::OpType::kWrite); + }); + while (limiter->GetTotalRequests() < 3) { + } + TEST_SYNC_POINT("RateLimiterTest::WaitHangingBug:TestThreadRequestEnd"); + + for (int i = 0; i < 3; i++) { + request_threads[i].join(); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {