From 4b124fb9d3e0e7adbb8522b1bfff586dcc02493e Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Mon, 5 Feb 2018 13:48:25 -0800 Subject: [PATCH] Handle error return from WriteBuffer() Summary: There are a couple of places where we swallow any error from WriteBuffer() - in SwitchMemtable() and DBImpl::CloseImpl(). Propagate the error up in those cases rather than ignoring it. Closes https://github.com/facebook/rocksdb/pull/3404 Differential Revision: D6879954 Pulled By: anand1976 fbshipit-source-id: 2ef88b554be5286b0a8bad7384ba17a105395bdb --- HISTORY.md | 1 + db/db_basic_test.cc | 23 +++++++++++++++++++++++ db/db_flush_test.cc | 20 ++++++++++++++++++++ db/db_impl.cc | 22 ++++++++++++++++++---- db/db_impl.h | 4 +++- db/db_impl_write.cc | 39 ++++++++++++++++++++++++--------------- include/rocksdb/db.h | 5 +++-- 7 files changed, 92 insertions(+), 22 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1d91a97064..8bac7fd1e1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ ### Bug Fixes * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. * Fix DB::Flush() keep waiting after flush finish under certain condition. +* Fix Handle error return from WriteBuffer() during WAL file close and DB close ## 5.10.0 (12/11/2017) ### Public API Change diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 92de4d5d66..23e6215f6b 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" +#include "util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "util/sync_point.h" #endif @@ -898,6 +899,28 @@ TEST_F(DBBasicTest, DBClose) { delete options.env; } +TEST_F(DBBasicTest, DBCloseFlushError) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.manual_wal_flush = true; + options.write_buffer_size=100; + options.env = fault_injection_env.get(); + + Reopen(options); + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + ASSERT_OK(Put("key3", "value3")); + fault_injection_env->SetFilesystemActive(false); + Status s = dbfull()->Close(); + fault_injection_env->SetFilesystemActive(true); + ASSERT_NE(s, Status::OK()); + + Destroy(options); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 6c24b716ea..87f894a7b4 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -185,6 +185,26 @@ TEST_P(DBFlushDirectIOTest, DirectIO) { delete options.env; } +TEST_F(DBFlushTest, FlushError) { + Options options; + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_injection_env.get(); + Reopen(options); + + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + fault_injection_env->SetFilesystemActive(false); + Status s = dbfull()->TEST_SwitchMemtable(); + fault_injection_env->SetFilesystemActive(true); + Destroy(options); + ASSERT_NE(s, Status::OK()); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl.cc b/db/db_impl.cc index a66f228578..837eee6167 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -287,6 +287,7 @@ Status DBImpl::CloseImpl() { env_->UnSchedule(this, Env::Priority::BOTTOM); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); + Status ret; mutex_.Lock(); bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; bg_compaction_scheduled_ -= compactions_unscheduled; @@ -349,7 +350,18 @@ Status DBImpl::CloseImpl() { delete l; } for (auto& log : logs_) { - log.ClearWriter(); + uint64_t log_number = log.writer->get_log_number(); + Status s = log.ClearWriter(); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unable to Sync WAL file %s with error -- %s", + LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), + s.ToString().c_str()); + // Retain the first error + if (ret.ok()) { + ret = s; + } + } } logs_.clear(); @@ -383,11 +395,13 @@ Status DBImpl::CloseImpl() { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); LogFlush(immutable_db_options_.info_log); - Status s = Status::OK(); if (immutable_db_options_.info_log && own_info_log_) { - s = immutable_db_options_.info_log->Close(); + Status s = immutable_db_options_.info_log->Close(); + if (ret.ok()) { + ret = s; + } } - return s; + return ret; } DBImpl::~DBImpl() { Close(); } diff --git a/db/db_impl.h b/db/db_impl.h index fe51847e52..bc251492b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -993,9 +993,11 @@ class DBImpl : public DB { writer = nullptr; return w; } - void ClearWriter() { + Status ClearWriter() { + Status s = writer->WriteBuffer(); delete writer; writer = nullptr; + return s; } uint64_t number; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index ac97934f09..d1a2daf66c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1292,6 +1292,29 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ". Immutable memtables: %d.\n", cfd->GetName().c_str(), new_log_number, num_imm_unflushed); mutex_.Lock(); + if (s.ok() && creating_new_log) { + log_write_mutex_.Lock(); + logfile_number_ = new_log_number; + assert(new_log != nullptr); + log_empty_ = true; + log_dir_synced_ = false; + if (!logs_.empty()) { + // Alway flush the buffer of the last log before switching to a new one + log::Writer* cur_log_writer = logs_.back().writer; + s = cur_log_writer->WriteBuffer(); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 + " WAL file -- %s\n", + cfd->GetName().c_str(), cur_log_writer->get_log_number(), + new_log_number); + } + } + logs_.emplace_back(logfile_number_, new_log); + alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + log_write_mutex_.Unlock(); + } + if (!s.ok()) { // how do we fail if we're not creating new log? assert(creating_new_log); @@ -1302,21 +1325,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } return s; } - if (creating_new_log) { - log_write_mutex_.Lock(); - logfile_number_ = new_log_number; - assert(new_log != nullptr); - log_empty_ = true; - log_dir_synced_ = false; - if (!logs_.empty()) { - // Alway flush the buffer of the last log before switching to a new one - log::Writer* cur_log_writer = logs_.back().writer; - cur_log_writer->WriteBuffer(); - } - logs_.emplace_back(logfile_number_, new_log); - alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); - log_write_mutex_.Unlock(); - } + for (auto loop_cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that // are no longer needed -- if CF is empty, that means it diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 14fe02217e..96761d8bda 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -173,8 +173,9 @@ class DB { // Close the DB by releasing resources, closing files etc. This should be // called before calling the desctructor so that the caller can get back a - // status in case there are any errors. Regardless of the return status, the - // DB must be freed + // status in case there are any errors. This will not fsync the WAL files. + // If syncing is required, the caller must first call SyncWAL. + // Regardless of the return status, the DB must be freed virtual Status Close() { return Status::OK(); } // ListColumnFamilies will open the DB specified by argument name