diff --git a/db/db_options_test.cc b/db/db_options_test.cc index d3a1f7a434..a7ecf12744 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -527,6 +527,17 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); int counter = 0; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; @@ -556,6 +567,17 @@ TEST_F(DBOptionsTest, StatsPersistScheduling) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG int counter = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); @@ -581,6 +603,17 @@ TEST_F(DBOptionsTest, PersistentStatsFreshInstall) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG int counter = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); @@ -616,6 +649,19 @@ TEST_F(DBOptionsTest, GetStatsHistory) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); @@ -658,6 +704,19 @@ TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) { mock_env.reset(new rocksdb::MockTimeEnv(env_)); mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + CreateColumnFamilies({"pikachu"}, options); ASSERT_OK(Put("foo", "bar")); ReopenWithColumnFamilies({"default", "pikachu"}, options); diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index cd6cde19a9..dcd5847eb2 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -356,6 +356,14 @@ TEST_F(DBSSTTest, RateLimitedDelete) { env_->time_elapse_only_sleep_ = true; Options options = CurrentOptions(); options.disable_auto_compactions = true; + // Need to disable stats dumping and persisting which also use + // RepeatableThread, one of whose member variables is of type + // InstrumentedCondVar. The callback for + // InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping + // and persisting threads and cause time_spent_deleting measurement to become + // incorrect. + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; options.env = env_; int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec diff --git a/util/mock_time_env.h b/util/mock_time_env.h index c6ab8a7483..feada47771 100644 --- a/util/mock_time_env.h +++ b/util/mock_time_env.h @@ -31,6 +31,8 @@ class MockTimeEnv : public EnvWrapper { return current_time_ * 1000000000; } + uint64_t RealNowMicros() { return target()->NowMicros(); } + void set_current_time(uint64_t time) { assert(time >= current_time_); current_time_ = time; diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 770ba57724..967cc49945 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -25,6 +25,7 @@ class RepeatableThread { env_(env), delay_us_(delay_us), initial_delay_us_(initial_delay_us), + mutex_(env), cond_var_(&mutex_), running_(true), #ifndef NDEBUG @@ -36,7 +37,7 @@ class RepeatableThread { void cancel() { { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (!running_) { return; } @@ -58,7 +59,7 @@ class RepeatableThread { // // Note: only support one caller of this method. void TEST_WaitForRun(std::function callback = nullptr) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); while (!waiting_) { cond_var_.Wait(); } @@ -75,7 +76,7 @@ class RepeatableThread { private: bool wait(uint64_t delay) { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); if (running_ && delay > 0) { uint64_t wait_until = env_->NowMicros() + delay; #ifndef NDEBUG @@ -83,17 +84,7 @@ class RepeatableThread { cond_var_.SignalAll(); #endif while (running_) { -#ifndef NDEBUG - if (dynamic_cast(env_) != nullptr) { - // MockTimeEnv is used. Since it is not easy to mock TimedWait, - // we wait without timeout to wait for TEST_WaitForRun to wake us up. - cond_var_.Wait(); - } else { - cond_var_.TimedWait(wait_until); - } -#else cond_var_.TimedWait(wait_until); -#endif if (env_->NowMicros() >= wait_until) { break; } @@ -124,7 +115,7 @@ class RepeatableThread { function_(); #ifndef NDEBUG { - MutexLock l(&mutex_); + InstrumentedMutexLock l(&mutex_); run_count_++; cond_var_.SignalAll(); } @@ -140,8 +131,8 @@ class RepeatableThread { // Mutex lock should be held when accessing running_, waiting_ // and run_count_. - port::Mutex mutex_; - port::CondVar cond_var_; + InstrumentedMutex mutex_; + InstrumentedCondVar cond_var_; bool running_; #ifndef NDEBUG // RepeatableThread waiting for timeout. diff --git a/util/repeatable_thread_test.cc b/util/repeatable_thread_test.cc index dec437da32..ee853c1056 100644 --- a/util/repeatable_thread_test.cc +++ b/util/repeatable_thread_test.cc @@ -8,6 +8,7 @@ #include "db/db_test_util.h" #include "util/repeatable_thread.h" +#include "util/sync_point.h" #include "util/testharness.h" class RepeatableThreadTest : public testing::Test { @@ -56,6 +57,35 @@ TEST_F(RepeatableThreadTest, MockEnvTest) { constexpr int kIteration = 3; mock_env_->set_current_time(0); // in seconds std::atomic count{0}; + +#if defined(OS_MACOSX) && !defined(NDEBUG) + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + // Obtain the current (real) time in seconds and add 1000 extra seconds + // to ensure that RepeatableThread::wait invokes TimedWait with a time + // greater than (real) current time. This is to prevent the TimedWait + // function from returning immediately without sleeping and releasing + // the mutex on certain platforms, e.g. OS X. If TimedWait returns + // immediately, the mutex will not be released, and + // RepeatableThread::TEST_WaitForRun never has a chance to execute the + // callback which, in this case, updates the result returned by + // mock_env->NowMicros. Consequently, RepeatableThread::wait cannot + // break out of the loop, causing test to hang. The extra 1000 seconds + // is a best-effort approach because there seems no reliable and + // deterministic way to provide the aforementioned guarantee. By the + // time RepeatableThread::wait is called, it is no guarantee that the + // delay + mock_env->NowMicros will be greater than the current real + // time. However, 1000 seconds should be sufficient in most cases. + uint64_t time_us = *reinterpret_cast(arg); + if (time_us < mock_env_->RealNowMicros()) { + *reinterpret_cast(arg) = mock_env_->RealNowMicros() + 1000; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + rocksdb::RepeatableThread thread([&] { count++; }, "rt_test", mock_env_.get(), 1 * kSecond, 1 * kSecond); for (int i = 1; i <= kIteration; i++) {