From d8fb849b7ead31afd45e1a070778768f4bd58b81 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 19 Apr 2024 19:13:31 -0700 Subject: [PATCH] Basic RocksDB follower implementation (#12540) Summary: A basic implementation of RocksDB follower mode, which opens a remote database (referred to as leader) on a distributed file system by tailing its MANIFEST. It leverages the secondary instance mode, but is different in some key ways - 1. It has its own directory with links to the leader's database 2. Periodically refreshes itself 3. (Future) Snapshot support 4. (Future) Garbage collection of obsolete links 5. (Long term) Memtable replication There are two main classes implementing this functionality - `DBImplFollower` and `OnDemandFileSystem`. The former is derived from `DBImplSecondary`. Similar to `DBImplSecondary`, it implements recovery and catch up through MANIFEST tailing using the `ReactiveVersionSet`, but does not consider logs. In a future PR, we will implement memtable replication, which will eliminate the need to catch up using logs. In addition, the recovery and catch-up tries to avoid directory listing as repeated metadata operations are expensive. The second main piece is the `OnDemandFileSystem`, which plugs in as an `Env` for the follower instance and creates the illusion of the follower directory as a clone of the leader directory. It creates links to SSTs on first reference. When the follower tails the MANIFEST and attempts to create a new `Version`, it calls `VerifyFileMetadata` to verify the size of the file, and optionally the unique ID of the file. During this process, links are created which prevent the underlying files from getting deallocated even if the leader deletes the files. TODOs: Deletion of obsolete links, snapshots, robust checking against misconfigurations, better observability etc. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12540 Reviewed By: jowlyzhang Differential Revision: D56315718 Pulled By: anand1976 fbshipit-source-id: d19e1aca43a6af4000cb8622a718031b69ebd97b --- CMakeLists.txt | 3 + Makefile | 3 + TARGETS | 8 + db/db_follower_test.cc | 63 +++++++ db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_follower.cc | 309 ++++++++++++++++++++++++++++++ db/db_impl/db_impl_follower.h | 53 ++++++ db/db_impl/db_impl_secondary.h | 8 +- env/fs_on_demand.cc | 330 +++++++++++++++++++++++++++++++++ env/fs_on_demand.h | 139 ++++++++++++++ include/rocksdb/db.h | 23 +++ include/rocksdb/file_system.h | 14 ++ include/rocksdb/options.h | 24 +++ options/db_options.cc | 19 +- options/db_options.h | 3 + src.mk | 3 + tools/db_bench_tool.cc | 11 ++ 17 files changed, 1009 insertions(+), 6 deletions(-) create mode 100644 db/db_follower_test.cc create mode 100644 db/db_impl/db_impl_follower.cc create mode 100644 db/db_impl/db_impl_follower.h create mode 100644 env/fs_on_demand.cc create mode 100644 env/fs_on_demand.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 576ad4e5ee..50cc40b0d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1037,8 +1037,10 @@ endif() else() list(APPEND SOURCES + db/db_impl/db_impl_follower.cc port/port_posix.cc env/env_posix.cc + env/fs_on_demand.cc env/fs_posix.cc env/io_posix.cc) endif() @@ -1363,6 +1365,7 @@ if(WITH_TESTS) db/file_indexer_test.cc db/filename_test.cc db/flush_job_test.cc + db/db_follower_test.cc db/import_column_family_test.cc db/listener_test.cc db/log_test.cc diff --git a/Makefile b/Makefile index 850c18af4f..f22727f925 100644 --- a/Makefile +++ b/Makefile @@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $( db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 7cb9f793e8..cdfcdc701c 100644 --- a/TARGETS +++ b/TARGETS @@ -60,6 +60,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/db_impl/db_impl_debug.cc", "db/db_impl/db_impl_experimental.cc", "db/db_impl/db_impl_files.cc", + "db/db_impl/db_impl_follower.cc", "db/db_impl/db_impl_open.cc", "db/db_impl/db_impl_readonly.cc", "db/db_impl/db_impl_secondary.cc", @@ -117,6 +118,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", + "env/fs_on_demand.cc", "env/fs_posix.cc", "env/fs_remap.cc", "env/io_posix.cc", @@ -4795,6 +4797,12 @@ cpp_unittest_wrapper(name="db_flush_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_follower_test", + srcs=["db/db_follower_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_inplace_update_test", srcs=["db/db_inplace_update_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc new file mode 100644 index 0000000000..86bf8cc7c5 --- /dev/null +++ b/db/db_follower_test.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef OS_LINUX + +class DBFollowerTest : public DBTestBase { + public: + // Create directories for leader and follower + // Create the leader DB object + DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) { + follower_name_ = dbname_ + "/follower"; + Close(); + Destroy(CurrentOptions()); + EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); + dbname_ = dbname_ + "/leader"; + Reopen(CurrentOptions()); + } + + ~DBFollowerTest() { + follower_.reset(); + EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + } + + protected: + Status OpenAsFollower() { + return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, + &follower_); + } + DB* follower() { return follower_.get(); } + + private: + std::string follower_name_; + std::unique_ptr follower_; +}; + +TEST_F(DBFollowerTest, Basic) { + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + + ASSERT_OK(OpenAsFollower()); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); +} + +#endif +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9803a39afc..ae9a8d5a1d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1535,7 +1535,7 @@ class DBImpl : public DB { Status WriteRecoverableState(); // Actual implementation of Close() - Status CloseImpl(); + virtual Status CloseImpl(); // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc new file mode 100644 index 0000000000..6abd40731b --- /dev/null +++ b/db/db_impl/db_impl_follower.cc @@ -0,0 +1,309 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl/db_impl_follower.h" + +#include + +#include "db/arena_wrapped_db_iter.h" +#include "db/merge_context.h" +#include "env/composite_env_wrapper.h" +#include "env/fs_on_demand.h" +#include "logging/auto_roll_logger.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/configurable.h" +#include "rocksdb/db.h" +#include "util/cast_util.h" +#include "util/write_batch_util.h" + +namespace ROCKSDB_NAMESPACE { + +DBImplFollower::DBImplFollower(const DBOptions& db_options, + std::unique_ptr&& env, + const std::string& dbname, std::string src_path) + : DBImplSecondary(db_options, dbname, ""), + env_guard_(std::move(env)), + src_path_(std::move(src_path)), + cv_(&mu_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in follower mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplFollower::~DBImplFollower() { + Status s = Close(); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s", + s.ToString().c_str()); + } +} + +// Recover a follower DB instance by reading the MANIFEST. The verification +// as part of the MANIFEST replay will ensure that local links to the +// leader's files are created, thus ensuring we can continue reading them +// even if the leader deletes those files due to compaction. +// TODO: +// 1. Devise a mechanism to prevent misconfiguration by, for example, +// keeping a local copy of the IDENTITY file and cross checking +// 2. Make the recovery more robust by retrying if the first attempt +// fails. +Status DBImplFollower::Recover( + const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) { + mutex_.AssertHeld(); + + JobContext job_context(0); + Status s; + s = static_cast(versions_.get()) + ->Recover(column_families, &manifest_reader_, &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + if (manifest_reader_status_) { + manifest_reader_status_->PermitUncheckedError(); + } + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + + // Start the periodic catch-up thread + // TODO: See if it makes sense to have a threadpool, rather than a thread + // per follower DB instance + catch_up_thread_.reset( + new port::Thread(&DBImplFollower::PeriodicRefresh, this)); + } + + return s; +} + +// Try to catch up by tailing the MANIFEST. +// TODO: +// 1. Cleanup obsolete files afterward +// 2. Add some error notifications and statistics +Status DBImplFollower::TryCatchUpWithLeader() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + // read the manifest and apply new changes to the follower instance + std::unordered_set cfds_changed; + JobContext job_context(0, true /*create_superversion*/); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, + manifest_reader_status_.get(), &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + + if (s.ok()) { + for (auto cfd : cfds_changed) { + if (cfd->mem()->GetEarliestSequenceNumber() < + versions_->LastSequence()) { + // Construct a new memtable with earliest sequence number set to the + // last sequence number in the VersionSet. This matters when + // DBImpl::MultiCFSnapshot tries to get consistent references + // to super versions in a lock free manner, it checks the earliest + // sequence number to detect if there was a change in version in + // the meantime. + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + MemTable* new_mem = cfd->ConstructNewMemtable( + mutable_cf_options, versions_->LastSequence()); + cfd->mem()->SetNextLogNumber(cfd->GetLogNumber()); + cfd->mem()->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + } + + // This will check if the old memtable is still referenced + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } + } + } + job_context.Clean(); + + return s; +} + +void DBImplFollower::PeriodicRefresh() { + while (!stop_requested_.load()) { + MutexLock l(&mu_); + int64_t wait_until = + immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_refresh_catchup_period_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + if (stop_requested_.load()) { + break; + } + Status s; + for (uint64_t i = 0; + i < immutable_db_options_.follower_catchup_retry_count && + !stop_requested_.load(); + ++i) { + s = TryCatchUpWithLeader(); + + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Successful catch up on attempt %llu", + static_cast(i)); + break; + } + wait_until = immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_catchup_retry_wait_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + } + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful"); + } + } +} + +Status DBImplFollower::Close() { + if (catch_up_thread_) { + stop_requested_.store(true); + { + MutexLock l(&mu_); + cv_.SignalAll(); + } + catch_up_thread_->join(); + catch_up_thread_.reset(); + } + + return DBImpl::Close(); +} + +Status DB::OpenAsFollower(const Options& options, const std::string& dbname, + const std::string& leader_path, + std::unique_ptr* dbptr) { + dbptr->reset(); + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsFollower(db_options, dbname, leader_path, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsFollower( + const DBOptions& db_options, const std::string& dbname, + const std::string& src_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr) { + dbptr->reset(); + + FileSystem* fs = db_options.env->GetFileSystem().get(); + { + IOStatus io_s; + if (db_options.create_if_missing) { + io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr); + } else { + io_s = fs->FileExists(dbname, IOOptions(), nullptr); + } + if (!io_s.ok()) { + return static_cast(io_s); + } + } + std::unique_ptr new_env(new CompositeEnvWrapper( + db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(), + src_path, dbname))); + + DBOptions tmp_opts(db_options); + Status s; + tmp_opts.env = new_env.get(); + if (nullptr == tmp_opts.info_log) { + s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log); + if (!s.ok()) { + tmp_opts.info_log = nullptr; + return s; + } + } + + handles->clear(); + DBImplFollower* impl = + new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path); + impl->versions_.reset(new ReactiveVersionSet( + dbname, &impl->immutable_db_options_, impl->file_options_, + impl->table_cache_.get(), impl->write_buffer_manager_, + &impl->write_controller_, impl->io_tracer_)); + impl->column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); + impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); + + impl->mutex_.Lock(); + s = impl->Recover(column_families, /*read_only=*/true, + /*error_if_wal_file_exists=*/false, + /*error_if_data_exists_in_wals=*/false); + if (s.ok()) { + for (const auto& cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(false /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + dbptr->reset(impl); + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + static_cast_with_check(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h new file mode 100644 index 0000000000..60992c111e --- /dev/null +++ b/db/db_impl/db_impl_follower.h @@ -0,0 +1,53 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "db/db_impl/db_impl.h" +#include "db/db_impl/db_impl_secondary.h" +#include "logging/logging.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImplFollower : public DBImplSecondary { + public: + DBImplFollower(const DBOptions& db_options, std::unique_ptr&& env, + const std::string& dbname, std::string src_path); + ~DBImplFollower(); + + Status Close() override; + + protected: + bool OwnTablesAndLogs() const override { + // TODO: Change this to true once we've properly implemented file + // deletion for the read scaling case + return false; + } + + Status Recover(const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, + bool /*is_retry*/ = false, uint64_t* = nullptr, + RecoveryContext* /*recovery_ctx*/ = nullptr, + bool* /*can_retry*/ = nullptr) override; + + private: + friend class DB; + + Status TryCatchUpWithLeader(); + void PeriodicRefresh(); + + std::unique_ptr env_guard_; + std::unique_ptr catch_up_thread_; + std::atomic stop_requested_; + std::string src_path_; + port::Mutex mu_; + port::CondVar cv_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 5801cf7f9e..124cee3f3b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -277,6 +277,10 @@ class DBImplSecondary : public DBImpl { return false; } + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; + private: friend class DB; @@ -305,10 +309,6 @@ class DBImplSecondary : public DBImpl { const CompactionServiceInput& input, CompactionServiceResult* result); - std::unique_ptr manifest_reader_; - std::unique_ptr manifest_reporter_; - std::unique_ptr manifest_reader_status_; - // Cache log readers for each log number, used for continue WAL replay // after recovery std::map> log_readers_; diff --git a/env/fs_on_demand.cc b/env/fs_on_demand.cc new file mode 100644 index 0000000000..bac424264a --- /dev/null +++ b/env/fs_on_demand.cc @@ -0,0 +1,330 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "env/fs_on_demand.h" + +#include + +#include "file/filename.h" +#include "port/port.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { +// Check if the input path is under orig (typically the local directory), and if +// so, change it to the equivalent path under replace (typically the remote +// directory). For example, if orig is "/data/follower", replace is +// "/data/leader", and the given path is "/data/follower/000010.sst", on return +// the path would be changed to +// "/data/leader/000010.sst". +// Return value is true if the path was modified, false otherwise +bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig, + const std::string& replace, + std::string& path) { + size_t pos = path.find(orig); + if (pos > 0) { + return false; + } + path.replace(pos, orig.length(), replace); + return true; +} + +bool OnDemandFileSystem::LookupFileType(const std::string& name, + FileType* type) { + std::size_t found = name.find_last_of('/'); + std::string file_name = name.substr(found); + uint64_t number = 0; + return ParseFileName(file_name, &number, type); +} + +// RocksDB opens non-SST files for reading in sequential file mode. This +// includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them +// in place in the source directory. For files that are appendable or +// can be renamed, which is MANIFEST and CURRENT files, we wrap the +// underlying FSSequentialFile with another class that checks when EOF +// has been reached and re-opens the file to see the latest data. On some +// distributed file systems, this is necessary. +IOStatus OnDemandFileSystem::NewSequentialFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + static std::unordered_set valid_types( + {kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile}); + if (!LookupFileType(fname, &type) || + (valid_types.find(type) == valid_types.end())) { + return IOStatus::NotSupported(); + } + + IOStatus s; + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + std::unique_ptr inner_file; + s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg); + if (s.ok() && type == kDescriptorFile) { + result->reset(new OnDemandSequentialFile(std::move(inner_file), this, + file_opts, rname)); + } else { + *result = std::move(inner_file); + } + } else { + s = target()->NewSequentialFile(fname, file_opts, result, dbg); + } + return s; +} + +// This is only supported for SST files. If the file is present locally, +// i.e in the destination dir, we just open it and return. If its in the +// remote, i.e source dir, we link it locally and open the link. +// TODO: Add support for blob files belonging to the new BlobDB +IOStatus OnDemandFileSystem::NewRandomAccessFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kTableFile) { + return IOStatus::NotSupported(); + } + + IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr); + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + s = target()->LinkFile(rname, fname, IOOptions(), nullptr); + if (!s.ok()) { + return s; + } + } + } + + return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg) + : s; +} + +// We don't expect to create any writable file other than info LOG files. +IOStatus OnDemandFileSystem::NewWritableFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kInfoLogFile) { + return IOStatus::NotSupported(); + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg); + if (s.ok()) { + return IOStatus::InvalidArgument( + "Writing to a file present in the remote directory not supoprted"); + } + } + + return target()->NewWritableFile(fname, file_opts, result, dbg); +} + +// Currently not supported, as there's no need for RocksDB to create a +// directory object for a DB in read-only mode. +IOStatus OnDemandFileSystem::NewDirectory( + const std::string& /*name*/, const IOOptions& /*io_opts*/, + std::unique_ptr* /*result*/, IODebugContext* /*dbg*/) { + return IOStatus::NotSupported(); +} + +// Check if the given file exists, either locally or remote. If the file is an +// SST file, then link it locally. We assume if the file existence is being +// checked, its for verification purposes, for example while replaying the +// MANIFEST. The file will be opened for reading some time in the future. +IOStatus OnDemandFileSystem::FileExists(const std::string& fname, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus s = target()->FileExists(fname, options, dbg); + if (!s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + } else { + s = target()->FileExists(rname, options, dbg); + } + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildren(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildren(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildren(rdir, options, &rchildren, dbg); + if (s.ok()) { + std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, name); + }); + std::sort(result->begin(), result->end()); + std::sort(rchildren.begin(), rchildren.end()); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(result->begin(), result->end(), rchildren.begin(), + rchildren.end(), std::back_inserter(output)); + *result = std::move(output); + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildrenFileAttributes( + const std::string& dir, const IOOptions& options, + std::vector* result, IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg); + if (s.ok()) { + struct FileAttributeSorter { + bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) { + return lhs.name < rhs.name; + } + } file_attr_sorter; + + std::for_each( + rchildren.begin(), rchildren.end(), [&](FileAttributes& file) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, file.name); + }); + std::sort(result->begin(), result->end(), file_attr_sorter); + std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(rchildren.begin(), rchildren.end(), result->begin(), + result->end(), std::back_inserter(output), file_attr_sorter); + *result = std::move(output); + } + return s; +} + +IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname, + const IOOptions& options, + uint64_t* file_size, + IODebugContext* dbg) { + uint64_t local_size = 0; + IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg); + if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + if (s.ok()) { + s = target()->GetFileSize(fname, options, &local_size, dbg); + } + } else { + s = target()->GetFileSize(rname, options, &local_size, dbg); + } + } + } + *file_size = local_size; + return s; +} + +// An implementation of Read that tracks whether we've reached EOF. If so, +// re-open the file to try to read past the previous EOF offset. After +// re-opening, positing it back to the last read offset. +IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) { + IOStatus s; + if (eof_) { + // Reopen the file. With some distributed file systems, this is required + // in order to get the new size + file_.reset(); + s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg); + if (!s.ok()) { + return IOStatus::IOError("While opening file after relinking, got error ", + s.ToString()); + } + s = file_->Skip(offset_); + if (!s.ok()) { + return IOStatus::IOError( + "While seeking to offset" + std::to_string(offset_) + "got error", + s.ToString()); + } + eof_ = false; + } + + s = file_->Read(n, options, result, scratch, dbg); + if (s.ok()) { + offset_ += result->size(); + if (result->size() < n) { + // We reached EOF. Mark it so we know to relink next time + eof_ = true; + } + } + return s; +} + +IOStatus OnDemandSequentialFile::Skip(uint64_t n) { + IOStatus s = file_->Skip(n); + if (s.ok()) { + offset_ += n; + } + return s; +} + +bool OnDemandSequentialFile::use_direct_io() const { + return file_->use_direct_io(); +} + +size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const { + return file_->GetRequiredBufferAlignment(); +} + +Temperature OnDemandSequentialFile::GetTemperature() const { + return file_->GetTemperature(); +} + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string src_path, + std::string dest_path) { + return std::make_shared(fs, src_path, dest_path); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/env/fs_on_demand.h b/env/fs_on_demand.h new file mode 100644 index 0000000000..e313103d6c --- /dev/null +++ b/env/fs_on_demand.h @@ -0,0 +1,139 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include "rocksdb/file_system.h" + +namespace ROCKSDB_NAMESPACE { +// A FileSystem that links files to a local (destination) directory from a +// corresponding remote (source) directory on demand. The decision to link +// depends on the file type, with appendable or rename-able files, such as, +// descriptors, logs, CURRENT, being read in place in the remote directory, +// and SST files being linked. In the future, files read in place may be +// mirrored to the local directory, so the local dir has a complete database +// for troubleshooting purposes. + +class OnDemandFileSystem : public FileSystemWrapper { + public: + OnDemandFileSystem(const std::shared_ptr& target, + const std::string& remote_path, + const std::string& local_path) + : FileSystemWrapper(target), + remote_path_(remote_path), + local_path_(local_path) {} + + const char* Name() const override { return "OnDemandFileSystem"; } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus ReuseWritableFile(const std::string& /*fname*/, + const std::string& /*old_fname*/, + const FileOptions& /*fopts*/, + std::unique_ptr* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("ReuseWritableFile"); + } + + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus FileExists(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus GetChildren(const std::string& dir, const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetFileSize(const std::string& fname, const IOOptions& options, + uint64_t* file_size, IODebugContext* dbg) override; + + private: + bool CheckPathAndAdjust(const std::string& orig, const std::string& replace, + std::string& path); + bool LookupFileType(const std::string& name, FileType* type); + + const std::string remote_path_; + const std::string local_path_; +}; + +// A wrapper class around an FSSequentialFile object. Its mainly +// intended to be used for appendable files like MANIFEST and logs. +// Beneath the covers, it tracks when EOF is reached, and reopens +// the file in order to read the latest appended data. This is +// necessary on some distributed file systems as they may have +// stale metadata about the file. +// TODO: Mirror the data read to a local file for troubleshooting +// purposes, as well as recovery in case the source dir is +// deleted. +class OnDemandSequentialFile : public FSSequentialFile { + public: + OnDemandSequentialFile(std::unique_ptr&& file, + OnDemandFileSystem* fs, const FileOptions& file_opts, + const std::string& path) + : file_(std::move(file)), + fs_(fs), + file_opts_(file_opts), + path_(path), + eof_(false), + offset_(0) {} + + virtual ~OnDemandSequentialFile() {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override; + + IOStatus Skip(uint64_t n) override; + + bool use_direct_io() const override; + + size_t GetRequiredBufferAlignment() const override; + + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::NotSupported("InvalidateCache not supported."); + } + + IOStatus PositionedRead(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, Slice* /*result*/, + char* /*scratch*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("PositionedRead"); + } + + Temperature GetTemperature() const override; + + private: + std::unique_ptr file_; + OnDemandFileSystem* fs_; + const FileOptions file_opts_; + const std::string path_; + bool eof_; + uint64_t offset_; +}; + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string remote_path, + std::string local_path); + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 54343236e9..e413fa044d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -298,6 +298,29 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + // EXPERIMENTAL + + // Open a database as a follower. The difference between this and opening + // as secondary is that the follower database has its own directory with + // links to the actual files, and can tolarate obsolete file deletions by + // the leader to its own database. Another difference is the follower + // tries to keep up with the leader by periodically tailing the leader's + // MANIFEST, and (in the future) memtable updates, rather than relying on + // the user to manually call TryCatchupWithPrimary(). + + // Open as a follower with the default column family + static Status OpenAsFollower(const Options& options, const std::string& name, + const std::string& leader_path, + std::unique_ptr* dbptr); + + // Open as a follower with multiple column families + static Status OpenAsFollower( + const DBOptions& db_options, const std::string& name, + const std::string& leader_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr); + // End EXPERIMENTAL + // Open DB and run the compaction. // It's a read-only operation, the result won't be installed to the DB, it // will be output to the `output_directory`. The API should only be used with diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index d56eed1adc..8d21c91946 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -702,6 +702,16 @@ class FileSystem : public Customizable { return IOStatus::OK(); } + // EXPERIMENTAL + // Discard any directory metadata cached in memory for the specified + // directory and its descendants. Useful for distributed file systems + // where the local cache may be out of sync with the actual directory state. + // + // The implementation is not required to be thread safe. Its the caller's + // responsibility to ensure that no directory operations happen + // concurrently. + virtual void DiscardCacheForDirectory(const std::string& /*path*/) {} + // Indicates to upper layers which FileSystem operations mentioned in // FSSupportedOps are supported by underlying FileSystem. Each bit in // supported_ops argument represent corresponding FSSupportedOps operation. @@ -1624,6 +1634,10 @@ class FileSystemWrapper : public FileSystem { return target_->AbortIO(io_handles); } + void DiscardCacheForDirectory(const std::string& path) override { + target_->DiscardCacheForDirectory(path); + } + void SupportedOps(int64_t& supported_ops) override { return target_->SupportedOps(supported_ops); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b10dc01526..5de241112e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1487,6 +1487,30 @@ struct DBOptions { // use "0:00-23:59". To make an entire day have no offpeak period, leave // this field blank. Default: Empty string (no offpeak). std::string daily_offpeak_time_utc = ""; + + // EXPERIMENTAL + + // When a RocksDB database is opened in follower mode, this option + // is set by the user to request the frequency of the follower + // attempting to refresh its view of the leader. RocksDB may choose to + // trigger catch ups more frequently if it detects any changes in the + // database state. + // Default every 10s. + uint64_t follower_refresh_catchup_period_ms = 10000; + + // For a given catch up attempt, this option specifies the number of times + // to tail the MANIFEST and try to install a new, consistent version before + // giving up. Though it should be extremely rare, the catch up may fail if + // the leader is mutating the LSM at a very high rate and the follower is + // unable to get a consistent view. + // Default to 10 attempts + uint64_t follower_catchup_retry_count = 10; + + // Time to wait between consecutive catch up attempts + // Default 100ms + uint64_t follower_catchup_retry_wait_ms = 100; + + // End EXPERIMENTAL }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index cdc97cd1f7..6da12a1215 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -559,6 +559,19 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"follower_refresh_catchup_period_ms", + {offsetof(struct ImmutableDBOptions, + follower_refresh_catchup_period_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_count", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_count), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_wait_ms", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_wait_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -756,7 +769,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), compaction_service(options.compaction_service), - enforce_single_del_contracts(options.enforce_single_del_contracts) { + enforce_single_del_contracts(options.enforce_single_del_contracts), + follower_refresh_catchup_period_ms( + options.follower_refresh_catchup_period_ms), + follower_catchup_retry_count(options.follower_catchup_retry_count), + follower_catchup_retry_wait_ms(options.follower_catchup_retry_wait_ms) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); logger = info_log.get(); diff --git a/options/db_options.h b/options/db_options.h index b0432df81f..ff7ddc880f 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -104,6 +104,9 @@ struct ImmutableDBOptions { Logger* logger; std::shared_ptr compaction_service; bool enforce_single_del_contracts; + uint64_t follower_refresh_catchup_period_ms; + uint64_t follower_catchup_retry_count; + uint64_t follower_catchup_retry_wait_ms; bool IsWalDirSameAsDBPath() const; bool IsWalDirSameAsDBPath(const std::string& path) const; diff --git a/src.mk b/src.mk index 365f5ba777..23cf348e1e 100644 --- a/src.mk +++ b/src.mk @@ -53,6 +53,7 @@ LIB_SOURCES = \ db/db_impl/db_impl_debug.cc \ db/db_impl/db_impl_experimental.cc \ db/db_impl/db_impl_files.cc \ + db/db_impl/db_impl_follower.cc \ db/db_impl/db_impl_open.cc \ db/db_impl/db_impl_readonly.cc \ db/db_impl/db_impl_secondary.cc \ @@ -109,6 +110,7 @@ LIB_SOURCES = \ env/env_encryption.cc \ env/env_posix.cc \ env/file_system.cc \ + env/fs_on_demand.cc \ env/fs_posix.cc \ env/fs_remap.cc \ env/file_system_tracer.cc \ @@ -474,6 +476,7 @@ TEST_MAIN_SOURCES = \ db/db_dynamic_level_test.cc \ db/db_encryption_test.cc \ db/db_flush_test.cc \ + db/db_follower_test.cc \ db/db_readonly_with_timestamp_test.cc \ db/db_with_timestamp_basic_test.cc \ db/import_column_family_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index d47ffb5385..ce14bce5da 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1135,6 +1135,11 @@ DEFINE_int32(secondary_update_interval, 5, "Secondary instance attempts to catch up with the primary every " "secondary_update_interval seconds."); +DEFINE_bool(open_as_follower, false, + "Open a RocksDB DB as a follower. The leader instance can be " + "running in another db_bench process."); + +DEFINE_string(leader_path, "", "Path to the directory of the leader DB"); DEFINE_bool(report_bg_io_stats, false, "Measure times spents on I/Os while in compactions. "); @@ -4979,6 +4984,12 @@ class Benchmark { }, FLAGS_secondary_update_interval, db)); } + } else if (FLAGS_open_as_follower) { + std::unique_ptr dbptr; + s = DB::OpenAsFollower(options, db_name, FLAGS_leader_path, &dbptr); + if (s.ok()) { + db->db = dbptr.release(); + } } else { s = DB::Open(options, db_name, &db->db); }