diff --git a/CMakeLists.txt b/CMakeLists.txt index 576ad4e5ee..50cc40b0d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1037,8 +1037,10 @@ endif() else() list(APPEND SOURCES + db/db_impl/db_impl_follower.cc port/port_posix.cc env/env_posix.cc + env/fs_on_demand.cc env/fs_posix.cc env/io_posix.cc) endif() @@ -1363,6 +1365,7 @@ if(WITH_TESTS) db/file_indexer_test.cc db/filename_test.cc db/flush_job_test.cc + db/db_follower_test.cc db/import_column_family_test.cc db/listener_test.cc db/log_test.cc diff --git a/Makefile b/Makefile index 850c18af4f..f22727f925 100644 --- a/Makefile +++ b/Makefile @@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $( db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 7cb9f793e8..cdfcdc701c 100644 --- a/TARGETS +++ b/TARGETS @@ -60,6 +60,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/db_impl/db_impl_debug.cc", "db/db_impl/db_impl_experimental.cc", "db/db_impl/db_impl_files.cc", + "db/db_impl/db_impl_follower.cc", "db/db_impl/db_impl_open.cc", "db/db_impl/db_impl_readonly.cc", "db/db_impl/db_impl_secondary.cc", @@ -117,6 +118,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", + "env/fs_on_demand.cc", "env/fs_posix.cc", "env/fs_remap.cc", "env/io_posix.cc", @@ -4795,6 +4797,12 @@ cpp_unittest_wrapper(name="db_flush_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_follower_test", + srcs=["db/db_follower_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_inplace_update_test", srcs=["db/db_inplace_update_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc new file mode 100644 index 0000000000..86bf8cc7c5 --- /dev/null +++ b/db/db_follower_test.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef OS_LINUX + +class DBFollowerTest : public DBTestBase { + public: + // Create directories for leader and follower + // Create the leader DB object + DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) { + follower_name_ = dbname_ + "/follower"; + Close(); + Destroy(CurrentOptions()); + EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); + dbname_ = dbname_ + "/leader"; + Reopen(CurrentOptions()); + } + + ~DBFollowerTest() { + follower_.reset(); + EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + } + + protected: + Status OpenAsFollower() { + return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, + &follower_); + } + DB* follower() { return follower_.get(); } + + private: + std::string follower_name_; + std::unique_ptr follower_; +}; + +TEST_F(DBFollowerTest, Basic) { + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + + ASSERT_OK(OpenAsFollower()); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); +} + +#endif +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9803a39afc..ae9a8d5a1d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1535,7 +1535,7 @@ class DBImpl : public DB { Status WriteRecoverableState(); // Actual implementation of Close() - Status CloseImpl(); + virtual Status CloseImpl(); // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc new file mode 100644 index 0000000000..6abd40731b --- /dev/null +++ b/db/db_impl/db_impl_follower.cc @@ -0,0 +1,309 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl/db_impl_follower.h" + +#include + +#include "db/arena_wrapped_db_iter.h" +#include "db/merge_context.h" +#include "env/composite_env_wrapper.h" +#include "env/fs_on_demand.h" +#include "logging/auto_roll_logger.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/configurable.h" +#include "rocksdb/db.h" +#include "util/cast_util.h" +#include "util/write_batch_util.h" + +namespace ROCKSDB_NAMESPACE { + +DBImplFollower::DBImplFollower(const DBOptions& db_options, + std::unique_ptr&& env, + const std::string& dbname, std::string src_path) + : DBImplSecondary(db_options, dbname, ""), + env_guard_(std::move(env)), + src_path_(std::move(src_path)), + cv_(&mu_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in follower mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplFollower::~DBImplFollower() { + Status s = Close(); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s", + s.ToString().c_str()); + } +} + +// Recover a follower DB instance by reading the MANIFEST. The verification +// as part of the MANIFEST replay will ensure that local links to the +// leader's files are created, thus ensuring we can continue reading them +// even if the leader deletes those files due to compaction. +// TODO: +// 1. Devise a mechanism to prevent misconfiguration by, for example, +// keeping a local copy of the IDENTITY file and cross checking +// 2. Make the recovery more robust by retrying if the first attempt +// fails. +Status DBImplFollower::Recover( + const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) { + mutex_.AssertHeld(); + + JobContext job_context(0); + Status s; + s = static_cast(versions_.get()) + ->Recover(column_families, &manifest_reader_, &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + if (manifest_reader_status_) { + manifest_reader_status_->PermitUncheckedError(); + } + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + + // Start the periodic catch-up thread + // TODO: See if it makes sense to have a threadpool, rather than a thread + // per follower DB instance + catch_up_thread_.reset( + new port::Thread(&DBImplFollower::PeriodicRefresh, this)); + } + + return s; +} + +// Try to catch up by tailing the MANIFEST. +// TODO: +// 1. Cleanup obsolete files afterward +// 2. Add some error notifications and statistics +Status DBImplFollower::TryCatchUpWithLeader() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + // read the manifest and apply new changes to the follower instance + std::unordered_set cfds_changed; + JobContext job_context(0, true /*create_superversion*/); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, + manifest_reader_status_.get(), &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + + if (s.ok()) { + for (auto cfd : cfds_changed) { + if (cfd->mem()->GetEarliestSequenceNumber() < + versions_->LastSequence()) { + // Construct a new memtable with earliest sequence number set to the + // last sequence number in the VersionSet. This matters when + // DBImpl::MultiCFSnapshot tries to get consistent references + // to super versions in a lock free manner, it checks the earliest + // sequence number to detect if there was a change in version in + // the meantime. + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + MemTable* new_mem = cfd->ConstructNewMemtable( + mutable_cf_options, versions_->LastSequence()); + cfd->mem()->SetNextLogNumber(cfd->GetLogNumber()); + cfd->mem()->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + } + + // This will check if the old memtable is still referenced + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } + } + } + job_context.Clean(); + + return s; +} + +void DBImplFollower::PeriodicRefresh() { + while (!stop_requested_.load()) { + MutexLock l(&mu_); + int64_t wait_until = + immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_refresh_catchup_period_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + if (stop_requested_.load()) { + break; + } + Status s; + for (uint64_t i = 0; + i < immutable_db_options_.follower_catchup_retry_count && + !stop_requested_.load(); + ++i) { + s = TryCatchUpWithLeader(); + + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Successful catch up on attempt %llu", + static_cast(i)); + break; + } + wait_until = immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_catchup_retry_wait_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + } + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful"); + } + } +} + +Status DBImplFollower::Close() { + if (catch_up_thread_) { + stop_requested_.store(true); + { + MutexLock l(&mu_); + cv_.SignalAll(); + } + catch_up_thread_->join(); + catch_up_thread_.reset(); + } + + return DBImpl::Close(); +} + +Status DB::OpenAsFollower(const Options& options, const std::string& dbname, + const std::string& leader_path, + std::unique_ptr* dbptr) { + dbptr->reset(); + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsFollower(db_options, dbname, leader_path, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsFollower( + const DBOptions& db_options, const std::string& dbname, + const std::string& src_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr) { + dbptr->reset(); + + FileSystem* fs = db_options.env->GetFileSystem().get(); + { + IOStatus io_s; + if (db_options.create_if_missing) { + io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr); + } else { + io_s = fs->FileExists(dbname, IOOptions(), nullptr); + } + if (!io_s.ok()) { + return static_cast(io_s); + } + } + std::unique_ptr new_env(new CompositeEnvWrapper( + db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(), + src_path, dbname))); + + DBOptions tmp_opts(db_options); + Status s; + tmp_opts.env = new_env.get(); + if (nullptr == tmp_opts.info_log) { + s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log); + if (!s.ok()) { + tmp_opts.info_log = nullptr; + return s; + } + } + + handles->clear(); + DBImplFollower* impl = + new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path); + impl->versions_.reset(new ReactiveVersionSet( + dbname, &impl->immutable_db_options_, impl->file_options_, + impl->table_cache_.get(), impl->write_buffer_manager_, + &impl->write_controller_, impl->io_tracer_)); + impl->column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); + impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); + + impl->mutex_.Lock(); + s = impl->Recover(column_families, /*read_only=*/true, + /*error_if_wal_file_exists=*/false, + /*error_if_data_exists_in_wals=*/false); + if (s.ok()) { + for (const auto& cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(false /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + dbptr->reset(impl); + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + static_cast_with_check(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h new file mode 100644 index 0000000000..60992c111e --- /dev/null +++ b/db/db_impl/db_impl_follower.h @@ -0,0 +1,53 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "db/db_impl/db_impl.h" +#include "db/db_impl/db_impl_secondary.h" +#include "logging/logging.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImplFollower : public DBImplSecondary { + public: + DBImplFollower(const DBOptions& db_options, std::unique_ptr&& env, + const std::string& dbname, std::string src_path); + ~DBImplFollower(); + + Status Close() override; + + protected: + bool OwnTablesAndLogs() const override { + // TODO: Change this to true once we've properly implemented file + // deletion for the read scaling case + return false; + } + + Status Recover(const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, + bool /*is_retry*/ = false, uint64_t* = nullptr, + RecoveryContext* /*recovery_ctx*/ = nullptr, + bool* /*can_retry*/ = nullptr) override; + + private: + friend class DB; + + Status TryCatchUpWithLeader(); + void PeriodicRefresh(); + + std::unique_ptr env_guard_; + std::unique_ptr catch_up_thread_; + std::atomic stop_requested_; + std::string src_path_; + port::Mutex mu_; + port::CondVar cv_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 5801cf7f9e..124cee3f3b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -277,6 +277,10 @@ class DBImplSecondary : public DBImpl { return false; } + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; + private: friend class DB; @@ -305,10 +309,6 @@ class DBImplSecondary : public DBImpl { const CompactionServiceInput& input, CompactionServiceResult* result); - std::unique_ptr manifest_reader_; - std::unique_ptr manifest_reporter_; - std::unique_ptr manifest_reader_status_; - // Cache log readers for each log number, used for continue WAL replay // after recovery std::map> log_readers_; diff --git a/env/fs_on_demand.cc b/env/fs_on_demand.cc new file mode 100644 index 0000000000..bac424264a --- /dev/null +++ b/env/fs_on_demand.cc @@ -0,0 +1,330 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "env/fs_on_demand.h" + +#include + +#include "file/filename.h" +#include "port/port.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { +// Check if the input path is under orig (typically the local directory), and if +// so, change it to the equivalent path under replace (typically the remote +// directory). For example, if orig is "/data/follower", replace is +// "/data/leader", and the given path is "/data/follower/000010.sst", on return +// the path would be changed to +// "/data/leader/000010.sst". +// Return value is true if the path was modified, false otherwise +bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig, + const std::string& replace, + std::string& path) { + size_t pos = path.find(orig); + if (pos > 0) { + return false; + } + path.replace(pos, orig.length(), replace); + return true; +} + +bool OnDemandFileSystem::LookupFileType(const std::string& name, + FileType* type) { + std::size_t found = name.find_last_of('/'); + std::string file_name = name.substr(found); + uint64_t number = 0; + return ParseFileName(file_name, &number, type); +} + +// RocksDB opens non-SST files for reading in sequential file mode. This +// includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them +// in place in the source directory. For files that are appendable or +// can be renamed, which is MANIFEST and CURRENT files, we wrap the +// underlying FSSequentialFile with another class that checks when EOF +// has been reached and re-opens the file to see the latest data. On some +// distributed file systems, this is necessary. +IOStatus OnDemandFileSystem::NewSequentialFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + static std::unordered_set valid_types( + {kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile}); + if (!LookupFileType(fname, &type) || + (valid_types.find(type) == valid_types.end())) { + return IOStatus::NotSupported(); + } + + IOStatus s; + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + std::unique_ptr inner_file; + s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg); + if (s.ok() && type == kDescriptorFile) { + result->reset(new OnDemandSequentialFile(std::move(inner_file), this, + file_opts, rname)); + } else { + *result = std::move(inner_file); + } + } else { + s = target()->NewSequentialFile(fname, file_opts, result, dbg); + } + return s; +} + +// This is only supported for SST files. If the file is present locally, +// i.e in the destination dir, we just open it and return. If its in the +// remote, i.e source dir, we link it locally and open the link. +// TODO: Add support for blob files belonging to the new BlobDB +IOStatus OnDemandFileSystem::NewRandomAccessFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kTableFile) { + return IOStatus::NotSupported(); + } + + IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr); + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + s = target()->LinkFile(rname, fname, IOOptions(), nullptr); + if (!s.ok()) { + return s; + } + } + } + + return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg) + : s; +} + +// We don't expect to create any writable file other than info LOG files. +IOStatus OnDemandFileSystem::NewWritableFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kInfoLogFile) { + return IOStatus::NotSupported(); + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg); + if (s.ok()) { + return IOStatus::InvalidArgument( + "Writing to a file present in the remote directory not supoprted"); + } + } + + return target()->NewWritableFile(fname, file_opts, result, dbg); +} + +// Currently not supported, as there's no need for RocksDB to create a +// directory object for a DB in read-only mode. +IOStatus OnDemandFileSystem::NewDirectory( + const std::string& /*name*/, const IOOptions& /*io_opts*/, + std::unique_ptr* /*result*/, IODebugContext* /*dbg*/) { + return IOStatus::NotSupported(); +} + +// Check if the given file exists, either locally or remote. If the file is an +// SST file, then link it locally. We assume if the file existence is being +// checked, its for verification purposes, for example while replaying the +// MANIFEST. The file will be opened for reading some time in the future. +IOStatus OnDemandFileSystem::FileExists(const std::string& fname, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus s = target()->FileExists(fname, options, dbg); + if (!s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + } else { + s = target()->FileExists(rname, options, dbg); + } + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildren(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildren(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildren(rdir, options, &rchildren, dbg); + if (s.ok()) { + std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, name); + }); + std::sort(result->begin(), result->end()); + std::sort(rchildren.begin(), rchildren.end()); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(result->begin(), result->end(), rchildren.begin(), + rchildren.end(), std::back_inserter(output)); + *result = std::move(output); + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildrenFileAttributes( + const std::string& dir, const IOOptions& options, + std::vector* result, IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg); + if (s.ok()) { + struct FileAttributeSorter { + bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) { + return lhs.name < rhs.name; + } + } file_attr_sorter; + + std::for_each( + rchildren.begin(), rchildren.end(), [&](FileAttributes& file) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, file.name); + }); + std::sort(result->begin(), result->end(), file_attr_sorter); + std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(rchildren.begin(), rchildren.end(), result->begin(), + result->end(), std::back_inserter(output), file_attr_sorter); + *result = std::move(output); + } + return s; +} + +IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname, + const IOOptions& options, + uint64_t* file_size, + IODebugContext* dbg) { + uint64_t local_size = 0; + IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg); + if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + if (s.ok()) { + s = target()->GetFileSize(fname, options, &local_size, dbg); + } + } else { + s = target()->GetFileSize(rname, options, &local_size, dbg); + } + } + } + *file_size = local_size; + return s; +} + +// An implementation of Read that tracks whether we've reached EOF. If so, +// re-open the file to try to read past the previous EOF offset. After +// re-opening, positing it back to the last read offset. +IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) { + IOStatus s; + if (eof_) { + // Reopen the file. With some distributed file systems, this is required + // in order to get the new size + file_.reset(); + s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg); + if (!s.ok()) { + return IOStatus::IOError("While opening file after relinking, got error ", + s.ToString()); + } + s = file_->Skip(offset_); + if (!s.ok()) { + return IOStatus::IOError( + "While seeking to offset" + std::to_string(offset_) + "got error", + s.ToString()); + } + eof_ = false; + } + + s = file_->Read(n, options, result, scratch, dbg); + if (s.ok()) { + offset_ += result->size(); + if (result->size() < n) { + // We reached EOF. Mark it so we know to relink next time + eof_ = true; + } + } + return s; +} + +IOStatus OnDemandSequentialFile::Skip(uint64_t n) { + IOStatus s = file_->Skip(n); + if (s.ok()) { + offset_ += n; + } + return s; +} + +bool OnDemandSequentialFile::use_direct_io() const { + return file_->use_direct_io(); +} + +size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const { + return file_->GetRequiredBufferAlignment(); +} + +Temperature OnDemandSequentialFile::GetTemperature() const { + return file_->GetTemperature(); +} + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string src_path, + std::string dest_path) { + return std::make_shared(fs, src_path, dest_path); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/env/fs_on_demand.h b/env/fs_on_demand.h new file mode 100644 index 0000000000..e313103d6c --- /dev/null +++ b/env/fs_on_demand.h @@ -0,0 +1,139 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include "rocksdb/file_system.h" + +namespace ROCKSDB_NAMESPACE { +// A FileSystem that links files to a local (destination) directory from a +// corresponding remote (source) directory on demand. The decision to link +// depends on the file type, with appendable or rename-able files, such as, +// descriptors, logs, CURRENT, being read in place in the remote directory, +// and SST files being linked. In the future, files read in place may be +// mirrored to the local directory, so the local dir has a complete database +// for troubleshooting purposes. + +class OnDemandFileSystem : public FileSystemWrapper { + public: + OnDemandFileSystem(const std::shared_ptr& target, + const std::string& remote_path, + const std::string& local_path) + : FileSystemWrapper(target), + remote_path_(remote_path), + local_path_(local_path) {} + + const char* Name() const override { return "OnDemandFileSystem"; } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus ReuseWritableFile(const std::string& /*fname*/, + const std::string& /*old_fname*/, + const FileOptions& /*fopts*/, + std::unique_ptr* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("ReuseWritableFile"); + } + + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus FileExists(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus GetChildren(const std::string& dir, const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetFileSize(const std::string& fname, const IOOptions& options, + uint64_t* file_size, IODebugContext* dbg) override; + + private: + bool CheckPathAndAdjust(const std::string& orig, const std::string& replace, + std::string& path); + bool LookupFileType(const std::string& name, FileType* type); + + const std::string remote_path_; + const std::string local_path_; +}; + +// A wrapper class around an FSSequentialFile object. Its mainly +// intended to be used for appendable files like MANIFEST and logs. +// Beneath the covers, it tracks when EOF is reached, and reopens +// the file in order to read the latest appended data. This is +// necessary on some distributed file systems as they may have +// stale metadata about the file. +// TODO: Mirror the data read to a local file for troubleshooting +// purposes, as well as recovery in case the source dir is +// deleted. +class OnDemandSequentialFile : public FSSequentialFile { + public: + OnDemandSequentialFile(std::unique_ptr&& file, + OnDemandFileSystem* fs, const FileOptions& file_opts, + const std::string& path) + : file_(std::move(file)), + fs_(fs), + file_opts_(file_opts), + path_(path), + eof_(false), + offset_(0) {} + + virtual ~OnDemandSequentialFile() {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override; + + IOStatus Skip(uint64_t n) override; + + bool use_direct_io() const override; + + size_t GetRequiredBufferAlignment() const override; + + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::NotSupported("InvalidateCache not supported."); + } + + IOStatus PositionedRead(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, Slice* /*result*/, + char* /*scratch*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("PositionedRead"); + } + + Temperature GetTemperature() const override; + + private: + std::unique_ptr file_; + OnDemandFileSystem* fs_; + const FileOptions file_opts_; + const std::string path_; + bool eof_; + uint64_t offset_; +}; + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string remote_path, + std::string local_path); + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 54343236e9..e413fa044d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -298,6 +298,29 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + // EXPERIMENTAL + + // Open a database as a follower. The difference between this and opening + // as secondary is that the follower database has its own directory with + // links to the actual files, and can tolarate obsolete file deletions by + // the leader to its own database. Another difference is the follower + // tries to keep up with the leader by periodically tailing the leader's + // MANIFEST, and (in the future) memtable updates, rather than relying on + // the user to manually call TryCatchupWithPrimary(). + + // Open as a follower with the default column family + static Status OpenAsFollower(const Options& options, const std::string& name, + const std::string& leader_path, + std::unique_ptr* dbptr); + + // Open as a follower with multiple column families + static Status OpenAsFollower( + const DBOptions& db_options, const std::string& name, + const std::string& leader_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr); + // End EXPERIMENTAL + // Open DB and run the compaction. // It's a read-only operation, the result won't be installed to the DB, it // will be output to the `output_directory`. The API should only be used with diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index d56eed1adc..8d21c91946 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -702,6 +702,16 @@ class FileSystem : public Customizable { return IOStatus::OK(); } + // EXPERIMENTAL + // Discard any directory metadata cached in memory for the specified + // directory and its descendants. Useful for distributed file systems + // where the local cache may be out of sync with the actual directory state. + // + // The implementation is not required to be thread safe. Its the caller's + // responsibility to ensure that no directory operations happen + // concurrently. + virtual void DiscardCacheForDirectory(const std::string& /*path*/) {} + // Indicates to upper layers which FileSystem operations mentioned in // FSSupportedOps are supported by underlying FileSystem. Each bit in // supported_ops argument represent corresponding FSSupportedOps operation. @@ -1624,6 +1634,10 @@ class FileSystemWrapper : public FileSystem { return target_->AbortIO(io_handles); } + void DiscardCacheForDirectory(const std::string& path) override { + target_->DiscardCacheForDirectory(path); + } + void SupportedOps(int64_t& supported_ops) override { return target_->SupportedOps(supported_ops); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b10dc01526..5de241112e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1487,6 +1487,30 @@ struct DBOptions { // use "0:00-23:59". To make an entire day have no offpeak period, leave // this field blank. Default: Empty string (no offpeak). std::string daily_offpeak_time_utc = ""; + + // EXPERIMENTAL + + // When a RocksDB database is opened in follower mode, this option + // is set by the user to request the frequency of the follower + // attempting to refresh its view of the leader. RocksDB may choose to + // trigger catch ups more frequently if it detects any changes in the + // database state. + // Default every 10s. + uint64_t follower_refresh_catchup_period_ms = 10000; + + // For a given catch up attempt, this option specifies the number of times + // to tail the MANIFEST and try to install a new, consistent version before + // giving up. Though it should be extremely rare, the catch up may fail if + // the leader is mutating the LSM at a very high rate and the follower is + // unable to get a consistent view. + // Default to 10 attempts + uint64_t follower_catchup_retry_count = 10; + + // Time to wait between consecutive catch up attempts + // Default 100ms + uint64_t follower_catchup_retry_wait_ms = 100; + + // End EXPERIMENTAL }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index cdc97cd1f7..6da12a1215 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -559,6 +559,19 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"follower_refresh_catchup_period_ms", + {offsetof(struct ImmutableDBOptions, + follower_refresh_catchup_period_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_count", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_count), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_wait_ms", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_wait_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -756,7 +769,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), compaction_service(options.compaction_service), - enforce_single_del_contracts(options.enforce_single_del_contracts) { + enforce_single_del_contracts(options.enforce_single_del_contracts), + follower_refresh_catchup_period_ms( + options.follower_refresh_catchup_period_ms), + follower_catchup_retry_count(options.follower_catchup_retry_count), + follower_catchup_retry_wait_ms(options.follower_catchup_retry_wait_ms) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); logger = info_log.get(); diff --git a/options/db_options.h b/options/db_options.h index b0432df81f..ff7ddc880f 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -104,6 +104,9 @@ struct ImmutableDBOptions { Logger* logger; std::shared_ptr compaction_service; bool enforce_single_del_contracts; + uint64_t follower_refresh_catchup_period_ms; + uint64_t follower_catchup_retry_count; + uint64_t follower_catchup_retry_wait_ms; bool IsWalDirSameAsDBPath() const; bool IsWalDirSameAsDBPath(const std::string& path) const; diff --git a/src.mk b/src.mk index 365f5ba777..23cf348e1e 100644 --- a/src.mk +++ b/src.mk @@ -53,6 +53,7 @@ LIB_SOURCES = \ db/db_impl/db_impl_debug.cc \ db/db_impl/db_impl_experimental.cc \ db/db_impl/db_impl_files.cc \ + db/db_impl/db_impl_follower.cc \ db/db_impl/db_impl_open.cc \ db/db_impl/db_impl_readonly.cc \ db/db_impl/db_impl_secondary.cc \ @@ -109,6 +110,7 @@ LIB_SOURCES = \ env/env_encryption.cc \ env/env_posix.cc \ env/file_system.cc \ + env/fs_on_demand.cc \ env/fs_posix.cc \ env/fs_remap.cc \ env/file_system_tracer.cc \ @@ -474,6 +476,7 @@ TEST_MAIN_SOURCES = \ db/db_dynamic_level_test.cc \ db/db_encryption_test.cc \ db/db_flush_test.cc \ + db/db_follower_test.cc \ db/db_readonly_with_timestamp_test.cc \ db/db_with_timestamp_basic_test.cc \ db/import_column_family_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index d47ffb5385..ce14bce5da 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1135,6 +1135,11 @@ DEFINE_int32(secondary_update_interval, 5, "Secondary instance attempts to catch up with the primary every " "secondary_update_interval seconds."); +DEFINE_bool(open_as_follower, false, + "Open a RocksDB DB as a follower. The leader instance can be " + "running in another db_bench process."); + +DEFINE_string(leader_path, "", "Path to the directory of the leader DB"); DEFINE_bool(report_bg_io_stats, false, "Measure times spents on I/Os while in compactions. "); @@ -4979,6 +4984,12 @@ class Benchmark { }, FLAGS_secondary_update_interval, db)); } + } else if (FLAGS_open_as_follower) { + std::unique_ptr dbptr; + s = DB::OpenAsFollower(options, db_name, FLAGS_leader_path, &dbptr); + if (s.ok()) { + db->db = dbptr.release(); + } } else { s = DB::Open(options, db_name, &db->db); }