diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 1749b52ba7..3c0ad9f255 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -96,3 +96,13 @@ jobs: steps: - uses: actions/checkout@v4.1.0 - uses: "./.github/actions/windows-build-steps" + build-linux-arm-test-full: + if: ${{ github.repository_owner == 'facebook' }} + runs-on: + labels: 4-core-ubuntu-arm + steps: + - uses: actions/checkout@v4.1.0 + - uses: "./.github/actions/pre-steps" + - run: sudo apt-get update && sudo apt-get install -y build-essential libgflags-dev + - run: make V=1 J=4 -j4 check + - uses: "./.github/actions/post-steps" diff --git a/.github/workflows/pr-jobs.yml b/.github/workflows/pr-jobs.yml index 00f1305a2e..eaab8e6d73 100644 --- a/.github/workflows/pr-jobs.yml +++ b/.github/workflows/pr-jobs.yml @@ -607,3 +607,13 @@ jobs: with: name: maven-site path: "${{ github.workspace }}/java/target/site" + build-linux-arm: + if: ${{ github.repository_owner == 'facebook' }} + runs-on: + labels: 4-core-ubuntu-arm + steps: + - uses: actions/checkout@v4.1.0 + - uses: "./.github/actions/pre-steps" + - run: sudo apt-get update && sudo apt-get install -y build-essential + - run: ROCKSDBTESTS_PLATFORM_DEPENDENT=only make V=1 J=4 -j4 all_but_some_tests check_some + - uses: "./.github/actions/post-steps" diff --git a/CMakeLists.txt b/CMakeLists.txt index 576ad4e5ee..50cc40b0d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1037,8 +1037,10 @@ endif() else() list(APPEND SOURCES + db/db_impl/db_impl_follower.cc port/port_posix.cc env/env_posix.cc + env/fs_on_demand.cc env/fs_posix.cc env/io_posix.cc) endif() @@ -1363,6 +1365,7 @@ if(WITH_TESTS) db/file_indexer_test.cc db/filename_test.cc db/flush_job_test.cc + db/db_follower_test.cc db/import_column_family_test.cc db/listener_test.cc db/log_test.cc diff --git a/Makefile b/Makefile index 850c18af4f..f22727f925 100644 --- a/Makefile +++ b/Makefile @@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $( db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 7cb9f793e8..cdfcdc701c 100644 --- a/TARGETS +++ b/TARGETS @@ -60,6 +60,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/db_impl/db_impl_debug.cc", "db/db_impl/db_impl_experimental.cc", "db/db_impl/db_impl_files.cc", + "db/db_impl/db_impl_follower.cc", "db/db_impl/db_impl_open.cc", "db/db_impl/db_impl_readonly.cc", "db/db_impl/db_impl_secondary.cc", @@ -117,6 +118,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", + "env/fs_on_demand.cc", "env/fs_posix.cc", "env/fs_remap.cc", "env/io_posix.cc", @@ -4795,6 +4797,12 @@ cpp_unittest_wrapper(name="db_flush_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_follower_test", + srcs=["db/db_follower_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_inplace_update_test", srcs=["db/db_inplace_update_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/build_tools/fbcode_config.sh b/build_tools/fbcode_config.sh index fa629af978..02732bde3d 100644 --- a/build_tools/fbcode_config.sh +++ b/build_tools/fbcode_config.sh @@ -113,7 +113,7 @@ CLANG_LIB="$CLANG_BASE/lib" CLANG_SRC="$CLANG_BASE/../../src" CLANG_ANALYZER="$CLANG_BIN/clang++" -CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build" +CLANG_SCAN_BUILD="$CLANG_BIN/scan-build if [ -z "$USE_CLANG" ]; then # gcc diff --git a/build_tools/fbcode_config_platform010.sh b/build_tools/fbcode_config_platform010.sh index 25835d0910..143954d210 100644 --- a/build_tools/fbcode_config_platform010.sh +++ b/build_tools/fbcode_config_platform010.sh @@ -110,7 +110,7 @@ CLANG_LIB="$CLANG_BASE/lib" CLANG_SRC="$CLANG_BASE/../../src" CLANG_ANALYZER="$CLANG_BIN/clang++" -CLANG_SCAN_BUILD="$CLANG_SRC/llvm/clang/tools/scan-build/bin/scan-build" +CLANG_SCAN_BUILD="$CLANG_BIN/scan-build" if [ -z "$USE_CLANG" ]; then # gcc diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 129b29c99f..5a8100041f 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -441,6 +441,50 @@ TEST_F(CompactFilesTest, SentinelCompressionType) { } } +TEST_F(CompactFilesTest, CompressionWithBlockAlign) { + Options options; + options.compression = CompressionType::kNoCompression; + options.create_if_missing = true; + options.disable_auto_compactions = true; + + std::shared_ptr collector = + std::make_shared(); + options.listeners.push_back(collector); + + { + BlockBasedTableOptions bbto; + bbto.block_align = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + } + + std::unique_ptr db; + { + DB* _db = nullptr; + ASSERT_OK(DB::Open(options, db_name_, &_db)); + db.reset(_db); + } + + ASSERT_OK(db->Put(WriteOptions(), "key", "val")); + ASSERT_OK(db->Flush(FlushOptions())); + + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK( + static_cast_with_check(db.get())->TEST_WaitForBackgroundWork()); + auto l0_files = collector->GetFlushedFiles(); + ASSERT_EQ(1, l0_files.size()); + + // We can run this test even without Snappy support because we expect the + // `CompactFiles()` to fail before actually invoking Snappy compression. + CompactionOptions compaction_opts; + compaction_opts.compression = CompressionType::kSnappyCompression; + ASSERT_TRUE(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */) + .IsInvalidArgument()); + + compaction_opts.compression = CompressionType::kDisableCompressionOption; + ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */)); +} + TEST_F(CompactFilesTest, GetCompactionJobInfo) { Options options; options.create_if_missing = true; diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index dec2cfb9ca..7a386088a2 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { // After MultiGet refs a couple of CFs, flush all CFs so MultiGet // is forced to repeat the process @@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { int retries = 0; bool last_try = false; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; }); + "DBImpl::MultiCFSnapshot::LastTry", + [&](void* /*arg*/) { last_try = true; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (last_try) { return; } @@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ - {"DBImpl::MultiGet::AfterLastTryRefSV", + {"DBImpl::MultiCFSnapshot::AfterLastTryRefSV", "DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"}, {"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV", - "DBImpl::MultiGet::BeforeLastTryUnRefSV"}, + "DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { for (int i = 0; i < 8; ++i) { ASSERT_OK(Flush(i)); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 05cc0e3c8b..e03093077f 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3997,7 +3997,7 @@ TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) { Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); } -TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) { +TEST_F(DBCompactionTest, CancelCompactionWaitingOnRunningConflict) { // This test verifies cancellation of a compaction waiting to be scheduled due // to conflict with a running compaction. // @@ -4036,7 +4036,7 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) { // Make sure the manual compaction has seen the conflict before being canceled ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"ColumnFamilyData::CompactRange:Return", - "DBCompactionTest::CancelCompactionWaitingOnConflict:" + "DBCompactionTest::CancelCompactionWaitingOnRunningConflict:" "PreDisableManualCompaction"}}); auto manual_compaction_thread = port::Thread([this]() { ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) @@ -4047,12 +4047,73 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) { // despite finding a conflict with an automatic compaction that is still // running TEST_SYNC_POINT( - "DBCompactionTest::CancelCompactionWaitingOnConflict:" + "DBCompactionTest::CancelCompactionWaitingOnRunningConflict:" "PreDisableManualCompaction"); db_->DisableManualCompaction(); manual_compaction_thread.join(); } +TEST_F(DBCompactionTest, CancelCompactionWaitingOnScheduledConflict) { + // This test verifies cancellation of a compaction waiting to be scheduled due + // to conflict with a scheduled (but not running) compaction. + // + // A `CompactRange()` in universal compacts all files, waiting for files to + // become available if they are locked for another compaction. This test + // blocks the compaction thread pool and then calls `CompactRange()` twice. + // The first call to `CompactRange()` schedules a compaction that is queued + // in the thread pool. The second call to `CompactRange()` blocks on the first + // call due to the conflict in file picking. The test verifies that + // `DisableManualCompaction()` can cancel both while the thread pool remains + // blocked. + const int kNumSortedRuns = 4; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.disable_auto_compactions = true; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); + Reopen(options); + + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + // Fill overlapping files in L0 + Random rnd(301); + for (int i = 0; i < kNumSortedRuns; ++i) { + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx, false /* nowait */); + } + + std::atomic num_compact_range_calls{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ColumnFamilyData::CompactRange:Return", + [&](void* /* arg */) { num_compact_range_calls++; }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + const int kNumManualCompactions = 2; + port::Thread manual_compaction_threads[kNumManualCompactions]; + for (int i = 0; i < kNumManualCompactions; i++) { + manual_compaction_threads[i] = port::Thread([this]() { + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) + .IsIncomplete()); + }); + } + while (num_compact_range_calls < kNumManualCompactions) { + } + + // Cancel it. Threads should be joinable, i.e., both the scheduled and blocked + // manual compactions were canceled despite no compaction could have ever run. + db_->DisableManualCompaction(); + for (int i = 0; i < kNumManualCompactions; i++) { + manual_compaction_threads[i].join(); + } + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); +} + TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) { // Deletions can be dropped when compacted to non-last level if they fall // outside the lower-level files' key-ranges. diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc new file mode 100644 index 0000000000..86bf8cc7c5 --- /dev/null +++ b/db/db_follower_test.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef OS_LINUX + +class DBFollowerTest : public DBTestBase { + public: + // Create directories for leader and follower + // Create the leader DB object + DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) { + follower_name_ = dbname_ + "/follower"; + Close(); + Destroy(CurrentOptions()); + EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); + dbname_ = dbname_ + "/leader"; + Reopen(CurrentOptions()); + } + + ~DBFollowerTest() { + follower_.reset(); + EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + } + + protected: + Status OpenAsFollower() { + return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, + &follower_); + } + DB* follower() { return follower_.get(); } + + private: + std::string follower_name_; + std::unique_ptr follower_; +}; + +TEST_F(DBFollowerTest, Basic) { + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + + ASSERT_OK(OpenAsFollower()); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); +} + +#endif +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 93342d3642..12b0afc2bb 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2517,12 +2517,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, return s; } -template -Status DBImpl::MultiCFSnapshot( - const ReadOptions& read_options, ReadCallback* callback, - std::function& - iter_deref_func, - T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local) { +template +Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, + ReadCallback* callback, + IterDerefFuncType iter_deref_func, T* cf_list, + bool extra_sv_ref, SequenceNumber* snapshot, + bool* sv_from_thread_local) { PERF_TIMER_GUARD(get_snapshot_time); assert(sv_from_thread_local); @@ -2539,7 +2539,7 @@ Status DBImpl::MultiCFSnapshot( SuperVersion* super_version = node->super_version; ColumnFamilyData* cfd = node->cfd; if (super_version != nullptr) { - if (*sv_from_thread_local) { + if (*sv_from_thread_local && !extra_sv_ref) { ReturnAndCleanupSuperVersion(cfd, super_version); } else { CleanupSuperVersion(super_version); @@ -2555,7 +2555,11 @@ Status DBImpl::MultiCFSnapshot( // super version auto cf_iter = cf_list->begin(); auto node = iter_deref_func(cf_iter); - node->super_version = GetAndRefSuperVersion(node->cfd); + if (extra_sv_ref) { + node->super_version = node->cfd->GetReferencedSuperVersion(this); + } else { + node->super_version = GetAndRefSuperVersion(node->cfd); + } if (check_read_ts) { s = FailIfReadCollapsedHistory(node->cfd, node->super_version, *(read_options.timestamp)); @@ -2602,7 +2606,7 @@ Status DBImpl::MultiCFSnapshot( } if (read_options.snapshot == nullptr) { if (last_try) { - TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry"); // We're close to max number of retries. For the last retry, // acquire the lock so we're sure to succeed mutex_.Lock(); @@ -2617,11 +2621,15 @@ Status DBImpl::MultiCFSnapshot( ++cf_iter) { auto node = iter_deref_func(cf_iter); if (!last_try) { - node->super_version = GetAndRefSuperVersion(node->cfd); + if (extra_sv_ref) { + node->super_version = node->cfd->GetReferencedSuperVersion(this); + } else { + node->super_version = GetAndRefSuperVersion(node->cfd); + } } else { node->super_version = node->cfd->GetSuperVersion()->Ref(); } - TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV"); if (check_read_ts) { s = FailIfReadCollapsedHistory(node->cfd, node->super_version, *(read_options.timestamp)); @@ -2635,6 +2643,7 @@ Status DBImpl::MultiCFSnapshot( break; } } + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"); if (read_options.snapshot != nullptr || last_try) { // If user passed a snapshot, then we don't care if a memtable is // sealed or compaction happens because the snapshot would ensure @@ -2658,7 +2667,7 @@ Status DBImpl::MultiCFSnapshot( if (!retry) { if (last_try) { mutex_.Unlock(); - TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV"); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV"); } break; } @@ -2770,37 +2779,37 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, } PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys); - autovector - multiget_cf_data; + autovector + key_range_per_cf; + autovector + cf_sv_pairs; size_t cf_start = 0; ColumnFamilyHandle* cf = sorted_keys[0]->column_family; for (size_t i = 0; i < num_keys; ++i) { KeyContext* key_ctx = sorted_keys[i]; if (key_ctx->column_family != cf) { - multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr); + key_range_per_cf.emplace_back(cf_start, i - cf_start); + cf_sv_pairs.emplace_back(cf, nullptr); cf_start = i; cf = key_ctx->column_family; } } - multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr); + key_range_per_cf.emplace_back(cf_start, num_keys - cf_start); + cf_sv_pairs.emplace_back(cf, nullptr); - std::function::iterator&)> - iter_deref_lambda = - [](autovector::iterator& cf_iter) { - return &(*cf_iter); - }; - - SequenceNumber consistent_seqnum; - bool sv_from_thread_local; - Status s = MultiCFSnapshot< - autovector>( - read_options, nullptr, iter_deref_lambda, &multiget_cf_data, - &consistent_seqnum, &sv_from_thread_local); + SequenceNumber consistent_seqnum = kMaxSequenceNumber; + bool sv_from_thread_local = false; + Status s = MultiCFSnapshot>( + read_options, nullptr, + [](autovector::iterator& cf_iter) { + return &(*cf_iter); + }, + &cf_sv_pairs, + /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { for (size_t i = 0; i < num_keys; ++i) { @@ -2818,31 +2827,40 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, read_callback = ×tamp_read_callback; } - auto cf_iter = multiget_cf_data.begin(); - for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { - s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, - &sorted_keys, cf_iter->super_version, consistent_seqnum, + assert(key_range_per_cf.size() == cf_sv_pairs.size()); + auto key_range_per_cf_iter = key_range_per_cf.begin(); + auto cf_sv_pair_iter = cf_sv_pairs.begin(); + while (key_range_per_cf_iter != key_range_per_cf.end() && + cf_sv_pair_iter != cf_sv_pairs.end()) { + s = MultiGetImpl(read_options, key_range_per_cf_iter->start, + key_range_per_cf_iter->num_keys, &sorted_keys, + cf_sv_pair_iter->super_version, consistent_seqnum, read_callback); if (!s.ok()) { break; } + ++key_range_per_cf_iter; + ++cf_sv_pair_iter; } if (!s.ok()) { assert(s.IsTimedOut() || s.IsAborted()); - for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) { - for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys; + for (++key_range_per_cf_iter; + key_range_per_cf_iter != key_range_per_cf.end(); + ++key_range_per_cf_iter) { + for (size_t i = key_range_per_cf_iter->start; + i < key_range_per_cf_iter->start + key_range_per_cf_iter->num_keys; ++i) { *sorted_keys[i]->s = s; } } } - for (const auto& iter : multiget_cf_data) { + for (const auto& cf_sv_pair : cf_sv_pairs) { if (sv_from_thread_local) { - ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version); + ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version); } else { - TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV"); - CleanupSuperVersion(iter.super_version); + TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"); + CleanupSuperVersion(cf_sv_pair.super_version); } } } @@ -2974,21 +2992,18 @@ void DBImpl::MultiGetWithCallbackImpl( const ReadOptions& read_options, ColumnFamilyHandle* column_family, ReadCallback* callback, autovector* sorted_keys) { - std::array multiget_cf_data; - multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr); - std::function::iterator&)> - iter_deref_lambda = - [](std::array::iterator& cf_iter) { - return &(*cf_iter); - }; - + std::array cf_sv_pairs; + cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr); size_t num_keys = sorted_keys->size(); - SequenceNumber consistent_seqnum; - bool sv_from_thread_local; - Status s = MultiCFSnapshot>( - read_options, callback, iter_deref_lambda, &multiget_cf_data, - &consistent_seqnum, &sv_from_thread_local); + SequenceNumber consistent_seqnum = kMaxSequenceNumber; + bool sv_from_thread_local = false; + Status s = MultiCFSnapshot>( + read_options, callback, + [](std::array::iterator& cf_iter) { + return &(*cf_iter); + }, + &cf_sv_pairs, + /* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { return; } @@ -3027,11 +3042,11 @@ void DBImpl::MultiGetWithCallbackImpl( } s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, - multiget_cf_data[0].super_version, consistent_seqnum, + cf_sv_pairs[0].super_version, consistent_seqnum, read_callback); assert(s.ok() || s.IsTimedOut() || s.IsAborted()); - ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, - multiget_cf_data[0].super_version); + ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd, + cf_sv_pairs[0].super_version); } // The actual implementation of batched MultiGet. Parameters - @@ -3813,69 +3828,62 @@ Status DBImpl::NewIterators( "ReadTier::kPersistedData is not yet supported in iterators."); } - if (read_options.timestamp) { - for (auto* cf : column_families) { - assert(cf); - const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp)); - if (!s.ok()) { - return s; - } - } - } else { - for (auto* cf : column_families) { - assert(cf); - const Status s = FailIfCfHasTs(cf); - if (!s.ok()) { - return s; - } - } - } + autovector + cf_sv_pairs; + Status s; + for (auto* cf : column_families) { + assert(cf); + if (read_options.timestamp) { + s = FailIfTsMismatchCf(cf, *(read_options.timestamp)); + } else { + s = FailIfCfHasTs(cf); + } + if (!s.ok()) { + return s; + } + cf_sv_pairs.emplace_back(cf, nullptr); + } iterators->clear(); iterators->reserve(column_families.size()); - autovector> cfh_to_sv; - const bool check_read_ts = - read_options.timestamp && read_options.timestamp->size() > 0; - for (auto cf : column_families) { - auto cfh = static_cast_with_check(cf); - auto cfd = cfh->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - cfh_to_sv.emplace_back(cfh, sv); - if (check_read_ts) { - const Status s = - FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp)); - if (!s.ok()) { - for (auto prev_entry : cfh_to_sv) { - CleanupSuperVersion(std::get<1>(prev_entry)); - } - return s; - } - } - } - assert(cfh_to_sv.size() == column_families.size()); - if (read_options.tailing) { - for (auto [cfh, sv] : cfh_to_sv) { - auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv, - /* allow_unprepared_value */ true); - iterators->push_back(NewDBIterator( - env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options, - cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - nullptr /*read_callback*/, cfh)); - } - } else { - // Note: no need to consider the special case of - // last_seq_same_as_publish_seq_==false since NewIterators is overridden - // in WritePreparedTxnDB - auto snapshot = read_options.snapshot != nullptr - ? read_options.snapshot->GetSequenceNumber() - : versions_->LastSequence(); - for (auto [cfh, sv] : cfh_to_sv) { - iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot, - nullptr /*read_callback*/)); - } + + SequenceNumber consistent_seqnum = kMaxSequenceNumber; + bool sv_from_thread_local = false; + s = MultiCFSnapshot>( + read_options, nullptr /* read_callback*/, + [](autovector::iterator& cf_iter) { + return &(*cf_iter); + }, + &cf_sv_pairs, + /* extra_sv_ref */ true, &consistent_seqnum, &sv_from_thread_local); + if (!s.ok()) { + return s; } + assert(cf_sv_pairs.size() == column_families.size()); + if (read_options.tailing) { + for (const auto& cf_sv_pair : cf_sv_pairs) { + auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd, + cf_sv_pair.super_version, + /* allow_unprepared_value */ true); + iterators->push_back( + NewDBIterator(env_, read_options, *cf_sv_pair.cfd->ioptions(), + cf_sv_pair.super_version->mutable_cf_options, + cf_sv_pair.cfd->user_comparator(), iter, + cf_sv_pair.super_version->current, kMaxSequenceNumber, + cf_sv_pair.super_version->mutable_cf_options + .max_sequential_skip_in_iterations, + nullptr /*read_callback*/, cf_sv_pair.cfh)); + } + } else { + for (const auto& cf_sv_pair : cf_sv_pairs) { + iterators->push_back(NewIteratorImpl( + read_options, cf_sv_pair.cfh, cf_sv_pair.super_version, + consistent_seqnum, nullptr /*read_callback*/)); + } + } return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e0ac43ddb9..504d7ec608 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1535,7 +1535,7 @@ class DBImpl : public DB { Status WriteRecoverableState(); // Actual implementation of Close() - Status CloseImpl(); + virtual Status CloseImpl(); // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to @@ -2337,10 +2337,7 @@ class DBImpl : public DB { // A structure to hold the information required to process MultiGet of keys // belonging to one column family. For a multi column family MultiGet, there // will be a container of these objects. - struct MultiGetColumnFamilyData { - ColumnFamilyHandle* cf; - ColumnFamilyData* cfd; - + struct MultiGetKeyRangePerCf { // For the batched MultiGet which relies on sorted keys, start specifies // the index of first key belonging to this column family in the sorted // list. @@ -2350,31 +2347,33 @@ class DBImpl : public DB { // belonging to this column family in the sorted list size_t num_keys; + MultiGetKeyRangePerCf() : start(0), num_keys(0) {} + + MultiGetKeyRangePerCf(size_t first, size_t count) + : start(first), num_keys(count) {} + }; + + // A structure to contain ColumnFamilyData and the SuperVersion obtained for + // the consistent view of DB + struct ColumnFamilySuperVersionPair { + ColumnFamilyHandleImpl* cfh; + ColumnFamilyData* cfd; + // SuperVersion for the column family obtained in a manner that ensures a // consistent view across all column families in the DB SuperVersion* super_version; - MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, - SuperVersion* sv) - : cf(column_family), - cfd(static_cast(cf)->cfd()), - start(0), - num_keys(0), + ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family, + SuperVersion* sv) + : cfh(static_cast(column_family)), + cfd(cfh->cfd()), super_version(sv) {} - MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first, - size_t count, SuperVersion* sv) - : cf(column_family), - cfd(static_cast(cf)->cfd()), - start(first), - num_keys(count), - super_version(sv) {} - - MultiGetColumnFamilyData() = default; + ColumnFamilySuperVersionPair() = default; }; // A common function to obtain a consistent snapshot, which can be implicit // if the user doesn't specify a snapshot in read_options, across - // multiple column families for MultiGet. It will attempt to get an implicit + // multiple column families. It will attempt to get an implicit // snapshot without acquiring the db_mutes, but will give up after a few // tries and acquire the mutex if a memtable flush happens. The template // allows both the batched and non-batched MultiGet to call this with @@ -2383,18 +2382,26 @@ class DBImpl : public DB { // If callback is non-null, the callback is refreshed with the snapshot // sequence number // + // `extra_sv_ref` is used to indicate whether thread-local SuperVersion + // should be obtained with an extra ref (by GetReferencedSuperVersion()) or + // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet + // does not require SuperVersion to be re-acquired throughout the entire + // invocation (no need extra ref), while MultiCfIterators may need the + // SuperVersion to be updated during Refresh() (requires extra ref). + // // `sv_from_thread_local` being set to false indicates that the SuperVersion // obtained from the ColumnFamilyData, whereas true indicates they are thread // local. + // // A non-OK status will be returned if for a column family that enables // user-defined timestamp feature, the specified `ReadOptions.timestamp` // attemps to read collapsed history. - template - Status MultiCFSnapshot( - const ReadOptions& read_options, ReadCallback* callback, - std::function& - iter_deref_func, - T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local); + template + Status MultiCFSnapshot(const ReadOptions& read_options, + ReadCallback* callback, + IterDerefFuncType iter_deref_func, T* cf_list, + bool extra_sv_ref, SequenceNumber* snapshot, + bool* sv_from_thread_local); // The actual implementation of the batching MultiGet. The caller is expected // to have acquired the SuperVersion and pass in a snapshot sequence number diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 43137cd87e..c3e29c200b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1420,6 +1420,14 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log.get()); + if (compact_options.compression != + CompressionType::kDisableCompressionOption) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] [JOB %d] Found use of deprecated option " + "`CompactionOptions::compression`", + cfd->GetName().c_str(), job_context.job_id); + } + // Perform CompactFiles TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); TEST_SYNC_POINT_CALLBACK("TestCompactFiles:PausingManualCompaction:3", @@ -2164,16 +2172,6 @@ Status DBImpl::RunManualCompaction( manual.begin, manual.end, &manual.manual_end, &manual_conflict, max_file_num_to_ignore, trim_ts)) == nullptr && manual_conflict))) { - if (!scheduled) { - // There is a conflicting compaction - if (manual_compaction_paused_ > 0 || manual.canceled == true) { - // Stop waiting since it was canceled. Pretend the error came from - // compaction so the below cleanup/error handling code can process it. - manual.done = true; - manual.status = - Status::Incomplete(Status::SubCode::kManualCompactionPaused); - } - } if (!manual.done) { bg_cv_.Wait(); } @@ -2248,6 +2246,17 @@ Status DBImpl::RunManualCompaction( *final_output_level = compaction->output_level(); } } + if (!scheduled) { + // There is nothing scheduled to wait on, so any cancellation can end the + // manual now. + if (manual_compaction_paused_ > 0 || manual.canceled == true) { + // Stop waiting since it was canceled. Pretend the error came from + // compaction so the below cleanup/error handling code can process it. + manual.done = true; + manual.status = + Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + } } log_buffer.FlushBufferToLog(); diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc new file mode 100644 index 0000000000..8d21f530cd --- /dev/null +++ b/db/db_impl/db_impl_follower.cc @@ -0,0 +1,310 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl/db_impl_follower.h" + +#include + +#include "db/arena_wrapped_db_iter.h" +#include "db/merge_context.h" +#include "env/composite_env_wrapper.h" +#include "env/fs_on_demand.h" +#include "logging/auto_roll_logger.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/configurable.h" +#include "rocksdb/db.h" +#include "util/cast_util.h" +#include "util/write_batch_util.h" + +namespace ROCKSDB_NAMESPACE { + +DBImplFollower::DBImplFollower(const DBOptions& db_options, + std::unique_ptr&& env, + const std::string& dbname, std::string src_path) + : DBImplSecondary(db_options, dbname, ""), + env_guard_(std::move(env)), + stop_requested_(false), + src_path_(std::move(src_path)), + cv_(&mu_) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in follower mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplFollower::~DBImplFollower() { + Status s = Close(); + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s", + s.ToString().c_str()); + } +} + +// Recover a follower DB instance by reading the MANIFEST. The verification +// as part of the MANIFEST replay will ensure that local links to the +// leader's files are created, thus ensuring we can continue reading them +// even if the leader deletes those files due to compaction. +// TODO: +// 1. Devise a mechanism to prevent misconfiguration by, for example, +// keeping a local copy of the IDENTITY file and cross checking +// 2. Make the recovery more robust by retrying if the first attempt +// fails. +Status DBImplFollower::Recover( + const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) { + mutex_.AssertHeld(); + + JobContext job_context(0); + Status s; + s = static_cast(versions_.get()) + ->Recover(column_families, &manifest_reader_, &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + if (manifest_reader_status_) { + manifest_reader_status_->PermitUncheckedError(); + } + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + + // Start the periodic catch-up thread + // TODO: See if it makes sense to have a threadpool, rather than a thread + // per follower DB instance + catch_up_thread_.reset( + new port::Thread(&DBImplFollower::PeriodicRefresh, this)); + } + + return s; +} + +// Try to catch up by tailing the MANIFEST. +// TODO: +// 1. Cleanup obsolete files afterward +// 2. Add some error notifications and statistics +Status DBImplFollower::TryCatchUpWithLeader() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + // read the manifest and apply new changes to the follower instance + std::unordered_set cfds_changed; + JobContext job_context(0, true /*create_superversion*/); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, + manifest_reader_status_.get(), &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + + if (s.ok()) { + for (auto cfd : cfds_changed) { + if (cfd->mem()->GetEarliestSequenceNumber() < + versions_->LastSequence()) { + // Construct a new memtable with earliest sequence number set to the + // last sequence number in the VersionSet. This matters when + // DBImpl::MultiCFSnapshot tries to get consistent references + // to super versions in a lock free manner, it checks the earliest + // sequence number to detect if there was a change in version in + // the meantime. + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + MemTable* new_mem = cfd->ConstructNewMemtable( + mutable_cf_options, versions_->LastSequence()); + cfd->mem()->SetNextLogNumber(cfd->GetLogNumber()); + cfd->mem()->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + } + + // This will check if the old memtable is still referenced + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } + } + } + job_context.Clean(); + + return s; +} + +void DBImplFollower::PeriodicRefresh() { + while (!stop_requested_.load()) { + MutexLock l(&mu_); + int64_t wait_until = + immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_refresh_catchup_period_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + if (stop_requested_.load()) { + break; + } + Status s; + for (uint64_t i = 0; + i < immutable_db_options_.follower_catchup_retry_count && + !stop_requested_.load(); + ++i) { + s = TryCatchUpWithLeader(); + + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Successful catch up on attempt %llu", + static_cast(i)); + break; + } + wait_until = immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_catchup_retry_wait_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + } + if (!s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful"); + } + } +} + +Status DBImplFollower::Close() { + if (catch_up_thread_) { + stop_requested_.store(true); + { + MutexLock l(&mu_); + cv_.SignalAll(); + } + catch_up_thread_->join(); + catch_up_thread_.reset(); + } + + return DBImpl::Close(); +} + +Status DB::OpenAsFollower(const Options& options, const std::string& dbname, + const std::string& leader_path, + std::unique_ptr* dbptr) { + dbptr->reset(); + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsFollower(db_options, dbname, leader_path, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsFollower( + const DBOptions& db_options, const std::string& dbname, + const std::string& src_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr) { + dbptr->reset(); + + FileSystem* fs = db_options.env->GetFileSystem().get(); + { + IOStatus io_s; + if (db_options.create_if_missing) { + io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr); + } else { + io_s = fs->FileExists(dbname, IOOptions(), nullptr); + } + if (!io_s.ok()) { + return static_cast(io_s); + } + } + std::unique_ptr new_env(new CompositeEnvWrapper( + db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(), + src_path, dbname))); + + DBOptions tmp_opts(db_options); + Status s; + tmp_opts.env = new_env.get(); + if (nullptr == tmp_opts.info_log) { + s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log); + if (!s.ok()) { + tmp_opts.info_log = nullptr; + return s; + } + } + + handles->clear(); + DBImplFollower* impl = + new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path); + impl->versions_.reset(new ReactiveVersionSet( + dbname, &impl->immutable_db_options_, impl->file_options_, + impl->table_cache_.get(), impl->write_buffer_manager_, + &impl->write_controller_, impl->io_tracer_)); + impl->column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); + impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); + + impl->mutex_.Lock(); + s = impl->Recover(column_families, /*read_only=*/true, + /*error_if_wal_file_exists=*/false, + /*error_if_data_exists_in_wals=*/false); + if (s.ok()) { + for (const auto& cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(false /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + dbptr->reset(impl); + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + static_cast_with_check(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h new file mode 100644 index 0000000000..60992c111e --- /dev/null +++ b/db/db_impl/db_impl_follower.h @@ -0,0 +1,53 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "db/db_impl/db_impl.h" +#include "db/db_impl/db_impl_secondary.h" +#include "logging/logging.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +class DBImplFollower : public DBImplSecondary { + public: + DBImplFollower(const DBOptions& db_options, std::unique_ptr&& env, + const std::string& dbname, std::string src_path); + ~DBImplFollower(); + + Status Close() override; + + protected: + bool OwnTablesAndLogs() const override { + // TODO: Change this to true once we've properly implemented file + // deletion for the read scaling case + return false; + } + + Status Recover(const std::vector& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, + bool /*is_retry*/ = false, uint64_t* = nullptr, + RecoveryContext* /*recovery_ctx*/ = nullptr, + bool* /*can_retry*/ = nullptr) override; + + private: + friend class DB; + + Status TryCatchUpWithLeader(); + void PeriodicRefresh(); + + std::unique_ptr env_guard_; + std::unique_ptr catch_up_thread_; + std::atomic stop_requested_; + std::string src_path_; + port::Mutex mu_; + port::CondVar cv_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 5801cf7f9e..124cee3f3b 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -277,6 +277,10 @@ class DBImplSecondary : public DBImpl { return false; } + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; + private: friend class DB; @@ -305,10 +309,6 @@ class DBImplSecondary : public DBImpl { const CompactionServiceInput& input, CompactionServiceResult* result); - std::unique_ptr manifest_reader_; - std::unique_ptr manifest_reporter_; - std::unique_ptr manifest_reader_status_; - // Cache log readers for each log number, used for continue WAL replay // after recovery std::map> log_readers_; diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7c3bdd850f..8247333b0b 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -3555,6 +3555,121 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) { iter->Reset(); } +TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + // After SV is obtained for the first CF, flush for the second CF + ASSERT_OK(Flush(1)); + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions read_options; + std::vector iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val_new"); + } + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SVs are no longer obsolete nor in use + for (int i = 0; i < 3; ++i) { + auto* cfd = + static_cast_with_check(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + +TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) { + Options options = GetDefaultOptions(); + options.atomic_flush = true; + CreateAndReopenWithCF({"cf_1", "cf_2"}, options); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + // After SV is obtained for the first CF, do the atomic flush() + ASSERT_OK(Flush()); + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Explicit snapshot wouldn't force reloading all svs. We should expect old + // values + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions read_options; + read_options.snapshot = snapshot; + std::vector iters; + ASSERT_OK(db_->NewIterators(read_options, handles_, &iters)); + + for (int i = 0; i < 3; ++i) { + auto iter = iters[i]; + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" + + std::to_string(i) + "_val"); + } + db_->ReleaseSnapshot(snapshot); + for (auto* iter : iters) { + delete iter; + } + + // Thread-local SV for cf_0 is obsolete (atomic flush happened after the first + // SV Ref) + auto* cfd0 = + static_cast_with_check(handles_[0])->cfd(); + ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + + // Rest are not InUse nor Obsolete + for (int i = 1; i < 3; ++i) { + auto* cfd = + static_cast_with_check(handles_[i])->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 385ccb43c7..5c8b6db2ba 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -313,6 +313,10 @@ TEST_F(DBMemTableTest, InsertWithHint) { ASSERT_EQ("foo_v3", Get("foo_k3")); ASSERT_EQ("bar_v1", Get("bar_k1")); ASSERT_EQ("bar_v2", Get("bar_k2")); + ASSERT_OK(db_->DeleteRange(WriteOptions(), "foo_k1", "foo_k4")); + ASSERT_EQ(hint_bar, rep->last_hint_in()); + ASSERT_EQ(hint_bar, rep->last_hint_out()); + ASSERT_EQ(5, rep->num_insert_with_hint()); ASSERT_EQ("vvv", Get("NotInPrefixDomain")); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index f464a3036b..abb34da1f0 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -795,7 +795,7 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { std::thread threads[10]; for (int t = 0; t < 10; t++) { threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, - no_wal_value, this] { + no_wal_value, &options, this] { for (int i = 0; i < 10; i++) { ROCKSDB_NAMESPACE::WriteOptions write_option_disable; write_option_disable.disableWAL = true; @@ -806,7 +806,10 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { std::string wal_key = wal_key_prefix + std::to_string(i) + "_" + std::to_string(i); ASSERT_OK(this->Put(wal_key, wal_value, write_option_default)); - ASSERT_OK(dbfull()->SyncWAL()); + ASSERT_OK(dbfull()->SyncWAL()) + << "options.env: " << options.env << ", env_: " << env_ + << ", env_->is_wal_sync_thread_safe_: " + << env_->is_wal_sync_thread_safe_.load(); } return; }); diff --git a/db/memtable.cc b/db/memtable.cc index 60037c9ba4..c3a6433dc1 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -765,8 +765,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type, Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_); if (!allow_concurrent) { - // Extract prefix for insert with hint. - if (insert_with_hint_prefix_extractor_ != nullptr && + // Extract prefix for insert with hint. Hints are for point key table + // (`table_`) only, not `range_del_table_`. + if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr && insert_with_hint_prefix_extractor_->InDomain(key_slice)) { Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc index 67ced9b219..f3094f358e 100644 --- a/db/multi_cf_iterator_test.cc +++ b/db/multi_cf_iterator_test.cc @@ -399,6 +399,113 @@ TEST_F(CoalescingIteratorTest, LowerAndUpperBounds) { } } +TEST_F(CoalescingIteratorTest, ConsistentViewExplicitSnapshot) { + Options options = GetDefaultOptions(); + options.atomic_flush = true; + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + ASSERT_OK(Flush()); + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + ReadOptions read_options; + const Snapshot* snapshot = db_->GetSnapshot(); + read_options.snapshot = snapshot; + // Verify Seek() + { + std::unique_ptr iter = + db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3); + iter->Seek(""); + ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val"); + } + // Verify SeekForPrev() + { + std::unique_ptr iter = + db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("cf2_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val"); + } + db_->ReleaseSnapshot(snapshot); +} + +TEST_F(CoalescingIteratorTest, ConsistentViewImplicitSnapshot) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkFlush:done", + "DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}}); + + bool flushed = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) { + if (!flushed) { + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val_new")); + } + ASSERT_OK(Flush(1)); + flushed = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + // Verify Seek() + { + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->Seek("cf2_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val_new"); + iter->Next(); + ASSERT_EQ(IterStatus(iter.get()), "cf3_key->cf3_val_new"); + } + // Verify SeekForPrev() + { + std::unique_ptr iter = + db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3); + iter->SeekForPrev(""); + ASSERT_EQ(IterStatus(iter.get()), "(invalid)"); + iter->SeekForPrev("cf1_key"); + ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val_new"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val_new"); + } +} + TEST_F(CoalescingIteratorTest, EmptyCfs) { Options options = GetDefaultOptions(); { diff --git a/env/fs_on_demand.cc b/env/fs_on_demand.cc new file mode 100644 index 0000000000..bac424264a --- /dev/null +++ b/env/fs_on_demand.cc @@ -0,0 +1,330 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "env/fs_on_demand.h" + +#include + +#include "file/filename.h" +#include "port/port.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { +// Check if the input path is under orig (typically the local directory), and if +// so, change it to the equivalent path under replace (typically the remote +// directory). For example, if orig is "/data/follower", replace is +// "/data/leader", and the given path is "/data/follower/000010.sst", on return +// the path would be changed to +// "/data/leader/000010.sst". +// Return value is true if the path was modified, false otherwise +bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig, + const std::string& replace, + std::string& path) { + size_t pos = path.find(orig); + if (pos > 0) { + return false; + } + path.replace(pos, orig.length(), replace); + return true; +} + +bool OnDemandFileSystem::LookupFileType(const std::string& name, + FileType* type) { + std::size_t found = name.find_last_of('/'); + std::string file_name = name.substr(found); + uint64_t number = 0; + return ParseFileName(file_name, &number, type); +} + +// RocksDB opens non-SST files for reading in sequential file mode. This +// includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them +// in place in the source directory. For files that are appendable or +// can be renamed, which is MANIFEST and CURRENT files, we wrap the +// underlying FSSequentialFile with another class that checks when EOF +// has been reached and re-opens the file to see the latest data. On some +// distributed file systems, this is necessary. +IOStatus OnDemandFileSystem::NewSequentialFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + static std::unordered_set valid_types( + {kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile}); + if (!LookupFileType(fname, &type) || + (valid_types.find(type) == valid_types.end())) { + return IOStatus::NotSupported(); + } + + IOStatus s; + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + std::unique_ptr inner_file; + s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg); + if (s.ok() && type == kDescriptorFile) { + result->reset(new OnDemandSequentialFile(std::move(inner_file), this, + file_opts, rname)); + } else { + *result = std::move(inner_file); + } + } else { + s = target()->NewSequentialFile(fname, file_opts, result, dbg); + } + return s; +} + +// This is only supported for SST files. If the file is present locally, +// i.e in the destination dir, we just open it and return. If its in the +// remote, i.e source dir, we link it locally and open the link. +// TODO: Add support for blob files belonging to the new BlobDB +IOStatus OnDemandFileSystem::NewRandomAccessFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kTableFile) { + return IOStatus::NotSupported(); + } + + IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr); + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + s = target()->LinkFile(rname, fname, IOOptions(), nullptr); + if (!s.ok()) { + return s; + } + } + } + + return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg) + : s; +} + +// We don't expect to create any writable file other than info LOG files. +IOStatus OnDemandFileSystem::NewWritableFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + FileType type; + if (!LookupFileType(fname, &type) || type != kInfoLogFile) { + return IOStatus::NotSupported(); + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg); + if (s.ok()) { + return IOStatus::InvalidArgument( + "Writing to a file present in the remote directory not supoprted"); + } + } + + return target()->NewWritableFile(fname, file_opts, result, dbg); +} + +// Currently not supported, as there's no need for RocksDB to create a +// directory object for a DB in read-only mode. +IOStatus OnDemandFileSystem::NewDirectory( + const std::string& /*name*/, const IOOptions& /*io_opts*/, + std::unique_ptr* /*result*/, IODebugContext* /*dbg*/) { + return IOStatus::NotSupported(); +} + +// Check if the given file exists, either locally or remote. If the file is an +// SST file, then link it locally. We assume if the file existence is being +// checked, its for verification purposes, for example while replaying the +// MANIFEST. The file will be opened for reading some time in the future. +IOStatus OnDemandFileSystem::FileExists(const std::string& fname, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus s = target()->FileExists(fname, options, dbg); + if (!s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + } else { + s = target()->FileExists(rname, options, dbg); + } + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildren(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildren(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildren(rdir, options, &rchildren, dbg); + if (s.ok()) { + std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, name); + }); + std::sort(result->begin(), result->end()); + std::sort(rchildren.begin(), rchildren.end()); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(result->begin(), result->end(), rchildren.begin(), + rchildren.end(), std::back_inserter(output)); + *result = std::move(output); + } + return s; +} + +// Doa listing of both the local and remote directories and merge the two. +IOStatus OnDemandFileSystem::GetChildrenFileAttributes( + const std::string& dir, const IOOptions& options, + std::vector* result, IODebugContext* dbg) { + std::string rdir = dir; + IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg); + if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) { + return s; + } + + std::vector rchildren; + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rdir); + s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg); + if (s.ok()) { + struct FileAttributeSorter { + bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) { + return lhs.name < rhs.name; + } + } file_attr_sorter; + + std::for_each( + rchildren.begin(), rchildren.end(), [&](FileAttributes& file) { + // Adjust name + (void)CheckPathAndAdjust(remote_path_, local_path_, file.name); + }); + std::sort(result->begin(), result->end(), file_attr_sorter); + std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter); + + std::vector output; + output.reserve(result->size() + rchildren.size()); + std::set_union(rchildren.begin(), rchildren.end(), result->begin(), + result->end(), std::back_inserter(output), file_attr_sorter); + *result = std::move(output); + } + return s; +} + +IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname, + const IOOptions& options, + uint64_t* file_size, + IODebugContext* dbg) { + uint64_t local_size = 0; + IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg); + if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) { + return s; + } + + if (s.IsNotFound() || s.IsPathNotFound()) { + std::string rname = fname; + if (CheckPathAndAdjust(local_path_, remote_path_, rname)) { + // First clear any local directory cache as it may be out of date + target()->DiscardCacheForDirectory(rname); + + FileType type; + if (LookupFileType(fname, &type) && type == kTableFile) { + s = target()->LinkFile(rname, fname, options, dbg); + if (s.ok()) { + s = target()->GetFileSize(fname, options, &local_size, dbg); + } + } else { + s = target()->GetFileSize(rname, options, &local_size, dbg); + } + } + } + *file_size = local_size; + return s; +} + +// An implementation of Read that tracks whether we've reached EOF. If so, +// re-open the file to try to read past the previous EOF offset. After +// re-opening, positing it back to the last read offset. +IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) { + IOStatus s; + if (eof_) { + // Reopen the file. With some distributed file systems, this is required + // in order to get the new size + file_.reset(); + s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg); + if (!s.ok()) { + return IOStatus::IOError("While opening file after relinking, got error ", + s.ToString()); + } + s = file_->Skip(offset_); + if (!s.ok()) { + return IOStatus::IOError( + "While seeking to offset" + std::to_string(offset_) + "got error", + s.ToString()); + } + eof_ = false; + } + + s = file_->Read(n, options, result, scratch, dbg); + if (s.ok()) { + offset_ += result->size(); + if (result->size() < n) { + // We reached EOF. Mark it so we know to relink next time + eof_ = true; + } + } + return s; +} + +IOStatus OnDemandSequentialFile::Skip(uint64_t n) { + IOStatus s = file_->Skip(n); + if (s.ok()) { + offset_ += n; + } + return s; +} + +bool OnDemandSequentialFile::use_direct_io() const { + return file_->use_direct_io(); +} + +size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const { + return file_->GetRequiredBufferAlignment(); +} + +Temperature OnDemandSequentialFile::GetTemperature() const { + return file_->GetTemperature(); +} + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string src_path, + std::string dest_path) { + return std::make_shared(fs, src_path, dest_path); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/env/fs_on_demand.h b/env/fs_on_demand.h new file mode 100644 index 0000000000..e313103d6c --- /dev/null +++ b/env/fs_on_demand.h @@ -0,0 +1,139 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include "rocksdb/file_system.h" + +namespace ROCKSDB_NAMESPACE { +// A FileSystem that links files to a local (destination) directory from a +// corresponding remote (source) directory on demand. The decision to link +// depends on the file type, with appendable or rename-able files, such as, +// descriptors, logs, CURRENT, being read in place in the remote directory, +// and SST files being linked. In the future, files read in place may be +// mirrored to the local directory, so the local dir has a complete database +// for troubleshooting purposes. + +class OnDemandFileSystem : public FileSystemWrapper { + public: + OnDemandFileSystem(const std::shared_ptr& target, + const std::string& remote_path, + const std::string& local_path) + : FileSystemWrapper(target), + remote_path_(remote_path), + local_path_(local_path) {} + + const char* Name() const override { return "OnDemandFileSystem"; } + + IOStatus NewSequentialFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus ReuseWritableFile(const std::string& /*fname*/, + const std::string& /*old_fname*/, + const FileOptions& /*fopts*/, + std::unique_ptr* /*result*/, + IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("ReuseWritableFile"); + } + + IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + + IOStatus FileExists(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override; + + IOStatus GetChildren(const std::string& dir, const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetChildrenFileAttributes(const std::string& dir, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) override; + + IOStatus GetFileSize(const std::string& fname, const IOOptions& options, + uint64_t* file_size, IODebugContext* dbg) override; + + private: + bool CheckPathAndAdjust(const std::string& orig, const std::string& replace, + std::string& path); + bool LookupFileType(const std::string& name, FileType* type); + + const std::string remote_path_; + const std::string local_path_; +}; + +// A wrapper class around an FSSequentialFile object. Its mainly +// intended to be used for appendable files like MANIFEST and logs. +// Beneath the covers, it tracks when EOF is reached, and reopens +// the file in order to read the latest appended data. This is +// necessary on some distributed file systems as they may have +// stale metadata about the file. +// TODO: Mirror the data read to a local file for troubleshooting +// purposes, as well as recovery in case the source dir is +// deleted. +class OnDemandSequentialFile : public FSSequentialFile { + public: + OnDemandSequentialFile(std::unique_ptr&& file, + OnDemandFileSystem* fs, const FileOptions& file_opts, + const std::string& path) + : file_(std::move(file)), + fs_(fs), + file_opts_(file_opts), + path_(path), + eof_(false), + offset_(0) {} + + virtual ~OnDemandSequentialFile() {} + + IOStatus Read(size_t n, const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) override; + + IOStatus Skip(uint64_t n) override; + + bool use_direct_io() const override; + + size_t GetRequiredBufferAlignment() const override; + + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::NotSupported("InvalidateCache not supported."); + } + + IOStatus PositionedRead(uint64_t /*offset*/, size_t /*n*/, + const IOOptions& /*options*/, Slice* /*result*/, + char* /*scratch*/, IODebugContext* /*dbg*/) override { + return IOStatus::NotSupported("PositionedRead"); + } + + Temperature GetTemperature() const override; + + private: + std::unique_ptr file_; + OnDemandFileSystem* fs_; + const FileOptions file_opts_; + const std::string path_; + bool eof_; + uint64_t offset_; +}; + +std::shared_ptr NewOnDemandFileSystem( + const std::shared_ptr& fs, std::string remote_path, + std::string local_path); + +} // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 4fadf1d71a..4758a2f0aa 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -205,15 +205,17 @@ IOStatus WritableFileWriter::Pad(const IOOptions& opts, assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); - size_t pad_start = buf_.CurrentSize(); + const size_t pad_start = buf_.CurrentSize(); // Assume pad_bytes is small compared to buf_ capacity. So we always // use buf_ rather than write directly to file in certain cases like // Append() does. + size_t actual_pad_bytes = 0; while (left) { size_t append_bytes = std::min(cap, left); buf_.PadWith(append_bytes, 0); left -= append_bytes; + actual_pad_bytes += append_bytes; if (left > 0) { IOStatus s = Flush(io_options); if (!s.ok()) { @@ -226,10 +228,13 @@ IOStatus WritableFileWriter::Pad(const IOOptions& opts, pending_sync_ = true; uint64_t cur_size = filesize_.load(std::memory_order_acquire); filesize_.store(cur_size + pad_bytes, std::memory_order_release); + + Slice data(buf_.BufferStart() + pad_start, actual_pad_bytes); + UpdateFileChecksum(data); if (perform_data_verification_) { buffered_data_crc32c_checksum_ = crc32c::Extend(buffered_data_crc32c_checksum_, - buf_.BufferStart() + pad_start, pad_bytes); + buf_.BufferStart() + pad_start, actual_pad_bytes); } return IOStatus::OK(); } diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index fdf2af058d..80933b1497 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -229,11 +229,18 @@ struct AdvancedColumnFamilyOptions { // if it is not explicitly set by the user. Otherwise, the default is 0. int64_t max_write_buffer_size_to_maintain = 0; - // Allows thread-safe inplace updates. If this is true, there is no way to + // Allows thread-safe inplace updates. + // + // If this is true, there is no way to // achieve point-in-time consistency using snapshot or iterator (assuming // concurrent updates). Hence iterator and multi-get will return results // which are not consistent as of any point-in-time. + // // Backward iteration on memtables will not work either. + // + // It is intended to work or be compatible with a limited set of features: + // (1) Non-snapshot Get() + // // If inplace_callback function is not set, // Put(key, new_value) will update inplace the existing_value iff // * key exists in current memtable diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 54343236e9..e413fa044d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -298,6 +298,29 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + // EXPERIMENTAL + + // Open a database as a follower. The difference between this and opening + // as secondary is that the follower database has its own directory with + // links to the actual files, and can tolarate obsolete file deletions by + // the leader to its own database. Another difference is the follower + // tries to keep up with the leader by periodically tailing the leader's + // MANIFEST, and (in the future) memtable updates, rather than relying on + // the user to manually call TryCatchupWithPrimary(). + + // Open as a follower with the default column family + static Status OpenAsFollower(const Options& options, const std::string& name, + const std::string& leader_path, + std::unique_ptr* dbptr); + + // Open as a follower with multiple column families + static Status OpenAsFollower( + const DBOptions& db_options, const std::string& name, + const std::string& leader_path, + const std::vector& column_families, + std::vector* handles, std::unique_ptr* dbptr); + // End EXPERIMENTAL + // Open DB and run the compaction. // It's a read-only operation, the result won't be installed to the DB, it // will be output to the `output_directory`. The API should only be used with diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index d56eed1adc..8d21c91946 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -702,6 +702,16 @@ class FileSystem : public Customizable { return IOStatus::OK(); } + // EXPERIMENTAL + // Discard any directory metadata cached in memory for the specified + // directory and its descendants. Useful for distributed file systems + // where the local cache may be out of sync with the actual directory state. + // + // The implementation is not required to be thread safe. Its the caller's + // responsibility to ensure that no directory operations happen + // concurrently. + virtual void DiscardCacheForDirectory(const std::string& /*path*/) {} + // Indicates to upper layers which FileSystem operations mentioned in // FSSupportedOps are supported by underlying FileSystem. Each bit in // supported_ops argument represent corresponding FSSupportedOps operation. @@ -1624,6 +1634,10 @@ class FileSystemWrapper : public FileSystem { return target_->AbortIO(io_handles); } + void DiscardCacheForDirectory(const std::string& path) override { + target_->DiscardCacheForDirectory(path); + } + void SupportedOps(int64_t& supported_ops) override { return target_->SupportedOps(supported_ops); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b10dc01526..a233d9611e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1487,6 +1487,30 @@ struct DBOptions { // use "0:00-23:59". To make an entire day have no offpeak period, leave // this field blank. Default: Empty string (no offpeak). std::string daily_offpeak_time_utc = ""; + + // EXPERIMENTAL + + // When a RocksDB database is opened in follower mode, this option + // is set by the user to request the frequency of the follower + // attempting to refresh its view of the leader. RocksDB may choose to + // trigger catch ups more frequently if it detects any changes in the + // database state. + // Default every 10s. + uint64_t follower_refresh_catchup_period_ms = 10000; + + // For a given catch up attempt, this option specifies the number of times + // to tail the MANIFEST and try to install a new, consistent version before + // giving up. Though it should be extremely rare, the catch up may fail if + // the leader is mutating the LSM at a very high rate and the follower is + // unable to get a consistent view. + // Default to 10 attempts + uint64_t follower_catchup_retry_count = 10; + + // Time to wait between consecutive catch up attempts + // Default 100ms + uint64_t follower_catchup_retry_wait_ms = 100; + + // End EXPERIMENTAL }; // Options to control the behavior of a database (passed to DB::Open) @@ -1911,20 +1935,29 @@ Status CreateLoggerFromOptions(const std::string& dbname, // CompactionOptions are used in CompactFiles() call. struct CompactionOptions { + // DEPRECATED: this option is unsafe because it allows the user to set any + // `CompressionType` while always using `CompressionOptions` from the + // `ColumnFamilyOptions`. As a result the `CompressionType` and + // `CompressionOptions` can easily be inconsistent. + // // Compaction output compression type - // Default: snappy + // + // Default: `kDisableCompressionOption` + // // If set to `kDisableCompressionOption`, RocksDB will choose compression type - // according to the `ColumnFamilyOptions`, taking into account the output - // level if `compression_per_level` is specified. + // according to the `ColumnFamilyOptions`. RocksDB takes into account the + // output level in case the `ColumnFamilyOptions` has level-specific settings. CompressionType compression; + // Compaction will create files of size `output_file_size_limit`. // Default: MAX, which means that compaction will create a single file uint64_t output_file_size_limit; + // If > 0, it will replace the option in the DBOptions for this compaction. uint32_t max_subcompactions; CompactionOptions() - : compression(kSnappyCompression), + : compression(kDisableCompressionOption), output_file_size_limit(std::numeric_limits::max()), max_subcompactions(0) {} }; diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 80792131cf..42452be151 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -280,7 +280,7 @@ struct PerfContextBase { struct PerfContext : public PerfContextBase { ~PerfContext(); - PerfContext() {} + PerfContext() { Reset(); } PerfContext(const PerfContext&); PerfContext& operator=(const PerfContext&); diff --git a/include/rocksdb/utilities/env_mirror.h b/include/rocksdb/utilities/env_mirror.h index 01293f0d0c..0172b20a7e 100644 --- a/include/rocksdb/utilities/env_mirror.h +++ b/include/rocksdb/utilities/env_mirror.h @@ -83,7 +83,7 @@ class EnvMirror : public EnvWrapper { std::sort(ar.begin(), ar.end()); std::sort(br.begin(), br.end()); if (!as.ok() || ar != br) { - assert(0 == "getchildren results don't match"); + assert(nullptr == "getchildren results don't match"); } *r = ar; return as; diff --git a/java/src/test/java/org/rocksdb/CompactionOptionsTest.java b/java/src/test/java/org/rocksdb/CompactionOptionsTest.java index 9b7d796945..b3820b8d17 100644 --- a/java/src/test/java/org/rocksdb/CompactionOptionsTest.java +++ b/java/src/test/java/org/rocksdb/CompactionOptionsTest.java @@ -20,10 +20,9 @@ public class CompactionOptionsTest { public void compression() { try (final CompactionOptions compactionOptions = new CompactionOptions()) { assertThat(compactionOptions.compression()) - .isEqualTo(CompressionType.SNAPPY_COMPRESSION); - compactionOptions.setCompression(CompressionType.NO_COMPRESSION); - assertThat(compactionOptions.compression()) - .isEqualTo(CompressionType.NO_COMPRESSION); + .isEqualTo(CompressionType.DISABLE_COMPRESSION_OPTION); + compactionOptions.setCompression(CompressionType.SNAPPY_COMPRESSION); + assertThat(compactionOptions.compression()).isEqualTo(CompressionType.SNAPPY_COMPRESSION); } } diff --git a/options/db_options.cc b/options/db_options.cc index cdc97cd1f7..6da12a1215 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -559,6 +559,19 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enforce_single_del_contracts), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"follower_refresh_catchup_period_ms", + {offsetof(struct ImmutableDBOptions, + follower_refresh_catchup_period_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_count", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_count), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"follower_catchup_retry_wait_ms", + {offsetof(struct ImmutableDBOptions, follower_catchup_retry_wait_ms), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; @@ -756,7 +769,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) checksum_handoff_file_types(options.checksum_handoff_file_types), lowest_used_cache_tier(options.lowest_used_cache_tier), compaction_service(options.compaction_service), - enforce_single_del_contracts(options.enforce_single_del_contracts) { + enforce_single_del_contracts(options.enforce_single_del_contracts), + follower_refresh_catchup_period_ms( + options.follower_refresh_catchup_period_ms), + follower_catchup_retry_count(options.follower_catchup_retry_count), + follower_catchup_retry_wait_ms(options.follower_catchup_retry_wait_ms) { fs = env->GetFileSystem(); clock = env->GetSystemClock().get(); logger = info_log.get(); diff --git a/options/db_options.h b/options/db_options.h index b0432df81f..ff7ddc880f 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -104,6 +104,9 @@ struct ImmutableDBOptions { Logger* logger; std::shared_ptr compaction_service; bool enforce_single_del_contracts; + uint64_t follower_refresh_catchup_period_ms; + uint64_t follower_catchup_retry_count; + uint64_t follower_catchup_retry_wait_ms; bool IsWalDirSameAsDBPath() const; bool IsWalDirSameAsDBPath(const std::string& path) const; diff --git a/src.mk b/src.mk index 365f5ba777..23cf348e1e 100644 --- a/src.mk +++ b/src.mk @@ -53,6 +53,7 @@ LIB_SOURCES = \ db/db_impl/db_impl_debug.cc \ db/db_impl/db_impl_experimental.cc \ db/db_impl/db_impl_files.cc \ + db/db_impl/db_impl_follower.cc \ db/db_impl/db_impl_open.cc \ db/db_impl/db_impl_readonly.cc \ db/db_impl/db_impl_secondary.cc \ @@ -109,6 +110,7 @@ LIB_SOURCES = \ env/env_encryption.cc \ env/env_posix.cc \ env/file_system.cc \ + env/fs_on_demand.cc \ env/fs_posix.cc \ env/fs_remap.cc \ env/file_system_tracer.cc \ @@ -474,6 +476,7 @@ TEST_MAIN_SOURCES = \ db/db_dynamic_level_test.cc \ db/db_encryption_test.cc \ db/db_flush_test.cc \ + db/db_follower_test.cc \ db/db_readonly_with_timestamp_test.cc \ db/db_with_timestamp_basic_test.cc \ db/import_column_family_test.cc \ diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index eb2ae4ddba..060a541e39 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -626,6 +626,14 @@ struct BlockBasedTableBuilder::Rep { } else { base_context_checksum = 0; } + + if (alignment > 0 && compression_type != kNoCompression) { + // With better sanitization in `CompactionPicker::CompactFiles()`, we + // would not need to handle this case here and could change it to an + // assertion instead. + SetStatus(Status::InvalidArgument( + "Enable block_align, but compression enabled")); + } } Rep(const Rep&) = delete; diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index 6da594c10d..34081621a4 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -619,6 +619,21 @@ Status BlockBasedTableFactory::ValidateOptions( "Enable block_align, but compression " "enabled"); } + if (table_options_.block_align && + cf_opts.bottommost_compression != kDisableCompressionOption && + cf_opts.bottommost_compression != kNoCompression) { + return Status::InvalidArgument( + "Enable block_align, but bottommost_compression enabled"); + } + if (table_options_.block_align) { + for (auto level_compression : cf_opts.compression_per_level) { + if (level_compression != kDisableCompressionOption && + level_compression != kNoCompression) { + return Status::InvalidArgument( + "Enable block_align, but compression_per_level enabled"); + } + } + } if (table_options_.block_align && (table_options_.block_size & (table_options_.block_size - 1))) { return Status::InvalidArgument( diff --git a/table/table_test.cc b/table/table_test.cc index 02a8d899d7..6b9843a6c1 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3188,6 +3188,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { Options options; BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); options.create_if_missing = true; + options.compression = kNoCompression; options.statistics = CreateDBStatistics(); table_options.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; @@ -3196,6 +3197,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) { table_options.filter_policy.reset(NewBloomFilterPolicy(10, true)); table_options.block_align = true; options.table_factory.reset(new BlockBasedTableFactory(table_options)); + ASSERT_OK(options.table_factory->ValidateOptions( + DBOptions(options), ColumnFamilyOptions(options))); TableConstructor c(BytewiseComparator()); GenerateKVMap(&c); @@ -3326,6 +3329,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { std::unique_ptr env( new CompositeEnvWrapper(c.env_, FileSystem::Default())); options.env = env.get(); + options.compression = kNoCompression; options.statistics = CreateDBStatistics(); c.env_ = env.get(); @@ -3338,6 +3342,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) { table_options.filter_policy.reset(NewBloomFilterPolicy(10, true)); table_options.block_align = true; options.table_factory.reset(new BlockBasedTableFactory(table_options)); + ASSERT_OK(options.table_factory->ValidateOptions( + DBOptions(options), ColumnFamilyOptions(options))); GenerateKVMap(&c); @@ -5425,6 +5431,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { Options options; options.compression = kNoCompression; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_OK(options.table_factory->ValidateOptions( + DBOptions(options), ColumnFamilyOptions(options))); const ImmutableOptions ioptions(options); const MutableCFOptions moptions(options); InternalKeyComparator ikc(options.comparator); @@ -5475,7 +5483,10 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { std::unique_ptr table_reader; bbto.block_align = false; Options options2; + options2.compression = kNoCompression; options2.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_OK(options2.table_factory->ValidateOptions( + DBOptions(options2), ColumnFamilyOptions(options2))); ImmutableOptions ioptions2(options2); const MutableCFOptions moptions2(options2); @@ -5505,6 +5516,31 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { table_reader.reset(); } +TEST_P(BlockBasedTableTest, FixBlockAlignMismatchedFileChecksums) { + Options options; + options.create_if_missing = true; + options.compression = kNoCompression; + options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + BlockBasedTableOptions bbto; + bbto.block_align = true; + bbto.block_size = 1024; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_OK(options.table_factory->ValidateOptions( + DBOptions(options), ColumnFamilyOptions(options))); + const std::string kDBPath = + test::PerThreadDBPath("block_align_padded_bytes_verify_file_checksums"); + ASSERT_OK(DestroyDB(kDBPath, options)); + DB* db; + ASSERT_OK(DB::Open(options, kDBPath, &db)); + ASSERT_OK(db->Put(WriteOptions(), "k1", "v1")); + ASSERT_OK(db->Flush(FlushOptions())); + // Before the fix, VerifyFileChecksums() will fail as padded bytes from + // aligning blocks are used to generate the checksum to compare against the + // one not generated by padded bytes + ASSERT_OK(db->VerifyFileChecksums(ReadOptions())); + delete db; +} + TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_align = true; @@ -5516,6 +5552,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { Options options; options.compression = kNoCompression; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_OK(options.table_factory->ValidateOptions( + DBOptions(options), ColumnFamilyOptions(options))); const ImmutableOptions ioptions(options); const MutableCFOptions moptions(options); @@ -5822,6 +5860,7 @@ TEST_P(BlockBasedTableTest, SeekMetaBlocks) { TEST_P(BlockBasedTableTest, BadOptions) { ROCKSDB_NAMESPACE::Options options; options.compression = kNoCompression; + options.create_if_missing = true; BlockBasedTableOptions bbto = GetBlockBasedTableOptions(); bbto.block_size = 4000; bbto.block_align = true; @@ -5830,13 +5869,29 @@ TEST_P(BlockBasedTableTest, BadOptions) { test::PerThreadDBPath("block_based_table_bad_options_test"); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); ASSERT_OK(DestroyDB(kDBPath, options)); - ROCKSDB_NAMESPACE::DB* db; - ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db)); - bbto.block_size = 4096; - options.compression = kSnappyCompression; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db)); + std::unique_ptr db; + { + ROCKSDB_NAMESPACE::DB* _db; + ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db)); + + bbto.block_size = 4096; + options.compression = kSnappyCompression; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db)); + + options.compression = kNoCompression; + options.bottommost_compression = kSnappyCompression; + ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db)); + + options.bottommost_compression = kNoCompression; + options.compression_per_level.emplace_back(kSnappyCompression); + ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db)); + + options.compression_per_level.clear(); + ASSERT_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db)); + db.reset(_db); + } } TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index d47ffb5385..ce14bce5da 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1135,6 +1135,11 @@ DEFINE_int32(secondary_update_interval, 5, "Secondary instance attempts to catch up with the primary every " "secondary_update_interval seconds."); +DEFINE_bool(open_as_follower, false, + "Open a RocksDB DB as a follower. The leader instance can be " + "running in another db_bench process."); + +DEFINE_string(leader_path, "", "Path to the directory of the leader DB"); DEFINE_bool(report_bg_io_stats, false, "Measure times spents on I/Os while in compactions. "); @@ -4979,6 +4984,12 @@ class Benchmark { }, FLAGS_secondary_update_interval, db)); } + } else if (FLAGS_open_as_follower) { + std::unique_ptr dbptr; + s = DB::OpenAsFollower(options, db_name, FLAGS_leader_path, &dbptr); + if (s.ok()) { + db->db = dbptr.release(); + } } else { s = DB::Open(options, db_name, &db->db); } diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 82e9af9300..8ec39c3910 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -74,7 +74,16 @@ default_params = { "destroy_db_initially": 0, "enable_pipelined_write": lambda: random.randint(0, 1), "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]), - "inplace_update_support": lambda: random.randint(0, 1), + # `inplace_update_support` is incompatible with DB that has delete + # range data in memtables. + # Such data can result from any of the previous db stress runs + # using delete range. + # Since there is no easy way to keep track of whether delete range + # is used in any of the previous runs, + # to simpify our testing, we set `inplace_update_support` across + # runs and to disable delete range accordingly + # (see below `finalize_and_sanitize`). + "inplace_update_support": random.choice([0] * 9 + [1]), "expected_values_dir": lambda: setup_expected_values_dir(), "fail_if_options_file_error": lambda: random.randint(0, 1), "flush_one_in": lambda: random.choice([1000, 1000000]), @@ -283,8 +292,7 @@ default_params = { "hard_pending_compaction_bytes_limit" : lambda: random.choice([2 * 1024 * 1024] + [256 * 1073741824] * 4), "enable_sst_partitioner_factory": lambda: random.choice([0, 1]), "enable_do_not_compress_roles": lambda: random.choice([0, 1]), - # TODO(hx235): enable `block_align` after fixing the surfaced corruption issue - "block_align": 0, + "block_align": lambda: random.choice([0, 1]), "lowest_used_cache_tier": lambda: random.choice([0, 1, 2]), "enable_custom_split_merge": lambda: random.choice([0, 1]), "adm_policy": lambda: random.choice([0, 1, 2, 3]), @@ -479,6 +487,8 @@ txn_params = { "create_timestamped_snapshot_one_in": random.choice([0, 20]), # PutEntity in transactions is not yet implemented "use_put_entity_one_in": 0, + # Should not be used with TransactionDB which uses snapshot. + "inplace_update_support": 0, } # For optimistic transaction db @@ -490,6 +500,8 @@ optimistic_txn_params = { "occ_lock_bucket_count": lambda: random.choice([10, 100, 500]), # PutEntity in transactions is not yet implemented "use_put_entity_one_in": 0, + # Should not be used with OptimisticTransactionDB which uses snapshot. + "inplace_update_support": 0, } best_efforts_recovery_params = { @@ -599,6 +611,8 @@ multiops_txn_default_params = { "use_multi_get_entity": 0, # `MultiOpsTxnsStressTest::TestIterateAgainstExpected()` is not implemented. "verify_iterator_with_expected_state_one_in": 0, + # This test uses snapshot heavily which is incompatible with this option. + "inplace_update_support": 0, } multiops_wc_txn_params = { @@ -672,6 +686,11 @@ def finalize_and_sanitize(src_params): ): dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delrangepercent"] = 0 + if dest_params["inplace_update_support"] == 1: + dest_params["delpercent"] += dest_params["delrangepercent"] + dest_params["delrangepercent"] = 0 + dest_params["readpercent"] += dest_params["prefixpercent"] + dest_params["prefixpercent"] = 0 if ( dest_params.get("disable_wal") == 1 or dest_params.get("sync_fault_injection") == 1 @@ -689,6 +708,11 @@ def finalize_and_sanitize(src_params): # files, which would be problematic when unsynced data can be lost in # crash recoveries. dest_params["enable_compaction_filter"] = 0 + # TODO(hx235): re-enable "reopen" after supporting unsynced data loss + # verification upon reopen. Currently reopen does not restore expected state + # with potential data loss in mind like start of each `./db_stress` run. + # Therefore it always expects no data loss. + dest_params["reopen"] = 0 # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb if dest_params.get("unordered_write", 0) == 1: dest_params["txn_write_policy"] = 1 @@ -813,6 +837,7 @@ def finalize_and_sanitize(src_params): # Enabling block_align with compression is not supported if dest_params.get("block_align") == 1: dest_params["compression_type"] = "none" + dest_params["bottommost_compression_type"] = "none" # If periodic_compaction_seconds is not set, daily_offpeak_time_utc doesn't do anything if dest_params.get("periodic_compaction_seconds") == 0: dest_params["daily_offpeak_time_utc"] = "" diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 355e26fa7c..1f0e8b4ea5 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -214,6 +214,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == GetEntityCommand::Name()) { return new GetEntityCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == MultiGetEntityCommand::Name()) { + return new MultiGetEntityCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); } else if (parsed_params.cmd == PutCommand::Name()) { return new PutCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); @@ -2938,7 +2942,7 @@ void MultiGetCommand::DoCommand() { fprintf(stderr, "Status for key %s: %s\n", (is_key_hex_ ? StringToHex(keys_[i]) : keys_[i]).c_str(), statuses[i].ToString().c_str()); - failed = false; + failed = true; } } if (failed) { @@ -2998,6 +3002,73 @@ void GetEntityCommand::DoCommand() { // ---------------------------------------------------------------------------- +MultiGetEntityCommand::MultiGetEntityCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand(options, flags, true /* is_read_only */, + BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) { + if (params.size() < 1) { + exec_state_ = LDBCommandExecuteResult::Failed( + "At least one must be specified for the multi_get_entity " + "command"); + } else { + for (size_t i = 0; i < params.size(); i++) { + std::string key = params.at(i); + keys_.emplace_back(is_key_hex_ ? HexToString(key) : key); + } + } +} + +void MultiGetEntityCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(MultiGetEntityCommand::Name()); + ret.append(" ..."); + ret.append("\n"); +} + +void MultiGetEntityCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + + size_t num_keys = keys_.size(); + std::vector key_slices; + std::vector results(num_keys); + std::vector statuses(num_keys); + for (const std::string& key : keys_) { + key_slices.emplace_back(key); + } + + db_->MultiGetEntity(ReadOptions(), GetCfHandle(), num_keys, key_slices.data(), + results.data(), statuses.data()); + + bool failed = false; + for (size_t i = 0; i < num_keys; ++i) { + std::string key = is_key_hex_ ? StringToHex(keys_[i]) : keys_[i]; + if (statuses[i].ok()) { + std::ostringstream oss; + oss << key << DELIM; + WideColumnsHelper::DumpWideColumns(results[i].columns(), oss, + is_value_hex_); + fprintf(stdout, "%s\n", oss.str().c_str()); + } else if (statuses[i].IsNotFound()) { + fprintf(stdout, "Key not found: %s\n", key.c_str()); + } else { + fprintf(stderr, "Status for key %s: %s\n", key.c_str(), + statuses[i].ToString().c_str()); + failed = true; + } + } + if (failed) { + exec_state_ = + LDBCommandExecuteResult::Failed("one or more keys had non-okay status"); + } +} + +// ---------------------------------------------------------------------------- + ApproxSizeCommand::ApproxSizeCommand( const std::vector& /*params*/, const std::map& options, @@ -3473,7 +3544,7 @@ PutEntityCommand::PutEntityCommand( void PutEntityCommand::Help(std::string& ret) { ret.append(" "); - ret.append(PutCommand::Name()); + ret.append(PutEntityCommand::Name()); ret.append( " : : " "<...>"); diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index e5d0d1fe91..15ff571b21 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -435,6 +435,22 @@ class GetEntityCommand : public LDBCommand { std::string key_; }; +class MultiGetEntityCommand : public LDBCommand { + public: + static std::string Name() { return "multi_get_entity"; } + + MultiGetEntityCommand(const std::vector& params, + const std::map& options, + const std::vector& flags); + + void DoCommand() override; + + static void Help(std::string& ret); + + private: + std::vector keys_; +}; + class ApproxSizeCommand : public LDBCommand { public: static std::string Name() { return "approxsize"; } diff --git a/tools/ldb_tool.cc b/tools/ldb_tool.cc index 381f70c4d1..86fd37ef4f 100644 --- a/tools/ldb_tool.cc +++ b/tools/ldb_tool.cc @@ -89,8 +89,11 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options, ret.append("\n\n"); ret.append("Data Access Commands:\n"); PutCommand::Help(ret); + PutEntityCommand::Help(ret); GetCommand::Help(ret); + GetEntityCommand::Help(ret); MultiGetCommand::Help(ret); + MultiGetEntityCommand::Help(ret); BatchPutCommand::Help(ret); ScanCommand::Help(ret); DeleteCommand::Help(ret); diff --git a/unreleased_history/behavior_changes/block_align_compression_incompat.md b/unreleased_history/behavior_changes/block_align_compression_incompat.md new file mode 100644 index 0000000000..7068a42679 --- /dev/null +++ b/unreleased_history/behavior_changes/block_align_compression_incompat.md @@ -0,0 +1 @@ +* Enabling `BlockBasedTableOptions::block_align` is now incompatible (i.e., APIs will return `Status::InvalidArgument`) with more ways of enabling compression: `CompactionOptions::compression`, `ColumnFamilyOptions::compression_per_level`, and `ColumnFamilyOptions::bottommost_compression`. diff --git a/unreleased_history/behavior_changes/default_compaction_options_compression.md b/unreleased_history/behavior_changes/default_compaction_options_compression.md new file mode 100644 index 0000000000..2e4e87ff18 --- /dev/null +++ b/unreleased_history/behavior_changes/default_compaction_options_compression.md @@ -0,0 +1 @@ +* Changed the default value of `CompactionOptions::compression` to `kDisableCompressionOption`, which means the compression type is determined by the `ColumnFamilyOptions`. diff --git a/unreleased_history/bug_fixes/block_align_checksum_mismatch.md b/unreleased_history/bug_fixes/block_align_checksum_mismatch.md new file mode 100644 index 0000000000..b784e32c5c --- /dev/null +++ b/unreleased_history/bug_fixes/block_align_checksum_mismatch.md @@ -0,0 +1 @@ +Fix a bug causing `VerifyFileChecksums()` to return false-positive corruption under `BlockBasedTableOptions::block_align=true` diff --git a/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md new file mode 100644 index 0000000000..f537e474aa --- /dev/null +++ b/unreleased_history/bug_fixes/consistent_view_for_new_iterators_api.md @@ -0,0 +1 @@ +Provide consistent view of the database across the column families for `NewIterators()` API. diff --git a/unreleased_history/bug_fixes/del_range_and_memtable_insert_with_hint_prefix_extractor.md b/unreleased_history/bug_fixes/del_range_and_memtable_insert_with_hint_prefix_extractor.md new file mode 100644 index 0000000000..7283fb95ac --- /dev/null +++ b/unreleased_history/bug_fixes/del_range_and_memtable_insert_with_hint_prefix_extractor.md @@ -0,0 +1 @@ +* Fixed feature interaction bug for `DeleteRange()` together with `ColumnFamilyOptions::memtable_insert_with_hint_prefix_extractor`. The impact of this bug would likely be corruption or crashing. diff --git a/unreleased_history/bug_fixes/disable_manual_compaction_hang.md b/unreleased_history/bug_fixes/disable_manual_compaction_hang.md new file mode 100644 index 0000000000..c140874a1f --- /dev/null +++ b/unreleased_history/bug_fixes/disable_manual_compaction_hang.md @@ -0,0 +1 @@ +* Fixed hang in `DisableManualCompactions()` where compactions waiting to be scheduled due to conflicts would not be canceled promptly diff --git a/unreleased_history/public_api_changes/deprecate_compaction_options_compression.md b/unreleased_history/public_api_changes/deprecate_compaction_options_compression.md new file mode 100644 index 0000000000..65dea56517 --- /dev/null +++ b/unreleased_history/public_api_changes/deprecate_compaction_options_compression.md @@ -0,0 +1 @@ +* Deprecated `CompactionOptions::compression` since `CompactionOptions`'s API for configuring compression was incomplete, unsafe, and likely unnecessary diff --git a/util/xxhash.h b/util/xxhash.h index 43203b0634..a772a7ccc8 100644 --- a/util/xxhash.h +++ b/util/xxhash.h @@ -5394,7 +5394,7 @@ static XXH_MALLOCF void* XXH_alignedMalloc(size_t s, size_t align) */ static void XXH_alignedFree(void* p) { - if (p != NULL) { + if (p != nullptr) { xxh_u8* ptr = (xxh_u8*)p; /* Get the offset byte we added in XXH_malloc. */ xxh_u8 offset = ptr[-1]; @@ -5407,7 +5407,7 @@ static void XXH_alignedFree(void* p) XXH_PUBLIC_API XXH3_state_t* XXH3_createState(void) { XXH3_state_t* const state = (XXH3_state_t*)XXH_alignedMalloc(sizeof(XXH3_state_t), 64); - if (state==NULL) return NULL; + if (state==nullptr) return nullptr; XXH3_INITSTATE(state); return state; } @@ -5457,7 +5457,7 @@ XXH3_reset_internal(XXH3_state_t* statePtr, XXH_PUBLIC_API XXH_errorcode XXH3_64bits_reset(XXH_NOESCAPE XXH3_state_t* statePtr) { - if (statePtr == NULL) return XXH_ERROR; + if (statePtr == nullptr) return XXH_ERROR; XXH3_reset_internal(statePtr, 0, XXH3_kSecret, XXH_SECRET_DEFAULT_SIZE); return XXH_OK; } @@ -5466,9 +5466,9 @@ XXH3_64bits_reset(XXH_NOESCAPE XXH3_state_t* statePtr) XXH_PUBLIC_API XXH_errorcode XXH3_64bits_reset_withSecret(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE const void* secret, size_t secretSize) { - if (statePtr == NULL) return XXH_ERROR; + if (statePtr == nullptr) return XXH_ERROR; XXH3_reset_internal(statePtr, 0, secret, secretSize); - if (secret == NULL) return XXH_ERROR; + if (secret == nullptr) return XXH_ERROR; if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR; return XXH_OK; } @@ -5477,11 +5477,11 @@ XXH3_64bits_reset_withSecret(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE c XXH_PUBLIC_API XXH_errorcode XXH3_64bits_reset_withSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH64_hash_t seed) { - if (statePtr == NULL) return XXH_ERROR; + if (statePtr == nullptr) return XXH_ERROR; if (seed==0) return XXH3_64bits_reset(statePtr); - if ((seed != statePtr->seed) || (statePtr->extSecret != NULL)) + if ((seed != statePtr->seed) || (statePtr->extSecret != nullptr)) XXH3_initCustomSecret(statePtr->customSecret, seed); - XXH3_reset_internal(statePtr, seed, NULL, XXH_SECRET_DEFAULT_SIZE); + XXH3_reset_internal(statePtr, seed, nullptr, XXH_SECRET_DEFAULT_SIZE); return XXH_OK; } @@ -5489,8 +5489,8 @@ XXH3_64bits_reset_withSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH64_hash_t see XXH_PUBLIC_API XXH_errorcode XXH3_64bits_reset_withSecretandSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE const void* secret, size_t secretSize, XXH64_hash_t seed64) { - if (statePtr == NULL) return XXH_ERROR; - if (secret == NULL) return XXH_ERROR; + if (statePtr == nullptr) return XXH_ERROR; + if (secret == nullptr) return XXH_ERROR; if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR; XXH3_reset_internal(statePtr, seed64, secret, secretSize); statePtr->useSeed = 1; /* always, even if seed64==0 */ @@ -5538,14 +5538,14 @@ XXH3_update(XXH3_state_t* XXH_RESTRICT const state, XXH3_f_accumulate f_acc, XXH3_f_scrambleAcc f_scramble) { - if (input==NULL) { + if (input==nullptr) { XXH_ASSERT(len == 0) return XXH_OK; } XXH_ASSERT(state != NULL) { const xxh_u8* const bEnd = input + len; - const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret; + const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret; #if defined(XXH3_STREAM_USE_STACK) && XXH3_STREAM_USE_STACK >= 1 /* For some reason, gcc and MSVC seem to suffer greatly * when operating accumulators directly into state. @@ -5693,7 +5693,7 @@ XXH3_digest_long (XXH64_hash_t* acc, /*! @ingroup XXH3_family */ XXH_PUBLIC_API XXH64_hash_t XXH3_64bits_digest (XXH_NOESCAPE const XXH3_state_t* state) { - const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret; + const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret; if (state->totalLen > XXH3_MIDSIZE_MAX) { XXH_ALIGN(XXH_ACC_ALIGN) XXH64_hash_t acc[XXH_ACC_NB]; XXH3_digest_long(acc, state, secret); @@ -6131,7 +6131,7 @@ XXH_PUBLIC_API XXH128_hash_t XXH3_128bits_withSecretandSeed(XXH_NOESCAPE const void* input, size_t len, XXH_NOESCAPE const void* secret, size_t secretSize, XXH64_hash_t seed) { if (len <= XXH3_MIDSIZE_MAX) - return XXH3_128bits_internal(input, len, seed, XXH3_kSecret, sizeof(XXH3_kSecret), NULL); + return XXH3_128bits_internal(input, len, seed, XXH3_kSecret, sizeof(XXH3_kSecret), nullptr); return XXH3_hashLong_128b_withSecret(input, len, seed, secret, secretSize); } @@ -6189,7 +6189,7 @@ XXH3_128bits_update(XXH_NOESCAPE XXH3_state_t* state, XXH_NOESCAPE const void* i /*! @ingroup XXH3_family */ XXH_PUBLIC_API XXH128_hash_t XXH3_128bits_digest (XXH_NOESCAPE const XXH3_state_t* state) { - const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret; + const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret; if (state->totalLen > XXH3_MIDSIZE_MAX) { XXH_ALIGN(XXH_ACC_ALIGN) XXH64_hash_t acc[XXH_ACC_NB]; XXH3_digest_long(acc, state, secret); @@ -6287,7 +6287,7 @@ XXH3_generateSecret(XXH_NOESCAPE void* secretBuffer, size_t secretSize, XXH_NOES XXH_ASSERT(secretSize >= XXH3_SECRET_SIZE_MIN) #else /* production mode, assert() are disabled */ - if (secretBuffer == NULL) return XXH_ERROR; + if (secretBuffer == nullptr) return XXH_ERROR; if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR; #endif @@ -6298,7 +6298,7 @@ XXH3_generateSecret(XXH_NOESCAPE void* secretBuffer, size_t secretSize, XXH_NOES #if (XXH_DEBUGLEVEL >= 1) XXH_ASSERT(customSeed != NULL) #else - if (customSeed == NULL) return XXH_ERROR; + if (customSeed == nullptr) return XXH_ERROR; #endif /* Fill secretBuffer with a copy of customSeed - repeat as needed */