Make StringEnv, StringSink, StringSource use FS classes (#7786)

Summary:
Change the StringEnv and related classes to be based on FileSystem APIs rather than the corresponding Env ones.  The StringSink and StringSource classes were changed to be based on the corresponding FS file classes.

Part of a cleanup to use the newer interfaces.  This change also eliminates some of the casts/wrappers to LegacyFile classes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7786

Reviewed By: jay-zhuang

Differential Revision: D25761460

Pulled By: anand1976

fbshipit-source-id: 428ae8e32b3db97dbeeca08c9d3bb0d9d4d3a38f
This commit is contained in:
mrambacher 2021-01-04 15:59:52 -08:00 committed by Facebook GitHub Bot
parent 58660bf21a
commit c1a65a4de4
12 changed files with 490 additions and 448 deletions

View File

@ -9,7 +9,6 @@
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "env/composite_env_wrapper.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "rocksdb/env.h"
@ -50,7 +49,7 @@ static std::string RandomSkewedString(int i, Random* rnd) {
// get<1>(tuple): true if allow retry after read EOF, false otherwise
class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
private:
class StringSource : public SequentialFile {
class StringSource : public FSSequentialFile {
public:
Slice& contents_;
bool force_error_;
@ -68,7 +67,8 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
returned_partial_(false),
fail_after_read_partial_(fail_after_read_partial) {}
Status Read(size_t n, Slice* result, char* scratch) override {
IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
if (fail_after_read_partial_) {
EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
}
@ -81,7 +81,7 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
contents_.remove_prefix(force_error_position_);
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
return IOStatus::Corruption("read error");
}
}
@ -106,28 +106,21 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
*result = Slice(scratch, n);
contents_.remove_prefix(n);
return Status::OK();
return IOStatus::OK();
}
Status Skip(uint64_t n) override {
IOStatus Skip(uint64_t n) override {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
return IOStatus::NotFound("in-memory file skipepd past end");
}
contents_.remove_prefix(n);
return Status::OK();
return IOStatus::OK();
}
};
inline StringSource* GetStringSourceFromLegacyReader(
SequentialFileReader* reader) {
LegacySequentialFileWrapper* file =
static_cast<LegacySequentialFileWrapper*>(reader->file());
return static_cast<StringSource*>(file->target());
}
class ReportCollector : public Reader::Reporter {
public:
size_t dropped_bytes_;
@ -140,29 +133,17 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
}
};
std::string& dest_contents() {
auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
assert(dest);
return dest->contents_;
}
std::string& dest_contents() { return sink_->contents_; }
const std::string& dest_contents() const {
auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
assert(dest);
return dest->contents_;
}
const std::string& dest_contents() const { return sink_->contents_; }
void reset_source_contents() {
auto src = GetStringSourceFromLegacyReader(reader_->file());
assert(src);
src->contents_ = dest_contents();
}
void reset_source_contents() { source_->contents_ = dest_contents(); }
Slice reader_contents_;
std::unique_ptr<WritableFileWriter> dest_holder_;
std::unique_ptr<SequentialFileReader> source_holder_;
test::StringSink* sink_;
StringSource* source_;
ReportCollector report_;
Writer writer_;
std::unique_ptr<Writer> writer_;
std::unique_ptr<Reader> reader_;
protected:
@ -171,19 +152,23 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
public:
LogTest()
: reader_contents_(),
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_), "" /* don't care */)),
source_holder_(test::GetSequentialFileReader(
new StringSource(reader_contents_, !std::get<1>(GetParam())),
"" /* file name */)),
writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
sink_(new test::StringSink(&reader_contents_)),
source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))),
allow_retry_read_(std::get<1>(GetParam())) {
std::unique_ptr<FSWritableFile> sink_holder(sink_);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(sink_holder), "" /* don't care */, FileOptions()));
writer_.reset(
new Writer(std::move(file_writer), 123, std::get<0>(GetParam())));
std::unique_ptr<FSSequentialFile> source_holder(source_);
std::unique_ptr<SequentialFileReader> file_reader(
new SequentialFileReader(std::move(source_holder), "" /* file name */));
if (allow_retry_read_) {
reader_.reset(new FragmentBufferedReader(
nullptr, std::move(source_holder_), &report_, true /* checksum */,
123 /* log_number */));
reader_.reset(new FragmentBufferedReader(nullptr, std::move(file_reader),
&report_, true /* checksum */,
123 /* log_number */));
} else {
reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
reader_.reset(new Reader(nullptr, std::move(file_reader), &report_,
true /* checksum */, 123 /* log_number */));
}
}
@ -191,7 +176,7 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
Slice* get_reader_contents() { return &reader_contents_; }
void Write(const std::string& msg) {
ASSERT_OK(writer_.AddRecord(Slice(msg)));
ASSERT_OK(writer_->AddRecord(Slice(msg)));
}
size_t WrittenBytes() const {
@ -219,11 +204,7 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
dest_contents()[offset] = new_byte;
}
void ShrinkSize(int bytes) {
auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
assert(dest);
dest->Drop(bytes);
}
void ShrinkSize(int bytes) { sink_->Drop(bytes); }
void FixChecksum(int header_offset, int len, bool recyclable) {
// Compute crc of type/len/data
@ -235,9 +216,8 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
}
void ForceError(size_t position = 0) {
auto src = GetStringSourceFromLegacyReader(reader_->file());
src->force_error_ = true;
src->force_error_position_ = position;
source_->force_error_ = true;
source_->force_error_position_ = position;
}
size_t DroppedBytes() const {
@ -249,14 +229,12 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
}
void ForceEOF(size_t position = 0) {
auto src = GetStringSourceFromLegacyReader(reader_->file());
src->force_eof_ = true;
src->force_eof_position_ = position;
source_->force_eof_ = true;
source_->force_eof_position_ = position;
}
void UnmarkEOF() {
auto src = GetStringSourceFromLegacyReader(reader_->file());
src->returned_partial_ = false;
source_->returned_partial_ = false;
reader_->UnmarkEOF();
}
@ -685,9 +663,10 @@ TEST_P(LogTest, Recycle) {
while (get_reader_contents()->size() < log::kBlockSize * 2) {
Write("xxxxxxxxxxxxxxxx");
}
std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
new test::OverwritingStringSink(get_reader_contents()),
"" /* don't care */));
std::unique_ptr<FSWritableFile> sink(
new test::OverwritingStringSink(get_reader_contents()));
std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter(
std::move(sink), "" /* don't care */, FileOptions()));
Writer recycle_writer(std::move(dest_holder), 123, true);
ASSERT_OK(recycle_writer.AddRecord(Slice("foooo")));
ASSERT_OK(recycle_writer.AddRecord(Slice("bar")));
@ -718,10 +697,9 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
};
Slice contents_;
std::unique_ptr<WritableFileWriter> dest_holder_;
test::StringSink* sink_;
std::unique_ptr<Writer> log_writer_;
Env* env_;
EnvOptions env_options_;
const std::string test_dir_;
const std::string log_file_;
std::unique_ptr<WritableFileWriter> writer_;
@ -732,55 +710,50 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
public:
RetriableLogTest()
: contents_(),
dest_holder_(nullptr),
sink_(new test::StringSink(&contents_)),
log_writer_(nullptr),
env_(Env::Default()),
test_dir_(test::PerThreadDBPath("retriable_log_test")),
log_file_(test_dir_ + "/log"),
writer_(nullptr),
reader_(nullptr),
log_reader_(nullptr) {}
log_reader_(nullptr) {
std::unique_ptr<FSWritableFile> sink_holder(sink_);
std::unique_ptr<WritableFileWriter> wfw(new WritableFileWriter(
std::move(sink_holder), "" /* file name */, FileOptions()));
log_writer_.reset(new Writer(std::move(wfw), 123, GetParam()));
}
Status SetupTestEnv() {
dest_holder_.reset(test::GetWritableFileWriter(
new test::StringSink(&contents_), "" /* file name */));
assert(dest_holder_ != nullptr);
log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
assert(log_writer_ != nullptr);
Status s;
s = env_->CreateDirIfMissing(test_dir_);
std::unique_ptr<WritableFile> writable_file;
FileOptions fopts;
auto fs = env_->GetFileSystem();
s = fs->CreateDirIfMissing(test_dir_, IOOptions(), nullptr);
std::unique_ptr<FSWritableFile> writable_file;
if (s.ok()) {
s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
s = fs->NewWritableFile(log_file_, fopts, &writable_file, nullptr);
}
if (s.ok()) {
writer_.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_,
env_options_));
assert(writer_ != nullptr);
writer_.reset(
new WritableFileWriter(std::move(writable_file), log_file_, fopts));
EXPECT_NE(writer_, nullptr);
}
std::unique_ptr<SequentialFile> seq_file;
std::unique_ptr<FSSequentialFile> seq_file;
if (s.ok()) {
s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
s = fs->NewSequentialFile(log_file_, fopts, &seq_file, nullptr);
}
if (s.ok()) {
reader_.reset(new SequentialFileReader(
NewLegacySequentialFileWrapper(seq_file), log_file_));
assert(reader_ != nullptr);
reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
EXPECT_NE(reader_, nullptr);
log_reader_.reset(new FragmentBufferedReader(
nullptr, std::move(reader_), &report_, true /* checksum */,
123 /* log_number */));
assert(log_reader_ != nullptr);
EXPECT_NE(log_reader_, nullptr);
}
return s;
}
std::string contents() {
auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file());
assert(file != nullptr);
return file->contents_;
}
std::string contents() { return sink_->contents_; }
void Encode(const std::string& msg) {
ASSERT_OK(log_writer_->AddRecord(Slice(msg)));

View File

@ -49,9 +49,9 @@ TEST_F(PlainTableKeyDecoderTest, ReadNonMmap) {
Slice contents(tmp);
test::StringSource* string_source =
new test::StringSource(contents, 0, false);
std::unique_ptr<FSRandomAccessFile> holder(string_source);
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(string_source));
new RandomAccessFileReader(std::move(holder), "test"));
std::unique_ptr<PlainTableReaderFileInfo> file_info(
new PlainTableReaderFileInfo(std::move(file_reader), EnvOptions(),
kLength));

View File

@ -13,7 +13,6 @@
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "env/composite_env_wrapper.h"
#include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
@ -49,10 +48,9 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions,
int_tbl_prop_collector_factories,
std::unique_ptr<WritableFileWriter>* writable,
std::unique_ptr<TableBuilder>* builder) {
std::unique_ptr<WritableFile> wf(new test::StringSink);
std::unique_ptr<FSWritableFile> wf(new test::StringSink);
writable->reset(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
"" /* don't care */, EnvOptions()));
new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
int unknown_level = -1;
builder->reset(NewTableBuilder(
ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories,
@ -286,12 +284,13 @@ void TestCustomizedTablePropertiesCollector(
writer->Flush();
// -- Step 2: Read properties
LegacyWritableFileWrapper* file =
static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
test::StringSink* fwf = static_cast<test::StringSink*>(file->target());
test::StringSink* fwf =
static_cast<test::StringSink*>(writer->writable_file());
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(fwf->contents()));
std::unique_ptr<RandomAccessFileReader> fake_file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(fwf->contents())));
new RandomAccessFileReader(std::move(source), "test"));
TableProperties* props;
Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(),
magic_number, ioptions, &props,
@ -427,12 +426,13 @@ void TestInternalKeyPropertiesCollector(
ASSERT_OK(builder->Finish());
writable->Flush();
LegacyWritableFileWrapper* file =
static_cast<LegacyWritableFileWrapper*>(writable->writable_file());
test::StringSink* fwf = static_cast<test::StringSink*>(file->target());
test::StringSink* fwf =
static_cast<test::StringSink*>(writable->writable_file());
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(fwf->contents()));
std::unique_ptr<RandomAccessFileReader> reader(
test::GetRandomAccessFileReader(
new test::StringSource(fwf->contents())));
new RandomAccessFileReader(std::move(source), "test"));
TableProperties* props;
Status s =
ReadTableProperties(reader.get(), fwf->contents().size(), magic_number,

View File

@ -11,15 +11,17 @@
#include <algorithm>
#include <mutex>
#include "file/read_write_util.h"
#include "rocksdb/file_system.h"
#include "util/aligned_buffer.h"
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile {
class ReadaheadRandomAccessFile : public FSRandomAccessFile {
public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
ReadaheadRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
@ -35,11 +37,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
delete;
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
return file_->Read(offset, n, result, scratch);
return file_->Read(offset, n, options, result, scratch, dbg);
}
std::unique_lock<std::mutex> lk(lock_);
@ -53,14 +56,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return Status::OK();
return IOStatus::OK();
}
size_t advanced_offset = static_cast<size_t>(offset + cached_len);
// In the case of cache hit advanced_offset is already aligned, means that
// chunk_offset equals to advanced_offset
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
IOStatus s = ReadIntoBuffer(chunk_offset, readahead_size_, options, dbg);
if (s.ok()) {
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
@ -71,11 +74,12 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
return s;
}
Status Prefetch(uint64_t offset, size_t n) override {
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
if (n < readahead_size_) {
// Don't allow smaller prefetches than the configured `readahead_size_`.
// `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
return Status::OK();
return IOStatus::OK();
}
std::unique_lock<std::mutex> lk(lock_);
@ -83,10 +87,11 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
size_t offset_ = static_cast<size_t>(offset);
size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
if (prefetch_offset == buffer_offset_) {
return Status::OK();
return IOStatus::OK();
}
return ReadIntoBuffer(prefetch_offset,
Roundup(offset_ + n, alignment_) - prefetch_offset);
Roundup(offset_ + n, alignment_) - prefetch_offset,
options, dbg);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
@ -95,7 +100,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
Status InvalidateCache(size_t offset, size_t length) override {
IOStatus InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
@ -125,14 +130,16 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
// Reads into buffer_ the next n bytes from file_ starting at offset.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
Status ReadIntoBuffer(uint64_t offset, size_t n) const {
IOStatus ReadIntoBuffer(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) const {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(offset, alignment_));
assert(IsFileSectorAligned(n, alignment_));
Slice result;
Status s = file_->Read(offset, n, &result, buffer_.BufferStart());
IOStatus s =
file_->Read(offset, n, options, &result, buffer_.BufferStart(), dbg);
if (s.ok()) {
buffer_offset_ = offset;
buffer_.Size(result.size());
@ -141,7 +148,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
return s;
}
const std::unique_ptr<RandomAccessFile> file_;
const std::unique_ptr<FSRandomAccessFile> file_;
const size_t alignment_;
const size_t readahead_size_;
@ -153,9 +160,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
};
} // namespace
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
std::unique_ptr<RandomAccessFile> result(
std::unique_ptr<FSRandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>&& file, size_t readahead_size) {
std::unique_ptr<FSRandomAccessFile> result(
new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return result;
}

View File

@ -8,10 +8,12 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include "rocksdb/env.h"
#include <memory>
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class FSRandomAccessFile;
// This file provides the following main abstractions:
// SequentialFileReader : wrapper over Env::SequentialFile
// RandomAccessFileReader : wrapper over Env::RandomAccessFile
@ -22,6 +24,6 @@ namespace ROCKSDB_NAMESPACE {
// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to
// always prefetch additional data with every read. This is mainly used in
// Compaction Table Readers.
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
std::unique_ptr<FSRandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>&& file, size_t readahead_size);
} // namespace ROCKSDB_NAMESPACE

View File

@ -2414,14 +2414,10 @@ TEST_F(OptionsOldApiTest, ColumnFamilyOptionsSerialization) {
#ifndef ROCKSDB_LITE
class OptionsParserTest : public testing::Test {
public:
OptionsParserTest() {
env_.reset(new test::StringEnv(Env::Default()));
fs_.reset(new LegacyFileSystemWrapper(env_.get()));
}
OptionsParserTest() { fs_.reset(new test::StringFS(FileSystem::Default())); }
protected:
std::unique_ptr<test::StringEnv> env_;
std::unique_ptr<LegacyFileSystemWrapper> fs_;
std::shared_ptr<test::StringFS> fs_;
};
TEST_F(OptionsParserTest, Comment) {
@ -2450,7 +2446,7 @@ TEST_F(OptionsParserTest, Comment) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_OK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2481,7 +2477,7 @@ TEST_F(OptionsParserTest, ExtraSpace) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_OK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2499,7 +2495,7 @@ TEST_F(OptionsParserTest, MissingDBOptions) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2529,7 +2525,7 @@ TEST_F(OptionsParserTest, DoubleDBOptions) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2557,7 +2553,7 @@ TEST_F(OptionsParserTest, NoDefaultCFOptions) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2587,7 +2583,7 @@ TEST_F(OptionsParserTest, DefaultCFOptionsMustBeTheFirst) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2616,7 +2612,7 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) {
"[CFOptions \"something_else\"]\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(
parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */));
@ -2684,12 +2680,12 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) {
" # if a section is blank, we will use the default\n";
const std::string kTestFileName = "test-rocksdb-options.ini";
auto s = env_->FileExists(kTestFileName);
auto s = fs_->FileExists(kTestFileName, IOOptions(), nullptr);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_OK(env_->DeleteFile(kTestFileName));
ASSERT_OK(fs_->DeleteFile(kTestFileName, IOOptions(), nullptr));
}
ASSERT_OK(env_->WriteToNewFile(kTestFileName, options_file_content));
ASSERT_OK(fs_->WriteToNewFile(kTestFileName, options_file_content));
RocksDBOptionsParser parser;
ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false,
4096 /* readahead_size */));
@ -2737,7 +2733,7 @@ TEST_F(OptionsParserTest, ParseVersion) {
snprintf(buffer, kLength - 1, file_template.c_str(), iv.c_str());
parser.Reset();
ASSERT_OK(env_->WriteToNewFile(iv, buffer));
ASSERT_OK(fs_->WriteToNewFile(iv, buffer));
ASSERT_NOK(parser.Parse(iv, fs_.get(), false, 0 /* readahead_size */));
}
@ -2746,7 +2742,7 @@ TEST_F(OptionsParserTest, ParseVersion) {
for (auto vv : valid_versions) {
snprintf(buffer, kLength - 1, file_template.c_str(), vv.c_str());
parser.Reset();
ASSERT_OK(env_->WriteToNewFile(vv, buffer));
ASSERT_OK(fs_->WriteToNewFile(vv, buffer));
ASSERT_OK(parser.Parse(vv, fs_.get(), false, 0 /* readahead_size */));
}
}
@ -2855,37 +2851,37 @@ TEST_F(OptionsParserTest, Readahead) {
kOptionsFileName, fs_.get()));
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(kOptionsFileName, &file_size));
ASSERT_OK(
fs_->GetFileSize(kOptionsFileName, IOOptions(), &file_size, nullptr));
assert(file_size > 0);
RocksDBOptionsParser parser;
env_->num_seq_file_read_ = 0;
fs_->num_seq_file_read_ = 0;
size_t readahead_size = 128 * 1024;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size));
ASSERT_EQ(env_->num_seq_file_read_.load(),
ASSERT_EQ(fs_->num_seq_file_read_.load(),
(file_size - 1) / readahead_size + 1);
env_->num_seq_file_read_.store(0);
fs_->num_seq_file_read_.store(0);
readahead_size = 1024 * 1024;
ASSERT_OK(parser.Parse(kOptionsFileName, fs_.get(), false, readahead_size));
ASSERT_EQ(env_->num_seq_file_read_.load(),
ASSERT_EQ(fs_->num_seq_file_read_.load(),
(file_size - 1) / readahead_size + 1);
// Tiny readahead. 8 KB is read each time.
env_->num_seq_file_read_.store(0);
fs_->num_seq_file_read_.store(0);
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 1 /* readahead_size */));
ASSERT_GE(env_->num_seq_file_read_.load(), file_size / (8 * 1024));
ASSERT_LT(env_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2);
ASSERT_GE(fs_->num_seq_file_read_.load(), file_size / (8 * 1024));
ASSERT_LT(fs_->num_seq_file_read_.load(), file_size / (8 * 1024) * 2);
// Disable readahead means 512KB readahead.
env_->num_seq_file_read_.store(0);
fs_->num_seq_file_read_.store(0);
ASSERT_OK(
parser.Parse(kOptionsFileName, fs_.get(), false, 0 /* readahead_size */));
ASSERT_GE(env_->num_seq_file_read_.load(),
(file_size - 1) / (512 * 1024) + 1);
ASSERT_GE(fs_->num_seq_file_read_.load(), (file_size - 1) / (512 * 1024) + 1);
}
TEST_F(OptionsParserTest, DumpAndParse) {
@ -3083,7 +3079,7 @@ class OptionsSanityCheckTest : public OptionsParserTest {
}
Status PersistCFOptions(const ColumnFamilyOptions& cf_opts) {
Status s = env_->DeleteFile(kOptionsFileName);
Status s = fs_->DeleteFile(kOptionsFileName, IOOptions(), nullptr);
if (!s.ok()) {
return s;
}

View File

@ -546,8 +546,10 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
EnvOptions soptions;
soptions.use_mmap_reads = ioptions.allow_mmap_reads;
test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> f(sink);
file_writer.reset(
test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */));
new WritableFileWriter(std::move(f), "" /* don't care */, FileOptions()));
std::unique_ptr<TableBuilder> builder;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
@ -569,23 +571,20 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
file_writer->Flush();
EXPECT_TRUE(s.ok()) << s.ToString();
EXPECT_EQ(
test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(),
builder->FileSize());
EXPECT_EQ(sink->contents().size(), builder->FileSize());
// Open the table
file_reader.reset(test::GetRandomAccessFileReader(new test::StringSource(
test::GetStringSinkFromLegacyWriter(file_writer.get())->contents(),
0 /*uniq_id*/, ioptions.allow_mmap_reads)));
test::StringSource* source = new test::StringSource(
sink->contents(), 0 /*uniq_id*/, ioptions.allow_mmap_reads);
std::unique_ptr<FSRandomAccessFile> file(source);
file_reader.reset(new RandomAccessFileReader(std::move(file), "test"));
const bool kSkipFilters = true;
const bool kImmortal = true;
ASSERT_OK(ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
internal_comparator, !kSkipFilters, !kImmortal,
level_),
std::move(file_reader),
test::GetStringSinkFromLegacyWriter(file_writer.get())->contents().size(),
&table_reader));
std::move(file_reader), sink->contents().size(), &table_reader));
// Search using Get()
ReadOptions ro;

View File

@ -345,8 +345,9 @@ class TableConstructor : public Constructor {
const stl_wrappers::KVMap& kv_map) override {
Reset();
soptions.use_mmap_reads = ioptions.allow_mmap_reads;
file_writer_.reset(test::GetWritableFileWriter(new test::StringSink(),
"" /* don't care */));
std::unique_ptr<FSWritableFile> sink(new test::StringSink());
file_writer_.reset(new WritableFileWriter(
std::move(sink), "" /* don't care */, FileOptions()));
std::unique_ptr<TableBuilder> builder;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
@ -387,8 +388,10 @@ class TableConstructor : public Constructor {
// Open the table
uniq_id_ = cur_uniq_id_++;
file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource(
TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)));
std::unique_ptr<FSRandomAccessFile> source(new test::StringSource(
TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads));
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
const bool kSkipFilters = true;
const bool kImmortal = true;
return ioptions.table_factory->NewTableReader(
@ -425,8 +428,10 @@ class TableConstructor : public Constructor {
virtual Status Reopen(const ImmutableCFOptions& ioptions,
const MutableCFOptions& moptions) {
file_reader_.reset(test::GetRandomAccessFileReader(new test::StringSource(
TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads)));
std::unique_ptr<FSRandomAccessFile> source(new test::StringSource(
TEST_GetSink()->contents(), uniq_id_, ioptions.allow_mmap_reads));
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
return ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
*last_internal_key_),
@ -445,8 +450,7 @@ class TableConstructor : public Constructor {
bool ConvertToInternalKey() { return convert_to_internal_key_; }
test::StringSink* TEST_GetSink() {
return ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(
file_writer_.get());
return static_cast<test::StringSink*>(file_writer_->writable_file());
}
BlockCacheTracer block_cache_tracer_;
@ -1230,7 +1234,9 @@ class FileChecksumTestHelper {
void CreateWriteableFile() {
sink_ = new test::StringSink();
file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */));
std::unique_ptr<FSWritableFile> holder(sink_);
file_writer_.reset(new WritableFileWriter(
std::move(holder), "" /* don't care */, FileOptions()));
}
void SetFileChecksumGenerator(FileChecksumGenerator* checksum_generator) {
@ -1291,10 +1297,11 @@ class FileChecksumTestHelper {
assert(file_checksum_generator != nullptr);
cur_uniq_id_ = checksum_uniq_id_++;
test::StringSink* ss_rw =
ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(
file_writer_.get());
file_reader_.reset(test::GetRandomAccessFileReader(
new test::StringSource(ss_rw->contents())));
static_cast<test::StringSink*>(file_writer_->writable_file());
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(ss_rw->contents()));
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
std::unique_ptr<char[]> scratch(new char[2048]);
Slice result;
uint64_t offset = 0;
@ -3392,9 +3399,9 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
plain_table_options.hash_table_ratio = 0;
PlainTableFactory factory(plain_table_options);
test::StringSink sink;
std::unique_ptr<WritableFileWriter> file_writer(
test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */));
std::unique_ptr<FSWritableFile> sink(new test::StringSink());
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(sink), "" /* don't care */, FileOptions()));
Options options;
const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options);
@ -3421,10 +3428,11 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
ASSERT_OK(file_writer->Flush());
test::StringSink* ss =
ROCKSDB_NAMESPACE::test::GetStringSinkFromLegacyWriter(file_writer.get());
static_cast<test::StringSink*>(file_writer->writable_file());
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(ss->contents(), 72242, true));
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss->contents(), 72242, true)));
new RandomAccessFileReader(std::move(source), "test"));
TableProperties* props = nullptr;
auto s = ReadTableProperties(file_reader.get(), ss->contents().size(),
@ -4052,8 +4060,9 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
test::StringSink* sink = new test::StringSink();
std::unique_ptr<WritableFileWriter> file_writer(
test::GetWritableFileWriter(sink, "" /* don't care */));
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "" /* don't care */, FileOptions()));
Options options;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const ImmutableCFOptions ioptions(options);
@ -4090,9 +4099,10 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
// Helper function to get version, global_seqno, global_seqno_offset
std::function<void()> GetVersionAndGlobalSeqno = [&]() {
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(ss_rw.contents(), 73342, true));
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
new RandomAccessFileReader(std::move(source), ""));
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
@ -4115,16 +4125,18 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
std::string new_global_seqno;
PutFixed64(&new_global_seqno, val);
ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno));
ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno, IOOptions(),
nullptr));
};
// Helper function to get the contents of the table InternalIterator
std::unique_ptr<TableReader> table_reader;
const ReadOptions read_options;
std::function<InternalIterator*()> GetTableInternalIter = [&]() {
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(ss_rw.contents(), 73342, true));
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
new RandomAccessFileReader(std::move(source), ""));
options.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(),
@ -4236,8 +4248,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
bbto.block_align = true;
test::StringSink* sink = new test::StringSink();
std::unique_ptr<WritableFileWriter> file_writer(
test::GetWritableFileWriter(sink, "" /* don't care */));
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "" /* don't care */, FileOptions()));
Options options;
options.compression = kNoCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
@ -4267,17 +4280,16 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
ASSERT_OK(builder->Finish());
ASSERT_OK(file_writer->Flush());
test::RandomRWStringSink ss_rw(sink);
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(sink->contents(), 73342, false));
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
new RandomAccessFileReader(std::move(source), "test"));
// Helper function to get version, global_seqno, global_seqno_offset
std::function<void()> VerifyBlockAlignment = [&]() {
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
kBlockBasedTableMagicNumber, ioptions,
&props, true /* compression_type_missing */));
ASSERT_OK(ReadTableProperties(file_reader.get(), sink->contents().size(),
kBlockBasedTableMagicNumber, ioptions, &props,
true /* compression_type_missing */));
uint64_t data_block_size = props->data_size / props->num_data_blocks;
ASSERT_EQ(data_block_size, 4096);
@ -4301,7 +4313,7 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
TableReaderOptions(ioptions2, moptions2.prefix_extractor.get(),
EnvOptions(),
GetPlainInternalComparator(options2.comparator)),
std::move(file_reader), ss_rw.contents().size(), &table_reader));
std::move(file_reader), sink->contents().size(), &table_reader));
ReadOptions read_options;
std::unique_ptr<InternalIterator> db_iter(table_reader->NewIterator(
@ -4328,8 +4340,9 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
bbto.block_align = true;
test::StringSink* sink = new test::StringSink();
std::unique_ptr<WritableFileWriter> file_writer(
test::GetWritableFileWriter(sink, "" /* don't care */));
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "" /* don't care */, FileOptions()));
Options options;
options.compression = kNoCompression;
@ -4362,14 +4375,14 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
ASSERT_OK(builder->Finish());
ASSERT_OK(file_writer->Flush());
test::RandomRWStringSink ss_rw(sink);
std::unique_ptr<FSRandomAccessFile> source(
new test::StringSource(sink->contents(), 73342, true));
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
new RandomAccessFileReader(std::move(source), "test"));
{
RandomAccessFileReader* file = file_reader.get();
uint64_t file_size = ss_rw.contents().size();
uint64_t file_size = sink->contents().size();
Footer footer;
IOOptions opts;
@ -4452,10 +4465,11 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
// get file reader
test::StringSink* table_sink = c.TEST_GetSink();
std::unique_ptr<RandomAccessFileReader> table_reader{
test::GetRandomAccessFileReader(
new test::StringSource(table_sink->contents(), 0 /* unique_id */,
false /* allow_mmap_reads */))};
std::unique_ptr<FSRandomAccessFile> source(new test::StringSource(
table_sink->contents(), 0 /* unique_id */, false /* allow_mmap_reads */));
std::unique_ptr<RandomAccessFileReader> table_reader(
new RandomAccessFileReader(std::move(source), "test"));
size_t table_size = table_sink->contents().size();
// read footer

View File

@ -171,25 +171,6 @@ const Comparator* ComparatorWithU64Ts() {
return &comp_with_u64_ts;
}
WritableFileWriter* GetWritableFileWriter(WritableFile* wf,
const std::string& fname) {
std::unique_ptr<WritableFile> file(wf);
return new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
fname, EnvOptions());
}
RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf) {
std::unique_ptr<RandomAccessFile> file(raf);
return new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
"[test RandomAccessFileReader]");
}
SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
const std::string& fname) {
std::unique_ptr<SequentialFile> file(se);
return new SequentialFileReader(NewLegacySequentialFileWrapper(file), fname);
}
void CorruptKeyType(InternalKey* ikey) {
std::string keystr = ikey->Encode().ToString();
keystr[keystr.size() - 8] = kTypeLogData;

View File

@ -178,23 +178,16 @@ class VectorIterator : public InternalIterator {
std::vector<std::string> values_;
size_t current_;
};
extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf,
const std::string& fname);
extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf);
extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se,
const std::string& fname);
class StringSink: public WritableFile {
class StringSink : public FSWritableFile {
public:
std::string contents_;
explicit StringSink(Slice* reader_contents = nullptr) :
WritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {
explicit StringSink(Slice* reader_contents = nullptr)
: FSWritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {
if (reader_contents_ != nullptr) {
*reader_contents_ = Slice(contents_.data(), 0);
}
@ -202,12 +195,15 @@ class StringSink: public WritableFile {
const std::string& contents() const { return contents_; }
virtual Status Truncate(uint64_t size) override {
IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_.resize(static_cast<size_t>(size));
return Status::OK();
return IOStatus::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override {
IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
if (reader_contents_ != nullptr) {
assert(reader_contents_->size() <= last_flush_);
size_t offset = last_flush_ - reader_contents_->size();
@ -217,12 +213,17 @@ class StringSink: public WritableFile {
last_flush_ = contents_.size();
}
return Status::OK();
return IOStatus::OK();
}
virtual Status Sync() override { return Status::OK(); }
virtual Status Append(const Slice& slice) override {
IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
using FSWritableFile::Append;
IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_.append(slice.data(), slice.size());
return Status::OK();
return IOStatus::OK();
}
void Drop(size_t bytes) {
if (reader_contents_ != nullptr) {
@ -239,36 +240,44 @@ class StringSink: public WritableFile {
};
// A wrapper around a StringSink to give it a RandomRWFile interface
class RandomRWStringSink : public RandomRWFile {
class RandomRWStringSink : public FSRandomRWFile {
public:
explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {}
Status Write(uint64_t offset, const Slice& data) override {
IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (offset + data.size() > ss_->contents_.size()) {
ss_->contents_.resize(static_cast<size_t>(offset) + data.size(), '\0');
}
char* pos = const_cast<char*>(ss_->contents_.data() + offset);
memcpy(pos, data.data(), data.size());
return Status::OK();
return IOStatus::OK();
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* /*scratch*/) const override {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/,
Slice* result, char* /*scratch*/,
IODebugContext* /*dbg*/) const override {
*result = Slice(nullptr, 0);
if (offset < ss_->contents_.size()) {
size_t str_res_sz =
std::min(static_cast<size_t>(ss_->contents_.size() - offset), n);
*result = Slice(ss_->contents_.data() + offset, str_res_sz);
}
return Status::OK();
return IOStatus::OK();
}
Status Flush() override { return Status::OK(); }
IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
Status Sync() override { return Status::OK(); }
IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
Status Close() override { return Status::OK(); }
IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
const std::string& contents() const { return ss_->contents(); }
@ -279,34 +288,42 @@ class RandomRWStringSink : public RandomRWFile {
// Like StringSink, this writes into a string. Unlink StringSink, it
// has some initial content and overwrites it, just like a recycled
// log file.
class OverwritingStringSink : public WritableFile {
class OverwritingStringSink : public FSWritableFile {
public:
explicit OverwritingStringSink(Slice* reader_contents)
: WritableFile(),
: FSWritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {}
const std::string& contents() const { return contents_; }
virtual Status Truncate(uint64_t size) override {
IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_.resize(static_cast<size_t>(size));
return Status::OK();
return IOStatus::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override {
IOStatus Close(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
if (last_flush_ < contents_.size()) {
assert(reader_contents_->size() >= contents_.size());
memcpy((char*)reader_contents_->data() + last_flush_,
contents_.data() + last_flush_, contents_.size() - last_flush_);
last_flush_ = contents_.size();
}
return Status::OK();
return IOStatus::OK();
}
virtual Status Sync() override { return Status::OK(); }
virtual Status Append(const Slice& slice) override {
IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
using FSWritableFile::Append;
IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_.append(slice.data(), slice.size());
return Status::OK();
return IOStatus::OK();
}
void Drop(size_t bytes) {
contents_.resize(contents_.size() - bytes);
@ -319,7 +336,7 @@ class OverwritingStringSink : public WritableFile {
size_t last_flush_;
};
class StringSource: public RandomAccessFile {
class StringSource : public FSRandomAccessFile {
public:
explicit StringSource(const Slice& contents, uint64_t uniq_id = 0,
bool mmap = false)
@ -332,11 +349,23 @@ class StringSource: public RandomAccessFile {
uint64_t Size() const { return contents_.size(); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
// If we are using mmap_, it is equivalent to performing a prefetch
if (mmap_) {
return IOStatus::OK();
} else {
return IOStatus::NotSupported("Prefetch not supported");
}
}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*opts*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
total_reads_++;
if (offset > contents_.size()) {
return Status::InvalidArgument("invalid Read offset");
return IOStatus::InvalidArgument("invalid Read offset");
}
if (offset + n > contents_.size()) {
n = contents_.size() - static_cast<size_t>(offset);
@ -347,10 +376,10 @@ class StringSource: public RandomAccessFile {
} else {
*result = Slice(&contents_[static_cast<size_t>(offset)], n);
}
return Status::OK();
return IOStatus::OK();
}
virtual size_t GetUniqueId(char* id, size_t max_size) const override {
size_t GetUniqueId(char* id, size_t max_size) const override {
if (max_size < 20) {
return 0;
}
@ -372,13 +401,6 @@ class StringSource: public RandomAccessFile {
mutable int total_reads_;
};
inline StringSink* GetStringSinkFromLegacyWriter(
const WritableFileWriter* writer) {
LegacyWritableFileWrapper* file =
static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
return static_cast<StringSink*>(file->target());
}
class NullLogger : public Logger {
public:
using Logger::Logv;
@ -525,176 +547,220 @@ inline std::string EncodeInt(uint64_t x) {
return result;
}
class SeqStringSource : public SequentialFile {
public:
SeqStringSource(const std::string& data, std::atomic<int>* read_count)
: data_(data), offset_(0), read_count_(read_count) {}
~SeqStringSource() override {}
Status Read(size_t n, Slice* result, char* scratch) override {
std::string output;
if (offset_ < data_.size()) {
n = std::min(data_.size() - offset_, n);
memcpy(scratch, data_.data() + offset_, n);
offset_ += n;
*result = Slice(scratch, n);
} else {
return Status::InvalidArgument(
"Attemp to read when it already reached eof.");
}
(*read_count_)++;
return Status::OK();
class SeqStringSource : public FSSequentialFile {
public:
SeqStringSource(const std::string& data, std::atomic<int>* read_count)
: data_(data), offset_(0), read_count_(read_count) {}
~SeqStringSource() override {}
IOStatus Read(size_t n, const IOOptions& /*opts*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
std::string output;
if (offset_ < data_.size()) {
n = std::min(data_.size() - offset_, n);
memcpy(scratch, data_.data() + offset_, n);
offset_ += n;
*result = Slice(scratch, n);
} else {
return IOStatus::InvalidArgument(
"Attempt to read when it already reached eof.");
}
Status Skip(uint64_t n) override {
if (offset_ >= data_.size()) {
return Status::InvalidArgument(
"Attemp to read when it already reached eof.");
}
// TODO(yhchiang): Currently doesn't handle the overflow case.
offset_ += static_cast<size_t>(n);
return Status::OK();
(*read_count_)++;
return IOStatus::OK();
}
IOStatus Skip(uint64_t n) override {
if (offset_ >= data_.size()) {
return IOStatus::InvalidArgument(
"Attempt to read when it already reached eof.");
}
// TODO(yhchiang): Currently doesn't handle the overflow case.
offset_ += static_cast<size_t>(n);
return IOStatus::OK();
}
private:
std::string data_;
size_t offset_;
std::atomic<int>* read_count_;
};
class StringFS : public FileSystemWrapper {
public:
class StringSink : public FSWritableFile {
public:
explicit StringSink(std::string* contents)
: FSWritableFile(), contents_(contents) {}
IOStatus Truncate(uint64_t size, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_->resize(static_cast<size_t>(size));
return IOStatus::OK();
}
IOStatus Close(const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& /*opts*/, IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
using FSWritableFile::Append;
IOStatus Append(const Slice& slice, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
contents_->append(slice.data(), slice.size());
return IOStatus::OK();
}
private:
std::string data_;
size_t offset_;
std::atomic<int>* read_count_;
std::string* contents_;
};
class StringEnv : public EnvWrapper {
public:
class StringSink : public WritableFile {
public:
explicit StringSink(std::string* contents)
: WritableFile(), contents_(contents) {}
virtual Status Truncate(uint64_t size) override {
contents_->resize(static_cast<size_t>(size));
return Status::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override { return Status::OK(); }
virtual Status Sync() override { return Status::OK(); }
virtual Status Append(const Slice& slice) override {
contents_->append(slice.data(), slice.size());
return Status::OK();
}
explicit StringFS(const std::shared_ptr<FileSystem>& t)
: FileSystemWrapper(t) {}
~StringFS() override {}
private:
std::string* contents_;
};
const std::string& GetContent(const std::string& f) { return files_[f]; }
explicit StringEnv(Env* t) : EnvWrapper(t) {}
~StringEnv() override {}
const std::string& GetContent(const std::string& f) { return files_[f]; }
const Status WriteToNewFile(const std::string& file_name,
const IOStatus WriteToNewFile(const std::string& file_name,
const std::string& content) {
std::unique_ptr<WritableFile> r;
auto s = NewWritableFile(file_name, &r, EnvOptions());
if (s.ok()) {
s = r->Append(content);
}
if (s.ok()) {
s = r->Flush();
}
if (s.ok()) {
s = r->Close();
}
assert(!s.ok() || files_[file_name] == content);
return s;
}
std::unique_ptr<FSWritableFile> r;
FileOptions file_opts;
IOOptions io_opts;
// The following text is boilerplate that forwards all methods to target()
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& /*options*/) override {
auto iter = files_.find(f);
if (iter == files_.end()) {
return Status::NotFound("The specified file does not exist", f);
}
r->reset(new SeqStringSource(iter->second, &num_seq_file_read_));
return Status::OK();
auto s = NewWritableFile(file_name, file_opts, &r, nullptr);
if (s.ok()) {
s = r->Append(content, io_opts, nullptr);
}
Status NewRandomAccessFile(const std::string& /*f*/,
std::unique_ptr<RandomAccessFile>* /*r*/,
const EnvOptions& /*options*/) override {
return Status::NotSupported();
if (s.ok()) {
s = r->Flush(io_opts, nullptr);
}
Status NewWritableFile(const std::string& f,
std::unique_ptr<WritableFile>* r,
const EnvOptions& /*options*/) override {
auto iter = files_.find(f);
if (iter != files_.end()) {
return Status::IOError("The specified file already exists", f);
}
r->reset(new StringSink(&files_[f]));
return Status::OK();
}
virtual Status NewDirectory(
const std::string& /*name*/,
std::unique_ptr<Directory>* /*result*/) override {
return Status::NotSupported();
}
Status FileExists(const std::string& f) override {
if (files_.find(f) == files_.end()) {
return Status::NotFound();
}
return Status::OK();
}
Status GetChildren(const std::string& /*dir*/,
std::vector<std::string>* /*r*/) override {
return Status::NotSupported();
}
Status DeleteFile(const std::string& f) override {
files_.erase(f);
return Status::OK();
}
Status CreateDir(const std::string& /*d*/) override {
return Status::NotSupported();
}
Status CreateDirIfMissing(const std::string& /*d*/) override {
return Status::NotSupported();
}
Status DeleteDir(const std::string& /*d*/) override {
return Status::NotSupported();
}
Status GetFileSize(const std::string& f, uint64_t* s) override {
auto iter = files_.find(f);
if (iter == files_.end()) {
return Status::NotFound("The specified file does not exist:", f);
}
*s = iter->second.size();
return Status::OK();
if (s.ok()) {
s = r->Close(io_opts, nullptr);
}
assert(!s.ok() || files_[file_name] == content);
return s;
}
Status GetFileModificationTime(const std::string& /*fname*/,
uint64_t* /*file_mtime*/) override {
return Status::NotSupported();
// The following text is boilerplate that forwards all methods to target()
IOStatus NewSequentialFile(const std::string& f,
const FileOptions& /*options*/,
std::unique_ptr<FSSequentialFile>* r,
IODebugContext* /*dbg*/) override {
auto iter = files_.find(f);
if (iter == files_.end()) {
return IOStatus::NotFound("The specified file does not exist", f);
}
r->reset(new SeqStringSource(iter->second, &num_seq_file_read_));
return IOStatus::OK();
}
Status RenameFile(const std::string& /*s*/,
const std::string& /*t*/) override {
return Status::NotSupported();
IOStatus NewRandomAccessFile(const std::string& /*f*/,
const FileOptions& /*options*/,
std::unique_ptr<FSRandomAccessFile>* /*r*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus NewWritableFile(const std::string& f, const FileOptions& /*options*/,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* /*dbg*/) override {
auto iter = files_.find(f);
if (iter != files_.end()) {
return IOStatus::IOError("The specified file already exists", f);
}
r->reset(new StringSink(&files_[f]));
return IOStatus::OK();
}
IOStatus NewDirectory(const std::string& /*name*/,
const IOOptions& /*options*/,
std::unique_ptr<FSDirectory>* /*result*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
Status LinkFile(const std::string& /*s*/,
const std::string& /*t*/) override {
return Status::NotSupported();
IOStatus FileExists(const std::string& f, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (files_.find(f) == files_.end()) {
return IOStatus::NotFound();
}
return IOStatus::OK();
}
Status LockFile(const std::string& /*f*/, FileLock** /*l*/) override {
return Status::NotSupported();
IOStatus GetChildren(const std::string& /*dir*/, const IOOptions& /*options*/,
std::vector<std::string>* /*r*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
files_.erase(f);
return IOStatus::OK();
}
IOStatus CreateDir(const std::string& /*d*/, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus CreateDirIfMissing(const std::string& /*d*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus DeleteDir(const std::string& /*d*/, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
uint64_t* s, IODebugContext* /*dbg*/) override {
auto iter = files_.find(f);
if (iter == files_.end()) {
return IOStatus::NotFound("The specified file does not exist:", f);
}
*s = iter->second.size();
return IOStatus::OK();
}
Status UnlockFile(FileLock* /*l*/) override {
return Status::NotSupported();
}
IOStatus GetFileModificationTime(const std::string& /*fname*/,
const IOOptions& /*options*/,
uint64_t* /*file_mtime*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
std::atomic<int> num_seq_file_read_;
IOStatus RenameFile(const std::string& /*s*/, const std::string& /*t*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
protected:
std::unordered_map<std::string, std::string> files_;
};
IOStatus LinkFile(const std::string& /*s*/, const std::string& /*t*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus LockFile(const std::string& /*f*/, const IOOptions& /*options*/,
FileLock** /*l*/, IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
IOStatus UnlockFile(FileLock* /*l*/, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported();
}
std::atomic<int> num_seq_file_read_;
protected:
std::unordered_map<std::string, std::string> files_;
};
// Randomly initialize the given DBOptions
void RandomInitDBOptions(DBOptions* db_opt, Random* rnd);

View File

@ -246,19 +246,21 @@ class ReadaheadRandomAccessFileTest
ReadaheadRandomAccessFileTest() : control_contents_() {}
std::string Read(uint64_t offset, size_t n) {
Slice result;
Status s = test_read_holder_->Read(offset, n, &result, scratch_.get());
Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
scratch_.get(), nullptr);
EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
return std::string(result.data(), result.size());
}
void ResetSourceStr(const std::string& str = "") {
auto write_holder =
std::unique_ptr<WritableFileWriter>(test::GetWritableFileWriter(
new test::StringSink(&control_contents_), "" /* don't care */));
std::unique_ptr<FSWritableFile> sink(
new test::StringSink(&control_contents_));
std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
std::move(sink), "" /* don't care */, FileOptions()));
Status s = write_holder->Append(Slice(str));
EXPECT_OK(s);
s = write_holder->Flush();
EXPECT_OK(s);
auto read_holder = std::unique_ptr<RandomAccessFile>(
std::unique_ptr<FSRandomAccessFile> read_holder(
new test::StringSource(control_contents_));
test_read_holder_ =
NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
@ -268,7 +270,7 @@ class ReadaheadRandomAccessFileTest
private:
size_t readahead_size_;
Slice control_contents_;
std::unique_ptr<RandomAccessFile> test_read_holder_;
std::unique_ptr<FSRandomAccessFile> test_read_holder_;
std::unique_ptr<char[]> scratch_;
};
@ -353,10 +355,10 @@ class ReadaheadSequentialFileTest : public testing::Test,
}
void Skip(size_t n) { test_read_holder_->Skip(n); }
void ResetSourceStr(const std::string& str = "") {
auto read_holder = std::unique_ptr<SequentialFile>(
auto read_holder = std::unique_ptr<FSSequentialFile>(
new test::SeqStringSource(str, &seq_read_count_));
test_read_holder_.reset(new SequentialFileReader(
NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_));
test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
"test", readahead_size_));
}
size_t GetReadaheadSize() const { return readahead_size_; }

View File

@ -5,17 +5,19 @@
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_dump_tool.h"
#include <stdio.h>
#include <cinttypes>
#include <iostream>
#include <memory>
#include <string>
#include "env/composite_env_wrapper.h"
#include "file/random_access_file_reader.h"
#include "file/readahead_raf.h"
#include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/string_util.h"
@ -32,18 +34,19 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key,
bool show_summary) {
constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
Status s;
Env* env = Env::Default();
s = env->FileExists(filename);
const auto fs = FileSystem::Default();
IOOptions io_opts;
s = fs->FileExists(filename, io_opts, nullptr);
if (!s.ok()) {
return s;
}
uint64_t file_size = 0;
s = env->GetFileSize(filename, &file_size);
s = fs->GetFileSize(filename, io_opts, &file_size, nullptr);
if (!s.ok()) {
return s;
}
std::unique_ptr<RandomAccessFile> file;
s = env->NewRandomAccessFile(filename, &file, EnvOptions());
std::unique_ptr<FSRandomAccessFile> file;
s = fs->NewRandomAccessFile(filename, FileOptions(), &file, nullptr);
if (!s.ok()) {
return s;
}
@ -51,8 +54,7 @@ Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key,
if (file_size == 0) {
return Status::Corruption("File is empty.");
}
reader_.reset(new RandomAccessFileReader(
NewLegacyRandomAccessFileWrapper(file), filename));
reader_.reset(new RandomAccessFileReader(std::move(file), filename));
uint64_t offset = 0;
uint64_t footer_offset = 0;
CompressionType compression = kNoCompression;