Remove Legacy and Custom FileWrapper classes from header files (#7851)

Summary:
Removed the uses of the Legacy FileWrapper classes from the source code.  The wrappers were creating an additional layer of indirection/wrapping, as the Env already has a FileSystem.

Moved the Custom FileWrapper classes into the CustomEnv, as these classes are really for the private use the the CustomEnv class.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7851

Reviewed By: anand1976

Differential Revision: D26114816

Pulled By: mrambacher

fbshipit-source-id: db32840e58d969d3a0fa6c25aaf13d6dcdc74150
This commit is contained in:
mrambacher 2021-01-28 22:08:46 -08:00 committed by Facebook GitHub Bot
parent 0a9a05ae12
commit 4a09d632c4
40 changed files with 714 additions and 668 deletions

View File

@ -5,6 +5,8 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "db/compaction/compaction_job.h"
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cinttypes> #include <cinttypes>
@ -14,13 +16,13 @@
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/error_handler.h" #include "db/error_handler.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/file_system.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h" #include "table/mock_table.h"
@ -277,12 +279,13 @@ class CompactionJobTestBase : public testing::Test {
new_db.SetLastSequence(0); new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; std::unique_ptr<WritableFileWriter> file_writer;
Status s = env_->NewWritableFile( const auto& fs = env_->GetFileSystem();
manifest, &file, env_->OptimizeForManifestWrite(env_options_)); Status s = WritableFileWriter::Create(
fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
{ {
log::Writer log(std::move(file_writer), 0, false); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;

View File

@ -19,7 +19,6 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/log_format.h" #include "db/log_format.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h" #include "file/filename.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
@ -539,14 +538,15 @@ TEST_F(CorruptionTest, RangeDeletionCorrupted) {
ASSERT_EQ(static_cast<size_t>(1), metadata.size()); ASSERT_EQ(static_cast<size_t>(1), metadata.size());
std::string filename = dbname_ + metadata[0].name; std::string filename = dbname_ + metadata[0].name;
std::unique_ptr<RandomAccessFile> file; FileOptions file_opts;
ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions())); const auto& fs = options_.env->GetFileSystem();
std::unique_ptr<RandomAccessFileReader> file_reader( std::unique_ptr<RandomAccessFileReader> file_reader;
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
filename)); &file_reader, nullptr));
uint64_t file_size; uint64_t file_size;
ASSERT_OK(options_.env->GetFileSize(filename, &file_size)); ASSERT_OK(
fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
BlockHandle range_del_handle; BlockHandle range_del_handle;
ASSERT_OK(FindMetaBlock( ASSERT_OK(FindMetaBlock(

View File

@ -8,10 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "env/composite_env_wrapper.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "utilities/fault_injection_env.h" #include "utilities/fault_injection_env.h"
@ -1147,7 +1147,7 @@ class RecoveryTestHelper {
*count = 0; *count = 0;
std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0); std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
EnvOptions env_options; FileOptions file_options;
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
std::unique_ptr<VersionSet> versions; std::unique_ptr<VersionSet> versions;
@ -1155,22 +1155,22 @@ class RecoveryTestHelper {
WriteController write_controller; WriteController write_controller;
versions.reset(new VersionSet( versions.reset(new VersionSet(
test->dbname_, &db_options, env_options, table_cache.get(), test->dbname_, &db_options, file_options, table_cache.get(),
&write_buffer_manager, &write_controller, &write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
wal_manager.reset( wal_manager.reset(
new WalManager(db_options, env_options, /*io_tracer=*/nullptr)); new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
std::unique_ptr<log::Writer> current_log_writer; std::unique_ptr<log::Writer> current_log_writer;
for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
uint64_t current_log_number = j; uint64_t current_log_number = j;
std::string fname = LogFileName(test->dbname_, current_log_number); std::string fname = LogFileName(test->dbname_, current_log_number);
std::unique_ptr<WritableFile> file; std::unique_ptr<WritableFileWriter> file_writer;
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( fname, file_options, &file_writer,
NewLegacyWritableFileWrapper(std::move(file)), fname, env_options)); nullptr));
current_log_writer.reset( current_log_writer.reset(
new log::Writer(std::move(file_writer), current_log_number, new log::Writer(std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0)); db_options.recycle_log_file_num > 0));

View File

@ -16,6 +16,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/file_system.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -74,12 +75,13 @@ class FlushJobTestBase : public testing::Test {
} }
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; const auto& fs = env_->GetFileSystem();
Status s = env_->NewWritableFile( std::unique_ptr<WritableFileWriter> file_writer;
manifest, &file, env_->OptimizeForManifestWrite(env_options_)); Status s = WritableFileWriter::Create(
fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, EnvOptions()));
{ {
log::Writer log(std::move(file_writer), 0, false); log::Writer log(std::move(file_writer), 0, false);
std::string record; std::string record;

View File

@ -71,7 +71,6 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "options/cf_options.h" #include "options/cf_options.h"
@ -358,14 +357,14 @@ class Repairer {
// Open the log file // Open the log file
std::string logname = LogFileName(db_options_.wal_dir, log); std::string logname = LogFileName(db_options_.wal_dir, log);
std::unique_ptr<SequentialFile> lfile; const auto& fs = env_->GetFileSystem();
Status status = env_->NewSequentialFile( std::unique_ptr<SequentialFileReader> lfile_reader;
logname, &lfile, env_->OptimizeForLogRead(env_options_)); Status status = SequentialFileReader::Create(
fs, logname, fs->OptimizeForLogRead(env_options_), &lfile_reader,
nullptr);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
std::unique_ptr<SequentialFileReader> lfile_reader(new SequentialFileReader(
NewLegacySequentialFileWrapper(lfile), logname));
// Create the log reader. // Create the log reader.
LogReporter reporter; LogReporter reporter;

View File

@ -11,6 +11,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "rocksdb/file_system.h"
#include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_factory.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -783,13 +784,13 @@ class VersionSetTestBase {
} }
*last_seqno = last_seq; *last_seqno = last_seq;
num_initial_edits_ = static_cast<int>(new_cfs.size() + 1); num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
std::unique_ptr<WritableFileWriter> file_writer;
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; const auto& fs = env_->GetFileSystem();
Status s = env_->NewWritableFile( Status s = WritableFileWriter::Create(
manifest, &file, env_->OptimizeForManifestWrite(env_options_)); fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
{ {
log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
std::string record; std::string record;
@ -2312,14 +2313,13 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase,
assert(log_writer != nullptr); assert(log_writer != nullptr);
VersionEdit new_db; VersionEdit new_db;
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
std::unique_ptr<WritableFile> file;
const std::string manifest_path = DescriptorFileName(dbname_, 1); const std::string manifest_path = DescriptorFileName(dbname_, 1);
Status s = env_->NewWritableFile( const auto& fs = env_->GetFileSystem();
manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); std::unique_ptr<WritableFileWriter> file_writer;
Status s = WritableFileWriter::Create(
fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
&file_writer, nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
manifest_path, env_options_));
log_writer->reset(new log::Writer(std::move(file_writer), 0, true)); log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
std::string record; std::string record;
ASSERT_TRUE(new_db.EncodeTo(&record)); ASSERT_TRUE(new_db.EncodeTo(&record));
@ -2387,13 +2387,12 @@ class VersionSetTestEmptyDb
new_db.SetDBId(db_id); new_db.SetDBId(db_id);
} }
const std::string manifest_path = DescriptorFileName(dbname_, 1); const std::string manifest_path = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; const auto& fs = env_->GetFileSystem();
Status s = env_->NewWritableFile( std::unique_ptr<WritableFileWriter> file_writer;
manifest_path, &file, env_->OptimizeForManifestWrite(env_options_)); Status s = WritableFileWriter::Create(
fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
&file_writer, nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
manifest_path, env_options_));
{ {
log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
std::string record; std::string record;
@ -2697,12 +2696,12 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
assert(last_seqno != nullptr); assert(last_seqno != nullptr);
assert(log_writer != nullptr); assert(log_writer != nullptr);
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; const auto& fs = env_->GetFileSystem();
Status s = env_->NewWritableFile( std::unique_ptr<WritableFileWriter> file_writer;
manifest, &file, env_->OptimizeForManifestWrite(env_options_)); Status s = WritableFileWriter::Create(
fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
nullptr);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
log_writer->reset(new log::Writer(std::move(file_writer), 0, false)); log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
VersionEdit new_db; VersionEdit new_db;
if (db_options_.write_dbid_to_manifest) { if (db_options_.write_dbid_to_manifest) {

View File

@ -5,20 +5,21 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "db/wal_manager.h"
#include <map> #include <map>
#include <string> #include <string>
#include "rocksdb/cache.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/wal_manager.h"
#include "env/mock_env.h" #include "env/mock_env.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/cache.h"
#include "rocksdb/file_system.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -81,10 +82,10 @@ class WalManagerTest : public testing::Test {
void RollTheLog(bool /*archived*/) { void RollTheLog(bool /*archived*/) {
current_log_number_++; current_log_number_++;
std::string fname = ArchivedLogFileName(dbname_, current_log_number_); std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
std::unique_ptr<WritableFile> file; const auto& fs = env_->GetFileSystem();
ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( ASSERT_OK(WritableFileWriter::Create(fs, fname, env_options_, &file_writer,
NewLegacyWritableFileWrapper(std::move(file)), fname, env_options_)); nullptr));
current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false)); current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
} }
@ -123,8 +124,9 @@ class WalManagerTest : public testing::Test {
TEST_F(WalManagerTest, ReadFirstRecordCache) { TEST_F(WalManagerTest, ReadFirstRecordCache) {
Init(); Init();
std::string path = dbname_ + "/000001.log"; std::string path = dbname_ + "/000001.log";
std::unique_ptr<WritableFile> file; std::unique_ptr<FSWritableFile> file;
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); ASSERT_OK(env_->GetFileSystem()->NewWritableFile(path, FileOptions(), &file,
nullptr));
SequenceNumber s; SequenceNumber s;
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s)); ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
@ -134,8 +136,8 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s)); wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
ASSERT_EQ(s, 0U); ASSERT_EQ(s, 0U);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( std::unique_ptr<WritableFileWriter> file_writer(
NewLegacyWritableFileWrapper(std::move(file)), path, EnvOptions())); new WritableFileWriter(std::move(file), path, FileOptions()));
log::Writer writer(std::move(file_writer), 1, log::Writer writer(std::move(file_writer), 1,
db_options_.recycle_log_file_num > 0); db_options_.recycle_log_file_num > 0);
WriteBatch batch; WriteBatch batch;

View File

@ -18,7 +18,6 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class CompositeEnv : public Env { class CompositeEnv : public Env {
public: public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related // Initialize a CompositeEnvWrapper that delegates all thread/time related
@ -335,275 +334,4 @@ class CompositeEnvWrapper : public CompositeEnv {
private: private:
Env* env_target_; Env* env_target_;
}; };
class LegacySequentialFileWrapper : public FSSequentialFile {
public:
explicit LegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>&& _target)
: target_(std::move(_target)) {}
IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Read(n, result, scratch));
}
IOStatus Skip(uint64_t n) override {
return status_to_io_status(target_->Skip(n));
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus PositionedRead(uint64_t offset, size_t n,
const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->PositionedRead(offset, n, result, scratch));
}
SequentialFile* target() { return target_.get(); }
private:
std::unique_ptr<SequentialFile> target_;
};
class LegacyRandomAccessFileWrapper : public FSRandomAccessFile {
public:
explicit LegacyRandomAccessFileWrapper(
std::unique_ptr<RandomAccessFile>&& target)
: target_(std::move(target)) {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
std::vector<ReadRequest> reqs;
Status status;
reqs.reserve(num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
ReadRequest req;
req.offset = fs_reqs[i].offset;
req.len = fs_reqs[i].len;
req.scratch = fs_reqs[i].scratch;
req.status = Status::OK();
reqs.emplace_back(req);
}
status = target_->MultiRead(reqs.data(), num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
fs_reqs[i].result = reqs[i].result;
fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status));
}
return status_to_io_status(std::move(status));
;
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Prefetch(offset, n));
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
};
void Hint(AccessPattern pattern) override {
target_->Hint((RandomAccessFile::AccessPattern)pattern);
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
private:
std::unique_ptr<RandomAccessFile> target_;
};
class LegacyWritableFileWrapper : public FSWritableFile {
public:
explicit LegacyWritableFileWrapper(std::unique_ptr<WritableFile>&& _target)
: target_(std::move(_target)) {}
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Append(data));
}
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
const DataVerificationInfo& /*verification_info*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Append(data));
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->PositionedAppend(data, offset));
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& /*options*/,
const DataVerificationInfo& /*verification_info*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->PositionedAppend(data, offset));
}
IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(size));
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
target_->SetWriteLifeTimeHint(hint);
}
Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
return target_->GetWriteLifeTimeHint();
}
uint64_t GetFileSize(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return target_->GetFileSize();
}
void SetPreallocationBlockSize(size_t size) override {
target_->SetPreallocationBlockSize(size);
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
target_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RangeSync(offset, nbytes));
}
void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
target_->PrepareWrite(offset, len);
}
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Allocate(offset, len));
}
WritableFile* target() { return target_.get(); }
private:
std::unique_ptr<WritableFile> target_;
};
class LegacyRandomRWFileWrapper : public FSRandomRWFile {
public:
explicit LegacyRandomRWFileWrapper(std::unique_ptr<RandomRWFile>&& target)
: target_(std::move(target)) {}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus Write(uint64_t offset, const Slice& data,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Write(offset, data));
}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
private:
std::unique_ptr<RandomRWFile> target_;
};
class LegacyDirectoryWrapper : public FSDirectory {
public:
explicit LegacyDirectoryWrapper(std::unique_ptr<Directory>&& target)
: target_(std::move(target)) {}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
private:
std::unique_ptr<Directory> target_;
};
inline std::unique_ptr<FSSequentialFile> NewLegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>& file) {
return std::unique_ptr<FSSequentialFile>(
new LegacySequentialFileWrapper(std::move(file)));
}
inline std::unique_ptr<FSRandomAccessFile> NewLegacyRandomAccessFileWrapper(
std::unique_ptr<RandomAccessFile>& file) {
return std::unique_ptr<FSRandomAccessFile>(
new LegacyRandomAccessFileWrapper(std::move(file)));
}
inline std::unique_ptr<FSWritableFile> NewLegacyWritableFileWrapper(
std::unique_ptr<WritableFile>&& file) {
return std::unique_ptr<FSWritableFile>(
new LegacyWritableFileWrapper(std::move(file)));
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

262
env/env.cc vendored
View File

@ -62,6 +62,256 @@ class LegacySystemClock : public SystemClock {
} }
}; };
class LegacySequentialFileWrapper : public FSSequentialFile {
public:
explicit LegacySequentialFileWrapper(
std::unique_ptr<SequentialFile>&& _target)
: target_(std::move(_target)) {}
IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Read(n, result, scratch));
}
IOStatus Skip(uint64_t n) override {
return status_to_io_status(target_->Skip(n));
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus PositionedRead(uint64_t offset, size_t n,
const IOOptions& /*options*/, Slice* result,
char* scratch, IODebugContext* /*dbg*/) override {
return status_to_io_status(
target_->PositionedRead(offset, n, result, scratch));
}
private:
std::unique_ptr<SequentialFile> target_;
};
class LegacyRandomAccessFileWrapper : public FSRandomAccessFile {
public:
explicit LegacyRandomAccessFileWrapper(
std::unique_ptr<RandomAccessFile>&& target)
: target_(std::move(target)) {}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
std::vector<ReadRequest> reqs;
Status status;
reqs.reserve(num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
ReadRequest req;
req.offset = fs_reqs[i].offset;
req.len = fs_reqs[i].len;
req.scratch = fs_reqs[i].scratch;
req.status = Status::OK();
reqs.emplace_back(req);
}
status = target_->MultiRead(reqs.data(), num_reqs);
for (size_t i = 0; i < num_reqs; ++i) {
fs_reqs[i].result = reqs[i].result;
fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status));
}
return status_to_io_status(std::move(status));
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Prefetch(offset, n));
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
void Hint(AccessPattern pattern) override {
target_->Hint((RandomAccessFile::AccessPattern)pattern);
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
private:
std::unique_ptr<RandomAccessFile> target_;
};
class LegacyRandomRWFileWrapper : public FSRandomRWFile {
public:
explicit LegacyRandomRWFileWrapper(std::unique_ptr<RandomRWFile>&& target)
: target_(std::move(target)) {}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
IOStatus Write(uint64_t offset, const Slice& data,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Write(offset, data));
}
IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
Slice* result, char* scratch,
IODebugContext* /*dbg*/) const override {
return status_to_io_status(target_->Read(offset, n, result, scratch));
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
private:
std::unique_ptr<RandomRWFile> target_;
};
class LegacyWritableFileWrapper : public FSWritableFile {
public:
explicit LegacyWritableFileWrapper(std::unique_ptr<WritableFile>&& _target)
: target_(std::move(_target)) {}
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Append(data));
}
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
const DataVerificationInfo& /*verification_info*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Append(data));
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->PositionedAppend(data, offset));
}
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
const IOOptions& /*options*/,
const DataVerificationInfo& /*verification_info*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->PositionedAppend(data, offset));
}
IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Truncate(size));
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Close());
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Flush());
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Sync());
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
target_->SetWriteLifeTimeHint(hint);
}
Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
return target_->GetWriteLifeTimeHint();
}
uint64_t GetFileSize(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return target_->GetFileSize();
}
void SetPreallocationBlockSize(size_t size) override {
target_->SetPreallocationBlockSize(size);
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
target_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
IOStatus InvalidateCache(size_t offset, size_t length) override {
return status_to_io_status(target_->InvalidateCache(offset, length));
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->RangeSync(offset, nbytes));
}
void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
target_->PrepareWrite(offset, len);
}
IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Allocate(offset, len));
}
private:
std::unique_ptr<WritableFile> target_;
};
class LegacyDirectoryWrapper : public FSDirectory {
public:
explicit LegacyDirectoryWrapper(std::unique_ptr<Directory>&& target)
: target_(std::move(target)) {}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
private:
std::unique_ptr<Directory> target_;
};
class LegacyFileSystemWrapper : public FileSystem { class LegacyFileSystemWrapper : public FileSystem {
public: public:
// Initialize an EnvWrapper that delegates all calls to *t // Initialize an EnvWrapper that delegates all calls to *t
@ -759,18 +1009,18 @@ EnvOptions::EnvOptions() {
Status NewEnvLogger(const std::string& fname, Env* env, Status NewEnvLogger(const std::string& fname, Env* env,
std::shared_ptr<Logger>* result) { std::shared_ptr<Logger>* result) {
EnvOptions options; FileOptions options;
// TODO: Tune the buffer size. // TODO: Tune the buffer size.
options.writable_file_max_buffer_size = 1024 * 1024; options.writable_file_max_buffer_size = 1024 * 1024;
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<FSWritableFile> writable_file;
const auto status = env->NewWritableFile(fname, &writable_file, options); const auto status = env->GetFileSystem()->NewWritableFile(
fname, options, &writable_file, nullptr);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
*result = std::make_shared<EnvLogger>( *result = std::make_shared<EnvLogger>(std::move(writable_file), fname,
NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options, options, env);
env);
return Status::OK(); return Status::OK();
} }

1
env/env_posix.cc vendored
View File

@ -123,6 +123,7 @@ class PosixDynamicLibrary : public DynamicLibrary {
void* handle_; void* handle_;
}; };
#endif // !ROCKSDB_NO_DYNAMIC_EXTENSION #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
class PosixClock : public SystemClock { class PosixClock : public SystemClock {
public: public:
const char* Name() const override { return "PosixClock"; } const char* Name() const override { return "PosixClock"; }

View File

@ -22,6 +22,17 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status RandomAccessFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
std::unique_ptr<FSRandomAccessFile> file;
Status s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
if (s.ok()) {
reader->reset(new RandomAccessFileReader(std::move(file), fname));
}
return s;
}
Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch, size_t n, Slice* result, char* scratch,

View File

@ -103,6 +103,10 @@ class RandomAccessFileReader {
#endif #endif
} }
static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<RandomAccessFileReader>* reader,
IODebugContext* dbg);
RandomAccessFileReader(const RandomAccessFileReader&) = delete; RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;

View File

@ -22,6 +22,18 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status SequentialFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg) {
std::unique_ptr<FSSequentialFile> file;
Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok()) {
reader->reset(new SequentialFileReader(std::move(file), fname));
}
return s;
}
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s; Status s;
if (use_direct_io()) { if (use_direct_io()) {

View File

@ -41,6 +41,10 @@ class SequentialFileReader {
: file_name_(_file_name), : file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer, _file_name) {} io_tracer, _file_name) {}
static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg);
SequentialFileReader(const SequentialFileReader&) = delete; SequentialFileReader(const SequentialFileReader&) = delete;
SequentialFileReader& operator=(const SequentialFileReader&) = delete; SequentialFileReader& operator=(const SequentialFileReader&) = delete;

View File

@ -22,6 +22,19 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg) {
std::unique_ptr<FSWritableFile> file;
Status s = fs->NewWritableFile(fname, file_opts, &file, dbg);
if (s.ok()) {
writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
}
return s;
}
IOStatus WritableFileWriter::Append(const Slice& data) { IOStatus WritableFileWriter::Append(const Slice& data) {
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();

View File

@ -191,6 +191,10 @@ class WritableFileWriter {
} }
} }
static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg);
WritableFileWriter(const WritableFileWriter&) = delete; WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete; WritableFileWriter& operator=(const WritableFileWriter&) = delete;

View File

@ -58,16 +58,15 @@ TableReader* NewTableReader(const std::string& sst_file_path,
// This code block is similar to SstFileReader::Open. // This code block is similar to SstFileReader::Open.
uint64_t file_size = 0; uint64_t file_size = 0;
std::unique_ptr<RandomAccessFile> file;
std::unique_ptr<RandomAccessFileReader> file_reader; std::unique_ptr<RandomAccessFileReader> file_reader;
std::unique_ptr<TableReader> table_reader; std::unique_ptr<TableReader> table_reader;
Status s = options.env->GetFileSize(sst_file_path, &file_size); const auto& fs = options.env->GetFileSystem();
FileOptions fopts(env_options);
Status s = options.env->GetFileSize(sst_file_path, fopts.io_options,
&file_size, nullptr);
if (s.ok()) { if (s.ok()) {
s = options.env->NewRandomAccessFile(sst_file_path, &file, env_options); s = RandomAccessFileReader::Create(fs, sst_file_path, fopts, &file_reader,
} nullptr);
if (s.ok()) {
file_reader.reset(new RandomAccessFileReader(
NewLegacyRandomAccessFileWrapper(file), sst_file_path));
} }
if (s.ok()) { if (s.ok()) {
TableReaderOptions t_opt(cf_ioptions, /*prefix_extractor=*/nullptr, TableReaderOptions t_opt(cf_ioptions, /*prefix_extractor=*/nullptr,

View File

@ -1360,6 +1360,7 @@ Status WinEnv::GetThreadList(std::vector<ThreadStatus>* thread_list) {
Status WinEnv::GetHostName(char* name, uint64_t len) { Status WinEnv::GetHostName(char* name, uint64_t len) {
return winenv_io_.GetHostName(name, len); return winenv_io_.GetHostName(name, len);
} }
void WinEnv::Schedule(void (*function)(void*), void* arg, Env::Priority pri, void WinEnv::Schedule(void (*function)(void*), void* arg, Env::Priority pri,
void* tag, void (*unschedFunction)(void* arg)) { void* tag, void (*unschedFunction)(void* arg)) {
return winenv_threads_.Schedule(function, arg, pri, tag, unschedFunction); return winenv_threads_.Schedule(function, arg, pri, tag, unschedFunction);

View File

@ -24,7 +24,6 @@ namespace ROCKSDB_NAMESPACE {
class SystemClock; class SystemClock;
namespace port { namespace port {
class WinLogger : public ROCKSDB_NAMESPACE::Logger { class WinLogger : public ROCKSDB_NAMESPACE::Logger {
public: public:
WinLogger(uint64_t (*gettid)(), const std::shared_ptr<SystemClock>& clock, WinLogger(uint64_t (*gettid)(), const std::shared_ptr<SystemClock>& clock,

View File

@ -6,11 +6,11 @@
#include "table/block_fetcher.h" #include "table/block_fetcher.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/file_system.h"
#include "table/block_based/binary_search_index_reader.h" #include "table/block_based/binary_search_index_reader.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"
#include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_factory.h"
@ -248,11 +248,9 @@ class BlockFetcherTest : public testing::Test {
void NewFileWriter(const std::string& filename, void NewFileWriter(const std::string& filename,
std::unique_ptr<WritableFileWriter>* writer) { std::unique_ptr<WritableFileWriter>* writer) {
std::string path = Path(filename); std::string path = Path(filename);
EnvOptions env_options; FileOptions file_options;
std::unique_ptr<WritableFile> file; ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), path,
ASSERT_OK(env_->NewWritableFile(path, &file, env_options)); file_options, writer, nullptr));
writer->reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), path, env_options));
} }
void NewFileReader(const std::string& filename, const FileOptions& opt, void NewFileReader(const std::string& filename, const FileOptions& opt,

View File

@ -5,14 +5,16 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <vector> #include "table/cuckoo/cuckoo_table_builder.h"
#include <string>
#include <map> #include <map>
#include <string>
#include <utility> #include <utility>
#include <vector>
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "table/cuckoo/cuckoo_table_builder.h" #include "rocksdb/file_system.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -35,7 +37,7 @@ class CuckooBuilderTest : public testing::Test {
env_ = Env::Default(); env_ = Env::Default();
Options options; Options options;
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
env_options_ = EnvOptions(options); file_options_ = FileOptions(options);
} }
void CheckFileContents(const std::vector<std::string>& keys, void CheckFileContents(const std::vector<std::string>& keys,
@ -54,10 +56,11 @@ class CuckooBuilderTest : public testing::Test {
} }
} }
// Read file // Read file
std::unique_ptr<RandomAccessFile> read_file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &read_file, env_options_));
uint64_t read_file_size; uint64_t read_file_size;
ASSERT_OK(env_->GetFileSize(fname, &read_file_size)); ASSERT_OK(env_->GetFileSize(fname, &read_file_size));
std::unique_ptr<RandomAccessFileReader> file_reader;
ASSERT_OK(RandomAccessFileReader::Create(
env_->GetFileSystem(), fname, file_options_, &file_reader, nullptr));
Options options; Options options;
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
@ -65,9 +68,6 @@ class CuckooBuilderTest : public testing::Test {
// Assert Table Properties. // Assert Table Properties.
TableProperties* props = nullptr; TableProperties* props = nullptr;
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file),
fname));
ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size, ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size,
kCuckooTableMagicNumber, ioptions, kCuckooTableMagicNumber, ioptions,
&props, true /* compression_type_missing */)); &props, true /* compression_type_missing */));
@ -158,7 +158,7 @@ class CuckooBuilderTest : public testing::Test {
Env* env_; Env* env_;
EnvOptions env_options_; FileOptions file_options_;
std::string fname; std::string fname;
const double kHashTableRatio = 0.9; const double kHashTableRatio = 0.9;
}; };
@ -166,10 +166,9 @@ class CuckooBuilderTest : public testing::Test {
TEST_F(CuckooBuilderTest, SuccessWithEmptyFile) { TEST_F(CuckooBuilderTest, SuccessWithEmptyFile) {
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFile> writable_file;
fname = test::PerThreadDBPath("EmptyFile"); fname = test::PerThreadDBPath("EmptyFile");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
NewLegacyWritableFileWrapper(std::move(writable_file)), fname, file_options_, &file_writer, nullptr));
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, 4, 100, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, 4, 100,
BytewiseComparator(), 1, false, false, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -207,12 +206,10 @@ TEST_F(CuckooBuilderTest, WriteSuccessNoCollisionFullKey) {
} }
uint64_t expected_table_size = GetExpectedTableSize(keys.size()); uint64_t expected_table_size = GetExpectedTableSize(keys.size());
std::unique_ptr<WritableFile> writable_file;
fname = test::PerThreadDBPath("NoCollisionFullKey"); fname = test::PerThreadDBPath("NoCollisionFullKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
NewLegacyWritableFileWrapper(std::move(writable_file)), fname, file_options_, &file_writer, nullptr));
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -257,12 +254,10 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionFullKey) {
} }
uint64_t expected_table_size = GetExpectedTableSize(keys.size()); uint64_t expected_table_size = GetExpectedTableSize(keys.size());
std::unique_ptr<WritableFile> writable_file;
fname = test::PerThreadDBPath("WithCollisionFullKey"); fname = test::PerThreadDBPath("WithCollisionFullKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
NewLegacyWritableFileWrapper(std::move(writable_file)), fname, file_options_, &file_writer, nullptr));
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -306,13 +301,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionAndCuckooBlock) {
} }
uint64_t expected_table_size = GetExpectedTableSize(keys.size()); uint64_t expected_table_size = GetExpectedTableSize(keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
uint32_t cuckoo_block_size = 2; uint32_t cuckoo_block_size = 2;
fname = test::PerThreadDBPath("WithCollisionFullKey2"); fname = test::PerThreadDBPath("WithCollisionFullKey2");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder( CuckooTableBuilder builder(
file_writer.get(), kHashTableRatio, num_hash_fun, 100, file_writer.get(), kHashTableRatio, num_hash_fun, 100,
BytewiseComparator(), cuckoo_block_size, false, false, GetSliceHash, BytewiseComparator(), cuckoo_block_size, false, false, GetSliceHash,
@ -361,12 +354,10 @@ TEST_F(CuckooBuilderTest, WithCollisionPathFullKey) {
} }
uint64_t expected_table_size = GetExpectedTableSize(keys.size()); uint64_t expected_table_size = GetExpectedTableSize(keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("WithCollisionPathFullKey"); fname = test::PerThreadDBPath("WithCollisionPathFullKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -412,12 +403,10 @@ TEST_F(CuckooBuilderTest, WithCollisionPathFullKeyAndCuckooBlock) {
} }
uint64_t expected_table_size = GetExpectedTableSize(keys.size()); uint64_t expected_table_size = GetExpectedTableSize(keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("WithCollisionPathFullKeyAndCuckooBlock"); fname = test::PerThreadDBPath("WithCollisionPathFullKeyAndCuckooBlock");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 2, false, false, 100, BytewiseComparator(), 2, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -456,12 +445,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessNoCollisionUserKey) {
std::vector<uint64_t> expected_locations = {0, 1, 2, 3}; std::vector<uint64_t> expected_locations = {0, 1, 2, 3};
uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); uint64_t expected_table_size = GetExpectedTableSize(user_keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("NoCollisionUserKey"); fname = test::PerThreadDBPath("NoCollisionUserKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -501,12 +489,11 @@ TEST_F(CuckooBuilderTest, WriteSuccessWithCollisionUserKey) {
std::vector<uint64_t> expected_locations = {0, 1, 2, 3}; std::vector<uint64_t> expected_locations = {0, 1, 2, 3};
uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); uint64_t expected_table_size = GetExpectedTableSize(user_keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("WithCollisionUserKey"); fname = test::PerThreadDBPath("WithCollisionUserKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -548,12 +535,11 @@ TEST_F(CuckooBuilderTest, WithCollisionPathUserKey) {
std::vector<uint64_t> expected_locations = {0, 1, 3, 4, 2}; std::vector<uint64_t> expected_locations = {0, 1, 3, 4, 2};
uint64_t expected_table_size = GetExpectedTableSize(user_keys.size()); uint64_t expected_table_size = GetExpectedTableSize(user_keys.size());
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("WithCollisionPathUserKey"); fname = test::PerThreadDBPath("WithCollisionPathUserKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
2, BytewiseComparator(), 1, false, false, 2, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -594,12 +580,10 @@ TEST_F(CuckooBuilderTest, FailWhenCollisionPathTooLong) {
}; };
hash_map = std::move(hm); hash_map = std::move(hm);
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("WithCollisionPathUserKey"); fname = test::PerThreadDBPath("WithCollisionPathUserKey");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
2, BytewiseComparator(), 1, false, false, 2, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,
@ -623,12 +607,10 @@ TEST_F(CuckooBuilderTest, FailWhenSameKeyInserted) {
uint32_t num_hash_fun = 4; uint32_t num_hash_fun = 4;
std::string user_key = "repeatedkey"; std::string user_key = "repeatedkey";
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
fname = test::PerThreadDBPath("FailWhenSameKeyInserted"); fname = test::PerThreadDBPath("FailWhenSameKeyInserted");
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options_, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
EnvOptions()));
CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun, CuckooTableBuilder builder(file_writer.get(), kHashTableRatio, num_hash_fun,
100, BytewiseComparator(), 1, false, false, 100, BytewiseComparator(), 1, false, false,
GetSliceHash, 0 /* column_family_id */, GetSliceHash, 0 /* column_family_id */,

View File

@ -68,7 +68,7 @@ class CuckooReaderTest : public testing::Test {
CuckooReaderTest() { CuckooReaderTest() {
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
env = options.env; env = options.env;
env_options = EnvOptions(options); file_options = FileOptions(options);
} }
void SetUp(int num) { void SetUp(int num) {
@ -88,12 +88,9 @@ class CuckooReaderTest : public testing::Test {
void CreateCuckooFileAndCheckReader( void CreateCuckooFileAndCheckReader(
const Comparator* ucomp = BytewiseComparator()) { const Comparator* ucomp = BytewiseComparator()) {
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); ASSERT_OK(WritableFileWriter::Create(env->GetFileSystem(), fname,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( file_options, &file_writer, nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
env_options));
CuckooTableBuilder builder( CuckooTableBuilder builder(
file_writer.get(), 0.9, kNumHashFunc, 100, ucomp, 2, false, false, file_writer.get(), 0.9, kNumHashFunc, 100, ucomp, 2, false, false,
GetSliceHash, 0 /* column_family_id */, kDefaultColumnFamilyName); GetSliceHash, 0 /* column_family_id */, kDefaultColumnFamilyName);
@ -109,11 +106,9 @@ class CuckooReaderTest : public testing::Test {
ASSERT_OK(file_writer->Close()); ASSERT_OK(file_writer->Close());
// Check reader now. // Check reader now.
std::unique_ptr<RandomAccessFile> read_file; std::unique_ptr<RandomAccessFileReader> file_reader;
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); ASSERT_OK(RandomAccessFileReader::Create(
std::unique_ptr<RandomAccessFileReader> file_reader( env->GetFileSystem(), fname, file_options, &file_reader, nullptr));
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file),
fname));
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp, CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp,
GetSliceHash); GetSliceHash);
@ -139,11 +134,9 @@ class CuckooReaderTest : public testing::Test {
} }
void CheckIterator(const Comparator* ucomp = BytewiseComparator()) { void CheckIterator(const Comparator* ucomp = BytewiseComparator()) {
std::unique_ptr<RandomAccessFile> read_file; std::unique_ptr<RandomAccessFileReader> file_reader;
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); ASSERT_OK(RandomAccessFileReader::Create(
std::unique_ptr<RandomAccessFileReader> file_reader( env->GetFileSystem(), fname, file_options, &file_reader, nullptr));
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file),
fname));
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp, CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucomp,
GetSliceHash); GetSliceHash);
@ -211,7 +204,7 @@ class CuckooReaderTest : public testing::Test {
uint64_t file_size; uint64_t file_size;
Options options; Options options;
Env* env; Env* env;
EnvOptions env_options; FileOptions file_options;
}; };
TEST_F(CuckooReaderTest, FileNotMmaped) { TEST_F(CuckooReaderTest, FileNotMmaped) {
@ -330,11 +323,11 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
} }
auto* ucmp = BytewiseComparator(); auto* ucmp = BytewiseComparator();
CreateCuckooFileAndCheckReader(); CreateCuckooFileAndCheckReader();
std::unique_ptr<RandomAccessFile> read_file;
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); std::unique_ptr<RandomAccessFileReader> file_reader;
std::unique_ptr<RandomAccessFileReader> file_reader( ASSERT_OK(RandomAccessFileReader::Create(
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), env->GetFileSystem(), fname, file_options, &file_reader, nullptr));
fname));
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucmp, CuckooTableReader reader(ioptions, std::move(file_reader), file_size, ucmp,
GetSliceHash); GetSliceHash);
@ -415,15 +408,13 @@ void WriteFile(const std::vector<std::string>& keys,
const uint64_t num, double hash_ratio) { const uint64_t num, double hash_ratio) {
Options options; Options options;
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
Env* env = options.env; const auto& fs = options.env->GetFileSystem();
EnvOptions env_options = EnvOptions(options); FileOptions file_options(options);
std::string fname = GetFileName(num); std::string fname = GetFileName(num);
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFileWriter> file_writer;
ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); ASSERT_OK(WritableFileWriter::Create(fs, fname, file_options, &file_writer,
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( nullptr));
NewLegacyWritableFileWrapper(std::move(writable_file)), fname,
env_options));
CuckooTableBuilder builder( CuckooTableBuilder builder(
file_writer.get(), hash_ratio, 64, 1000, test::Uint64Comparator(), 5, file_writer.get(), hash_ratio, 64, 1000, test::Uint64Comparator(), 5,
false, FLAGS_identity_as_first_hash, nullptr, 0 /* column_family_id */, false, FLAGS_identity_as_first_hash, nullptr, 0 /* column_family_id */,
@ -440,12 +431,11 @@ void WriteFile(const std::vector<std::string>& keys,
ASSERT_OK(file_writer->Close()); ASSERT_OK(file_writer->Close());
uint64_t file_size; uint64_t file_size;
env->GetFileSize(fname, &file_size); ASSERT_OK(
std::unique_ptr<RandomAccessFile> read_file; fs->GetFileSize(fname, file_options.io_options, &file_size, nullptr));
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); std::unique_ptr<RandomAccessFileReader> file_reader;
std::unique_ptr<RandomAccessFileReader> file_reader( ASSERT_OK(RandomAccessFileReader::Create(fs, fname, file_options,
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), &file_reader, nullptr));
fname));
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
CuckooTableReader reader(ioptions, std::move(file_reader), file_size, CuckooTableReader reader(ioptions, std::move(file_reader), file_size,
@ -469,16 +459,16 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
Options options; Options options;
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
Env* env = options.env; Env* env = options.env;
EnvOptions env_options = EnvOptions(options); const auto& fs = options.env->GetFileSystem();
FileOptions file_options(options);
std::string fname = GetFileName(num); std::string fname = GetFileName(num);
uint64_t file_size; uint64_t file_size;
env->GetFileSize(fname, &file_size); ASSERT_OK(
std::unique_ptr<RandomAccessFile> read_file; fs->GetFileSize(fname, file_options.io_options, &file_size, nullptr));
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options)); std::unique_ptr<RandomAccessFileReader> file_reader;
std::unique_ptr<RandomAccessFileReader> file_reader( ASSERT_OK(RandomAccessFileReader::Create(fs, fname, file_options,
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(read_file), &file_reader, nullptr));
fname));
const ImmutableCFOptions ioptions(options); const ImmutableCFOptions ioptions(options);
CuckooTableReader reader(ioptions, std::move(file_reader), file_size, CuckooTableReader reader(ioptions, std::move(file_reader), file_size,

View File

@ -265,17 +265,14 @@ TableBuilder* MockTableFactory::NewTableBuilder(
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
KVVector file_contents) { KVVector file_contents) {
std::unique_ptr<WritableFile> file; std::unique_ptr<WritableFileWriter> file_writer;
auto s = env->NewWritableFile(fname, &file, EnvOptions()); auto s = WritableFileWriter::Create(env->GetFileSystem(), fname,
FileOptions(), &file_writer, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
WritableFileWriter file_writer(NewLegacyWritableFileWrapper(std::move(file)),
fname, EnvOptions());
uint32_t id; uint32_t id;
s = GetAndWriteNextID(&file_writer, &id); s = GetAndWriteNextID(file_writer.get(), &id);
if (s.ok()) { if (s.ok()) {
file_system_.files.insert({id, std::move(file_contents)}); file_system_.files.insert({id, std::move(file_contents)});
} }

View File

@ -18,7 +18,6 @@
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -80,11 +79,13 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
// read table magic number // read table magic number
Footer footer; Footer footer;
std::unique_ptr<RandomAccessFile> file; const auto& fs = options_.env->GetFileSystem();
std::unique_ptr<FSRandomAccessFile> file;
uint64_t file_size = 0; uint64_t file_size = 0;
Status s = options_.env->NewRandomAccessFile(file_path, &file, soptions_); Status s = fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file,
nullptr);
if (s.ok()) { if (s.ok()) {
s = options_.env->GetFileSize(file_path, &file_size); s = fs->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
} }
// check empty file // check empty file
@ -93,8 +94,7 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
return Status::Aborted(file_path, "Empty file"); return Status::Aborted(file_path, "Empty file");
} }
file_.reset(new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), file_.reset(new RandomAccessFileReader(std::move(file), file_path));
file_path));
FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */, FilePrefetchBuffer prefetch_buffer(nullptr, 0, 0, true /* enable */,
false /* track_min_offset */); false /* track_min_offset */);
@ -119,9 +119,10 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
if (magic_number == kPlainTableMagicNumber || if (magic_number == kPlainTableMagicNumber ||
magic_number == kLegacyPlainTableMagicNumber) { magic_number == kLegacyPlainTableMagicNumber) {
soptions_.use_mmap_reads = true; soptions_.use_mmap_reads = true;
options_.env->NewRandomAccessFile(file_path, &file, soptions_);
file_.reset(new RandomAccessFileReader( fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file,
NewLegacyRandomAccessFileWrapper(file), file_path)); nullptr);
file_.reset(new RandomAccessFileReader(std::move(file), file_path));
} }
options_.comparator = &internal_comparator_; options_.comparator = &internal_comparator_;
// For old sst format, ReadTableProperties might fail but file can be read // For old sst format, ReadTableProperties might fail but file can be read
@ -192,16 +193,14 @@ Status SstFileDumper::DumpTable(const std::string& out_filename) {
Status SstFileDumper::CalculateCompressedTableSize( Status SstFileDumper::CalculateCompressedTableSize(
const TableBuilderOptions& tb_options, size_t block_size, const TableBuilderOptions& tb_options, size_t block_size,
uint64_t* num_data_blocks, uint64_t* compressed_table_size) { uint64_t* num_data_blocks, uint64_t* compressed_table_size) {
std::unique_ptr<WritableFile> out_file;
std::unique_ptr<Env> env(NewMemEnv(options_.env)); std::unique_ptr<Env> env(NewMemEnv(options_.env));
Status s = env->NewWritableFile(testFileName, &out_file, soptions_); std::unique_ptr<WritableFileWriter> dest_writer;
Status s =
WritableFileWriter::Create(env->GetFileSystem(), testFileName,
FileOptions(soptions_), &dest_writer, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<WritableFileWriter> dest_writer;
dest_writer.reset(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)),
testFileName, soptions_));
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.block_size = block_size; table_options.block_size = block_size;
BlockBasedTableFactory block_based_tf(table_options); BlockBasedTableFactory block_based_tf(table_options);

View File

@ -10,9 +10,10 @@
#include "db/arena_wrapped_db_iter.h" #include "db/arena_wrapped_db_iter.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "env/composite_env_wrapper.h"
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "table/get_context.h" #include "table/get_context.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/table_reader.h" #include "table/table_reader.h"
@ -42,15 +43,17 @@ Status SstFileReader::Open(const std::string& file_path) {
auto r = rep_.get(); auto r = rep_.get();
Status s; Status s;
uint64_t file_size = 0; uint64_t file_size = 0;
std::unique_ptr<RandomAccessFile> file; std::unique_ptr<FSRandomAccessFile> file;
std::unique_ptr<RandomAccessFileReader> file_reader; std::unique_ptr<RandomAccessFileReader> file_reader;
s = r->options.env->GetFileSize(file_path, &file_size); FileOptions fopts(r->soptions);
const auto& fs = r->options.env->GetFileSystem();
s = fs->GetFileSize(file_path, fopts.io_options, &file_size, nullptr);
if (s.ok()) { if (s.ok()) {
s = r->options.env->NewRandomAccessFile(file_path, &file, r->soptions); s = fs->NewRandomAccessFile(file_path, fopts, &file, nullptr);
} }
if (s.ok()) { if (s.ok()) {
file_reader.reset(new RandomAccessFileReader( file_reader.reset(new RandomAccessFileReader(std::move(file), file_path));
NewLegacyRandomAccessFileWrapper(file), file_path));
} }
if (s.ok()) { if (s.ok()) {
TableReaderOptions t_opt(r->ioptions, r->moptions.prefix_extractor.get(), TableReaderOptions t_opt(r->ioptions, r->moptions.prefix_extractor.get(),

View File

@ -8,8 +8,8 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "env/composite_env_wrapper.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/file_system.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"
#include "table/sst_file_writer_collectors.h" #include "table/sst_file_writer_collectors.h"
@ -182,8 +182,9 @@ SstFileWriter::~SstFileWriter() {
Status SstFileWriter::Open(const std::string& file_path) { Status SstFileWriter::Open(const std::string& file_path) {
Rep* r = rep_.get(); Rep* r = rep_.get();
Status s; Status s;
std::unique_ptr<WritableFile> sst_file; std::unique_ptr<FSWritableFile> sst_file;
s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); s = r->ioptions.env->GetFileSystem()->NewWritableFile(
file_path, r->env_options, &sst_file, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -255,10 +256,11 @@ Status SstFileWriter::Open(const std::string& file_path) {
r->column_family_name, unknown_level, 0 /* creation_time */, r->column_family_name, unknown_level, 0 /* creation_time */,
0 /* oldest_key_time */, 0 /* target_file_size */, 0 /* oldest_key_time */, 0 /* target_file_size */,
0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id); 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id);
r->file_writer.reset(new WritableFileWriter( r->file_writer.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(sst_file)), file_path, std::move(sst_file), file_path, r->env_options,
r->env_options, r->ioptions.env->GetSystemClock(), r->ioptions.env->GetSystemClock(), nullptr /* io_tracer */,
nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners, nullptr /* stats */, r->ioptions.listeners,
r->ioptions.file_checksum_gen_factory)); r->ioptions.file_checksum_gen_factory));
// TODO(tec) : If table_factory is using compressed block cache, we will // TODO(tec) : If table_factory is using compressed block cache, we will

View File

@ -13,10 +13,10 @@ int main() {
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "env/composite_env_wrapper.h"
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/file_system.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
@ -92,14 +92,13 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
const MutableCFOptions moptions(cfo); const MutableCFOptions moptions(cfo);
std::unique_ptr<WritableFileWriter> file_writer; std::unique_ptr<WritableFileWriter> file_writer;
if (!through_db) { if (!through_db) {
std::unique_ptr<WritableFile> file; ASSERT_OK(WritableFileWriter::Create(env->GetFileSystem(), file_name,
env->NewWritableFile(file_name, &file, env_options); FileOptions(env_options), &file_writer,
nullptr));
std::vector<std::unique_ptr<IntTblPropCollectorFactory> > std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
int_tbl_prop_collector_factories; int_tbl_prop_collector_factories;
file_writer.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), file_name, env_options));
int unknown_level = -1; int unknown_level = -1;
tb = opts.table_factory->NewTableBuilder( tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions( TableBuilderOptions(
@ -133,17 +132,19 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::unique_ptr<TableReader> table_reader; std::unique_ptr<TableReader> table_reader;
if (!through_db) { if (!through_db) {
std::unique_ptr<RandomAccessFile> raf; const auto& fs = env->GetFileSystem();
s = env->NewRandomAccessFile(file_name, &raf, env_options); FileOptions fopts(env_options);
std::unique_ptr<FSRandomAccessFile> raf;
s = fs->NewRandomAccessFile(file_name, fopts, &raf, nullptr);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Create File Error: %s\n", s.ToString().c_str()); fprintf(stderr, "Create File Error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
uint64_t file_size; uint64_t file_size;
env->GetFileSize(file_name, &file_size); fs->GetFileSize(file_name, fopts.io_options, &file_size, nullptr);
std::unique_ptr<RandomAccessFileReader> file_reader( std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(raf), new RandomAccessFileReader(std::move(raf), file_name));
file_name));
s = opts.table_factory->NewTableReader( s = opts.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), TableReaderOptions(ioptions, moptions.prefix_extractor.get(),
env_options, ikc), env_options, ikc),

View File

@ -22,7 +22,6 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h" #include "file/filename.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
@ -2290,19 +2289,11 @@ class InMemoryHandler : public WriteBatch::Handler {
void DumpWalFile(Options options, std::string wal_file, bool print_header, void DumpWalFile(Options options, std::string wal_file, bool print_header,
bool print_values, bool is_write_committed, bool print_values, bool is_write_committed,
LDBCommandExecuteResult* exec_state) { LDBCommandExecuteResult* exec_state) {
Env* env = options.env; const auto& fs = options.env->GetFileSystem();
EnvOptions soptions(options); FileOptions soptions(options);
std::unique_ptr<SequentialFileReader> wal_file_reader; std::unique_ptr<SequentialFileReader> wal_file_reader;
Status status = SequentialFileReader::Create(fs, wal_file, soptions,
Status status; &wal_file_reader, nullptr);
{
std::unique_ptr<SequentialFile> file;
status = env->NewSequentialFile(wal_file, &file, soptions);
if (status.ok()) {
wal_file_reader.reset(new SequentialFileReader(
NewLegacySequentialFileWrapper(file), wal_file));
}
}
if (!status.ok()) { if (!status.ok()) {
if (exec_state) { if (exec_state) {
*exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " + *exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " +

View File

@ -94,21 +94,20 @@ class SSTDumpToolTest : public testing::Test {
void createSST(const Options& opts, const std::string& file_name) { void createSST(const Options& opts, const std::string& file_name) {
Env* test_env = opts.env; Env* test_env = opts.env;
EnvOptions env_options(opts); FileOptions file_options(opts);
ReadOptions read_options; ReadOptions read_options;
const ImmutableCFOptions imoptions(opts); const ImmutableCFOptions imoptions(opts);
const MutableCFOptions moptions(opts); const MutableCFOptions moptions(opts);
ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator); ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator);
std::unique_ptr<TableBuilder> tb; std::unique_ptr<TableBuilder> tb;
std::unique_ptr<WritableFile> file;
ASSERT_OK(test_env->NewWritableFile(file_name, &file, env_options));
std::vector<std::unique_ptr<IntTblPropCollectorFactory> > std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
int_tbl_prop_collector_factories; int_tbl_prop_collector_factories;
std::unique_ptr<WritableFileWriter> file_writer( std::unique_ptr<WritableFileWriter> file_writer;
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)), ASSERT_OK(WritableFileWriter::Create(test_env->GetFileSystem(), file_name,
file_name, EnvOptions())); file_options, &file_writer, nullptr));
std::string column_family_name; std::string column_family_name;
int unknown_level = -1; int unknown_level = -1;
tb.reset(opts.table_factory->NewTableBuilder( tb.reset(opts.table_factory->NewTableBuilder(

View File

@ -120,9 +120,12 @@ class TraceAnalyzerTest : public testing::Test {
void CheckFileContent(const std::vector<std::string>& cnt, void CheckFileContent(const std::vector<std::string>& cnt,
std::string file_path, bool full_content) { std::string file_path, bool full_content) {
ASSERT_OK(env_->FileExists(file_path)); const auto& fs = env_->GetFileSystem();
std::unique_ptr<SequentialFile> f_ptr; FileOptions fopts(env_options_);
ASSERT_OK(env_->NewSequentialFile(file_path, &f_ptr, env_options_));
ASSERT_OK(fs->FileExists(file_path, fopts.io_options, nullptr));
std::unique_ptr<FSSequentialFile> file;
ASSERT_OK(fs->NewSequentialFile(file_path, fopts, &file, nullptr));
std::string get_line; std::string get_line;
std::istringstream iss; std::istringstream iss;
@ -130,8 +133,6 @@ class TraceAnalyzerTest : public testing::Test {
std::vector<std::string> result; std::vector<std::string> result;
uint32_t count; uint32_t count;
Status s; Status s;
std::unique_ptr<FSSequentialFile> file =
NewLegacySequentialFileWrapper(f_ptr);
SequentialFileReader sf_reader(std::move(file), file_path, SequentialFileReader sf_reader(std::move(file), file_path,
4096 /* filereadahead_size */); 4096 /* filereadahead_size */);

View File

@ -1047,18 +1047,17 @@ Status TraceAnalyzer::ReProcessing() {
std::vector<std::string> prefix(kTaTypeNum); std::vector<std::string> prefix(kTaTypeNum);
std::istringstream iss; std::istringstream iss;
bool has_data = true; bool has_data = true;
std::unique_ptr<SequentialFile> wkey_input_f; std::unique_ptr<FSSequentialFile> file;
s = env_->NewSequentialFile(whole_key_path, &wkey_input_f, env_options_); s = env_->GetFileSystem()->NewSequentialFile(
whole_key_path, FileOptions(env_options_), &file, nullptr);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot open the whole key space file of CF: %u\n", fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
cf_id); cf_id);
wkey_input_f.reset(); file.reset();
} }
if (wkey_input_f) { if (file) {
std::unique_ptr<FSSequentialFile> file;
file = NewLegacySequentialFileWrapper(wkey_input_f);
size_t kTraceFileReadaheadSize = 2 * 1024 * 1024; size_t kTraceFileReadaheadSize = 2 * 1024 * 1024;
SequentialFileReader sf_reader( SequentialFileReader sf_reader(
std::move(file), whole_key_path, std::move(file), whole_key_path,

View File

@ -5,11 +5,12 @@
// //
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include "env/composite_env_wrapper.h"
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "file/readahead_raf.h" #include "file/readahead_raf.h"
#include "file/sequence_file_reader.h" #include "file/sequence_file_reader.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/file_system.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/random.h" #include "util/random.h"
@ -21,42 +22,64 @@ class WritableFileWriterTest : public testing::Test {};
const uint32_t kMb = 1 << 20; const uint32_t kMb = 1 << 20;
TEST_F(WritableFileWriterTest, RangeSync) { TEST_F(WritableFileWriterTest, RangeSync) {
class FakeWF : public WritableFile { class FakeWF : public FSWritableFile {
public: public:
explicit FakeWF() : size_(0), last_synced_(0) {} explicit FakeWF() : size_(0), last_synced_(0) {}
~FakeWF() override {} ~FakeWF() override {}
Status Append(const Slice& data) override { using FSWritableFile::Append;
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
size_ += data.size(); size_ += data.size();
return Status::OK(); return IOStatus::OK();
} }
Status Truncate(uint64_t /*size*/) override { return Status::OK(); } IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
Status Close() override { IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
EXPECT_GE(size_, last_synced_ + kMb); EXPECT_GE(size_, last_synced_ + kMb);
EXPECT_LT(size_, last_synced_ + 2 * kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb);
// Make sure random writes generated enough writes. // Make sure random writes generated enough writes.
EXPECT_GT(size_, 10 * kMb); EXPECT_GT(size_, 10 * kMb);
return Status::OK(); return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
} }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
Status Fsync() override { return Status::OK(); }
void SetIOPriority(Env::IOPriority /*pri*/) override {} void SetIOPriority(Env::IOPriority /*pri*/) override {}
uint64_t GetFileSize() override { return size_; } uint64_t GetFileSize(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return size_;
}
void GetPreallocationStatus(size_t* /*block_size*/, void GetPreallocationStatus(size_t* /*block_size*/,
size_t* /*last_allocated_block*/) override {} size_t* /*last_allocated_block*/) override {}
size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
return 0; return 0;
} }
Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override { IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return Status::OK(); return IOStatus::OK();
} }
protected: protected:
Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override { IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
return Status::OK(); const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
} }
Status RangeSync(uint64_t offset, uint64_t nbytes) override { IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
EXPECT_EQ(offset % 4096, 0u); EXPECT_EQ(offset % 4096, 0u);
EXPECT_EQ(nbytes % 4096, 0u); EXPECT_EQ(nbytes % 4096, 0u);
@ -66,7 +89,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
if (size_ > 2 * kMb) { if (size_ > 2 * kMb) {
EXPECT_LT(size_, last_synced_ + 2 * kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb);
} }
return Status::OK(); return IOStatus::OK();
} }
uint64_t size_; uint64_t size_;
@ -77,8 +100,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
env_options.bytes_per_sync = kMb; env_options.bytes_per_sync = kMb;
std::unique_ptr<FakeWF> wf(new FakeWF); std::unique_ptr<FakeWF> wf(new FakeWF);
std::unique_ptr<WritableFileWriter> writer( std::unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
"" /* don't care */, env_options));
Random r(301); Random r(301);
Status s; Status s;
std::unique_ptr<char[]> large_buf(new char[10 * kMb]); std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
@ -99,7 +121,7 @@ TEST_F(WritableFileWriterTest, RangeSync) {
} }
TEST_F(WritableFileWriterTest, IncrementalBuffer) { TEST_F(WritableFileWriterTest, IncrementalBuffer) {
class FakeWF : public WritableFile { class FakeWF : public FSWritableFile {
public: public:
explicit FakeWF(std::string* _file_data, bool _use_direct_io, explicit FakeWF(std::string* _file_data, bool _use_direct_io,
bool _no_flush) bool _no_flush)
@ -108,37 +130,58 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
no_flush_(_no_flush) {} no_flush_(_no_flush) {}
~FakeWF() override {} ~FakeWF() override {}
Status Append(const Slice& data) override { using FSWritableFile::Append;
IOStatus Append(const Slice& data, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
file_data_->append(data.data(), data.size()); file_data_->append(data.data(), data.size());
size_ += data.size(); size_ += data.size();
return Status::OK(); return IOStatus::OK();
} }
Status PositionedAppend(const Slice& data, uint64_t pos) override { using FSWritableFile::PositionedAppend;
IOStatus PositionedAppend(const Slice& data, uint64_t pos,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
EXPECT_TRUE(pos % 512 == 0); EXPECT_TRUE(pos % 512 == 0);
EXPECT_TRUE(data.size() % 512 == 0); EXPECT_TRUE(data.size() % 512 == 0);
file_data_->resize(pos); file_data_->resize(pos);
file_data_->append(data.data(), data.size()); file_data_->append(data.data(), data.size());
size_ += data.size(); size_ += data.size();
return Status::OK(); return IOStatus::OK();
} }
Status Truncate(uint64_t size) override { IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
file_data_->resize(size); file_data_->resize(size);
return Status::OK(); return IOStatus::OK();
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Fsync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
} }
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
Status Fsync() override { return Status::OK(); }
void SetIOPriority(Env::IOPriority /*pri*/) override {} void SetIOPriority(Env::IOPriority /*pri*/) override {}
uint64_t GetFileSize() override { return size_; } uint64_t GetFileSize(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return size_;
}
void GetPreallocationStatus(size_t* /*block_size*/, void GetPreallocationStatus(size_t* /*block_size*/,
size_t* /*last_allocated_block*/) override {} size_t* /*last_allocated_block*/) override {}
size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
return 0; return 0;
} }
Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override { IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return Status::OK(); return IOStatus::OK();
} }
bool use_direct_io() const override { return use_direct_io_; } bool use_direct_io() const override { return use_direct_io_; }
@ -163,9 +206,8 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
false, false,
#endif #endif
no_flush)); no_flush));
std::unique_ptr<WritableFileWriter> writer( std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), std::move(wf), "" /* don't care */, env_options));
"" /* don't care */, env_options));
std::string target; std::string target;
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
@ -188,26 +230,41 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest, AppendStatusReturn) { TEST_F(WritableFileWriterTest, AppendStatusReturn) {
class FakeWF : public WritableFile { class FakeWF : public FSWritableFile {
public: public:
explicit FakeWF() : use_direct_io_(false), io_error_(false) {} explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
bool use_direct_io() const override { return use_direct_io_; } bool use_direct_io() const override { return use_direct_io_; }
Status Append(const Slice& /*data*/) override {
using FSWritableFile::Append;
IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (io_error_) { if (io_error_) {
return Status::IOError("Fake IO error"); return IOStatus::IOError("Fake IO error");
} }
return Status::OK(); return IOStatus::OK();
} }
Status PositionedAppend(const Slice& /*data*/, uint64_t) override { using FSWritableFile::PositionedAppend;
IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
if (io_error_) { if (io_error_) {
return Status::IOError("Fake IO error"); return IOStatus::IOError("Fake IO error");
} }
return Status::OK(); return IOStatus::OK();
}
IOStatus Close(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Flush(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
IOStatus Sync(const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
} }
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
void Setuse_direct_io(bool val) { use_direct_io_ = val; } void Setuse_direct_io(bool val) { use_direct_io_ = val; }
void SetIOError(bool val) { io_error_ = val; } void SetIOError(bool val) { io_error_ = val; }
@ -218,15 +275,13 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
std::unique_ptr<FakeWF> wf(new FakeWF()); std::unique_ptr<FakeWF> wf(new FakeWF());
wf->Setuse_direct_io(true); wf->Setuse_direct_io(true);
std::unique_ptr<WritableFileWriter> writer( std::unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)), new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
"" /* don't care */, EnvOptions()));
ASSERT_OK(writer->Append(std::string(2 * kMb, 'a'))); ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
// Next call to WritableFile::Append() should fail // Next call to WritableFile::Append() should fail
LegacyWritableFileWrapper* file = FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
static_cast<LegacyWritableFileWrapper*>(writer->writable_file()); fwf->SetIOError(true);
static_cast<FakeWF*>(file->target())->SetIOError(true);
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b'))); ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
} }
#endif #endif

View File

@ -1505,10 +1505,10 @@ Status BackupEngineImpl::CopyOrCreateFile(
uint64_t size_limit, std::function<void()> progress_callback) { uint64_t size_limit, std::function<void()> progress_callback) {
assert(src.empty() != contents.empty()); assert(src.empty() != contents.empty());
Status s; Status s;
std::unique_ptr<WritableFile> dst_file; std::unique_ptr<FSWritableFile> dst_file;
std::unique_ptr<SequentialFile> src_file; std::unique_ptr<FSSequentialFile> src_file;
EnvOptions dst_env_options; FileOptions dst_file_options;
dst_env_options.use_mmap_writes = false; dst_file_options.use_mmap_writes = false;
// TODO:(gzh) maybe use direct reads/writes here if possible // TODO:(gzh) maybe use direct reads/writes here if possible
if (size != nullptr) { if (size != nullptr) {
*size = 0; *size = 0;
@ -1520,21 +1520,22 @@ Status BackupEngineImpl::CopyOrCreateFile(
size_limit = std::numeric_limits<uint64_t>::max(); size_limit = std::numeric_limits<uint64_t>::max();
} }
s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options); s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
&dst_file, nullptr);
if (s.ok() && !src.empty()) { if (s.ok() && !src.empty()) {
s = src_env->NewSequentialFile(src, &src_file, src_env_options); s = src_env->GetFileSystem()->NewSequentialFile(
src, FileOptions(src_env_options), &src_file, nullptr);
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter( std::unique_ptr<WritableFileWriter> dest_writer(
NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options)); new WritableFileWriter(std::move(dst_file), dst, dst_file_options));
std::unique_ptr<SequentialFileReader> src_reader; std::unique_ptr<SequentialFileReader> src_reader;
std::unique_ptr<char[]> buf; std::unique_ptr<char[]> buf;
if (!src.empty()) { if (!src.empty()) {
src_reader.reset(new SequentialFileReader( src_reader.reset(new SequentialFileReader(std::move(src_file), src));
NewLegacySequentialFileWrapper(src_file), src));
buf.reset(new char[copy_file_buffer_size_]); buf.reset(new char[copy_file_buffer_size_]);
} }
@ -1825,14 +1826,14 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
size_limit = std::numeric_limits<uint64_t>::max(); size_limit = std::numeric_limits<uint64_t>::max();
} }
std::unique_ptr<SequentialFile> src_file; std::unique_ptr<SequentialFileReader> src_reader;
Status s = src_env->NewSequentialFile(src, &src_file, src_env_options); Status s = SequentialFileReader::Create(src_env->GetFileSystem(), src,
FileOptions(src_env_options),
&src_reader, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<SequentialFileReader> src_reader(
new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]); std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data; Slice data;
@ -2142,15 +2143,12 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::unordered_map<std::string, uint64_t>& abs_path_to_size) { const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
assert(Empty()); assert(Empty());
Status s; Status s;
std::unique_ptr<SequentialFile> backup_meta_file; std::unique_ptr<SequentialFileReader> backup_meta_reader;
s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions()); s = SequentialFileReader::Create(env_->GetFileSystem(), meta_filename_,
FileOptions(), &backup_meta_reader, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<SequentialFileReader> backup_meta_reader(
new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
meta_filename_));
std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]); std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
Slice data; Slice data;
s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get()); s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());

View File

@ -15,7 +15,6 @@
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
@ -81,7 +80,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
bdb_options_(blob_db_options), bdb_options_(blob_db_options),
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
env_options_(db_options), file_options_(db_options),
statistics_(db_options_.statistics.get()), statistics_(db_options_.statistics.get()),
next_file_number_(1), next_file_number_(1),
flush_sequence_(0), flush_sequence_(0),
@ -96,7 +95,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
blob_dir_ = (bdb_options_.path_relative) blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir ? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir; : bdb_options_.blob_dir;
env_options_.bytes_per_sync = blob_db_options.bytes_per_sync; file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
} }
BlobDBImpl::~BlobDBImpl() { BlobDBImpl::~BlobDBImpl() {
@ -346,7 +345,8 @@ Status BlobDBImpl::OpenAllBlobFiles() {
blob_file->MarkImmutable(/* sequence */ 0); blob_file->MarkImmutable(/* sequence */ 0);
// Read file header and footer // Read file header and footer
Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); Status read_metadata_status =
blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
if (read_metadata_status.IsCorruption()) { if (read_metadata_status.IsCorruption()) {
// Remove incomplete file. // Remove incomplete file.
if (!obsolete_files_.empty()) { if (!obsolete_files_.empty()) {
@ -679,7 +679,7 @@ Status BlobDBImpl::GetBlobFileReader(
std::shared_ptr<RandomAccessFileReader>* reader) { std::shared_ptr<RandomAccessFileReader>* reader) {
assert(reader != nullptr); assert(reader != nullptr);
bool fresh_open = false; bool fresh_open = false;
Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open); Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
if (s.ok() && fresh_open) { if (s.ok() && fresh_open) {
assert(*reader != nullptr); assert(*reader != nullptr);
open_file_count_++; open_file_count_++;
@ -720,21 +720,23 @@ void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) { Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
std::string fpath(bfile->PathName()); std::string fpath(bfile->PathName());
std::unique_ptr<WritableFile> wfile; std::unique_ptr<FSWritableFile> wfile;
const auto& fs = env_->GetFileSystem();
Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_); Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open blob file for write: %s status: '%s'" "Failed to open blob file for write: %s status: '%s'"
" exists: '%s'", " exists: '%s'",
fpath.c_str(), s.ToString().c_str(), fpath.c_str(), s.ToString().c_str(),
env_->FileExists(fpath).ToString().c_str()); fs->FileExists(fpath, file_options_.io_options, nullptr)
.ToString()
.c_str());
return s; return s;
} }
std::unique_ptr<WritableFileWriter> fwriter; std::unique_ptr<WritableFileWriter> fwriter;
fwriter.reset(new WritableFileWriter( fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_));
NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_));
uint64_t boffset = bfile->GetFileSize(); uint64_t boffset = bfile->GetFileSize();
if (debug_level_ >= 2 && boffset) { if (debug_level_ >= 2 && boffset) {

View File

@ -24,6 +24,7 @@
#include "db/db_iter.h" #include "db/db_iter.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/file_system.h"
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
@ -407,7 +408,7 @@ class BlobDBImpl : public BlobDB {
BlobDBOptions bdb_options_; BlobDBOptions bdb_options_;
DBOptions db_options_; DBOptions db_options_;
ColumnFamilyOptions cf_options_; ColumnFamilyOptions cf_options_;
EnvOptions env_options_; FileOptions file_options_;
// Raw pointer of statistic. db_options_ has a std::shared_ptr to hold // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
// ownership. // ownership.

View File

@ -15,7 +15,6 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h" #include "file/filename.h"
#include "file/readahead_raf.h" #include "file/readahead_raf.h"
#include "logging/logging.h" #include "logging/logging.h"
@ -151,7 +150,7 @@ void BlobFile::CloseRandomAccessLocked() {
last_access_ = -1; last_access_ = -1;
} }
Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, Status BlobFile::GetReader(Env* env, const FileOptions& file_options,
std::shared_ptr<RandomAccessFileReader>* reader, std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open) { bool* fresh_open) {
assert(reader != nullptr); assert(reader != nullptr);
@ -178,8 +177,9 @@ Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
return s; return s;
} }
std::unique_ptr<RandomAccessFile> rfile; std::unique_ptr<FSRandomAccessFile> rfile;
s = env->NewRandomAccessFile(PathName(), &rfile, env_options); s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options,
&rfile, nullptr);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file for random-read: %s status: '%s'" "Failed to open blob file for random-read: %s status: '%s'"
@ -189,18 +189,20 @@ Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
return s; return s;
} }
ra_file_reader_ = std::make_shared<RandomAccessFileReader>( ra_file_reader_ =
NewLegacyRandomAccessFileWrapper(rfile), PathName()); std::make_shared<RandomAccessFileReader>(std::move(rfile), PathName());
*reader = ra_file_reader_; *reader = ra_file_reader_;
*fresh_open = true; *fresh_open = true;
return s; return s;
} }
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_options) {
assert(Immutable()); assert(Immutable());
// Get file size. // Get file size.
uint64_t file_size = 0; uint64_t file_size = 0;
Status s = env->GetFileSize(PathName(), &file_size); Status s =
fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr);
if (s.ok()) { if (s.ok()) {
file_size_ = file_size; file_size_ = file_size;
} else { } else {
@ -219,17 +221,15 @@ Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
} }
// Create file reader. // Create file reader.
std::unique_ptr<RandomAccessFile> file; std::unique_ptr<RandomAccessFileReader> file_reader;
s = env->NewRandomAccessFile(PathName(), &file, env_options); s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader,
nullptr);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file %" PRIu64 ", status: %s", "Failed to open blob file %" PRIu64 ", status: %s",
file_number_, s.ToString().c_str()); file_number_, s.ToString().c_str());
return s; return s;
} }
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
PathName()));
// Read file header. // Read file header.
std::string header_buf; std::string header_buf;

View File

@ -15,6 +15,7 @@
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -208,9 +209,10 @@ class BlobFile {
// Read blob file header and footer. Return corruption if file header is // Read blob file header and footer. Return corruption if file header is
// malform or incomplete. If footer is malform or incomplete, set // malform or incomplete. If footer is malform or incomplete, set
// footer_valid_ to false and return Status::OK. // footer_valid_ to false and return Status::OK.
Status ReadMetadata(Env* env, const EnvOptions& env_options); Status ReadMetadata(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_options);
Status GetReader(Env* env, const EnvOptions& env_options, Status GetReader(Env* env, const FileOptions& file_options,
std::shared_ptr<RandomAccessFileReader>* reader, std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open); bool* fresh_open);

View File

@ -33,15 +33,15 @@ Status NewWritableCacheFile(Env* const env, const std::string& filepath,
return s; return s;
} }
Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs,
std::unique_ptr<RandomAccessFile>* file, const std::string& filepath,
std::unique_ptr<FSRandomAccessFile>* file,
const bool use_direct_reads = true) { const bool use_direct_reads = true) {
assert(env); assert(fs.get());
EnvOptions opt; FileOptions opt;
opt.use_direct_reads = use_direct_reads; opt.use_direct_reads = use_direct_reads;
Status s = env->NewRandomAccessFile(filepath, file, opt); return fs->NewRandomAccessFile(filepath, opt, file, nullptr);
return s;
} }
// //
@ -210,17 +210,18 @@ bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
rwlock_.AssertHeld(); rwlock_.AssertHeld();
ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
assert(env_);
std::unique_ptr<RandomAccessFile> file; std::unique_ptr<FSRandomAccessFile> file;
Status status = Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file,
NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads); enable_direct_reads);
if (!status.ok()) { if (!status.ok()) {
Error(log_, "Error opening random access file %s. %s", Path().c_str(), Error(log_, "Error opening random access file %s. %s", Path().c_str(),
status.ToString().c_str()); status.ToString().c_str());
return false; return false;
} }
freader_.reset(new RandomAccessFileReader( freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
NewLegacyRandomAccessFileWrapper(file), Path(), env_->GetSystemClock())); env_->GetSystemClock()));
return true; return true;
} }

View File

@ -4,12 +4,14 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include <atomic> #include <atomic>
#include "env/composite_env_wrapper.h"
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -35,8 +37,7 @@ class CacheActivityLogger {
assert(env != nullptr); assert(env != nullptr);
Status status; Status status;
EnvOptions env_opts; FileOptions file_opts;
std::unique_ptr<WritableFile> log_file;
MutexLock l(&mutex_); MutexLock l(&mutex_);
@ -44,13 +45,11 @@ class CacheActivityLogger {
StopLoggingInternal(); StopLoggingInternal();
// Open log file // Open log file
status = env->NewWritableFile(activity_log_file, &log_file, env_opts); status = WritableFileWriter::Create(env->GetFileSystem(), activity_log_file,
file_opts, &file_writer_, nullptr);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
file_writer_.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(log_file)), activity_log_file,
env_opts));
max_logging_size_ = max_logging_size; max_logging_size_ = max_logging_size;
activity_logging_enabled_.store(true); activity_logging_enabled_.store(true);

View File

@ -92,15 +92,13 @@ uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
Status NewFileTraceReader(Env* env, const EnvOptions& env_options, Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename, const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader) { std::unique_ptr<TraceReader>* trace_reader) {
std::unique_ptr<RandomAccessFile> trace_file; std::unique_ptr<RandomAccessFileReader> file_reader;
Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options); Status s = RandomAccessFileReader::Create(
env->GetFileSystem(), trace_filename, FileOptions(env_options),
&file_reader, nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<RandomAccessFileReader> file_reader;
file_reader.reset(new RandomAccessFileReader(
NewLegacyRandomAccessFileWrapper(trace_file), trace_filename));
trace_reader->reset(new FileTraceReader(std::move(file_reader))); trace_reader->reset(new FileTraceReader(std::move(file_reader)));
return s; return s;
} }
@ -108,16 +106,13 @@ Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
const std::string& trace_filename, const std::string& trace_filename,
std::unique_ptr<TraceWriter>* trace_writer) { std::unique_ptr<TraceWriter>* trace_writer) {
std::unique_ptr<WritableFile> trace_file; std::unique_ptr<WritableFileWriter> file_writer;
Status s = env->NewWritableFile(trace_filename, &trace_file, env_options); Status s = WritableFileWriter::Create(env->GetFileSystem(), trace_filename,
FileOptions(env_options), &file_writer,
nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::unique_ptr<WritableFileWriter> file_writer;
file_writer.reset(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(trace_file)), trace_filename,
env_options));
trace_writer->reset(new FileTraceWriter(std::move(file_writer))); trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
return s; return s;
} }