diff --git a/util/timer.h b/util/timer.h index 21ddf330a9..6a5022d2ba 100644 --- a/util/timer.h +++ b/util/timer.h @@ -15,6 +15,7 @@ #include "monitoring/instrumented_mutex.h" #include "rocksdb/env.h" +#include "test_util/sync_point.h" #include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { @@ -56,6 +57,7 @@ class Timer { InstrumentedMutexLock l(&mutex_); heap_.push(fn_info.get()); map_.emplace(std::make_pair(fn_name, std::move(fn_info))); + cond_var_.Signal(); } void Cancel(const std::string& fn_name) { @@ -112,6 +114,7 @@ class Timer { while (running_) { if (heap_.empty()) { // wait + TEST_SYNC_POINT("Timer::Run::Waiting"); cond_var_.Wait(); continue; } diff --git a/util/timer_test.cc b/util/timer_test.cc index 73fc71c42d..7f9012ebc4 100644 --- a/util/timer_test.cc +++ b/util/timer_test.cc @@ -237,6 +237,49 @@ TEST_F(TimerTest, MultipleScheduleRepeatedlyTest) { ASSERT_EQ(count2, 5); } +TEST_F(TimerTest, AddAfterStartTest) { + const int kIterations = 5; + InstrumentedMutex mutex; + InstrumentedCondVar test_cv(&mutex); + + // wait timer to run and then add a new job + SyncPoint::GetInstance()->LoadDependency( + {{"Timer::Run::Waiting", "TimerTest:AddAfterStartTest:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + mock_env_->set_current_time(0); + Timer timer(mock_env_.get()); + + ASSERT_TRUE(timer.Start()); + + TEST_SYNC_POINT("TimerTest:AddAfterStartTest:1"); + int count = 0; + timer.Add( + [&] { + InstrumentedMutexLock l(&mutex); + count++; + if (count >= kIterations) { + test_cv.SignalAll(); + } + }, + "fn_sch_test", 1 * kSecond, 1 * kSecond); + + // Wait for execution to finish + uint64_t time_counter = 0; + { + InstrumentedMutexLock l(&mutex); + while (count < kIterations) { + time_counter += kSecond; + mock_env_->set_current_time(time_counter); + test_cv.TimedWait(time_counter); + } + } + + ASSERT_TRUE(timer.Shutdown()); + + ASSERT_EQ(kIterations, count); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {