Basic RocksDB follower implementation (#12540)

Summary:
A basic implementation of RocksDB follower mode, which opens a remote database (referred to as leader) on a distributed file system by tailing its MANIFEST. It leverages the secondary instance mode, but is different in some key ways -
1. It has its own directory with links to the leader's database
2. Periodically refreshes itself
3. (Future) Snapshot support
4. (Future) Garbage collection of obsolete links
5. (Long term) Memtable replication

There are two main classes implementing this functionality - `DBImplFollower` and `OnDemandFileSystem`. The former is derived from `DBImplSecondary`. Similar to `DBImplSecondary`, it implements recovery and catch up through MANIFEST tailing using the `ReactiveVersionSet`, but does not consider logs. In a future PR, we will implement memtable replication, which will eliminate the need to catch up using logs. In addition, the recovery and catch-up tries to avoid directory listing as repeated metadata operations are expensive.

The second main piece is the `OnDemandFileSystem`, which plugs in as an `Env` for the follower instance and creates the illusion of the follower directory as a clone of the leader directory. It creates links to SSTs on first reference. When the follower tails the MANIFEST and attempts to create a new `Version`, it calls `VerifyFileMetadata` to verify the size of the file, and optionally the unique ID of the file. During this process, links are created which prevent the underlying files from getting deallocated even if the leader deletes the files.

TODOs: Deletion of obsolete links, snapshots, robust checking against misconfigurations, better observability etc.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12540

Reviewed By: jowlyzhang

Differential Revision: D56315718

Pulled By: anand1976

fbshipit-source-id: d19e1aca43a6af4000cb8622a718031b69ebd97b
This commit is contained in:
anand76 2024-04-19 19:13:31 -07:00 committed by Facebook GitHub Bot
parent f0864d3eec
commit d8fb849b7e
17 changed files with 1009 additions and 6 deletions

View File

@ -1037,8 +1037,10 @@ endif()
else()
list(APPEND SOURCES
db/db_impl/db_impl_follower.cc
port/port_posix.cc
env/env_posix.cc
env/fs_on_demand.cc
env/fs_posix.cc
env/io_posix.cc)
endif()
@ -1363,6 +1365,7 @@ if(WITH_TESTS)
db/file_indexer_test.cc
db/filename_test.cc
db/flush_job_test.cc
db/db_follower_test.cc
db/import_column_family_test.cc
db/listener_test.cc
db/log_test.cc

View File

@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $(
db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

View File

@ -60,6 +60,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_follower.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
@ -117,6 +118,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"env/env_posix.cc",
"env/file_system.cc",
"env/file_system_tracer.cc",
"env/fs_on_demand.cc",
"env/fs_posix.cc",
"env/fs_remap.cc",
"env/io_posix.cc",
@ -4795,6 +4797,12 @@ cpp_unittest_wrapper(name="db_flush_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_follower_test",
srcs=["db/db_follower_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_inplace_update_test",
srcs=["db/db_inplace_update_test.cc"],
deps=[":rocksdb_test_lib"],

63
db/db_follower_test.cc Normal file
View File

@ -0,0 +1,63 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
#ifdef OS_LINUX
class DBFollowerTest : public DBTestBase {
public:
// Create directories for leader and follower
// Create the leader DB object
DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) {
follower_name_ = dbname_ + "/follower";
Close();
Destroy(CurrentOptions());
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
dbname_ = dbname_ + "/leader";
Reopen(CurrentOptions());
}
~DBFollowerTest() {
follower_.reset();
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
}
protected:
Status OpenAsFollower() {
return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_,
&follower_);
}
DB* follower() { return follower_.get(); }
private:
std::string follower_name_;
std::unique_ptr<DB> follower_;
};
TEST_F(DBFollowerTest, Basic) {
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(OpenAsFollower());
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
}
#endif
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1535,7 +1535,7 @@ class DBImpl : public DB {
Status WriteRecoverableState();
// Actual implementation of Close()
Status CloseImpl();
virtual Status CloseImpl();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to

View File

@ -0,0 +1,309 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_impl/db_impl_follower.h"
#include <cinttypes>
#include "db/arena_wrapped_db_iter.h"
#include "db/merge_context.h"
#include "env/composite_env_wrapper.h"
#include "env/fs_on_demand.h"
#include "logging/auto_roll_logger.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "rocksdb/db.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {
DBImplFollower::DBImplFollower(const DBOptions& db_options,
std::unique_ptr<Env>&& env,
const std::string& dbname, std::string src_path)
: DBImplSecondary(db_options, dbname, ""),
env_guard_(std::move(env)),
src_path_(std::move(src_path)),
cv_(&mu_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in follower mode");
LogFlush(immutable_db_options_.info_log);
}
DBImplFollower::~DBImplFollower() {
Status s = Close();
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s",
s.ToString().c_str());
}
}
// Recover a follower DB instance by reading the MANIFEST. The verification
// as part of the MANIFEST replay will ensure that local links to the
// leader's files are created, thus ensuring we can continue reading them
// even if the leader deletes those files due to compaction.
// TODO:
// 1. Devise a mechanism to prevent misconfiguration by, for example,
// keeping a local copy of the IDENTITY file and cross checking
// 2. Make the recovery more robust by retrying if the first attempt
// fails.
Status DBImplFollower::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
mutex_.AssertHeld();
JobContext job_context(0);
Status s;
s = static_cast<ReactiveVersionSet*>(versions_.get())
->Recover(column_families, &manifest_reader_, &manifest_reporter_,
&manifest_reader_status_);
if (!s.ok()) {
if (manifest_reader_status_) {
manifest_reader_status_->PermitUncheckedError();
}
return s;
}
if (immutable_db_options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
if (s.ok()) {
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
// Start the periodic catch-up thread
// TODO: See if it makes sense to have a threadpool, rather than a thread
// per follower DB instance
catch_up_thread_.reset(
new port::Thread(&DBImplFollower::PeriodicRefresh, this));
}
return s;
}
// Try to catch up by tailing the MANIFEST.
// TODO:
// 1. Cleanup obsolete files afterward
// 2. Add some error notifications and statistics
Status DBImplFollower::TryCatchUpWithLeader() {
assert(versions_.get() != nullptr);
assert(manifest_reader_.get() != nullptr);
Status s;
// read the manifest and apply new changes to the follower instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
{
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
if (cfd->mem()->GetEarliestSequenceNumber() <
versions_->LastSequence()) {
// Construct a new memtable with earliest sequence number set to the
// last sequence number in the VersionSet. This matters when
// DBImpl::MultiCFSnapshot tries to get consistent references
// to super versions in a lock free manner, it checks the earliest
// sequence number to detect if there was a change in version in
// the meantime.
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
MemTable* new_mem = cfd->ConstructNewMemtable(
mutable_cf_options, versions_->LastSequence());
cfd->mem()->SetNextLogNumber(cfd->GetLogNumber());
cfd->mem()->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free);
new_mem->Ref();
cfd->SetMemtable(new_mem);
}
// This will check if the old memtable is still referenced
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
}
}
job_context.Clean();
return s;
}
void DBImplFollower::PeriodicRefresh() {
while (!stop_requested_.load()) {
MutexLock l(&mu_);
int64_t wait_until =
immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_refresh_catchup_period_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
if (stop_requested_.load()) {
break;
}
Status s;
for (uint64_t i = 0;
i < immutable_db_options_.follower_catchup_retry_count &&
!stop_requested_.load();
++i) {
s = TryCatchUpWithLeader();
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Successful catch up on attempt %llu",
static_cast<unsigned long long>(i));
break;
}
wait_until = immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_catchup_retry_wait_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful");
}
}
}
Status DBImplFollower::Close() {
if (catch_up_thread_) {
stop_requested_.store(true);
{
MutexLock l(&mu_);
cv_.SignalAll();
}
catch_up_thread_->join();
catch_up_thread_.reset();
}
return DBImpl::Close();
}
Status DB::OpenAsFollower(const Options& options, const std::string& dbname,
const std::string& leader_path,
std::unique_ptr<DB>* dbptr) {
dbptr->reset();
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::OpenAsFollower(db_options, dbname, leader_path,
column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
delete handles[0];
}
return s;
}
Status DB::OpenAsFollower(
const DBOptions& db_options, const std::string& dbname,
const std::string& src_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr) {
dbptr->reset();
FileSystem* fs = db_options.env->GetFileSystem().get();
{
IOStatus io_s;
if (db_options.create_if_missing) {
io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr);
} else {
io_s = fs->FileExists(dbname, IOOptions(), nullptr);
}
if (!io_s.ok()) {
return static_cast<Status>(io_s);
}
}
std::unique_ptr<Env> new_env(new CompositeEnvWrapper(
db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(),
src_path, dbname)));
DBOptions tmp_opts(db_options);
Status s;
tmp_opts.env = new_env.get();
if (nullptr == tmp_opts.info_log) {
s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log);
if (!s.ok()) {
tmp_opts.info_log = nullptr;
return s;
}
}
handles->clear();
DBImplFollower* impl =
new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path);
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->file_options_,
impl->table_cache_.get(), impl->write_buffer_manager_,
&impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock();
s = impl->Recover(column_families, /*read_only=*/true,
/*error_if_wal_file_exists=*/false,
/*error_if_data_exists_in_wals=*/false);
if (s.ok()) {
for (const auto& cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (nullptr == cfd) {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
SuperVersionContext sv_context(false /* create_superversion */);
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
}
}
impl->mutex_.Unlock();
sv_context.Clean();
if (s.ok()) {
dbptr->reset(impl);
for (auto h : *handles) {
impl->NewThreadStatusCfInfo(
static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
}
} else {
for (auto h : *handles) {
delete h;
}
handles->clear();
delete impl;
}
return s;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,53 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "db/db_impl/db_impl_secondary.h"
#include "logging/logging.h"
#include "port/port.h"
namespace ROCKSDB_NAMESPACE {
class DBImplFollower : public DBImplSecondary {
public:
DBImplFollower(const DBOptions& db_options, std::unique_ptr<Env>&& env,
const std::string& dbname, std::string src_path);
~DBImplFollower();
Status Close() override;
protected:
bool OwnTablesAndLogs() const override {
// TODO: Change this to true once we've properly implemented file
// deletion for the read scaling case
return false;
}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/,
bool /*is_retry*/ = false, uint64_t* = nullptr,
RecoveryContext* /*recovery_ctx*/ = nullptr,
bool* /*can_retry*/ = nullptr) override;
private:
friend class DB;
Status TryCatchUpWithLeader();
void PeriodicRefresh();
std::unique_ptr<Env> env_guard_;
std::unique_ptr<port::Thread> catch_up_thread_;
std::atomic<bool> stop_requested_;
std::string src_path_;
port::Mutex mu_;
port::CondVar cv_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -277,6 +277,10 @@ class DBImplSecondary : public DBImpl {
return false;
}
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
private:
friend class DB;
@ -305,10 +309,6 @@ class DBImplSecondary : public DBImpl {
const CompactionServiceInput& input,
CompactionServiceResult* result);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
// Cache log readers for each log number, used for continue WAL replay
// after recovery
std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;

330
env/fs_on_demand.cc vendored Normal file
View File

@ -0,0 +1,330 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "env/fs_on_demand.h"
#include <set>
#include "file/filename.h"
#include "port/port.h"
#include "rocksdb/types.h"
namespace ROCKSDB_NAMESPACE {
// Check if the input path is under orig (typically the local directory), and if
// so, change it to the equivalent path under replace (typically the remote
// directory). For example, if orig is "/data/follower", replace is
// "/data/leader", and the given path is "/data/follower/000010.sst", on return
// the path would be changed to
// "/data/leader/000010.sst".
// Return value is true if the path was modified, false otherwise
bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig,
const std::string& replace,
std::string& path) {
size_t pos = path.find(orig);
if (pos > 0) {
return false;
}
path.replace(pos, orig.length(), replace);
return true;
}
bool OnDemandFileSystem::LookupFileType(const std::string& name,
FileType* type) {
std::size_t found = name.find_last_of('/');
std::string file_name = name.substr(found);
uint64_t number = 0;
return ParseFileName(file_name, &number, type);
}
// RocksDB opens non-SST files for reading in sequential file mode. This
// includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them
// in place in the source directory. For files that are appendable or
// can be renamed, which is MANIFEST and CURRENT files, we wrap the
// underlying FSSequentialFile with another class that checks when EOF
// has been reached and re-opens the file to see the latest data. On some
// distributed file systems, this is necessary.
IOStatus OnDemandFileSystem::NewSequentialFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
FileType type;
static std::unordered_set<FileType> valid_types(
{kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile});
if (!LookupFileType(fname, &type) ||
(valid_types.find(type) == valid_types.end())) {
return IOStatus::NotSupported();
}
IOStatus s;
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
std::unique_ptr<FSSequentialFile> inner_file;
s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg);
if (s.ok() && type == kDescriptorFile) {
result->reset(new OnDemandSequentialFile(std::move(inner_file), this,
file_opts, rname));
} else {
*result = std::move(inner_file);
}
} else {
s = target()->NewSequentialFile(fname, file_opts, result, dbg);
}
return s;
}
// This is only supported for SST files. If the file is present locally,
// i.e in the destination dir, we just open it and return. If its in the
// remote, i.e source dir, we link it locally and open the link.
// TODO: Add support for blob files belonging to the new BlobDB
IOStatus OnDemandFileSystem::NewRandomAccessFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
FileType type;
if (!LookupFileType(fname, &type) || type != kTableFile) {
return IOStatus::NotSupported();
}
IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr);
if (s.IsNotFound() || s.IsPathNotFound()) {
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
s = target()->LinkFile(rname, fname, IOOptions(), nullptr);
if (!s.ok()) {
return s;
}
}
}
return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg)
: s;
}
// We don't expect to create any writable file other than info LOG files.
IOStatus OnDemandFileSystem::NewWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
FileType type;
if (!LookupFileType(fname, &type) || type != kInfoLogFile) {
return IOStatus::NotSupported();
}
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg);
if (s.ok()) {
return IOStatus::InvalidArgument(
"Writing to a file present in the remote directory not supoprted");
}
}
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
// Currently not supported, as there's no need for RocksDB to create a
// directory object for a DB in read-only mode.
IOStatus OnDemandFileSystem::NewDirectory(
const std::string& /*name*/, const IOOptions& /*io_opts*/,
std::unique_ptr<FSDirectory>* /*result*/, IODebugContext* /*dbg*/) {
return IOStatus::NotSupported();
}
// Check if the given file exists, either locally or remote. If the file is an
// SST file, then link it locally. We assume if the file existence is being
// checked, its for verification purposes, for example while replaying the
// MANIFEST. The file will be opened for reading some time in the future.
IOStatus OnDemandFileSystem::FileExists(const std::string& fname,
const IOOptions& options,
IODebugContext* dbg) {
IOStatus s = target()->FileExists(fname, options, dbg);
if (!s.IsNotFound() && !s.IsPathNotFound()) {
return s;
}
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
FileType type;
if (LookupFileType(fname, &type) && type == kTableFile) {
s = target()->LinkFile(rname, fname, options, dbg);
} else {
s = target()->FileExists(rname, options, dbg);
}
}
return s;
}
// Doa listing of both the local and remote directories and merge the two.
IOStatus OnDemandFileSystem::GetChildren(const std::string& dir,
const IOOptions& options,
std::vector<std::string>* result,
IODebugContext* dbg) {
std::string rdir = dir;
IOStatus s = target()->GetChildren(dir, options, result, dbg);
if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
return s;
}
std::vector<std::string> rchildren;
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rdir);
s = target()->GetChildren(rdir, options, &rchildren, dbg);
if (s.ok()) {
std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) {
// Adjust name
(void)CheckPathAndAdjust(remote_path_, local_path_, name);
});
std::sort(result->begin(), result->end());
std::sort(rchildren.begin(), rchildren.end());
std::vector<std::string> output;
output.reserve(result->size() + rchildren.size());
std::set_union(result->begin(), result->end(), rchildren.begin(),
rchildren.end(), std::back_inserter(output));
*result = std::move(output);
}
return s;
}
// Doa listing of both the local and remote directories and merge the two.
IOStatus OnDemandFileSystem::GetChildrenFileAttributes(
const std::string& dir, const IOOptions& options,
std::vector<FileAttributes>* result, IODebugContext* dbg) {
std::string rdir = dir;
IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
return s;
}
std::vector<FileAttributes> rchildren;
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rdir);
s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg);
if (s.ok()) {
struct FileAttributeSorter {
bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) {
return lhs.name < rhs.name;
}
} file_attr_sorter;
std::for_each(
rchildren.begin(), rchildren.end(), [&](FileAttributes& file) {
// Adjust name
(void)CheckPathAndAdjust(remote_path_, local_path_, file.name);
});
std::sort(result->begin(), result->end(), file_attr_sorter);
std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter);
std::vector<FileAttributes> output;
output.reserve(result->size() + rchildren.size());
std::set_union(rchildren.begin(), rchildren.end(), result->begin(),
result->end(), std::back_inserter(output), file_attr_sorter);
*result = std::move(output);
}
return s;
}
IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname,
const IOOptions& options,
uint64_t* file_size,
IODebugContext* dbg) {
uint64_t local_size = 0;
IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg);
if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) {
return s;
}
if (s.IsNotFound() || s.IsPathNotFound()) {
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
FileType type;
if (LookupFileType(fname, &type) && type == kTableFile) {
s = target()->LinkFile(rname, fname, options, dbg);
if (s.ok()) {
s = target()->GetFileSize(fname, options, &local_size, dbg);
}
} else {
s = target()->GetFileSize(rname, options, &local_size, dbg);
}
}
}
*file_size = local_size;
return s;
}
// An implementation of Read that tracks whether we've reached EOF. If so,
// re-open the file to try to read past the previous EOF offset. After
// re-opening, positing it back to the last read offset.
IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
IOStatus s;
if (eof_) {
// Reopen the file. With some distributed file systems, this is required
// in order to get the new size
file_.reset();
s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg);
if (!s.ok()) {
return IOStatus::IOError("While opening file after relinking, got error ",
s.ToString());
}
s = file_->Skip(offset_);
if (!s.ok()) {
return IOStatus::IOError(
"While seeking to offset" + std::to_string(offset_) + "got error",
s.ToString());
}
eof_ = false;
}
s = file_->Read(n, options, result, scratch, dbg);
if (s.ok()) {
offset_ += result->size();
if (result->size() < n) {
// We reached EOF. Mark it so we know to relink next time
eof_ = true;
}
}
return s;
}
IOStatus OnDemandSequentialFile::Skip(uint64_t n) {
IOStatus s = file_->Skip(n);
if (s.ok()) {
offset_ += n;
}
return s;
}
bool OnDemandSequentialFile::use_direct_io() const {
return file_->use_direct_io();
}
size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const {
return file_->GetRequiredBufferAlignment();
}
Temperature OnDemandSequentialFile::GetTemperature() const {
return file_->GetTemperature();
}
std::shared_ptr<FileSystem> NewOnDemandFileSystem(
const std::shared_ptr<FileSystem>& fs, std::string src_path,
std::string dest_path) {
return std::make_shared<OnDemandFileSystem>(fs, src_path, dest_path);
}
} // namespace ROCKSDB_NAMESPACE

139
env/fs_on_demand.h vendored Normal file
View File

@ -0,0 +1,139 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include "rocksdb/file_system.h"
namespace ROCKSDB_NAMESPACE {
// A FileSystem that links files to a local (destination) directory from a
// corresponding remote (source) directory on demand. The decision to link
// depends on the file type, with appendable or rename-able files, such as,
// descriptors, logs, CURRENT, being read in place in the remote directory,
// and SST files being linked. In the future, files read in place may be
// mirrored to the local directory, so the local dir has a complete database
// for troubleshooting purposes.
class OnDemandFileSystem : public FileSystemWrapper {
public:
OnDemandFileSystem(const std::shared_ptr<FileSystem>& target,
const std::string& remote_path,
const std::string& local_path)
: FileSystemWrapper(target),
remote_path_(remote_path),
local_path_(local_path) {}
const char* Name() const override { return "OnDemandFileSystem"; }
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg) override;
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override;
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus ReuseWritableFile(const std::string& /*fname*/,
const std::string& /*old_fname*/,
const FileOptions& /*fopts*/,
std::unique_ptr<FSWritableFile>* /*result*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported("ReuseWritableFile");
}
IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) override;
IOStatus FileExists(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus GetChildren(const std::string& dir, const IOOptions& options,
std::vector<std::string>* result,
IODebugContext* dbg) override;
IOStatus GetChildrenFileAttributes(const std::string& dir,
const IOOptions& options,
std::vector<FileAttributes>* result,
IODebugContext* dbg) override;
IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
uint64_t* file_size, IODebugContext* dbg) override;
private:
bool CheckPathAndAdjust(const std::string& orig, const std::string& replace,
std::string& path);
bool LookupFileType(const std::string& name, FileType* type);
const std::string remote_path_;
const std::string local_path_;
};
// A wrapper class around an FSSequentialFile object. Its mainly
// intended to be used for appendable files like MANIFEST and logs.
// Beneath the covers, it tracks when EOF is reached, and reopens
// the file in order to read the latest appended data. This is
// necessary on some distributed file systems as they may have
// stale metadata about the file.
// TODO: Mirror the data read to a local file for troubleshooting
// purposes, as well as recovery in case the source dir is
// deleted.
class OnDemandSequentialFile : public FSSequentialFile {
public:
OnDemandSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
OnDemandFileSystem* fs, const FileOptions& file_opts,
const std::string& path)
: file_(std::move(file)),
fs_(fs),
file_opts_(file_opts),
path_(path),
eof_(false),
offset_(0) {}
virtual ~OnDemandSequentialFile() {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override;
IOStatus Skip(uint64_t n) override;
bool use_direct_io() const override;
size_t GetRequiredBufferAlignment() const override;
IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return IOStatus::NotSupported("InvalidateCache not supported.");
}
IOStatus PositionedRead(uint64_t /*offset*/, size_t /*n*/,
const IOOptions& /*options*/, Slice* /*result*/,
char* /*scratch*/, IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported("PositionedRead");
}
Temperature GetTemperature() const override;
private:
std::unique_ptr<FSSequentialFile> file_;
OnDemandFileSystem* fs_;
const FileOptions file_opts_;
const std::string path_;
bool eof_;
uint64_t offset_;
};
std::shared_ptr<FileSystem> NewOnDemandFileSystem(
const std::shared_ptr<FileSystem>& fs, std::string remote_path,
std::string local_path);
} // namespace ROCKSDB_NAMESPACE

View File

@ -298,6 +298,29 @@ class DB {
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// EXPERIMENTAL
// Open a database as a follower. The difference between this and opening
// as secondary is that the follower database has its own directory with
// links to the actual files, and can tolarate obsolete file deletions by
// the leader to its own database. Another difference is the follower
// tries to keep up with the leader by periodically tailing the leader's
// MANIFEST, and (in the future) memtable updates, rather than relying on
// the user to manually call TryCatchupWithPrimary().
// Open as a follower with the default column family
static Status OpenAsFollower(const Options& options, const std::string& name,
const std::string& leader_path,
std::unique_ptr<DB>* dbptr);
// Open as a follower with multiple column families
static Status OpenAsFollower(
const DBOptions& db_options, const std::string& name,
const std::string& leader_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr);
// End EXPERIMENTAL
// Open DB and run the compaction.
// It's a read-only operation, the result won't be installed to the DB, it
// will be output to the `output_directory`. The API should only be used with

View File

@ -702,6 +702,16 @@ class FileSystem : public Customizable {
return IOStatus::OK();
}
// EXPERIMENTAL
// Discard any directory metadata cached in memory for the specified
// directory and its descendants. Useful for distributed file systems
// where the local cache may be out of sync with the actual directory state.
//
// The implementation is not required to be thread safe. Its the caller's
// responsibility to ensure that no directory operations happen
// concurrently.
virtual void DiscardCacheForDirectory(const std::string& /*path*/) {}
// Indicates to upper layers which FileSystem operations mentioned in
// FSSupportedOps are supported by underlying FileSystem. Each bit in
// supported_ops argument represent corresponding FSSupportedOps operation.
@ -1624,6 +1634,10 @@ class FileSystemWrapper : public FileSystem {
return target_->AbortIO(io_handles);
}
void DiscardCacheForDirectory(const std::string& path) override {
target_->DiscardCacheForDirectory(path);
}
void SupportedOps(int64_t& supported_ops) override {
return target_->SupportedOps(supported_ops);
}

View File

@ -1487,6 +1487,30 @@ struct DBOptions {
// use "0:00-23:59". To make an entire day have no offpeak period, leave
// this field blank. Default: Empty string (no offpeak).
std::string daily_offpeak_time_utc = "";
// EXPERIMENTAL
// When a RocksDB database is opened in follower mode, this option
// is set by the user to request the frequency of the follower
// attempting to refresh its view of the leader. RocksDB may choose to
// trigger catch ups more frequently if it detects any changes in the
// database state.
// Default every 10s.
uint64_t follower_refresh_catchup_period_ms = 10000;
// For a given catch up attempt, this option specifies the number of times
// to tail the MANIFEST and try to install a new, consistent version before
// giving up. Though it should be extremely rare, the catch up may fail if
// the leader is mutating the LSM at a very high rate and the follower is
// unable to get a consistent view.
// Default to 10 attempts
uint64_t follower_catchup_retry_count = 10;
// Time to wait between consecutive catch up attempts
// Default 100ms
uint64_t follower_catchup_retry_wait_ms = 100;
// End EXPERIMENTAL
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -559,6 +559,19 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, enforce_single_del_contracts),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_refresh_catchup_period_ms",
{offsetof(struct ImmutableDBOptions,
follower_refresh_catchup_period_ms),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_catchup_retry_count",
{offsetof(struct ImmutableDBOptions, follower_catchup_retry_count),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_catchup_retry_wait_ms",
{offsetof(struct ImmutableDBOptions, follower_catchup_retry_wait_ms),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";
@ -756,7 +769,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
checksum_handoff_file_types(options.checksum_handoff_file_types),
lowest_used_cache_tier(options.lowest_used_cache_tier),
compaction_service(options.compaction_service),
enforce_single_del_contracts(options.enforce_single_del_contracts) {
enforce_single_del_contracts(options.enforce_single_del_contracts),
follower_refresh_catchup_period_ms(
options.follower_refresh_catchup_period_ms),
follower_catchup_retry_count(options.follower_catchup_retry_count),
follower_catchup_retry_wait_ms(options.follower_catchup_retry_wait_ms) {
fs = env->GetFileSystem();
clock = env->GetSystemClock().get();
logger = info_log.get();

View File

@ -104,6 +104,9 @@ struct ImmutableDBOptions {
Logger* logger;
std::shared_ptr<CompactionService> compaction_service;
bool enforce_single_del_contracts;
uint64_t follower_refresh_catchup_period_ms;
uint64_t follower_catchup_retry_count;
uint64_t follower_catchup_retry_wait_ms;
bool IsWalDirSameAsDBPath() const;
bool IsWalDirSameAsDBPath(const std::string& path) const;

3
src.mk
View File

@ -53,6 +53,7 @@ LIB_SOURCES = \
db/db_impl/db_impl_debug.cc \
db/db_impl/db_impl_experimental.cc \
db/db_impl/db_impl_files.cc \
db/db_impl/db_impl_follower.cc \
db/db_impl/db_impl_open.cc \
db/db_impl/db_impl_readonly.cc \
db/db_impl/db_impl_secondary.cc \
@ -109,6 +110,7 @@ LIB_SOURCES = \
env/env_encryption.cc \
env/env_posix.cc \
env/file_system.cc \
env/fs_on_demand.cc \
env/fs_posix.cc \
env/fs_remap.cc \
env/file_system_tracer.cc \
@ -474,6 +476,7 @@ TEST_MAIN_SOURCES = \
db/db_dynamic_level_test.cc \
db/db_encryption_test.cc \
db/db_flush_test.cc \
db/db_follower_test.cc \
db/db_readonly_with_timestamp_test.cc \
db/db_with_timestamp_basic_test.cc \
db/import_column_family_test.cc \

View File

@ -1135,6 +1135,11 @@ DEFINE_int32(secondary_update_interval, 5,
"Secondary instance attempts to catch up with the primary every "
"secondary_update_interval seconds.");
DEFINE_bool(open_as_follower, false,
"Open a RocksDB DB as a follower. The leader instance can be "
"running in another db_bench process.");
DEFINE_string(leader_path, "", "Path to the directory of the leader DB");
DEFINE_bool(report_bg_io_stats, false,
"Measure times spents on I/Os while in compactions. ");
@ -4979,6 +4984,12 @@ class Benchmark {
},
FLAGS_secondary_update_interval, db));
}
} else if (FLAGS_open_as_follower) {
std::unique_ptr<DB> dbptr;
s = DB::OpenAsFollower(options, db_name, FLAGS_leader_path, &dbptr);
if (s.ok()) {
db->db = dbptr.release();
}
} else {
s = DB::Open(options, db_name, &db->db);
}