From 52d4c9b7f67a4eacba100c94ac02522d5b005b60 Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Thu, 28 Jun 2018 12:23:57 -0700 Subject: [PATCH] Allow DB resume after background errors (#3997) Summary: Currently, if RocksDB encounters errors during a write operation (user requested or BG operations), it sets DBImpl::bg_error_ and fails subsequent writes. This PR allows the DB to be resumed for certain classes of errors. It consists of 3 parts - 1. Introduce Status::Severity in rocksdb::Status to indicate whether a given error can be recovered from or not 2. Refactor the error handling code so that setting bg_error_ and deciding on severity is in one place 3. Provide an API for the user to clear the error and resume the DB instance This whole change is broken up into multiple PRs. Initially, we only allow clearing the error for Status::NoSpace() errors during background flush/compaction. Subsequent PRs will expand this to include more errors and foreground operations such as Put(), and implement a polling mechanism for out-of-space errors. Closes https://github.com/facebook/rocksdb/pull/3997 Differential Revision: D8653831 Pulled By: anand1976 fbshipit-source-id: 6dc835c76122443a7668497c0226b4f072bc6afd --- CMakeLists.txt | 2 + Makefile | 4 + TARGETS | 1 + db/compaction_job.cc | 19 ++-- db/compaction_job.h | 5 +- db/compaction_job_test.cc | 19 ++-- db/db_impl.cc | 45 +++++++- db/db_impl.h | 10 +- db/db_impl_compaction_flush.cc | 88 ++++++---------- db/db_impl_debug.cc | 5 +- db/db_impl_open.cc | 1 + db/db_impl_write.cc | 31 ++---- db/error_handler.cc | 170 +++++++++++++++++++++++++++++++ db/error_handler.h | 52 ++++++++++ db/error_handler_test.cc | 138 +++++++++++++++++++++++++ db/flush_job.cc | 1 + include/rocksdb/db.h | 2 + include/rocksdb/status.h | 36 ++++++- src.mk | 2 + util/fault_injection_test_env.cc | 8 +- util/fault_injection_test_env.h | 15 ++- util/status.cc | 2 +- util/status_message.cc | 3 +- 23 files changed, 531 insertions(+), 128 deletions(-) create mode 100644 db/error_handler.cc create mode 100644 db/error_handler.h create mode 100644 db/error_handler_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 6fedf455a4..ad7c7a0f8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,7 @@ set(SOURCES db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc + db/error_handler.cc db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc @@ -877,6 +878,7 @@ if(WITH_TESTS) db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc + db/error_handler_test.cc db/obsolete_files_test.cc db/external_sst_file_basic_test.cc db/external_sst_file_test.cc diff --git a/Makefile b/Makefile index d9b5e11cd6..cf0a160f29 100644 --- a/Makefile +++ b/Makefile @@ -429,6 +429,7 @@ TESTS = \ db_table_properties_test \ db_statistics_test \ db_write_test \ + error_handler_test \ autovector_test \ blob_db_test \ cleanable_test \ @@ -1194,6 +1195,9 @@ db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TE db_write_test: db/db_write_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +error_handler_test: db/error_handler_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index b01eae323e..6849ff95ea 100644 --- a/TARGETS +++ b/TARGETS @@ -93,6 +93,7 @@ cpp_library( "db/db_info_dumper.cc", "db/db_iter.cc", "db/dbformat.cc", + "db/error_handler.cc", "db/event_helpers.cc", "db/experimental.cc", "db/external_sst_file_ingestion_job.cc", diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 48759d05fc..8f9fd1c505 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -25,8 +25,10 @@ #include #include "db/builder.h" +#include "db/db_impl.h" #include "db/db_iter.h" #include "db/dbformat.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -307,7 +309,7 @@ CompactionJob::CompactionJob( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, - InstrumentedMutex* db_mutex, Status* db_bg_error, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, @@ -331,7 +333,7 @@ CompactionJob::CompactionJob( output_directory_(output_directory), stats_(stats), db_mutex_(db_mutex), - db_bg_error_(db_bg_error), + db_error_handler_(db_error_handler), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), @@ -1282,21 +1284,12 @@ Status CompactionJob::FinishCompactionOutputFile( if (sfm->IsMaxAllowedSpaceReached()) { // TODO(ajkr): should we return OK() if max space was reached by the final // compaction output file (similarly to how flush works when full)? - s = Status::NoSpace("Max allowed space was reached"); + s = Status::SpaceLimit("Max allowed space was reached"); TEST_SYNC_POINT( "CompactionJob::FinishCompactionOutputFile:" "MaxAllowedSpaceReached"); InstrumentedMutexLock l(db_mutex_); - if (db_bg_error_->ok()) { - Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError( - cfd->ioptions()->listeners, BackgroundErrorReason::kCompaction, - &new_bg_error, db_mutex_); - if (!new_bg_error.ok()) { - *db_bg_error_ = new_bg_error; - } - } + db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction); } } #endif diff --git a/db/compaction_job.h b/db/compaction_job.h index 05a4ffb1a7..a31e8c1423 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -47,6 +47,7 @@ namespace rocksdb { class Arena; +class ErrorHandler; class MemTable; class SnapshotChecker; class TableCache; @@ -64,7 +65,7 @@ class CompactionJob { LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, - Status* db_bg_error, + ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, @@ -145,7 +146,7 @@ class CompactionJob { Directory* output_directory_; Statistics* stats_; InstrumentedMutex* db_mutex_; - Status* db_bg_error_; + ErrorHandler* db_error_handler_; // If there were two snapshots with seq numbers s1 and // s2 and s1 < s2, and if we find two instances of a key k1 then lies // entirely within s1 and s2, then the earlier version of k1 can be safely diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 7987ffac00..2246d489f3 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -12,6 +12,7 @@ #include "db/column_family.h" #include "db/compaction_job.h" +#include "db/error_handler.h" #include "db/version_set.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" @@ -77,7 +78,8 @@ class CompactionJobTest : public testing::Test { &write_controller_)), shutting_down_(false), preserve_deletes_seqnum_(0), - mock_table_factory_(new mock::MockTableFactory()) { + mock_table_factory_(new mock::MockTableFactory()), + error_handler_(db_options_, &mutex_) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); @@ -255,13 +257,12 @@ class CompactionJobTest : public testing::Test { EventLogger event_logger(db_options_.info_log.get()); // TODO(yiwu) add a mock snapshot checker and add test for it. SnapshotChecker* snapshot_checker = nullptr; - CompactionJob compaction_job(0, &compaction, db_options_, env_options_, - versions_.get(), &shutting_down_, - preserve_deletes_seqnum_, &log_buffer, - nullptr, nullptr, nullptr, &mutex_, &bg_error_, - snapshots, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger, - false, false, dbname_, &compaction_job_stats_); + CompactionJob compaction_job( + 0, &compaction, db_options_, env_options_, versions_.get(), + &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, + nullptr, nullptr, &mutex_, &error_handler_, snapshots, + earliest_write_conflict_snapshot, snapshot_checker, table_cache_, + &event_logger, false, false, dbname_, &compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); @@ -303,7 +304,7 @@ class CompactionJobTest : public testing::Test { ColumnFamilyData* cfd_; std::unique_ptr compaction_filter_; std::shared_ptr merge_op_; - Status bg_error_; + ErrorHandler error_handler_; }; TEST_F(CompactionJobTest, Simple) { diff --git a/db/db_impl.cc b/db/db_impl.cc index adb0652ad7..80dc3ff76f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -32,6 +32,7 @@ #include "db/db_info_dumper.h" #include "db/db_iter.h" #include "db/dbformat.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" @@ -215,7 +216,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // as well. use_custom_gc_(seq_per_batch), preserve_deletes_(options.preserve_deletes), - closed_(false) { + closed_(false), + error_handler_(immutable_db_options_, &mutex_) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -244,6 +246,43 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_seqnum_.store(0); } +Status DBImpl::Resume() { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB"); + + InstrumentedMutexLock db_mutex(&mutex_); + + if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) { + // Nothing to do + return Status::OK(); + } + + Status s = error_handler_.GetBGError(); + if (s.severity() > Status::Severity::kHardError) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "DB resume requested but failed due to Fatal/Unrecoverable error"); + return s; + } + + JobContext job_context(0); + FindObsoleteFiles(&job_context, true); + error_handler_.ClearBGError(); + mutex_.Unlock(); + + job_context.manifest_file_number = 1; + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + mutex_.Lock(); + MaybeScheduleFlushOrCompaction(); + + // No need to check BGError again. If something happened, event listener would be + // notified and the operation causing it would have failed + return Status::OK(); +} + // Will lock the mutex_, will wait for completion if wait is true void DBImpl::CancelAllBackgroundWork(bool wait) { InstrumentedMutexLock l(&mutex_); @@ -2879,9 +2918,9 @@ Status DBImpl::IngestExternalFile( std::list::iterator pending_output_elem; { InstrumentedMutexLock l(&mutex_); - if (!bg_error_.ok()) { + if (error_handler_.IsDBStopped()) { // Don't ingest files when there is a bg_error - return bg_error_; + return error_handler_.GetBGError(); } // Make sure that bg cleanup wont delete the files that we are ingesting diff --git a/db/db_impl.h b/db/db_impl.h index f6bfd576c2..8ff6db6372 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -23,6 +23,8 @@ #include "db/compaction_job.h" #include "db/dbformat.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/error_handler.h" +#include "db/event_helpers.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -73,6 +75,9 @@ class DBImpl : public DB { const bool seq_per_batch = false); virtual ~DBImpl(); + using DB::Resume; + virtual Status Resume() override; + // Implementations of the DB interface using DB::Put; virtual Status Put(const WriteOptions& options, @@ -1265,9 +1270,6 @@ class DBImpl : public DB { PrepickedCompaction* prepicked_compaction; }; - // Have we encountered a background error in paranoid mode? - Status bg_error_; - // shall we disable deletion of obsolete files // if 0 the deletion is enabled. // if non-zero, files will not be getting deleted @@ -1425,6 +1427,8 @@ class DBImpl : public DB { // Flag to check whether Close() has been called on this DB bool closed_; + + ErrorHandler error_handler_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 34d870e8d6..f4f1680f9e 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -14,6 +14,7 @@ #include #include "db/builder.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" @@ -93,14 +94,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { // "number < current_log_number". MarkLogsSynced(current_log_number - 1, true, s); if (!s.ok()) { - Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return s; } @@ -177,18 +171,9 @@ Status DBImpl::FlushMemTableToOutputFile( cfd->current()->storage_info()->LevelSummary(&tmp)); } - if (!s.ok() && !s.IsShutdownInProgress() && - immutable_db_options_.paranoid_checks && bg_error_.ok()) { + if (!s.ok() && !s.IsShutdownInProgress()) { Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - // if a bad error happened (not ShutdownInProgress), paranoid_checks is - // true, and the error isn't handled by callback, mark DB read-only - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } if (s.ok()) { #ifndef ROCKSDB_LITE @@ -202,18 +187,12 @@ Status DBImpl::FlushMemTableToOutputFile( std::string file_path = MakeTableFileName( cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); sfm->OnAddFile(file_path); - if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { - Status new_bg_error = Status::NoSpace("Max allowed space was reached"); + if (sfm->IsMaxAllowedSpaceReached()) { + Status new_bg_error = Status::SpaceLimit("Max allowed space was reached"); TEST_SYNC_POINT_CALLBACK( "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", &new_bg_error); - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } } #endif // ROCKSDB_LITE @@ -674,7 +653,7 @@ Status DBImpl::CompactFilesImpl( env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_, - &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, @@ -736,16 +715,7 @@ Status DBImpl::CompactFilesImpl( "[%s] [JOB %d] Compaction error: %s", c->column_family_data()->GetName().c_str(), job_context->job_id, status.ToString().c_str()); - if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kCompaction, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); } if (output_file_names != nullptr) { @@ -1141,6 +1111,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // Wait until the compaction completes s = WaitForFlushMemTable(cfd, &flush_memtable_id); } + TEST_SYNC_POINT("FlushMemTableFinished"); return s; } @@ -1149,7 +1120,7 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, Status s; // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() && + while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() && (flush_memtable_id == nullptr || cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { if (shutting_down_.load(std::memory_order_acquire)) { @@ -1163,8 +1134,8 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, } bg_cv_.Wait(); } - if (!bg_error_.ok()) { - s = bg_error_; + if (error_handler_.IsDBStopped()) { + s = error_handler_.GetBGError(); } return s; } @@ -1383,9 +1354,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); - Status status = bg_error_; - if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { - status = Status::ShutdownInProgress(); + Status status; + if (!error_handler_.IsBGWorkStopped()) { + if (shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + } else { + status = error_handler_.GetBGError(); } if (!status.ok()) { @@ -1631,9 +1606,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, is_manual && manual_compaction->disallow_trivial_move; CompactionJobStats compaction_job_stats; - Status status = bg_error_; - if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { - status = Status::ShutdownInProgress(); + Status status; + if (!error_handler_.IsBGWorkStopped()) { + if (shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + } else { + status = error_handler_.GetBGError(); } if (!status.ok()) { @@ -1905,7 +1884,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), stats_, - &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, @@ -1951,16 +1930,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kCompaction, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); } if (is_manual) { diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 8032f83336..2b7720e580 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -10,6 +10,7 @@ #ifndef NDEBUG #include "db/db_impl.h" +#include "db/error_handler.h" #include "monitoring/thread_status_updater.h" namespace rocksdb { @@ -134,10 +135,10 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || (wait_unscheduled && unscheduled_compactions_)) && - bg_error_.ok()) { + !error_handler_.IsDBStopped()) { bg_cv_.Wait(); } - return bg_error_; + return error_handler_.GetBGError(); } void DBImpl::TEST_LockMutex() { diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 48753723fc..31d0145684 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -14,6 +14,7 @@ #include #include "db/builder.h" +#include "db/error_handler.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" #include "table/block_based_table_factory.h" diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index e05023ba5a..788ffb9c5b 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -12,6 +12,7 @@ #define __STDC_FORMAT_MACROS #endif #include +#include "db/error_handler.h" #include "db/event_helpers.h" #include "monitoring/perf_context_imp.h" #include "options/options_helper.h" @@ -676,16 +677,7 @@ void DBImpl::WriteStatusCheck(const Status& status) { if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); - if (bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kWriteCallback, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; // stop compaction & fail any further writes - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); } } @@ -698,15 +690,8 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { // ignore_missing_column_families. if (!status.ok()) { mutex_.Lock(); - assert(bg_error_.ok()); - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kMemTable, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; // stop compaction & fail any further writes - } + assert(!error_handler_.IsBGWorkStopped()); + error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable); mutex_.Unlock(); } } @@ -736,8 +721,8 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } - if (UNLIKELY(status.ok() && !bg_error_.ok())) { - status = bg_error_; + if (UNLIKELY(status.ok())) { + status = error_handler_.GetBGError(); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { @@ -1176,7 +1161,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, mutex_.Lock(); } - while (bg_error_.ok() && write_controller_.IsStopped()) { + while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { return Status::Incomplete(); } @@ -1192,7 +1177,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, RecordTick(stats_, STALL_MICROS, time_delayed); } - return bg_error_; + return error_handler_.GetBGError(); } Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, diff --git a/db/error_handler.cc b/db/error_handler.cc new file mode 100644 index 0000000000..a5b23a4b81 --- /dev/null +++ b/db/error_handler.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/error_handler.h" +#include "db/event_helpers.h" + +namespace rocksdb { + +// Maps to help decide the severity of an error based on the +// BackgroundErrorReason, Code, SubCode and whether db_options.paranoid_checks +// is set or not. There are 3 maps, going from most specific to least specific +// (i.e from all 4 fields in a tuple to only the BackgroundErrorReason and +// paranoid_checks). The less specific map serves as a catch all in case we miss +// a specific error code or subcode. +std::map, + Status::Severity> + ErrorSeverityMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kNoSpace, + true), + Status::Severity::kSoftError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kNoSpace, + false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kSpaceLimit, + true), + Status::Severity::kHardError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kNoSpace, true), + Status::Severity::kSoftError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kNoSpace, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kSpaceLimit, true), + Status::Severity::kHardError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kNoSpace, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kNoSpace, + false), + Status::Severity::kFatalError}, +}; + +std::map, Status::Severity> + DefaultErrorSeverityMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, false), + Status::Severity::kNoError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kIOError, false), + Status::Severity::kNoError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, false), + Status::Severity::kNoError}, +}; + +std::map, Status::Severity> + DefaultReasonMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, false), + Status::Severity::kNoError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlush, false), + Status::Severity::kNoError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, false), + Status::Severity::kFatalError}, + // Errors during Memtable update + {std::make_tuple(BackgroundErrorReason::kMemTable, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kMemTable, false), + Status::Severity::kFatalError}, +}; + +Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) { + db_mutex_->AssertHeld(); + + if (bg_err.ok()) { + return Status::OK(); + } + + bool paranoid = db_options_.paranoid_checks; + Status::Severity sev = Status::Severity::kFatalError; + Status new_bg_err; + bool found = false; + + { + auto entry = ErrorSeverityMap.find(std::make_tuple(reason, bg_err.code(), + bg_err.subcode(), paranoid)); + if (entry != ErrorSeverityMap.end()) { + sev = entry->second; + found = true; + } + } + + if (!found) { + auto entry = DefaultErrorSeverityMap.find(std::make_tuple(reason, + bg_err.code(), paranoid)); + if (entry != DefaultErrorSeverityMap.end()) { + sev = entry->second; + found = true; + } + } + + if (!found) { + auto entry = DefaultReasonMap.find(std::make_tuple(reason, paranoid)); + if (entry != DefaultReasonMap.end()) { + sev = entry->second; + } + } + + new_bg_err = Status(bg_err, sev); + if (!new_bg_err.ok()) { + Status s = new_bg_err; + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_); + if (!s.ok() && (s.severity() > bg_error_.severity())) { + bg_error_ = s; + } + } + + return bg_error_; +} + +} diff --git a/db/error_handler.h b/db/error_handler.h new file mode 100644 index 0000000000..8a1a411fc8 --- /dev/null +++ b/db/error_handler.h @@ -0,0 +1,52 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include "monitoring/instrumented_mutex.h" +#include "options/db_options.h" +#include "rocksdb/listener.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class ErrorHandler { + public: + ErrorHandler(const ImmutableDBOptions& db_options, + InstrumentedMutex* db_mutex) + : db_options_(db_options), + bg_error_(Status::OK()), + db_mutex_(db_mutex) + {} + ~ErrorHandler() {} + + Status::Severity GetErrorSeverity(BackgroundErrorReason reason, + Status::Code code, Status::SubCode subcode); + + Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); + + Status GetBGError() + { + return bg_error_; + } + + void ClearBGError() { + bg_error_ = Status::OK(); + } + + bool IsDBStopped() { + return !bg_error_.ok(); + } + + bool IsBGWorkStopped() { + return !bg_error_.ok(); + } + + private: + const ImmutableDBOptions& db_options_; + Status bg_error_; + InstrumentedMutex* db_mutex_; +}; + +} diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc new file mode 100644 index 0000000000..abd5663d8d --- /dev/null +++ b/db/error_handler_test.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/perf_context.h" +#include "util/fault_injection_test_env.h" +#if !defined(ROCKSDB_LITE) +#include "util/sync_point.h" +#endif + +namespace rocksdb { + +class DBErrorHandlingTest : public DBTestBase { + public: + DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {} +}; + +class DBErrorHandlingEnv : public EnvWrapper { + public: + DBErrorHandlingEnv() : EnvWrapper(Env::Default()), + trig_no_space(false), trig_io_error(false) {} + + void SetTrigNoSpace() {trig_no_space = true;} + void SetTrigIoError() {trig_io_error = true;} + private: + bool trig_no_space; + bool trig_io_error; +}; + +TEST_F(DBErrorHandlingTest, FLushWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::Start", [&](void *) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, CompactionWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void *) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, CorruptionError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void *) { + fault_env->SetFilesystemActive(false, Status::Corruption("Corruption")); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kUnrecoverableError); + + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_NE(s, Status::OK()); + Destroy(options); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/flush_job.cc b/db/flush_job.cc index 0e46579846..06132fb868 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -187,6 +187,7 @@ void FlushJob::PickMemTable() { Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta) { + TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 80e9eb2061..f3aa39775d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -170,6 +170,8 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + virtual Status Resume() { return Status::NotSupported(); } + // Close the DB by releasing resources, closing files etc. This should be // called before calling the destructor so that the caller can get back a // status in case there are any errors. This will not fsync the WAL files. diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index c803f89bb0..e3576b2aa9 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -25,7 +25,7 @@ namespace rocksdb { class Status { public: // Create a success status. - Status() : code_(kOk), subcode_(kNone), state_(nullptr) {} + Status() : code_(kOk), subcode_(kNone), sev_(kNoError), state_(nullptr) {} ~Status() { free((void *) state_); } // Copy the specified status. @@ -44,7 +44,7 @@ class Status { bool operator==(const Status& rhs) const; bool operator!=(const Status& rhs) const; - enum Code { + enum Code : unsigned char { kOk = 0, kNotFound = 1, kCorruption = 2, @@ -64,7 +64,7 @@ class Status { Code code() const { return code_; } - enum SubCode { + enum SubCode : unsigned char { kNone = 0, kMutexTimeout = 1, kLockTimeout = 2, @@ -73,11 +73,24 @@ class Status { kDeadlock = 5, kStaleFile = 6, kMemoryLimit = 7, + kSpaceLimit = 8, kMaxSubCode }; SubCode subcode() const { return subcode_; } + enum Severity : unsigned char { + kNoError = 0, + kSoftError = 1, + kHardError = 2, + kFatalError = 3, + kUnrecoverableError = 4, + kMaxSeverity + }; + + Status(const Status& s, Severity sev); + Severity severity() const { return sev_; } + // Returns a C style string indicating the message of the Status const char* getState() const { return state_; } @@ -181,6 +194,11 @@ class Status { return Status(kAborted, kMemoryLimit, msg, msg2); } + static Status SpaceLimit() { return Status(kIOError, kSpaceLimit); } + static Status SpaceLimit(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIOError, kSpaceLimit, msg, msg2); + } + // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -261,12 +279,13 @@ class Status { // state_[4..] == message Code code_; SubCode subcode_; + Severity sev_; const char* state_; static const char* msgs[static_cast(kMaxSubCode)]; explicit Status(Code _code, SubCode _subcode = kNone) - : code_(_code), subcode_(_subcode), state_(nullptr) {} + : code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {} Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2); Status(Code _code, const Slice& msg, const Slice& msg2) @@ -275,7 +294,11 @@ class Status { static const char* CopyState(const char* s); }; -inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_) { +inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_), sev_(s.sev_) { + state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); +} +inline Status::Status(const Status& s, Severity sev) + : code_(s.code_), subcode_(s.subcode_), sev_(sev) { state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } inline Status& Status::operator=(const Status& s) { @@ -284,6 +307,7 @@ inline Status& Status::operator=(const Status& s) { if (this != &s) { code_ = s.code_; subcode_ = s.subcode_; + sev_ = s.sev_; free((void *) state_); state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } @@ -308,6 +332,8 @@ inline Status& Status::operator=(Status&& s) s.code_ = kOk; subcode_ = std::move(s.subcode_); s.subcode_ = kNone; + sev_ = std::move(s.sev_); + s.sev_ = kNoError; free((void *)state_); state_ = nullptr; std::swap(state_, s.state_); diff --git a/src.mk b/src.mk index 047eaa3f27..44bb1de13a 100644 --- a/src.mk +++ b/src.mk @@ -25,6 +25,7 @@ LIB_SOURCES = \ db/db_info_dumper.cc \ db/db_iter.cc \ db/dbformat.cc \ + db/error_handler.cc \ db/event_helpers.cc \ db/experimental.cc \ db/external_sst_file_ingestion_job.cc \ @@ -293,6 +294,7 @@ MAIN_SOURCES = \ db/dbformat_test.cc \ db/deletefile_test.cc \ db/env_timed_test.cc \ + db/error_handler_test.cc \ db/external_sst_file_basic_test.cc \ db/external_sst_file_test.cc \ db/fault_injection_test.cc \ diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc index 3b3a8b9359..46e1e0d773 100644 --- a/util/fault_injection_test_env.cc +++ b/util/fault_injection_test_env.cc @@ -121,7 +121,7 @@ TestWritableFile::~TestWritableFile() { Status TestWritableFile::Append(const Slice& data) { if (!env_->IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return env_->GetError(); } Status s = target_->Append(data); if (s.ok()) { @@ -172,7 +172,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } // Not allow overwriting files Status s = target()->FileExists(fname); @@ -199,7 +199,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } Status s = EnvWrapper::DeleteFile(f); if (!s.ok()) { @@ -216,7 +216,7 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { Status FaultInjectionTestEnv::RenameFile(const std::string& s, const std::string& t) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } Status ret = EnvWrapper::RenameFile(s, t); diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index 1992ab52ea..1a62c619e8 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -145,12 +145,20 @@ class FaultInjectionTestEnv : public EnvWrapper { MutexLock l(&mutex_); return filesystem_active_; } - void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; } - void SetFilesystemActive(bool active) { + void SetFilesystemActiveNoLock(bool active, + Status error = Status::Corruption("Not active")) { + filesystem_active_ = active; + if (!active) { + error_ = error; + } + } + void SetFilesystemActive(bool active, + Status error = Status::Corruption("Not active")) { MutexLock l(&mutex_); - SetFilesystemActiveNoLock(active); + SetFilesystemActiveNoLock(active, error); } void AssertNoOpenFile() { assert(open_files_.empty()); } + Status GetError() { return error_; } private: port::Mutex mutex_; @@ -159,6 +167,7 @@ class FaultInjectionTestEnv : public EnvWrapper { std::unordered_map> dir_to_new_files_since_last_sync_; bool filesystem_active_; // Record flushes, syncs, writes + Status error_; }; } // namespace rocksdb diff --git a/util/status.cc b/util/status.cc index f302d2eada..aaad673675 100644 --- a/util/status.cc +++ b/util/status.cc @@ -24,7 +24,7 @@ const char* Status::CopyState(const char* state) { } Status::Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2) - : code_(_code), subcode_(_subcode) { + : code_(_code), subcode_(_subcode), sev_(kNoError) { assert(code_ != kOk); assert(subcode_ != kMaxSubCode); const size_t len1 = msg.size(); diff --git a/util/status_message.cc b/util/status_message.cc index 6e9d4e4f7c..7150e817e3 100644 --- a/util/status_message.cc +++ b/util/status_message.cc @@ -15,7 +15,8 @@ const char* Status::msgs[] = { "No space left on device", // kNoSpace "Deadlock", // kDeadlock "Stale file handle", // kStaleFile - "Memory limit reached" // kMemoryLimit + "Memory limit reached", // kMemoryLimit + "Space limit reached" // kSpaceLimit }; } // namespace rocksdb