diff --git a/HISTORY.md b/HISTORY.md index 86d1829bff..e20260798a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fixed a possible race condition impacting users of `WriteBufferManager` who constructed it with `allow_stall == true`. The race condition led to undefined behavior (in our experience, typically a process crash). * Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting. * Make `DB::close()` thread-safe. +* Fix a bug in atomic flush where one bg flush thread will wait forever for a preceding bg flush thread to commit its result to MANIFEST but encounters an error which is mapped to a soft error (DB not stopped). ### New Features * Print information about blob files when using "ldb list_live_files_metadata" diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index a5bda71c81..fc97255160 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -2440,6 +2440,122 @@ TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +// In atomic flush, concurrent bg flush threads commit to the MANIFEST in +// serial, in the order of their picked memtables for each column family. +// Only when a bg flush thread finds out that its memtables are the earliest +// unflushed ones for all the included column families will this bg flush +// thread continue to commit to MANIFEST. +// This unit test uses sync point to coordinate the execution of two bg threads +// executing the same sequence of functions. The interleaving are as follows. +// time bg1 bg2 +// | pick memtables to flush +// | flush memtables cf1_m1, cf2_m1 +// | join MANIFEST write queue +// | pick memtabls to flush +// | flush memtables cf1_(m1+1) +// | join MANIFEST write queue +// | wait to write MANIFEST +// | write MANIFEST +// | IO error +// | detect IO error and stop waiting +// V +TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + auto fault_injection_env = std::make_shared(env_); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.atomic_flush = true; + options.env = fault_injection_env.get(); + // Set a larger value than default so that RocksDB can schedule concurrent + // background flush threads. + options.max_background_jobs = 8; + options.max_write_buffer_number = 8; + CreateAndReopenWithCF({"pikachu"}, options); + + assert(2 == handles_.size()); + + WriteOptions write_opts; + write_opts.disableWAL = true; + + ASSERT_OK(Put(0, "a", "v_0_a", write_opts)); + ASSERT_OK(Put(1, "a", "v_1_a", write_opts)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + SyncPoint::GetInstance()->LoadDependency({ + {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"}, + }); + + std::thread::id bg_flush_thr1, bg_flush_thr2; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void*) { + if (bg_flush_thr1 == std::thread::id()) { + bg_flush_thr1 = std::this_thread::get_id(); + } else if (bg_flush_thr2 == std::thread::id()) { + bg_flush_thr2 = std::this_thread::get_id(); + } + }); + + int called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) { + if (std::this_thread::get_id() == bg_flush_thr2) { + const auto* ptr = reinterpret_cast*>(arg); + assert(ptr); + if (0 == called) { + // When bg flush thread 2 reaches here for the first time. + ASSERT_OK(ptr->first); + ASSERT_TRUE(ptr->second); + } else if (1 == called) { + // When bg flush thread 2 reaches here for the second time. + ASSERT_TRUE(ptr->first.IsIOError()); + ASSERT_FALSE(ptr->second); + } + ++called; + TEST_SYNC_POINT("BgFlushThr2:WaitToCommit"); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", + [&](void*) { + if (std::this_thread::get_id() == bg_flush_thr1) { + TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest"); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void*) { + if (std::this_thread::get_id() != bg_flush_thr1) { + return; + } + ASSERT_OK(db_->Put(write_opts, "b", "v_1_b")); + + FlushOptions flush_opts; + flush_opts.wait = false; + std::vector cfhs(1, db_->DefaultColumnFamily()); + ASSERT_OK(dbfull()->Flush(flush_opts, cfhs)); + }); + + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) { + auto* ptr = reinterpret_cast(arg); + assert(ptr); + *ptr = IOStatus::IOError("Injected failure"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError()); + + Close(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 18226284b3..bb56c64a17 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -559,7 +559,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } if (s.ok()) { - auto wait_to_install_func = [&]() { + const auto wait_to_install_func = + [&]() -> std::pair { + if (!versions_->io_status().ok()) { + // Something went wrong elsewhere, we cannot count on waiting for our + // turn to write/sync to MANIFEST or CURRENT. Just return. + return std::make_pair(versions_->io_status(), false); + } else if (shutting_down_.load(std::memory_order_acquire)) { + return std::make_pair(Status::ShutdownInProgress(), false); + } bool ready = true; for (size_t i = 0; i != cfds.size(); ++i) { const auto& mems = jobs[i]->GetMemTables(); @@ -583,18 +591,40 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( break; } } - return ready; + return std::make_pair(Status::OK(), !ready); }; bool resuming_from_bg_err = error_handler_.IsDBStopped(); - while ((!error_handler_.IsDBStopped() || - error_handler_.GetRecoveryError().ok()) && - !wait_to_install_func()) { + while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) { + std::pair res = wait_to_install_func(); + + TEST_SYNC_POINT_CALLBACK( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", &res); + + if (!res.first.ok()) { + s = res.first; + break; + } else if (!res.second) { + break; + } atomic_flush_install_cv_.Wait(); + + resuming_from_bg_err = error_handler_.IsDBStopped(); } - s = resuming_from_bg_err ? error_handler_.GetRecoveryError() - : error_handler_.GetBGError(); + if (!resuming_from_bg_err) { + // If not resuming from bg err, then we determine future action based on + // whether we hit background error. + if (s.ok()) { + s = error_handler_.GetBGError(); + } + } else if (s.ok()) { + // If resuming from bg err, we still rely on wait_to_install_func()'s + // result to determine future action. If wait_to_install_func() returns + // non-ok already, then we should not proceed to flush result + // installation. + s = error_handler_.GetRecoveryError(); + } } if (s.ok()) { @@ -2653,7 +2683,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); - TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start"); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCallFlush:start", nullptr); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get());