diff --git a/db/log_test.cc b/db/log_test.cc index 8289fdb7ec..2e993d8f90 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -9,7 +9,6 @@ #include "db/log_reader.h" #include "db/log_writer.h" -#include "env/composite_env_wrapper.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "rocksdb/env.h" @@ -50,7 +49,7 @@ static std::string RandomSkewedString(int i, Random* rnd) { // get<1>(tuple): true if allow retry after read EOF, false otherwise class LogTest : public ::testing::TestWithParam> { private: - class StringSource : public SequentialFile { + class StringSource : public FSSequentialFile { public: Slice& contents_; bool force_error_; @@ -68,7 +67,8 @@ class LogTest : public ::testing::TestWithParam> { returned_partial_(false), fail_after_read_partial_(fail_after_read_partial) {} - Status Read(size_t n, Slice* result, char* scratch) override { + IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { if (fail_after_read_partial_) { EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; } @@ -81,7 +81,7 @@ class LogTest : public ::testing::TestWithParam> { contents_.remove_prefix(force_error_position_); force_error_ = false; returned_partial_ = true; - return Status::Corruption("read error"); + return IOStatus::Corruption("read error"); } } @@ -106,28 +106,21 @@ class LogTest : public ::testing::TestWithParam> { *result = Slice(scratch, n); contents_.remove_prefix(n); - return Status::OK(); + return IOStatus::OK(); } - Status Skip(uint64_t n) override { + IOStatus Skip(uint64_t n) override { if (n > contents_.size()) { contents_.clear(); - return Status::NotFound("in-memory file skipepd past end"); + return IOStatus::NotFound("in-memory file skipepd past end"); } contents_.remove_prefix(n); - return Status::OK(); + return IOStatus::OK(); } }; - inline StringSource* GetStringSourceFromLegacyReader( - SequentialFileReader* reader) { - LegacySequentialFileWrapper* file = - static_cast(reader->file()); - return static_cast(file->target()); - } - class ReportCollector : public Reader::Reporter { public: size_t dropped_bytes_; @@ -140,29 +133,17 @@ class LogTest : public ::testing::TestWithParam> { } }; - std::string& dest_contents() { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - return dest->contents_; - } + std::string& dest_contents() { return sink_->contents_; } - const std::string& dest_contents() const { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - return dest->contents_; - } + const std::string& dest_contents() const { return sink_->contents_; } - void reset_source_contents() { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - assert(src); - src->contents_ = dest_contents(); - } + void reset_source_contents() { source_->contents_ = dest_contents(); } Slice reader_contents_; - std::unique_ptr dest_holder_; - std::unique_ptr source_holder_; + test::StringSink* sink_; + StringSource* source_; ReportCollector report_; - Writer writer_; + std::unique_ptr writer_; std::unique_ptr reader_; protected: @@ -171,19 +152,23 @@ class LogTest : public ::testing::TestWithParam> { public: LogTest() : reader_contents_(), - dest_holder_(test::GetWritableFileWriter( - new test::StringSink(&reader_contents_), "" /* don't care */)), - source_holder_(test::GetSequentialFileReader( - new StringSource(reader_contents_, !std::get<1>(GetParam())), - "" /* file name */)), - writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), + sink_(new test::StringSink(&reader_contents_)), + source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))), allow_retry_read_(std::get<1>(GetParam())) { + std::unique_ptr sink_holder(sink_); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(sink_holder), "" /* don't care */, FileOptions())); + writer_.reset( + new Writer(std::move(file_writer), 123, std::get<0>(GetParam()))); + std::unique_ptr source_holder(source_); + std::unique_ptr file_reader( + new SequentialFileReader(std::move(source_holder), "" /* file name */)); if (allow_retry_read_) { - reader_.reset(new FragmentBufferedReader( - nullptr, std::move(source_holder_), &report_, true /* checksum */, - 123 /* log_number */)); + reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader), + &report_, true /* checksum */, + 123 /* log_number */)); } else { - reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_, + reader_.reset(new Reader(nullptr, std::move(file_reader), &report_, true /* checksum */, 123 /* log_number */)); } } @@ -191,7 +176,7 @@ class LogTest : public ::testing::TestWithParam> { Slice* get_reader_contents() { return &reader_contents_; } void Write(const std::string& msg) { - ASSERT_OK(writer_.AddRecord(Slice(msg))); + ASSERT_OK(writer_->AddRecord(Slice(msg))); } size_t WrittenBytes() const { @@ -219,11 +204,7 @@ class LogTest : public ::testing::TestWithParam> { dest_contents()[offset] = new_byte; } - void ShrinkSize(int bytes) { - auto dest = test::GetStringSinkFromLegacyWriter(writer_.file()); - assert(dest); - dest->Drop(bytes); - } + void ShrinkSize(int bytes) { sink_->Drop(bytes); } void FixChecksum(int header_offset, int len, bool recyclable) { // Compute crc of type/len/data @@ -235,9 +216,8 @@ class LogTest : public ::testing::TestWithParam> { } void ForceError(size_t position = 0) { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->force_error_ = true; - src->force_error_position_ = position; + source_->force_error_ = true; + source_->force_error_position_ = position; } size_t DroppedBytes() const { @@ -249,14 +229,12 @@ class LogTest : public ::testing::TestWithParam> { } void ForceEOF(size_t position = 0) { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->force_eof_ = true; - src->force_eof_position_ = position; + source_->force_eof_ = true; + source_->force_eof_position_ = position; } void UnmarkEOF() { - auto src = GetStringSourceFromLegacyReader(reader_->file()); - src->returned_partial_ = false; + source_->returned_partial_ = false; reader_->UnmarkEOF(); } @@ -685,9 +663,10 @@ TEST_P(LogTest, Recycle) { while (get_reader_contents()->size() < log::kBlockSize * 2) { Write("xxxxxxxxxxxxxxxx"); } - std::unique_ptr dest_holder(test::GetWritableFileWriter( - new test::OverwritingStringSink(get_reader_contents()), - "" /* don't care */)); + std::unique_ptr sink( + new test::OverwritingStringSink(get_reader_contents())); + std::unique_ptr dest_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Writer recycle_writer(std::move(dest_holder), 123, true); ASSERT_OK(recycle_writer.AddRecord(Slice("foooo"))); ASSERT_OK(recycle_writer.AddRecord(Slice("bar"))); @@ -718,10 +697,9 @@ class RetriableLogTest : public ::testing::TestWithParam { }; Slice contents_; - std::unique_ptr dest_holder_; + test::StringSink* sink_; std::unique_ptr log_writer_; Env* env_; - EnvOptions env_options_; const std::string test_dir_; const std::string log_file_; std::unique_ptr writer_; @@ -732,55 +710,50 @@ class RetriableLogTest : public ::testing::TestWithParam { public: RetriableLogTest() : contents_(), - dest_holder_(nullptr), + sink_(new test::StringSink(&contents_)), log_writer_(nullptr), env_(Env::Default()), test_dir_(test::PerThreadDBPath("retriable_log_test")), log_file_(test_dir_ + "/log"), writer_(nullptr), reader_(nullptr), - log_reader_(nullptr) {} + log_reader_(nullptr) { + std::unique_ptr sink_holder(sink_); + std::unique_ptr wfw(new WritableFileWriter( + std::move(sink_holder), "" /* file name */, FileOptions())); + log_writer_.reset(new Writer(std::move(wfw), 123, GetParam())); + } Status SetupTestEnv() { - dest_holder_.reset(test::GetWritableFileWriter( - new test::StringSink(&contents_), "" /* file name */)); - assert(dest_holder_ != nullptr); - log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam())); - assert(log_writer_ != nullptr); - Status s; - s = env_->CreateDirIfMissing(test_dir_); - std::unique_ptr writable_file; + FileOptions fopts; + auto fs = env_->GetFileSystem(); + s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr); + std::unique_ptr writable_file; if (s.ok()) { - s = env_->NewWritableFile(log_file_, &writable_file, env_options_); + s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr); } if (s.ok()) { - writer_.reset(new WritableFileWriter( - NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_, - env_options_)); - assert(writer_ != nullptr); + writer_.reset( + new WritableFileWriter(std::move(writable_file), log_file_, fopts)); + EXPECT_NE(writer_, nullptr); } - std::unique_ptr seq_file; + std::unique_ptr seq_file; if (s.ok()) { - s = env_->NewSequentialFile(log_file_, &seq_file, env_options_); + s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr); } if (s.ok()) { - reader_.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(seq_file), log_file_)); - assert(reader_ != nullptr); + reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); + EXPECT_NE(reader_, nullptr); log_reader_.reset(new FragmentBufferedReader( nullptr, std::move(reader_), &report_, true /* checksum */, 123 /* log_number */)); - assert(log_reader_ != nullptr); + EXPECT_NE(log_reader_, nullptr); } return s; } - std::string contents() { - auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file()); - assert(file != nullptr); - return file->contents_; - } + std::string contents() { return sink_->contents_; } void Encode(const std::string& msg) { ASSERT_OK(log_writer_->AddRecord(Slice(msg))); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 0ea7e99004..379e6b6b1a 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -49,9 +49,9 @@ TEST_F(PlainTableKeyDecoderTest, ReadNonMmap) { Slice contents(tmp); test::StringSource* string_source = new test::StringSource(contents, 0, false); - + std::unique_ptr holder(string_source); std::unique_ptr file_reader( - test::GetRandomAccessFileReader(string_source)); + new RandomAccessFileReader(std::move(holder), "test")); std::unique_ptr file_info( new PlainTableReaderFileInfo(std::move(file_reader), EnvOptions(), kLength)); diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 56d7edefe9..4777bdd5e9 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -13,7 +13,6 @@ #include "db/db_impl/db_impl.h" #include "db/dbformat.h" -#include "env/composite_env_wrapper.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" @@ -49,10 +48,9 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions, int_tbl_prop_collector_factories, std::unique_ptr* writable, std::unique_ptr* builder) { - std::unique_ptr wf(new test::StringSink); + std::unique_ptr wf(new test::StringSink); writable->reset( - new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), - "" /* don't care */, EnvOptions())); + new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions())); int unknown_level = -1; builder->reset(NewTableBuilder( ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories, @@ -286,12 +284,13 @@ void TestCustomizedTablePropertiesCollector( writer->Flush(); // -- Step 2: Read properties - LegacyWritableFileWrapper* file = - static_cast(writer->writable_file()); - test::StringSink* fwf = static_cast(file->target()); + test::StringSink* fwf = + static_cast(writer->writable_file()); + std::unique_ptr source( + new test::StringSource(fwf->contents())); std::unique_ptr fake_file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(fwf->contents()))); + new RandomAccessFileReader(std::move(source), "test")); + TableProperties* props; Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(), magic_number, ioptions, &props, @@ -427,12 +426,13 @@ void TestInternalKeyPropertiesCollector( ASSERT_OK(builder->Finish()); writable->Flush(); - LegacyWritableFileWrapper* file = - static_cast(writable->writable_file()); - test::StringSink* fwf = static_cast(file->target()); + test::StringSink* fwf = + static_cast(writable->writable_file()); + std::unique_ptr source( + new test::StringSource(fwf->contents())); std::unique_ptr reader( - test::GetRandomAccessFileReader( - new test::StringSource(fwf->contents()))); + new RandomAccessFileReader(std::move(source), "test")); + TableProperties* props; Status s = ReadTableProperties(reader.get(), fwf->contents().size(), magic_number, diff --git a/file/readahead_raf.cc b/file/readahead_raf.cc index 493f9d9e89..6d346432e2 100644 --- a/file/readahead_raf.cc +++ b/file/readahead_raf.cc @@ -11,15 +11,17 @@ #include #include + #include "file/read_write_util.h" +#include "rocksdb/file_system.h" #include "util/aligned_buffer.h" #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { namespace { -class ReadaheadRandomAccessFile : public RandomAccessFile { +class ReadaheadRandomAccessFile : public FSRandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr&& file, + ReadaheadRandomAccessFile(std::unique_ptr&& file, size_t readahead_size) : file_(std::move(file)), alignment_(file_->GetRequiredBufferAlignment()), @@ -35,11 +37,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; - Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override { // Read-ahead only make sense if we have some slack left after reading if (n + alignment_ >= readahead_size_) { - return file_->Read(offset, n, result, scratch); + return file_->Read(offset, n, options, result, scratch, dbg); } std::unique_lock lk(lock_); @@ -53,14 +56,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { // We read exactly what we needed, or we hit end of file - return. *result = Slice(scratch, cached_len); - return Status::OK(); + return IOStatus::OK(); } size_t advanced_offset = static_cast(offset + cached_len); // In the case of cache hit advanced_offset is already aligned, means that // chunk_offset equals to advanced_offset size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); - Status s = ReadIntoBuffer(chunk_offset, readahead_size_); + IOStatus s = ReadIntoBuffer(chunk_offset, readahead_size_, options, dbg); if (s.ok()) { // The data we need is now in cache, so we can safely read it size_t remaining_len; @@ -71,11 +74,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return s; } - Status Prefetch(uint64_t offset, size_t n) override { + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override { if (n < readahead_size_) { // Don't allow smaller prefetches than the configured `readahead_size_`. // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. - return Status::OK(); + return IOStatus::OK(); } std::unique_lock lk(lock_); @@ -83,10 +87,11 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { size_t offset_ = static_cast(offset); size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); if (prefetch_offset == buffer_offset_) { - return Status::OK(); + return IOStatus::OK(); } return ReadIntoBuffer(prefetch_offset, - Roundup(offset_ + n, alignment_) - prefetch_offset); + Roundup(offset_ + n, alignment_) - prefetch_offset, + options, dbg); } size_t GetUniqueId(char* id, size_t max_size) const override { @@ -95,7 +100,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { void Hint(AccessPattern pattern) override { file_->Hint(pattern); } - Status InvalidateCache(size_t offset, size_t length) override { + IOStatus InvalidateCache(size_t offset, size_t length) override { std::unique_lock lk(lock_); buffer_.Clear(); return file_->InvalidateCache(offset, length); @@ -125,14 +130,16 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // Reads into buffer_ the next n bytes from file_ starting at offset. // Can actually read less if EOF was reached. // Returns the status of the read operastion on the file. - Status ReadIntoBuffer(uint64_t offset, size_t n) const { + IOStatus ReadIntoBuffer(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) const { if (n > buffer_.Capacity()) { n = buffer_.Capacity(); } assert(IsFileSectorAligned(offset, alignment_)); assert(IsFileSectorAligned(n, alignment_)); Slice result; - Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); + IOStatus s = + file_->Read(offset, n, options, &result, buffer_.BufferStart(), dbg); if (s.ok()) { buffer_offset_ = offset; buffer_.Size(result.size()); @@ -141,7 +148,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return s; } - const std::unique_ptr file_; + const std::unique_ptr file_; const size_t alignment_; const size_t readahead_size_; @@ -153,9 +160,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { }; } // namespace -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size) { - std::unique_ptr result( +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size) { + std::unique_ptr result( new ReadaheadRandomAccessFile(std::move(file), readahead_size)); return result; } diff --git a/file/readahead_raf.h b/file/readahead_raf.h index cbdcb124fd..dfaf2b4fa9 100644 --- a/file/readahead_raf.h +++ b/file/readahead_raf.h @@ -8,10 +8,12 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include -#include "rocksdb/env.h" +#include + +#include "rocksdb/rocksdb_namespace.h" namespace ROCKSDB_NAMESPACE { +class FSRandomAccessFile; // This file provides the following main abstractions: // SequentialFileReader : wrapper over Env::SequentialFile // RandomAccessFileReader : wrapper over Env::RandomAccessFile @@ -22,6 +24,6 @@ namespace ROCKSDB_NAMESPACE { // NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to // always prefetch additional data with every read. This is mainly used in // Compaction Table Readers. -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size); +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size); } // namespace ROCKSDB_NAMESPACE diff --git a/options/options_test.cc b/options/options_test.cc index a916e85684..b15be02066 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -2414,14 +2414,10 @@ TEST_F(OptionsOldApiTest, ColumnFamilyOptionsSerialization) { #ifndef ROCKSDB_LITE class OptionsParserTest : public testing::Test { public: - OptionsParserTest() { - env_.reset(new test::StringEnv(Env::Default())); - fs_.reset(new LegacyFileSystemWrapper(env_.get())); - } + OptionsParserTest() { fs_.reset(new test::StringFS(FileSystem::Default())); } protected: - std::unique_ptr env_; - std::unique_ptr fs_; + std::shared_ptr fs_; }; TEST_F(OptionsParserTest, Comment) { @@ -2450,7 +2446,7 @@ TEST_F(OptionsParserTest, Comment) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_OK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2481,7 +2477,7 @@ TEST_F(OptionsParserTest, ExtraSpace) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_OK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2499,7 +2495,7 @@ TEST_F(OptionsParserTest, MissingDBOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2529,7 +2525,7 @@ TEST_F(OptionsParserTest, DoubleDBOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2557,7 +2553,7 @@ TEST_F(OptionsParserTest, NoDefaultCFOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2587,7 +2583,7 @@ TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2616,7 +2612,7 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) { "[CFOptions \"something_else\"]\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK( parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2684,12 +2680,12 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) { " # if a section is blank, we will use the default\n"; const std::string kTestFileName = "test-rocksdb-options.ini"; - auto s = env_->FileExists(kTestFileName); + auto s = fs_->FileExists(kTestFileName, IOOptions(), nullptr); ASSERT_TRUE(s.ok() || s.IsNotFound()); if (s.ok()) { - ASSERT_OK(env_->DeleteFile(kTestFileName)); + ASSERT_OK(fs_->DeleteFile(kTestFileName, IOOptions(), nullptr)); } - ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content)); + ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content)); RocksDBOptionsParser parser; ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); @@ -2737,7 +2733,7 @@ TEST_F(OptionsParserTest, ParseVersion) { snprintf(buffer, kLength - 1, file_template.c_str(), iv.c_str()); parser.Reset(); - ASSERT_OK(env_->WriteToNewFile(iv, buffer)); + ASSERT_OK(fs_->WriteToNewFile(iv, buffer)); ASSERT_NOK(parser.Parse(iv, fs_.get(), false, 0 /* readahead_size */)); } @@ -2746,7 +2742,7 @@ TEST_F(OptionsParserTest, ParseVersion) { for (auto vv : valid_versions) { snprintf(buffer, kLength - 1, file_template.c_str(), vv.c_str()); parser.Reset(); - ASSERT_OK(env_->WriteToNewFile(vv, buffer)); + ASSERT_OK(fs_->WriteToNewFile(vv, buffer)); ASSERT_OK(parser.Parse(vv, fs_.get(), false, 0 /* readahead_size */)); } } @@ -2855,37 +2851,37 @@ TEST_F(OptionsParserTest, Readahead) { kOptionsFileName, fs_.get())); uint64_t file_size = 0; - ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size)); + ASSERT_OK( + fs_->GetFileSize(kOptionsFileName, IOOptions(), &file_size, nullptr)); assert(file_size > 0); RocksDBOptionsParser parser; - env_->num_seq_file_read_ = 0; + fs_->num_seq_file_read_ = 0; size_t readahead_size = 128 * 1024; ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); - ASSERT_EQ(env_->num_seq_file_read_.load(), + ASSERT_EQ(fs_->num_seq_file_read_.load(), (file_size - 1) / readahead_size + 1); - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); readahead_size = 1024 * 1024; ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size)); - ASSERT_EQ(env_->num_seq_file_read_.load(), + ASSERT_EQ(fs_->num_seq_file_read_.load(), (file_size - 1) / readahead_size + 1); // Tiny readahead. 8 KB is read each time. - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); ASSERT_OK( parser.Parse(kOptionsFileName, fs_.get(), false, 1 /* readahead_size */)); - ASSERT_GE(env_->num_seq_file_read_.load(), file_size / (8 * 1024)); - ASSERT_LT(env_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2); + ASSERT_GE(fs_->num_seq_file_read_.load(), file_size / (8 * 1024)); + ASSERT_LT(fs_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2); // Disable readahead means 512KB readahead. - env_->num_seq_file_read_.store(0); + fs_->num_seq_file_read_.store(0); ASSERT_OK( parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */)); - ASSERT_GE(env_->num_seq_file_read_.load(), - (file_size - 1) / (512 * 1024) + 1); + ASSERT_GE(fs_->num_seq_file_read_.load(), (file_size - 1) / (512 * 1024) + 1); } TEST_F(OptionsParserTest, DumpAndParse) { @@ -3083,7 +3079,7 @@ class OptionsSanityCheckTest : public OptionsParserTest { } Status PersistCFOptions(const ColumnFamilyOptions& cf_opts) { - Status s = env_->DeleteFile(kOptionsFileName); + Status s = fs_->DeleteFile(kOptionsFileName, IOOptions(), nullptr); if (!s.ok()) { return s; } diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 7ce296318e..0a4276fd3f 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -546,8 +546,10 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, EnvOptions soptions; soptions.use_mmap_reads = ioptions.allow_mmap_reads; + test::StringSink* sink = new test::StringSink(); + std::unique_ptr f(sink); file_writer.reset( - test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */)); + new WritableFileWriter(std::move(f), "" /* don't care */, FileOptions())); std::unique_ptr builder; std::vector> int_tbl_prop_collector_factories; @@ -569,23 +571,20 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, file_writer->Flush(); EXPECT_TRUE(s.ok()) << s.ToString(); - EXPECT_EQ( - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(), - builder->FileSize()); + EXPECT_EQ(sink->contents().size(), builder->FileSize()); // Open the table - file_reader.reset(test::GetRandomAccessFileReader(new test::StringSource( - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents(), - 0 /*uniq_id*/, ioptions.allow_mmap_reads))); + test::StringSource* source = new test::StringSource( + sink->contents(), 0 /*uniq_id*/, ioptions.allow_mmap_reads); + std::unique_ptr file(source); + file_reader.reset(new RandomAccessFileReader(std::move(file), "test")); const bool kSkipFilters = true; const bool kImmortal = true; ASSERT_OK(ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, internal_comparator, !kSkipFilters, !kImmortal, level_), - std::move(file_reader), - test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(), - &table_reader)); + std::move(file_reader), sink->contents().size(), &table_reader)); // Search using Get() ReadOptions ro; diff --git a/table/table_test.cc b/table/table_test.cc index db765d4837..84a942dd4f 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -345,8 +345,9 @@ class TableConstructor : public Constructor { const stl_wrappers::KVMap& kv_map) override { Reset(); soptions.use_mmap_reads = ioptions.allow_mmap_reads; - file_writer_.reset(test::GetWritableFileWriter(new test::StringSink(), - "" /* don't care */)); + std::unique_ptr sink(new test::StringSink()); + file_writer_.reset(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); std::unique_ptr builder; std::vector> int_tbl_prop_collector_factories; @@ -387,8 +388,10 @@ class TableConstructor : public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; - file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource( - TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads))); + std::unique_ptr source(new test::StringSource( + TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)); + + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); const bool kSkipFilters = true; const bool kImmortal = true; return ioptions.table_factory->NewTableReader( @@ -425,8 +428,10 @@ class TableConstructor : public Constructor { virtual Status Reopen(const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions) { - file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource( - TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads))); + std::unique_ptr source(new test::StringSource( + TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)); + + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); return ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions, *last_internal_key_), @@ -445,8 +450,7 @@ class TableConstructor : public Constructor { bool ConvertToInternalKey() { return convert_to_internal_key_; } test::StringSink* TEST_GetSink() { - return ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter( - file_writer_.get()); + return static_cast(file_writer_->writable_file()); } BlockCacheTracer block_cache_tracer_; @@ -1230,7 +1234,9 @@ class FileChecksumTestHelper { void CreateWriteableFile() { sink_ = new test::StringSink(); - file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */)); + std::unique_ptr holder(sink_); + file_writer_.reset(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); } void SetFileChecksumGenerator(FileChecksumGenerator* checksum_generator) { @@ -1291,10 +1297,11 @@ class FileChecksumTestHelper { assert(file_checksum_generator != nullptr); cur_uniq_id_ = checksum_uniq_id_++; test::StringSink* ss_rw = - ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter( - file_writer_.get()); - file_reader_.reset(test::GetRandomAccessFileReader( - new test::StringSource(ss_rw->contents()))); + static_cast(file_writer_->writable_file()); + std::unique_ptr source( + new test::StringSource(ss_rw->contents())); + file_reader_.reset(new RandomAccessFileReader(std::move(source), "test")); + std::unique_ptr scratch(new char[2048]); Slice result; uint64_t offset = 0; @@ -3392,9 +3399,9 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { plain_table_options.hash_table_ratio = 0; PlainTableFactory factory(plain_table_options); - test::StringSink sink; - std::unique_ptr file_writer( - test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */)); + std::unique_ptr sink(new test::StringSink()); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Options options; const ImmutableCFOptions ioptions(options); const MutableCFOptions moptions(options); @@ -3421,10 +3428,11 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { ASSERT_OK(file_writer->Flush()); test::StringSink* ss = - ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(file_writer.get()); + static_cast(file_writer->writable_file()); + std::unique_ptr source( + new test::StringSource(ss->contents(), 72242, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss->contents(), 72242, true))); + new RandomAccessFileReader(std::move(source), "test")); TableProperties* props = nullptr; auto s = ReadTableProperties(file_reader.get(), ss->contents().size(), @@ -4052,8 +4060,9 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) { TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const ImmutableCFOptions ioptions(options); @@ -4090,9 +4099,10 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { // Helper function to get version, global_seqno, global_seqno_offset std::function GetVersionAndGlobalSeqno = [&]() { + std::unique_ptr source( + new test::StringSource(ss_rw.contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "")); TableProperties* props = nullptr; ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), @@ -4115,16 +4125,18 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { std::string new_global_seqno; PutFixed64(&new_global_seqno, val); - ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno)); + ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno, IOOptions(), + nullptr)); }; // Helper function to get the contents of the table InternalIterator std::unique_ptr table_reader; const ReadOptions read_options; std::function GetTableInternalIter = [&]() { + std::unique_ptr source( + new test::StringSource(ss_rw.contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "")); options.table_factory->NewTableReader( TableReaderOptions(ioptions, moptions.prefix_extractor.get(), @@ -4236,8 +4248,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_align = true; test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.compression = kNoCompression; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); @@ -4267,17 +4280,16 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { ASSERT_OK(builder->Finish()); ASSERT_OK(file_writer->Flush()); - test::RandomRWStringSink ss_rw(sink); + std::unique_ptr source( + new test::StringSource(sink->contents(), 73342, false)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); - + new RandomAccessFileReader(std::move(source), "test")); // Helper function to get version, global_seqno, global_seqno_offset std::function VerifyBlockAlignment = [&]() { TableProperties* props = nullptr; - ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), - kBlockBasedTableMagicNumber, ioptions, - &props, true /* compression_type_missing */)); + ASSERT_OK(ReadTableProperties(file_reader.get(), sink->contents().size(), + kBlockBasedTableMagicNumber, ioptions, &props, + true /* compression_type_missing */)); uint64_t data_block_size = props->data_size / props->num_data_blocks; ASSERT_EQ(data_block_size, 4096); @@ -4301,7 +4313,7 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { TableReaderOptions(ioptions2, moptions2.prefix_extractor.get(), EnvOptions(), GetPlainInternalComparator(options2.comparator)), - std::move(file_reader), ss_rw.contents().size(), &table_reader)); + std::move(file_reader), sink->contents().size(), &table_reader)); ReadOptions read_options; std::unique_ptr db_iter(table_reader->NewIterator( @@ -4328,8 +4340,9 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_align = true; test::StringSink* sink = new test::StringSink(); - std::unique_ptr file_writer( - test::GetWritableFileWriter(sink, "" /* don't care */)); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "" /* don't care */, FileOptions())); Options options; options.compression = kNoCompression; @@ -4362,14 +4375,14 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { ASSERT_OK(builder->Finish()); ASSERT_OK(file_writer->Flush()); - test::RandomRWStringSink ss_rw(sink); + std::unique_ptr source( + new test::StringSource(sink->contents(), 73342, true)); std::unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); + new RandomAccessFileReader(std::move(source), "test")); { RandomAccessFileReader* file = file_reader.get(); - uint64_t file_size = ss_rw.contents().size(); + uint64_t file_size = sink->contents().size(); Footer footer; IOOptions opts; @@ -4452,10 +4465,11 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { // get file reader test::StringSink* table_sink = c.TEST_GetSink(); - std::unique_ptr table_reader{ - test::GetRandomAccessFileReader( - new test::StringSource(table_sink->contents(), 0 /* unique_id */, - false /* allow_mmap_reads */))}; + std::unique_ptr source(new test::StringSource( + table_sink->contents(), 0 /* unique_id */, false /* allow_mmap_reads */)); + + std::unique_ptr table_reader( + new RandomAccessFileReader(std::move(source), "test")); size_t table_size = table_sink->contents().size(); // read footer diff --git a/test_util/testutil.cc b/test_util/testutil.cc index f85ee224a8..f0df928f1f 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -171,25 +171,6 @@ const Comparator* ComparatorWithU64Ts() { return &comp_with_u64_ts; } -WritableFileWriter* GetWritableFileWriter(WritableFile* wf, - const std::string& fname) { - std::unique_ptr file(wf); - return new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), - fname, EnvOptions()); -} - -RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) { - std::unique_ptr file(raf); - return new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), - "[test RandomAccessFileReader]"); -} - -SequentialFileReader* GetSequentialFileReader(SequentialFile* se, - const std::string& fname) { - std::unique_ptr file(se); - return new SequentialFileReader(NewLegacySequentialFileWrapper(file), fname); -} - void CorruptKeyType(InternalKey* ikey) { std::string keystr = ikey->Encode().ToString(); keystr[keystr.size() - 8] = kTypeLogData; diff --git a/test_util/testutil.h b/test_util/testutil.h index 9098200d02..00a768d50f 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -178,23 +178,16 @@ class VectorIterator : public InternalIterator { std::vector values_; size_t current_; }; -extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf, - const std::string& fname); -extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf); - -extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se, - const std::string& fname); - -class StringSink: public WritableFile { +class StringSink : public FSWritableFile { public: std::string contents_; - explicit StringSink(Slice* reader_contents = nullptr) : - WritableFile(), - contents_(""), - reader_contents_(reader_contents), - last_flush_(0) { + explicit StringSink(Slice* reader_contents = nullptr) + : FSWritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { if (reader_contents_ != nullptr) { *reader_contents_ = Slice(contents_.data(), 0); } @@ -202,12 +195,15 @@ class StringSink: public WritableFile { const std::string& contents() const { return contents_; } - virtual Status Truncate(uint64_t size) override { + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.resize(static_cast(size)); - return Status::OK(); + return IOStatus::OK(); } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { if (reader_contents_ != nullptr) { assert(reader_contents_->size() <= last_flush_); size_t offset = last_flush_ - reader_contents_->size(); @@ -217,12 +213,17 @@ class StringSink: public WritableFile { last_flush_ = contents_.size(); } - return Status::OK(); + return IOStatus::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.append(slice.data(), slice.size()); - return Status::OK(); + return IOStatus::OK(); } void Drop(size_t bytes) { if (reader_contents_ != nullptr) { @@ -239,36 +240,44 @@ class StringSink: public WritableFile { }; // A wrapper around a StringSink to give it a RandomRWFile interface -class RandomRWStringSink : public RandomRWFile { +class RandomRWStringSink : public FSRandomRWFile { public: explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {} - Status Write(uint64_t offset, const Slice& data) override { + IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { if (offset + data.size() > ss_->contents_.size()) { ss_->contents_.resize(static_cast(offset) + data.size(), '\0'); } char* pos = const_cast(ss_->contents_.data() + offset); memcpy(pos, data.data(), data.size()); - return Status::OK(); + return IOStatus::OK(); } - Status Read(uint64_t offset, size_t n, Slice* result, - char* /*scratch*/) const override { + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* /*scratch*/, + IODebugContext* /*dbg*/) const override { *result = Slice(nullptr, 0); if (offset < ss_->contents_.size()) { size_t str_res_sz = std::min(static_cast(ss_->contents_.size() - offset), n); *result = Slice(ss_->contents_.data() + offset, str_res_sz); } - return Status::OK(); + return IOStatus::OK(); } - Status Flush() override { return Status::OK(); } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } - Status Sync() override { return Status::OK(); } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } - Status Close() override { return Status::OK(); } + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } const std::string& contents() const { return ss_->contents(); } @@ -279,34 +288,42 @@ class RandomRWStringSink : public RandomRWFile { // Like StringSink, this writes into a string. Unlink StringSink, it // has some initial content and overwrites it, just like a recycled // log file. -class OverwritingStringSink : public WritableFile { +class OverwritingStringSink : public FSWritableFile { public: explicit OverwritingStringSink(Slice* reader_contents) - : WritableFile(), + : FSWritableFile(), contents_(""), reader_contents_(reader_contents), last_flush_(0) {} const std::string& contents() const { return contents_; } - virtual Status Truncate(uint64_t size) override { + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.resize(static_cast(size)); - return Status::OK(); + return IOStatus::OK(); } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { + IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { if (last_flush_ < contents_.size()) { assert(reader_contents_->size() >= contents_.size()); memcpy((char*)reader_contents_->data() + last_flush_, contents_.data() + last_flush_, contents_.size() - last_flush_); last_flush_ = contents_.size(); } - return Status::OK(); + return IOStatus::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { contents_.append(slice.data(), slice.size()); - return Status::OK(); + return IOStatus::OK(); } void Drop(size_t bytes) { contents_.resize(contents_.size() - bytes); @@ -319,7 +336,7 @@ class OverwritingStringSink : public WritableFile { size_t last_flush_; }; -class StringSource: public RandomAccessFile { +class StringSource : public FSRandomAccessFile { public: explicit StringSource(const Slice& contents, uint64_t uniq_id = 0, bool mmap = false) @@ -332,11 +349,23 @@ class StringSource: public RandomAccessFile { uint64_t Size() const { return contents_.size(); } - virtual Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { + IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + // If we are using mmap_, it is equivalent to performing a prefetch + if (mmap_) { + return IOStatus::OK(); + } else { + return IOStatus::NotSupported("Prefetch not supported"); + } + } + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { total_reads_++; if (offset > contents_.size()) { - return Status::InvalidArgument("invalid Read offset"); + return IOStatus::InvalidArgument("invalid Read offset"); } if (offset + n > contents_.size()) { n = contents_.size() - static_cast(offset); @@ -347,10 +376,10 @@ class StringSource: public RandomAccessFile { } else { *result = Slice(&contents_[static_cast(offset)], n); } - return Status::OK(); + return IOStatus::OK(); } - virtual size_t GetUniqueId(char* id, size_t max_size) const override { + size_t GetUniqueId(char* id, size_t max_size) const override { if (max_size < 20) { return 0; } @@ -372,13 +401,6 @@ class StringSource: public RandomAccessFile { mutable int total_reads_; }; -inline StringSink* GetStringSinkFromLegacyWriter( - const WritableFileWriter* writer) { - LegacyWritableFileWrapper* file = - static_cast(writer->writable_file()); - return static_cast(file->target()); -} - class NullLogger : public Logger { public: using Logger::Logv; @@ -525,176 +547,220 @@ inline std::string EncodeInt(uint64_t x) { return result; } - class SeqStringSource : public SequentialFile { - public: - SeqStringSource(const std::string& data, std::atomic* read_count) - : data_(data), offset_(0), read_count_(read_count) {} - ~SeqStringSource() override {} - Status Read(size_t n, Slice* result, char* scratch) override { - std::string output; - if (offset_ < data_.size()) { - n = std::min(data_.size() - offset_, n); - memcpy(scratch, data_.data() + offset_, n); - offset_ += n; - *result = Slice(scratch, n); - } else { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - (*read_count_)++; - return Status::OK(); +class SeqStringSource : public FSSequentialFile { + public: + SeqStringSource(const std::string& data, std::atomic* read_count) + : data_(data), offset_(0), read_count_(read_count) {} + ~SeqStringSource() override {} + IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + std::string output; + if (offset_ < data_.size()) { + n = std::min(data_.size() - offset_, n); + memcpy(scratch, data_.data() + offset_, n); + offset_ += n; + *result = Slice(scratch, n); + } else { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); } - Status Skip(uint64_t n) override { - if (offset_ >= data_.size()) { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - // TODO(yhchiang): Currently doesn't handle the overflow case. - offset_ += static_cast(n); - return Status::OK(); + (*read_count_)++; + return IOStatus::OK(); + } + + IOStatus Skip(uint64_t n) override { + if (offset_ >= data_.size()) { + return IOStatus::InvalidArgument( + "Attempt to read when it already reached eof."); + } + // TODO(yhchiang): Currently doesn't handle the overflow case. + offset_ += static_cast(n); + return IOStatus::OK(); + } + + private: + std::string data_; + size_t offset_; + std::atomic* read_count_; +}; + +class StringFS : public FileSystemWrapper { + public: + class StringSink : public FSWritableFile { + public: + explicit StringSink(std::string* contents) + : FSWritableFile(), contents_(contents) {} + IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->resize(static_cast(size)); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& slice, const IOOptions& /*opts*/, + IODebugContext* /*dbg*/) override { + contents_->append(slice.data(), slice.size()); + return IOStatus::OK(); } private: - std::string data_; - size_t offset_; - std::atomic* read_count_; + std::string* contents_; }; - class StringEnv : public EnvWrapper { - public: - class StringSink : public WritableFile { - public: - explicit StringSink(std::string* contents) - : WritableFile(), contents_(contents) {} - virtual Status Truncate(uint64_t size) override { - contents_->resize(static_cast(size)); - return Status::OK(); - } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { return Status::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { - contents_->append(slice.data(), slice.size()); - return Status::OK(); - } + explicit StringFS(const std::shared_ptr& t) + : FileSystemWrapper(t) {} + ~StringFS() override {} - private: - std::string* contents_; - }; + const std::string& GetContent(const std::string& f) { return files_[f]; } - explicit StringEnv(Env* t) : EnvWrapper(t) {} - ~StringEnv() override {} - - const std::string& GetContent(const std::string& f) { return files_[f]; } - - const Status WriteToNewFile(const std::string& file_name, + const IOStatus WriteToNewFile(const std::string& file_name, const std::string& content) { - std::unique_ptr r; - auto s = NewWritableFile(file_name, &r, EnvOptions()); - if (s.ok()) { - s = r->Append(content); - } - if (s.ok()) { - s = r->Flush(); - } - if (s.ok()) { - s = r->Close(); - } - assert(!s.ok() || files_[file_name] == content); - return s; - } + std::unique_ptr r; + FileOptions file_opts; + IOOptions io_opts; - // The following text is boilerplate that forwards all methods to target() - Status NewSequentialFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist", f); - } - r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); - return Status::OK(); + auto s = NewWritableFile(file_name, file_opts, &r, nullptr); + if (s.ok()) { + s = r->Append(content, io_opts, nullptr); } - Status NewRandomAccessFile(const std::string& /*f*/, - std::unique_ptr* /*r*/, - const EnvOptions& /*options*/) override { - return Status::NotSupported(); + if (s.ok()) { + s = r->Flush(io_opts, nullptr); } - Status NewWritableFile(const std::string& f, - std::unique_ptr* r, - const EnvOptions& /*options*/) override { - auto iter = files_.find(f); - if (iter != files_.end()) { - return Status::IOError("The specified file already exists", f); - } - r->reset(new StringSink(&files_[f])); - return Status::OK(); - } - virtual Status NewDirectory( - const std::string& /*name*/, - std::unique_ptr* /*result*/) override { - return Status::NotSupported(); - } - Status FileExists(const std::string& f) override { - if (files_.find(f) == files_.end()) { - return Status::NotFound(); - } - return Status::OK(); - } - Status GetChildren(const std::string& /*dir*/, - std::vector* /*r*/) override { - return Status::NotSupported(); - } - Status DeleteFile(const std::string& f) override { - files_.erase(f); - return Status::OK(); - } - Status CreateDir(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status CreateDirIfMissing(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status DeleteDir(const std::string& /*d*/) override { - return Status::NotSupported(); - } - Status GetFileSize(const std::string& f, uint64_t* s) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist:", f); - } - *s = iter->second.size(); - return Status::OK(); + if (s.ok()) { + s = r->Close(io_opts, nullptr); } + assert(!s.ok() || files_[file_name] == content); + return s; + } - Status GetFileModificationTime(const std::string& /*fname*/, - uint64_t* /*file_mtime*/) override { - return Status::NotSupported(); + // The following text is boilerplate that forwards all methods to target() + IOStatus NewSequentialFile(const std::string& f, + const FileOptions& /*options*/, + std::unique_ptr* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist", f); } + r->reset(new SeqStringSource(iter->second, &num_seq_file_read_)); + return IOStatus::OK(); + } - Status RenameFile(const std::string& /*s*/, - const std::string& /*t*/) override { - return Status::NotSupported(); + IOStatus NewRandomAccessFile(const std::string& /*f*/, + const FileOptions& /*options*/, + std::unique_ptr* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus NewWritableFile(const std::string& f, const FileOptions& /*options*/, + std::unique_ptr* r, + IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return IOStatus::IOError("The specified file already exists", f); } + r->reset(new StringSink(&files_[f])); + return IOStatus::OK(); + } + IOStatus NewDirectory(const std::string& /*name*/, + const IOOptions& /*options*/, + std::unique_ptr* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - Status LinkFile(const std::string& /*s*/, - const std::string& /*t*/) override { - return Status::NotSupported(); + IOStatus FileExists(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (files_.find(f) == files_.end()) { + return IOStatus::NotFound(); } + return IOStatus::OK(); + } - Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override { - return Status::NotSupported(); + IOStatus GetChildren(const std::string& /*dir*/, const IOOptions& /*options*/, + std::vector* /*r*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + files_.erase(f); + return IOStatus::OK(); + } + + IOStatus CreateDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus CreateDirIfMissing(const std::string& /*d*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus DeleteDir(const std::string& /*d*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/, + uint64_t* s, IODebugContext* /*dbg*/) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return IOStatus::NotFound("The specified file does not exist:", f); } + *s = iter->second.size(); + return IOStatus::OK(); + } - Status UnlockFile(FileLock* /*l*/) override { - return Status::NotSupported(); - } + IOStatus GetFileModificationTime(const std::string& /*fname*/, + const IOOptions& /*options*/, + uint64_t* /*file_mtime*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - std::atomic num_seq_file_read_; + IOStatus RenameFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } - protected: - std::unordered_map files_; - }; + IOStatus LinkFile(const std::string& /*s*/, const std::string& /*t*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus LockFile(const std::string& /*f*/, const IOOptions& /*options*/, + FileLock** /*l*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + IOStatus UnlockFile(FileLock* /*l*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported(); + } + + std::atomic num_seq_file_read_; + + protected: + std::unordered_map files_; +}; // Randomly initialize the given DBOptions void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 2abd50b148..c883e90166 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -246,19 +246,21 @@ class ReadaheadRandomAccessFileTest ReadaheadRandomAccessFileTest() : control_contents_() {} std::string Read(uint64_t offset, size_t n) { Slice result; - Status s = test_read_holder_->Read(offset, n, &result, scratch_.get()); + Status s = test_read_holder_->Read(offset, n, IOOptions(), &result, + scratch_.get(), nullptr); EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); return std::string(result.data(), result.size()); } void ResetSourceStr(const std::string& str = "") { - auto write_holder = - std::unique_ptr(test::GetWritableFileWriter( - new test::StringSink(&control_contents_), "" /* don't care */)); + std::unique_ptr sink( + new test::StringSink(&control_contents_)); + std::unique_ptr write_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); Status s = write_holder->Append(Slice(str)); EXPECT_OK(s); s = write_holder->Flush(); EXPECT_OK(s); - auto read_holder = std::unique_ptr( + std::unique_ptr read_holder( new test::StringSource(control_contents_)); test_read_holder_ = NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_); @@ -268,7 +270,7 @@ class ReadaheadRandomAccessFileTest private: size_t readahead_size_; Slice control_contents_; - std::unique_ptr test_read_holder_; + std::unique_ptr test_read_holder_; std::unique_ptr scratch_; }; @@ -353,10 +355,10 @@ class ReadaheadSequentialFileTest : public testing::Test, } void Skip(size_t n) { test_read_holder_->Skip(n); } void ResetSourceStr(const std::string& str = "") { - auto read_holder = std::unique_ptr( + auto read_holder = std::unique_ptr( new test::SeqStringSource(str, &seq_read_count_)); - test_read_holder_.reset(new SequentialFileReader( - NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_)); + test_read_holder_.reset(new SequentialFileReader(std::move(read_holder), + "test", readahead_size_)); } size_t GetReadaheadSize() const { return readahead_size_; } diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index 8f425f730a..7fb3d6b3b8 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -5,17 +5,19 @@ #ifndef ROCKSDB_LITE #include "utilities/blob_db/blob_dump_tool.h" + #include + #include #include #include #include -#include "env/composite_env_wrapper.h" + #include "file/random_access_file_reader.h" #include "file/readahead_raf.h" #include "port/port.h" #include "rocksdb/convenience.h" -#include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "table/format.h" #include "util/coding.h" #include "util/string_util.h" @@ -32,18 +34,19 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, bool show_summary) { constexpr size_t kReadaheadSize = 2 * 1024 * 1024; Status s; - Env* env = Env::Default(); - s = env->FileExists(filename); + const auto fs = FileSystem::Default(); + IOOptions io_opts; + s = fs->FileExists(filename, io_opts, nullptr); if (!s.ok()) { return s; } uint64_t file_size = 0; - s = env->GetFileSize(filename, &file_size); + s = fs->GetFileSize(filename, io_opts, &file_size, nullptr); if (!s.ok()) { return s; } - std::unique_ptr file; - s = env->NewRandomAccessFile(filename, &file, EnvOptions()); + std::unique_ptr file; + s = fs->NewRandomAccessFile(filename, FileOptions(), &file, nullptr); if (!s.ok()) { return s; } @@ -51,8 +54,7 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, if (file_size == 0) { return Status::Corruption("File is empty."); } - reader_.reset(new RandomAccessFileReader( - NewLegacyRandomAccessFileWrapper(file), filename)); + reader_.reset(new RandomAccessFileReader(std::move(file), filename)); uint64_t offset = 0; uint64_t footer_offset = 0; CompressionType compression = kNoCompression;