Timeout in microsecond option in WaitForCompactOptions (#11711)

Summary:
While it's rare, we may run into a scenario where `WaitForCompact()` waits for background jobs indefinitely. For example, not enough space error will add the job back to the queue while WaitForCompact() waits for _all jobs_ including the jobs that are in the queue to be completed.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11711

Test Plan:
`DBCompactionWaitForCompactTest::WaitForCompactToTimeout` added
`timeout` option added to the variables for all of the existing DBCompactionWaitForCompactTests

Reviewed By: pdillinger, jowlyzhang

Differential Revision: D48416390

Pulled By: jaykorean

fbshipit-source-id: 7b6a12f705ab6c6dfaf8ad736a484ca654a86106
This commit is contained in:
Jay Huh 2023-08-18 11:21:45 -07:00 committed by Facebook GitHub Bot
parent a1743e85be
commit 0fa0c97d3e
5 changed files with 77 additions and 8 deletions

View File

@ -153,19 +153,23 @@ class DBCompactionDirectIOTest : public DBCompactionTest,
DBCompactionDirectIOTest() : DBCompactionTest() {}
};
// Params: See WaitForCompactOptions for details
class DBCompactionWaitForCompactTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public testing::WithParamInterface<
std::tuple<bool, bool, bool, std::chrono::microseconds>> {
public:
DBCompactionWaitForCompactTest()
: DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
abort_on_pause_ = std::get<0>(GetParam());
flush_ = std::get<1>(GetParam());
close_db_ = std::get<2>(GetParam());
timeout_ = std::get<3>(GetParam());
}
bool abort_on_pause_;
bool flush_;
bool close_db_;
std::chrono::microseconds timeout_;
Options options_;
WaitForCompactOptions wait_for_compact_options_;
@ -182,6 +186,7 @@ class DBCompactionWaitForCompactTest
wait_for_compact_options_.abort_on_pause = abort_on_pause_;
wait_for_compact_options_.flush = flush_;
wait_for_compact_options_.close_db = close_db_;
wait_for_compact_options_.timeout = timeout_;
DestroyAndReopen(options_);
@ -3334,10 +3339,19 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBCompactionWaitForCompactTest,
DBCompactionWaitForCompactTest,
::testing::Combine(testing::Bool(), testing::Bool(),
testing::Bool()));
INSTANTIATE_TEST_CASE_P(
DBCompactionWaitForCompactTest, DBCompactionWaitForCompactTest,
::testing::Combine(
testing::Bool() /* abort_on_pause */, testing::Bool() /* flush */,
testing::Bool() /* close_db */,
testing::Values(
std::chrono::microseconds::zero(),
std::chrono::microseconds{
60 * 60 *
1000000ULL} /* timeout */))); // 1 hour (long enough to
// make sure that tests
// don't fail unexpectedly
// when running slow)
TEST_P(DBCompactionWaitForCompactTest,
WaitForCompactWaitsOnCompactionToFinish) {
@ -3583,6 +3597,44 @@ TEST_P(DBCompactionWaitForCompactTest,
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBCompactionWaitForCompactTest, WaitForCompactToTimeout) {
// When timeout is set, this test makes CompactionJob hangs forever
// using sync point. This test also sets the timeout to be 1 ms for
// WaitForCompact to time out early. WaitForCompact() is expected to return
// Status::TimedOut.
// When timeout is not set, we expect WaitForCompact() to wait indefinitely.
// We don't want the test to hang forever. When timeout = 0, this test is not
// much different from WaitForCompactWaitsOnCompactionToFinish
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBCompactionTest::WaitForCompactToTimeout",
"CompactionJob::Run():Start"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Now trigger L0 compaction by adding a file
Random rnd(123);
GenerateNewRandomFile(&rnd, /* nowait */ true);
ASSERT_OK(Flush());
if (wait_for_compact_options_.timeout.count()) {
// Make timeout shorter to finish test early
wait_for_compact_options_.timeout = std::chrono::microseconds{1000};
} else {
// if timeout is not set, WaitForCompact() will wait forever. We don't
// want test to hang forever. Just let compaction go through
TEST_SYNC_POINT("DBCompactionTest::WaitForCompactToTimeout");
}
Status s = dbfull()->WaitForCompact(wait_for_compact_options_);
if (wait_for_compact_options_.timeout.count()) {
ASSERT_NOK(s);
ASSERT_TRUE(s.IsTimedOut());
} else {
ASSERT_OK(s);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
static std::string ShortKey(int i) {
assert(i < 10000);
char buf[100];

View File

@ -4114,6 +4114,8 @@ Status DBImpl::WaitForCompact(
}
}
TEST_SYNC_POINT("DBImpl::WaitForCompact:StartWaiting");
const auto deadline = immutable_db_options_.clock->NowMicros() +
wait_for_compact_options.timeout.count();
for (;;) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
@ -4125,7 +4127,13 @@ Status DBImpl::WaitForCompact(
bg_flush_scheduled_ || unscheduled_compactions_ ||
unscheduled_flushes_ || error_handler_.IsRecoveryInProgress()) &&
(error_handler_.GetBGError().ok())) {
bg_cv_.Wait();
if (wait_for_compact_options.timeout.count()) {
if (bg_cv_.TimedWait(deadline)) {
return Status::TimedOut();
}
} else {
bg_cv_.Wait();
}
} else if (wait_for_compact_options.close_db) {
reject_new_background_jobs_ = true;
mutex_.Unlock();

View File

@ -1477,7 +1477,8 @@ class DB {
// NOTE: This may also never return if there's sufficient ongoing writes that
// keeps flush and compaction going without stopping. The user would have to
// cease all the writes to DB to make this eventually return in a stable
// state.
// state. The user may also use timeout option in WaitForCompactOptions to
// make this stop waiting and return when timeout expires.
virtual Status WaitForCompact(
const WaitForCompactOptions& /* wait_for_compact_options */) = 0;

View File

@ -2120,7 +2120,8 @@ struct WaitForCompactOptions {
// called) If true, Status::Aborted will be returned immediately. If false,
// ContinueBackgroundWork() must be called to resume the background jobs.
// Otherwise, jobs that were queued, but not scheduled yet may never finish
// and WaitForCompact() may wait indefinitely.
// and WaitForCompact() may wait indefinitely (if timeout is set, it will
// expire and return Status::TimedOut).
bool abort_on_pause = false;
// A boolean to flush all column families before starting to wait.
@ -2132,6 +2133,12 @@ struct WaitForCompactOptions {
// returned Aborted status due to unreleased snapshots in the system. See
// comments in DB::Close() for details.
bool close_db = false;
// Timeout in microseconds for waiting for compaction to complete.
// Status::TimedOut will be returned if timeout expires.
// when timeout == 0, WaitForCompact() will wait as long as there's background
// work to finish.
std::chrono::microseconds timeout = std::chrono::microseconds::zero();
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1 @@
Add `timeout` in microsecond option to `WaitForCompactOptions` to allow timely termination of prolonged waiting in scenarios like recurring recoverable errors, such as out-of-space situations and continuous write streams that sustain ongoing flush and compactions