Add SystemClock::TimedWait() function (#11753)

Summary:
Having a synthetic implementation of `TimedWait()` in `SystemClock` will allow us to add `SyncPoint`s while mutex is released, which was previously impossible since the lock was released and reacquired all within `pthread_cond_timedwait()`. Additionally, integrating `TimedWait()` with `MockSystemClock` allows us to cleanup some workarounds in the test code. In this PR I only cleaned up the `GenericRateLimiter` test workaround.

This is related to the intended follow-up mentioned in https://github.com/facebook/rocksdb/issues/7101's description. There are a couple differences:

(1) This PR does not include removing the particular workaround that initially motivated it. Actually, the `Timer` class uses `InstrumentedCondVar`, so the interface introduced here is inadequate to remove that workaround. On the bright side, the interface introduced in this PR can be changed as needed since it can neither be used nor extended externally, due to using forward-declared `port::CondVar*` in the interface.
(2) This PR only makes the change in `SystemClock` not `Env`. Older revisions of this PR included `Env::TimedWait()` and `SpecialEnv::TimedWait()`; however, since they were unused it probably makes sense to defer adding them until when they are needed.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11753

Reviewed By: pdillinger

Differential Revision: D48654995

Pulled By: ajkr

fbshipit-source-id: 15e19f2454b64d4ec7f50e328691c66ca9911122
This commit is contained in:
Andrew Kryczka 2023-08-29 18:39:10 -07:00 committed by Facebook GitHub Bot
parent 0b8b17a9d1
commit e373685dab
8 changed files with 64 additions and 20 deletions

5
env/env.cc vendored
View File

@ -1229,4 +1229,9 @@ Status SystemClock::CreateFromString(const ConfigOptions& config_options,
return LoadSharedObject<SystemClock>(config_options, value, result); return LoadSharedObject<SystemClock>(config_options, value, result);
} }
} }
bool SystemClock::TimedWait(port::CondVar* cv,
std::chrono::microseconds deadline) {
return cv->TimedWait(deadline.count());
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -12,6 +12,10 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace port {
class CondVar;
}
enum class CpuPriority { enum class CpuPriority {
kIdle = 0, kIdle = 0,
kLow = 1, kLow = 1,

View File

@ -9,9 +9,11 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <chrono>
#include <memory> #include <memory>
#include "rocksdb/customizable.h" #include "rocksdb/customizable.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -68,6 +70,14 @@ class SystemClock : public Customizable {
// Sleep/delay the thread for the prescribed number of micro-seconds. // Sleep/delay the thread for the prescribed number of micro-seconds.
virtual void SleepForMicroseconds(int micros) = 0; virtual void SleepForMicroseconds(int micros) = 0;
// For internal use/extension only.
//
// Issues a wait on `cv` that times out at `deadline`. May wakeup and return
// spuriously.
//
// Returns true if wait timed out, false otherwise
virtual bool TimedWait(port::CondVar* cv, std::chrono::microseconds deadline);
// Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC). // Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC).
// Only overwrites *unix_time on success. // Only overwrites *unix_time on success.
virtual Status GetCurrentTime(int64_t* unix_time) = 0; virtual Status GetCurrentTime(int64_t* unix_time) = 0;
@ -94,6 +104,11 @@ class SystemClockWrapper : public SystemClock {
return target_->SleepForMicroseconds(micros); return target_->SleepForMicroseconds(micros);
} }
virtual bool TimedWait(port::CondVar* cv,
std::chrono::microseconds deadline) override {
return target_->TimedWait(cv, deadline);
}
Status GetCurrentTime(int64_t* unix_time) override { Status GetCurrentTime(int64_t* unix_time) override {
return target_->GetCurrentTime(unix_time); return target_->GetCurrentTime(unix_time);
} }

View File

@ -149,6 +149,9 @@ class CondVar {
public: public:
explicit CondVar(Mutex* mu); explicit CondVar(Mutex* mu);
~CondVar(); ~CondVar();
Mutex* GetMutex() const { return mu_; }
void Wait(); void Wait();
// Timed condition wait. Returns true if timeout occurred. // Timed condition wait. Returns true if timeout occurred.
bool TimedWait(uint64_t abs_time_us); bool TimedWait(uint64_t abs_time_us);

View File

@ -170,6 +170,9 @@ class CondVar {
explicit CondVar(Mutex* mu) : mu_(mu) {} explicit CondVar(Mutex* mu) : mu_(mu) {}
~CondVar(); ~CondVar();
Mutex* GetMutex() const { return mu_; }
void Wait(); void Wait();
bool TimedWait(uint64_t expiration_time); bool TimedWait(uint64_t expiration_time);
void Signal(); void Signal();

View File

@ -8,7 +8,10 @@
#include <atomic> #include <atomic>
#include <limits> #include <limits>
#include "port/port.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "test_util/mock_time_env.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -65,6 +68,30 @@ class MockSystemClock : public SystemClockWrapper {
current_time_us_.fetch_add(micros); current_time_us_.fetch_add(micros);
} }
virtual bool TimedWait(port::CondVar* cv,
std::chrono::microseconds deadline) override {
uint64_t now_micros = NowMicros();
uint64_t deadline_micros = static_cast<uint64_t>(deadline.count());
uint64_t delay_micros;
if (deadline_micros > now_micros) {
delay_micros = deadline_micros - now_micros;
} else {
delay_micros = 0;
}
// To prevent slowdown, this `TimedWait()` is completely synthetic. First,
// it yields to coerce other threads to run while the lock is released.
// Second, it randomly selects between mocking an immediate wakeup and a
// timeout.
cv->GetMutex()->Unlock();
std::this_thread::yield();
bool mock_timeout = Random::GetTLSInstance()->OneIn(2);
if (mock_timeout) {
current_time_us_.fetch_add(delay_micros);
}
cv->GetMutex()->Lock();
return mock_timeout;
}
// TODO: this is a workaround for the different behavior on different platform // TODO: this is a workaround for the different behavior on different platform
// for timedwait timeout. Ideally timedwait API should be moved to env. // for timedwait timeout. Ideally timedwait API should be moved to env.
// details: PR #7101. // details: PR #7101.

View File

@ -170,7 +170,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_; ++num_drains_;
wait_until_refill_pending_ = true; wait_until_refill_pending_ = true;
r.cv.TimedWait(wait_until); clock_->TimedWait(&r.cv, std::chrono::microseconds(wait_until));
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait", TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
&time_until_refill_us); &time_until_refill_us);
wait_until_refill_pending_ = false; wait_until_refill_pending_ = false;

View File

@ -15,6 +15,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "test_util/mock_time_env.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "util/random.h" #include "util/random.h"
@ -464,31 +465,21 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
const std::chrono::seconds kTimePerRefill(1); const std::chrono::seconds kTimePerRefill(1);
const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); auto mock_clock =
std::make_shared<MockSystemClock>(Env::Default()->GetSystemClock());
auto stats = CreateDBStatistics(); auto stats = CreateDBStatistics();
std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter( std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
1000 /* rate_bytes_per_sec */, 1000 /* rate_bytes_per_sec */,
std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), RateLimiter::Mode::kWritesOnly, mock_clock, true /* auto_tuned */));
true /* auto_tuned */));
// Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
// `Env` to advance its time according to the fake wait duration. The
// workaround is to install a callback that advance the `Env`'s mock time.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
int64_t time_waited_us = *static_cast<int64_t*>(arg);
special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// verify rate limit increases after a sequence of periods where rate limiter // verify rate limit increases after a sequence of periods where rate limiter
// is always drained // is always drained
int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
while (std::chrono::microseconds(special_env.NowMicros()) <= while (std::chrono::microseconds(mock_clock->NowMicros()) <=
kRefillsPerTune * kTimePerRefill) { kRefillsPerTune * kTimePerRefill) {
rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
@ -496,13 +487,9 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::Request:PostTimedWait");
// decreases after a sequence of periods where rate limiter is not drained // decreases after a sequence of periods where rate limiter is not drained
orig_bytes_per_sec = new_bytes_per_sec; orig_bytes_per_sec = new_bytes_per_sec;
special_env.SleepForMicroseconds(static_cast<int>( mock_clock->SleepForMicroseconds(static_cast<int>(
kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
// make a request so tuner can be triggered // make a request so tuner can be triggered
rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(), rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),