From e373685dab1b0716a8ed7532c80ad2e4d05e7590 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 29 Aug 2023 18:39:10 -0700 Subject: [PATCH] Add SystemClock::TimedWait() function (#11753) Summary: Having a synthetic implementation of `TimedWait()` in `SystemClock` will allow us to add `SyncPoint`s while mutex is released, which was previously impossible since the lock was released and reacquired all within `pthread_cond_timedwait()`. Additionally, integrating `TimedWait()` with `MockSystemClock` allows us to cleanup some workarounds in the test code. In this PR I only cleaned up the `GenericRateLimiter` test workaround. This is related to the intended follow-up mentioned in https://github.com/facebook/rocksdb/issues/7101's description. There are a couple differences: (1) This PR does not include removing the particular workaround that initially motivated it. Actually, the `Timer` class uses `InstrumentedCondVar`, so the interface introduced here is inadequate to remove that workaround. On the bright side, the interface introduced in this PR can be changed as needed since it can neither be used nor extended externally, due to using forward-declared `port::CondVar*` in the interface. (2) This PR only makes the change in `SystemClock` not `Env`. Older revisions of this PR included `Env::TimedWait()` and `SpecialEnv::TimedWait()`; however, since they were unused it probably makes sense to defer adding them until when they are needed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11753 Reviewed By: pdillinger Differential Revision: D48654995 Pulled By: ajkr fbshipit-source-id: 15e19f2454b64d4ec7f50e328691c66ca9911122 --- env/env.cc | 5 +++++ include/rocksdb/port_defs.h | 4 ++++ include/rocksdb/system_clock.h | 15 +++++++++++++++ port/port_posix.h | 3 +++ port/win/port_win.h | 3 +++ test_util/mock_time_env.h | 27 +++++++++++++++++++++++++++ util/rate_limiter.cc | 2 +- util/rate_limiter_test.cc | 25 ++++++------------------- 8 files changed, 64 insertions(+), 20 deletions(-) diff --git a/env/env.cc b/env/env.cc index 937be43c05..40493b478b 100644 --- a/env/env.cc +++ b/env/env.cc @@ -1229,4 +1229,9 @@ Status SystemClock::CreateFromString(const ConfigOptions& config_options, return LoadSharedObject(config_options, value, result); } } + +bool SystemClock::TimedWait(port::CondVar* cv, + std::chrono::microseconds deadline) { + return cv->TimedWait(deadline.count()); +} } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/port_defs.h b/include/rocksdb/port_defs.h index 9771aacb92..68f1d61d23 100644 --- a/include/rocksdb/port_defs.h +++ b/include/rocksdb/port_defs.h @@ -12,6 +12,10 @@ namespace ROCKSDB_NAMESPACE { +namespace port { +class CondVar; +} + enum class CpuPriority { kIdle = 0, kLow = 1, diff --git a/include/rocksdb/system_clock.h b/include/rocksdb/system_clock.h index 7ca92e54e3..c4cfcecb55 100644 --- a/include/rocksdb/system_clock.h +++ b/include/rocksdb/system_clock.h @@ -9,9 +9,11 @@ #pragma once #include +#include #include #include "rocksdb/customizable.h" +#include "rocksdb/port_defs.h" #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/status.h" @@ -68,6 +70,14 @@ class SystemClock : public Customizable { // Sleep/delay the thread for the prescribed number of micro-seconds. virtual void SleepForMicroseconds(int micros) = 0; + // For internal use/extension only. + // + // Issues a wait on `cv` that times out at `deadline`. May wakeup and return + // spuriously. + // + // Returns true if wait timed out, false otherwise + virtual bool TimedWait(port::CondVar* cv, std::chrono::microseconds deadline); + // Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC). // Only overwrites *unix_time on success. virtual Status GetCurrentTime(int64_t* unix_time) = 0; @@ -94,6 +104,11 @@ class SystemClockWrapper : public SystemClock { return target_->SleepForMicroseconds(micros); } + virtual bool TimedWait(port::CondVar* cv, + std::chrono::microseconds deadline) override { + return target_->TimedWait(cv, deadline); + } + Status GetCurrentTime(int64_t* unix_time) override { return target_->GetCurrentTime(unix_time); } diff --git a/port/port_posix.h b/port/port_posix.h index cdb256a6d6..e498186041 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -149,6 +149,9 @@ class CondVar { public: explicit CondVar(Mutex* mu); ~CondVar(); + + Mutex* GetMutex() const { return mu_; } + void Wait(); // Timed condition wait. Returns true if timeout occurred. bool TimedWait(uint64_t abs_time_us); diff --git a/port/win/port_win.h b/port/win/port_win.h index 4d9883b63a..621f053703 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -170,6 +170,9 @@ class CondVar { explicit CondVar(Mutex* mu) : mu_(mu) {} ~CondVar(); + + Mutex* GetMutex() const { return mu_; } + void Wait(); bool TimedWait(uint64_t expiration_time); void Signal(); diff --git a/test_util/mock_time_env.h b/test_util/mock_time_env.h index 7834368e03..e11bed0d1c 100644 --- a/test_util/mock_time_env.h +++ b/test_util/mock_time_env.h @@ -8,7 +8,10 @@ #include #include +#include "port/port.h" #include "rocksdb/system_clock.h" +#include "test_util/mock_time_env.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -65,6 +68,30 @@ class MockSystemClock : public SystemClockWrapper { current_time_us_.fetch_add(micros); } + virtual bool TimedWait(port::CondVar* cv, + std::chrono::microseconds deadline) override { + uint64_t now_micros = NowMicros(); + uint64_t deadline_micros = static_cast(deadline.count()); + uint64_t delay_micros; + if (deadline_micros > now_micros) { + delay_micros = deadline_micros - now_micros; + } else { + delay_micros = 0; + } + // To prevent slowdown, this `TimedWait()` is completely synthetic. First, + // it yields to coerce other threads to run while the lock is released. + // Second, it randomly selects between mocking an immediate wakeup and a + // timeout. + cv->GetMutex()->Unlock(); + std::this_thread::yield(); + bool mock_timeout = Random::GetTLSInstance()->OneIn(2); + if (mock_timeout) { + current_time_us_.fetch_add(delay_micros); + } + cv->GetMutex()->Lock(); + return mock_timeout; + } + // TODO: this is a workaround for the different behavior on different platform // for timedwait timeout. Ideally timedwait API should be moved to env. // details: PR #7101. diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index ddb9bdbf02..12eef1311c 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -170,7 +170,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; wait_until_refill_pending_ = true; - r.cv.TimedWait(wait_until); + clock_->TimedWait(&r.cv, std::chrono::microseconds(wait_until)); TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait", &time_until_refill_us); wait_until_refill_pending_ = false; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 7df2bb04f1..dfaa3a2cd6 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,6 +15,7 @@ #include "db/db_test_util.h" #include "port/port.h" #include "rocksdb/system_clock.h" +#include "test_util/mock_time_env.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" @@ -464,31 +465,21 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { const std::chrono::seconds kTimePerRefill(1); const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc - SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + auto mock_clock = + std::make_shared(Env::Default()->GetSystemClock()); auto stats = CreateDBStatistics(); std::unique_ptr rate_limiter(new GenericRateLimiter( 1000 /* rate_bytes_per_sec */, std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, - RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), - true /* auto_tuned */)); - - // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the - // `Env` to advance its time according to the fake wait duration. The - // workaround is to install a callback that advance the `Env`'s mock time. - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { - int64_t time_waited_us = *static_cast(arg); - special_env.SleepForMicroseconds(static_cast(time_waited_us)); - }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + RateLimiter::Mode::kWritesOnly, mock_clock, true /* auto_tuned */)); // verify rate limit increases after a sequence of periods where rate limiter // is always drained int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), RateLimiter::OpType::kWrite); - while (std::chrono::microseconds(special_env.NowMicros()) <= + while (std::chrono::microseconds(mock_clock->NowMicros()) <= kRefillsPerTune * kTimePerRefill) { rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), RateLimiter::OpType::kWrite); @@ -496,13 +487,9 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( - "GenericRateLimiter::Request:PostTimedWait"); - // decreases after a sequence of periods where rate limiter is not drained orig_bytes_per_sec = new_bytes_per_sec; - special_env.SleepForMicroseconds(static_cast( + mock_clock->SleepForMicroseconds(static_cast( kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); // make a request so tuner can be triggered rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),