From 0fa0c97d3e9ac5dfc2e7ae94834b0850cdef5df7 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Fri, 18 Aug 2023 11:21:45 -0700 Subject: [PATCH] Timeout in microsecond option in WaitForCompactOptions (#11711) Summary: While it's rare, we may run into a scenario where `WaitForCompact()` waits for background jobs indefinitely. For example, not enough space error will add the job back to the queue while WaitForCompact() waits for _all jobs_ including the jobs that are in the queue to be completed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11711 Test Plan: `DBCompactionWaitForCompactTest::WaitForCompactToTimeout` added `timeout` option added to the variables for all of the existing DBCompactionWaitForCompactTests Reviewed By: pdillinger, jowlyzhang Differential Revision: D48416390 Pulled By: jaykorean fbshipit-source-id: 7b6a12f705ab6c6dfaf8ad736a484ca654a86106 --- db/db_compaction_test.cc | 62 +++++++++++++++++-- db/db_impl/db_impl_compaction_flush.cc | 10 ++- include/rocksdb/db.h | 3 +- include/rocksdb/options.h | 9 ++- .../timeout_for_wait_for_compact_api.md | 1 + 5 files changed, 77 insertions(+), 8 deletions(-) create mode 100644 unreleased_history/new_features/timeout_for_wait_for_compact_api.md diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 0d23a76e5e..24445ecdb7 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -153,19 +153,23 @@ class DBCompactionDirectIOTest : public DBCompactionTest, DBCompactionDirectIOTest() : DBCompactionTest() {} }; +// Params: See WaitForCompactOptions for details class DBCompactionWaitForCompactTest : public DBTestBase, - public testing::WithParamInterface> { + public testing::WithParamInterface< + std::tuple> { public: DBCompactionWaitForCompactTest() : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) { abort_on_pause_ = std::get<0>(GetParam()); flush_ = std::get<1>(GetParam()); close_db_ = std::get<2>(GetParam()); + timeout_ = std::get<3>(GetParam()); } bool abort_on_pause_; bool flush_; bool close_db_; + std::chrono::microseconds timeout_; Options options_; WaitForCompactOptions wait_for_compact_options_; @@ -182,6 +186,7 @@ class DBCompactionWaitForCompactTest wait_for_compact_options_.abort_on_pause = abort_on_pause_; wait_for_compact_options_.flush = flush_; wait_for_compact_options_.close_db = close_db_; + wait_for_compact_options_.timeout = timeout_; DestroyAndReopen(options_); @@ -3334,10 +3339,19 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -INSTANTIATE_TEST_CASE_P(DBCompactionWaitForCompactTest, - DBCompactionWaitForCompactTest, - ::testing::Combine(testing::Bool(), testing::Bool(), - testing::Bool())); +INSTANTIATE_TEST_CASE_P( + DBCompactionWaitForCompactTest, DBCompactionWaitForCompactTest, + ::testing::Combine( + testing::Bool() /* abort_on_pause */, testing::Bool() /* flush */, + testing::Bool() /* close_db */, + testing::Values( + std::chrono::microseconds::zero(), + std::chrono::microseconds{ + 60 * 60 * + 1000000ULL} /* timeout */))); // 1 hour (long enough to + // make sure that tests + // don't fail unexpectedly + // when running slow) TEST_P(DBCompactionWaitForCompactTest, WaitForCompactWaitsOnCompactionToFinish) { @@ -3583,6 +3597,44 @@ TEST_P(DBCompactionWaitForCompactTest, ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(DBCompactionWaitForCompactTest, WaitForCompactToTimeout) { + // When timeout is set, this test makes CompactionJob hangs forever + // using sync point. This test also sets the timeout to be 1 ms for + // WaitForCompact to time out early. WaitForCompact() is expected to return + // Status::TimedOut. + // When timeout is not set, we expect WaitForCompact() to wait indefinitely. + // We don't want the test to hang forever. When timeout = 0, this test is not + // much different from WaitForCompactWaitsOnCompactionToFinish + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBCompactionTest::WaitForCompactToTimeout", + "CompactionJob::Run():Start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Now trigger L0 compaction by adding a file + Random rnd(123); + GenerateNewRandomFile(&rnd, /* nowait */ true); + ASSERT_OK(Flush()); + + if (wait_for_compact_options_.timeout.count()) { + // Make timeout shorter to finish test early + wait_for_compact_options_.timeout = std::chrono::microseconds{1000}; + } else { + // if timeout is not set, WaitForCompact() will wait forever. We don't + // want test to hang forever. Just let compaction go through + TEST_SYNC_POINT("DBCompactionTest::WaitForCompactToTimeout"); + } + Status s = dbfull()->WaitForCompact(wait_for_compact_options_); + if (wait_for_compact_options_.timeout.count()) { + ASSERT_NOK(s); + ASSERT_TRUE(s.IsTimedOut()); + } else { + ASSERT_OK(s); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + static std::string ShortKey(int i) { assert(i < 10000); char buf[100]; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 157db28a17..732b0667de 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -4114,6 +4114,8 @@ Status DBImpl::WaitForCompact( } } TEST_SYNC_POINT("DBImpl::WaitForCompact:StartWaiting"); + const auto deadline = immutable_db_options_.clock->NowMicros() + + wait_for_compact_options.timeout.count(); for (;;) { if (shutting_down_.load(std::memory_order_acquire)) { return Status::ShutdownInProgress(); @@ -4125,7 +4127,13 @@ Status DBImpl::WaitForCompact( bg_flush_scheduled_ || unscheduled_compactions_ || unscheduled_flushes_ || error_handler_.IsRecoveryInProgress()) && (error_handler_.GetBGError().ok())) { - bg_cv_.Wait(); + if (wait_for_compact_options.timeout.count()) { + if (bg_cv_.TimedWait(deadline)) { + return Status::TimedOut(); + } + } else { + bg_cv_.Wait(); + } } else if (wait_for_compact_options.close_db) { reject_new_background_jobs_ = true; mutex_.Unlock(); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2c86444099..4e764cb6a8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1477,7 +1477,8 @@ class DB { // NOTE: This may also never return if there's sufficient ongoing writes that // keeps flush and compaction going without stopping. The user would have to // cease all the writes to DB to make this eventually return in a stable - // state. + // state. The user may also use timeout option in WaitForCompactOptions to + // make this stop waiting and return when timeout expires. virtual Status WaitForCompact( const WaitForCompactOptions& /* wait_for_compact_options */) = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index be42806f1b..ee6a1096e3 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2120,7 +2120,8 @@ struct WaitForCompactOptions { // called) If true, Status::Aborted will be returned immediately. If false, // ContinueBackgroundWork() must be called to resume the background jobs. // Otherwise, jobs that were queued, but not scheduled yet may never finish - // and WaitForCompact() may wait indefinitely. + // and WaitForCompact() may wait indefinitely (if timeout is set, it will + // expire and return Status::TimedOut). bool abort_on_pause = false; // A boolean to flush all column families before starting to wait. @@ -2132,6 +2133,12 @@ struct WaitForCompactOptions { // returned Aborted status due to unreleased snapshots in the system. See // comments in DB::Close() for details. bool close_db = false; + + // Timeout in microseconds for waiting for compaction to complete. + // Status::TimedOut will be returned if timeout expires. + // when timeout == 0, WaitForCompact() will wait as long as there's background + // work to finish. + std::chrono::microseconds timeout = std::chrono::microseconds::zero(); }; } // namespace ROCKSDB_NAMESPACE diff --git a/unreleased_history/new_features/timeout_for_wait_for_compact_api.md b/unreleased_history/new_features/timeout_for_wait_for_compact_api.md new file mode 100644 index 0000000000..fccc34f56c --- /dev/null +++ b/unreleased_history/new_features/timeout_for_wait_for_compact_api.md @@ -0,0 +1 @@ +Add `timeout` in microsecond option to `WaitForCompactOptions` to allow timely termination of prolonged waiting in scenarios like recurring recoverable errors, such as out-of-space situations and continuous write streams that sustain ongoing flush and compactions