From 4a7c1dc375a69d52bd1f96f0697ab3946d79e168 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 18 Nov 2021 17:09:54 -0800 Subject: [PATCH] Add listener API that notifies on IOError (#9177) Summary: Add a new API in listener.h that notifies about IOErrors on Read/Write/Append/Flush etc. The API reports about IOStatus, filename, Operation name, offset and length. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9177 Test Plan: Added new unit tests Reviewed By: anand1976 Differential Revision: D32470627 Pulled By: akankshamahajan15 fbshipit-source-id: 189a717033590ae227b3beae8b1e7e185e4cdc12 --- HISTORY.md | 1 + file/random_access_file_reader.cc | 16 ++++- file/random_access_file_reader.h | 16 +++++ file/sequence_file_reader.h | 1 + file/writable_file_writer.cc | 38 ++++++++++ file/writable_file_writer.h | 13 ++++ include/rocksdb/listener.h | 27 ++++++- util/file_reader_writer_test.cc | 115 ++++++++++++++++++++++++++++++ 8 files changed, 225 insertions(+), 2 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 75c45d183f..26f3c5919e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Added new option "adaptive_readahead" in ReadOptions. For iterators, RocksDB does auto-readahead on noticing sequential reads and by enabling this option, readahead_size of current file (if reads are sequential) will be carried forward to next file instead of starting from the scratch at each level (except L0 level files). If reads are not sequential it will fall back to 8KB. This option is applicable only for RocksDB internal prefetch buffer and isn't supported with underlying file system prefetching. * Added the read count and read bytes related stats to Statistics for tiered storage hot, warm, and cold file reads. * Added an option to dynamically charge an updating estimated memory usage of block-based table building to block cache if block cache available. It currently only includes charging memory usage of constructing (new) Bloom Filter and Ribbon Filter to block cache. To enable this feature, set `BlockBasedTableOptions::reserve_table_builder_memory = true`. +* Add a new API OnIOError in listener.h that notifies listeners when an IO error occurs during FileSystem operation along with filename, status etc. ### Bug Fixes * Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption. diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 4af420c621..6857f253ad 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -184,6 +184,10 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, io_s); + if (!io_s.ok()) { + NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), + tmp.size(), orig_offset); + } } buf.Size(buf.CurrentSize() + tmp.size()); @@ -245,9 +249,13 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, finish_ts, io_s); + + if (!io_s.ok()) { + NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), + tmp_result.size(), offset + pos); + } } #endif - if (res_scratch == nullptr) { // we can't simply use `scratch` because reads of mmap'd files return // data in a different buffer. @@ -431,6 +439,12 @@ IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(), start_ts, finish_ts, read_reqs[i].status); } + if (!read_reqs[i].status.ok()) { + NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead, + file_name(), read_reqs[i].result.size(), + read_reqs[i].offset); + } + #endif // ROCKSDB_LITE IOSTATS_ADD(bytes_read, read_reqs[i].result.size()); IOStatsAddBytesByTemperature(file_temperature_, diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 7706ca4aff..8b4ef4b943 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -60,7 +60,23 @@ class RandomAccessFileReader { for (auto& listener : listeners_) { listener->OnFileReadFinish(info); } + info.status.PermitUncheckedError(); } + + void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation, + const std::string& file_path, size_t length, + uint64_t offset) const { + if (listeners_.empty()) { + return; + } + IOErrorInfo io_error_info(io_status, operation, file_path, length, offset); + + for (auto& listener : listeners_) { + listener->OnIOError(io_error_info); + } + io_status.PermitUncheckedError(); + } + #endif // ROCKSDB_LITE bool ShouldNotifyListeners() const { return !listeners_.empty(); } diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index c4c6e5b5d2..00051c080d 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -37,6 +37,7 @@ class SequentialFileReader { for (auto& listener : listeners_) { listener->OnFileReadFinish(info); } + info.status.PermitUncheckedError(); } void AddFileIOListeners( diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index b8abd9da90..91d0067b8b 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -220,6 +220,10 @@ IOStatus WritableFileWriter::Close() { if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileTruncateFinish(start_ts, finish_ts, s); + if (!interim.ok()) { + NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(), + filesize_); + } } #endif } @@ -237,6 +241,9 @@ IOStatus WritableFileWriter::Close() { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileSyncFinish(start_ts, finish_ts, s, FileOperationType::kFsync); + if (!interim.ok()) { + NotifyOnIOError(interim, FileOperationType::kFsync, file_name()); + } } #endif } @@ -259,6 +266,9 @@ IOStatus WritableFileWriter::Close() { if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileCloseFinish(start_ts, finish_ts, s); + if (!interim.ok()) { + NotifyOnIOError(interim, FileOperationType::kClose, file_name()); + } } #endif } @@ -318,6 +328,9 @@ IOStatus WritableFileWriter::Flush() { if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileFlushFinish(start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kFlush, file_name()); + } } #endif } @@ -425,6 +438,11 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { NotifyOnFileSyncFinish( start_ts, finish_ts, s, use_fsync ? FileOperationType::kFsync : FileOperationType::kSync); + if (!s.ok()) { + NotifyOnIOError( + s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync), + file_name()); + } } #endif SetPerfLevel(prev_perf_level); @@ -445,6 +463,10 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes, + offset); + } } #endif return s; @@ -500,6 +522,10 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed, + old_size); + } } #endif if (!s.ok()) { @@ -570,6 +596,10 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left, + old_size); + } } #endif if (!s.ok()) { @@ -671,6 +701,10 @@ IOStatus WritableFileWriter::WriteDirect() { if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(), + size, write_offset); + } } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); @@ -761,6 +795,10 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s); + if (!s.ok()) { + NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(), + left, write_offset); + } } if (!s.ok()) { // In this case, we do not change buffered_data_crc32c_checksum_ because diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 84a6592675..0ff76e0a69 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -114,6 +114,19 @@ class WritableFileWriter { } info.status.PermitUncheckedError(); } + + void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation, + const std::string& file_path, size_t length = 0, + uint64_t offset = 0) { + if (listeners_.empty()) { + return; + } + IOErrorInfo io_error_info(io_status, operation, file_path, length, offset); + for (auto& listener : listeners_) { + listener->OnIOError(io_error_info); + } + io_error_info.io_status.PermitUncheckedError(); + } #endif // ROCKSDB_LITE bool ShouldNotifyListeners() const { return !listeners_.empty(); } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index b361f88cf0..6e2971e5df 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -15,6 +15,7 @@ #include "rocksdb/compaction_job_stats.h" #include "rocksdb/compression_type.h" #include "rocksdb/customizable.h" +#include "rocksdb/io_status.h" #include "rocksdb/status.h" #include "rocksdb/table_properties.h" #include "rocksdb/types.h" @@ -237,7 +238,10 @@ enum class FileOperationType { kFlush, kSync, kFsync, - kRangeSync + kRangeSync, + kAppend, + kPositionedAppend, + kOpen }; struct FileOperationInfo { @@ -449,6 +453,23 @@ struct ExternalFileIngestionInfo { TableProperties table_properties; }; +struct IOErrorInfo { + IOErrorInfo(const IOStatus& _io_status, FileOperationType _operation, + const std::string& _file_path, size_t _length, uint64_t _offset) + : io_status(_io_status), + operation(_operation), + file_path(_file_path), + length(_length), + offset(_offset) {} + + IOStatus io_status; + FileOperationType operation; + std::string file_path; + size_t length; + uint64_t offset; + ; +}; + // EventListener class contains a set of callback functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -705,6 +726,10 @@ class EventListener : public Customizable { // returned value. virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {} + // A callback function for RocksDB which will be called whenever an IO error + // happens. ShouldBeNotifiedOnFileIO should be set to true to get a callback. + virtual void OnIOError(const IOErrorInfo& /*info*/) {} + virtual ~EventListener() {} }; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 1ab0246c23..edd71b8994 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -772,6 +772,121 @@ TEST(LineFileReaderTest, LineFileReaderTest) { } } +#ifndef ROCKSDB_LITE +class IOErrorEventListener : public EventListener { + public: + IOErrorEventListener() { notify_error_.store(0); } + + void OnIOError(const IOErrorInfo& io_error_info) override { + notify_error_++; + EXPECT_FALSE(io_error_info.file_path.empty()); + EXPECT_FALSE(io_error_info.io_status.ok()); + } + + size_t NotifyErrorCount() { return notify_error_; } + + bool ShouldBeNotifiedOnFileIO() override { return true; } + + private: + std::atomic notify_error_; +}; + +TEST_F(DBWritableFileWriterTest, IOErrorNotification) { + class FakeWF : public FSWritableFile { + public: + explicit FakeWF() : io_error_(false) { + file_append_errors_.store(0); + file_flush_errors_.store(0); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + file_append_errors_++; + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + file_flush_errors_++; + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + void SetIOError(bool val) { io_error_ = val; } + + void CheckCounters(int file_append_errors, int file_flush_errors) { + ASSERT_EQ(file_append_errors, file_append_errors_); + ASSERT_EQ(file_flush_errors_, file_flush_errors); + } + + protected: + bool io_error_; + std::atomic file_append_errors_; + std::atomic file_flush_errors_; + }; + + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + IOErrorEventListener* listener = new IOErrorEventListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + ImmutableOptions ioptions(options); + + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr(new FakeWF); + + std::unique_ptr file_writer; + writable_file_ptr->SetIOError(true); + + file_writer.reset(new WritableFileWriter( + std::move(writable_file_ptr), fname, file_options, + SystemClock::Default().get(), nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + FakeWF* fwf = static_cast(file_writer->writable_file()); + + fwf->SetIOError(true); + ASSERT_NOK(file_writer->Append(std::string(2 * kMb, 'a'))); + fwf->CheckCounters(1, 0); + ASSERT_EQ(listener->NotifyErrorCount(), 1); + + fwf->SetIOError(true); + ASSERT_NOK(file_writer->Flush()); + fwf->CheckCounters(1, 1); + ASSERT_EQ(listener->NotifyErrorCount(), 2); + + /* No error generation */ + fwf->SetIOError(false); + ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b'))); + ASSERT_EQ(listener->NotifyErrorCount(), 2); + fwf->CheckCounters(1, 1); +} +#endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {