Merge branch 'main' into fix-some-potential-security-issues-in-compaction_job

This commit is contained in:
ZhangHuiGui 2024-04-29 13:58:19 +08:00 committed by GitHub
commit 84146ed35e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 1818 additions and 214 deletions

View File

@ -96,3 +96,13 @@ jobs:
steps:
- uses: actions/checkout@v4.1.0
- uses: "./.github/actions/windows-build-steps"
build-linux-arm-test-full:
if: ${{ github.repository_owner == 'facebook' }}
runs-on:
labels: 4-core-ubuntu-arm
steps:
- uses: actions/checkout@v4.1.0
- uses: "./.github/actions/pre-steps"
- run: sudo apt-get update && sudo apt-get install -y build-essential libgflags-dev
- run: make V=1 J=4 -j4 check
- uses: "./.github/actions/post-steps"

View File

@ -607,3 +607,13 @@ jobs:
with:
name: maven-site
path: "${{ github.workspace }}/java/target/site"
build-linux-arm:
if: ${{ github.repository_owner == 'facebook' }}
runs-on:
labels: 4-core-ubuntu-arm
steps:
- uses: actions/checkout@v4.1.0
- uses: "./.github/actions/pre-steps"
- run: sudo apt-get update && sudo apt-get install -y build-essential
- run: ROCKSDBTESTS_PLATFORM_DEPENDENT=only make V=1 J=4 -j4 all_but_some_tests check_some
- uses: "./.github/actions/post-steps"

View File

@ -1037,8 +1037,10 @@ endif()
else()
list(APPEND SOURCES
db/db_impl/db_impl_follower.cc
port/port_posix.cc
env/env_posix.cc
env/fs_on_demand.cc
env/fs_posix.cc
env/io_posix.cc)
endif()
@ -1363,6 +1365,7 @@ if(WITH_TESTS)
db/file_indexer_test.cc
db/filename_test.cc
db/flush_job_test.cc
db/db_follower_test.cc
db/import_column_family_test.cc
db/listener_test.cc
db/log_test.cc

View File

@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $(
db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

View File

@ -60,6 +60,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_follower.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
@ -117,6 +118,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"env/env_posix.cc",
"env/file_system.cc",
"env/file_system_tracer.cc",
"env/fs_on_demand.cc",
"env/fs_posix.cc",
"env/fs_remap.cc",
"env/io_posix.cc",
@ -4795,6 +4797,12 @@ cpp_unittest_wrapper(name="db_flush_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_follower_test",
srcs=["db/db_follower_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_inplace_update_test",
srcs=["db/db_inplace_update_test.cc"],
deps=[":rocksdb_test_lib"],

View File

@ -113,7 +113,7 @@ CLANG_LIB="$CLANG_BASE/lib"
CLANG_SRC="$CLANG_BASE/../../src"
CLANG_ANALYZER="$CLANG_BIN/clang++"
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/tools/clang/tools/scan-build/bin/scan-build"
CLANG_SCAN_BUILD="$CLANG_BIN/scan-build
if [ -z "$USE_CLANG" ]; then
# gcc

View File

@ -110,7 +110,7 @@ CLANG_LIB="$CLANG_BASE/lib"
CLANG_SRC="$CLANG_BASE/../../src"
CLANG_ANALYZER="$CLANG_BIN/clang++"
CLANG_SCAN_BUILD="$CLANG_SRC/llvm/clang/tools/scan-build/bin/scan-build"
CLANG_SCAN_BUILD="$CLANG_BIN/scan-build"
if [ -z "$USE_CLANG" ]; then
# gcc

View File

@ -441,6 +441,50 @@ TEST_F(CompactFilesTest, SentinelCompressionType) {
}
}
TEST_F(CompactFilesTest, CompressionWithBlockAlign) {
Options options;
options.compression = CompressionType::kNoCompression;
options.create_if_missing = true;
options.disable_auto_compactions = true;
std::shared_ptr<FlushedFileCollector> collector =
std::make_shared<FlushedFileCollector>();
options.listeners.push_back(collector);
{
BlockBasedTableOptions bbto;
bbto.block_align = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
}
std::unique_ptr<DB> db;
{
DB* _db = nullptr;
ASSERT_OK(DB::Open(options, db_name_, &_db));
db.reset(_db);
}
ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
ASSERT_OK(db->Flush(FlushOptions()));
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(
static_cast_with_check<DBImpl>(db.get())->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles();
ASSERT_EQ(1, l0_files.size());
// We can run this test even without Snappy support because we expect the
// `CompactFiles()` to fail before actually invoking Snappy compression.
CompactionOptions compaction_opts;
compaction_opts.compression = CompressionType::kSnappyCompression;
ASSERT_TRUE(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */)
.IsInvalidArgument());
compaction_opts.compression = CompressionType::kDisableCompressionOption;
ASSERT_OK(db->CompactFiles(compaction_opts, l0_files, 1 /* output_level */));
}
TEST_F(CompactFilesTest, GetCompactionJobInfo) {
Options options;
options.create_if_missing = true;

View File

@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
// is forced to repeat the process
@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
int retries = 0;
bool last_try = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; });
"DBImpl::MultiCFSnapshot::LastTry",
[&](void* /*arg*/) { last_try = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (last_try) {
return;
}
@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::MultiGet::AfterLastTryRefSV",
{"DBImpl::MultiCFSnapshot::AfterLastTryRefSV",
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"},
{"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV",
"DBImpl::MultiGet::BeforeLastTryUnRefSV"},
"DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));

View File

@ -3997,7 +3997,7 @@ TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
}
TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
TEST_F(DBCompactionTest, CancelCompactionWaitingOnRunningConflict) {
// This test verifies cancellation of a compaction waiting to be scheduled due
// to conflict with a running compaction.
//
@ -4036,7 +4036,7 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
// Make sure the manual compaction has seen the conflict before being canceled
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyData::CompactRange:Return",
"DBCompactionTest::CancelCompactionWaitingOnConflict:"
"DBCompactionTest::CancelCompactionWaitingOnRunningConflict:"
"PreDisableManualCompaction"}});
auto manual_compaction_thread = port::Thread([this]() {
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
@ -4047,12 +4047,73 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) {
// despite finding a conflict with an automatic compaction that is still
// running
TEST_SYNC_POINT(
"DBCompactionTest::CancelCompactionWaitingOnConflict:"
"DBCompactionTest::CancelCompactionWaitingOnRunningConflict:"
"PreDisableManualCompaction");
db_->DisableManualCompaction();
manual_compaction_thread.join();
}
TEST_F(DBCompactionTest, CancelCompactionWaitingOnScheduledConflict) {
// This test verifies cancellation of a compaction waiting to be scheduled due
// to conflict with a scheduled (but not running) compaction.
//
// A `CompactRange()` in universal compacts all files, waiting for files to
// become available if they are locked for another compaction. This test
// blocks the compaction thread pool and then calls `CompactRange()` twice.
// The first call to `CompactRange()` schedules a compaction that is queued
// in the thread pool. The second call to `CompactRange()` blocks on the first
// call due to the conflict in file picking. The test verifies that
// `DisableManualCompaction()` can cancel both while the thread pool remains
// blocked.
const int kNumSortedRuns = 4;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.disable_auto_compactions = true;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
Reopen(options);
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
// Fill overlapping files in L0
Random rnd(301);
for (int i = 0; i < kNumSortedRuns; ++i) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, false /* nowait */);
}
std::atomic<int> num_compact_range_calls{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ColumnFamilyData::CompactRange:Return",
[&](void* /* arg */) { num_compact_range_calls++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
const int kNumManualCompactions = 2;
port::Thread manual_compaction_threads[kNumManualCompactions];
for (int i = 0; i < kNumManualCompactions; i++) {
manual_compaction_threads[i] = port::Thread([this]() {
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsIncomplete());
});
}
while (num_compact_range_calls < kNumManualCompactions) {
}
// Cancel it. Threads should be joinable, i.e., both the scheduled and blocked
// manual compactions were canceled despite no compaction could have ever run.
db_->DisableManualCompaction();
for (int i = 0; i < kNumManualCompactions; i++) {
manual_compaction_threads[i].join();
}
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
}
TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
// Deletions can be dropped when compacted to non-last level if they fall
// outside the lower-level files' key-ranges.

63
db/db_follower_test.cc Normal file
View File

@ -0,0 +1,63 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
#ifdef OS_LINUX
class DBFollowerTest : public DBTestBase {
public:
// Create directories for leader and follower
// Create the leader DB object
DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) {
follower_name_ = dbname_ + "/follower";
Close();
Destroy(CurrentOptions());
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
dbname_ = dbname_ + "/leader";
Reopen(CurrentOptions());
}
~DBFollowerTest() {
follower_.reset();
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
}
protected:
Status OpenAsFollower() {
return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_,
&follower_);
}
DB* follower() { return follower_.get(); }
private:
std::string follower_name_;
std::unique_ptr<DB> follower_;
};
TEST_F(DBFollowerTest, Basic) {
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(OpenAsFollower());
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
}
#endif
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -2517,12 +2517,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
return s;
}
template <class T>
Status DBImpl::MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local) {
template <class T, typename IterDerefFuncType>
Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local) {
PERF_TIMER_GUARD(get_snapshot_time);
assert(sv_from_thread_local);
@ -2539,7 +2539,7 @@ Status DBImpl::MultiCFSnapshot(
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
if (*sv_from_thread_local) {
if (*sv_from_thread_local && !extra_sv_ref) {
ReturnAndCleanupSuperVersion(cfd, super_version);
} else {
CleanupSuperVersion(super_version);
@ -2555,7 +2555,11 @@ Status DBImpl::MultiCFSnapshot(
// super version
auto cf_iter = cf_list->begin();
auto node = iter_deref_func(cf_iter);
node->super_version = GetAndRefSuperVersion(node->cfd);
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@ -2602,7 +2606,7 @@ Status DBImpl::MultiCFSnapshot(
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
@ -2617,11 +2621,15 @@ Status DBImpl::MultiCFSnapshot(
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
node->super_version = GetAndRefSuperVersion(node->cfd);
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV");
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@ -2635,6 +2643,7 @@ Status DBImpl::MultiCFSnapshot(
break;
}
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot");
if (read_options.snapshot != nullptr || last_try) {
// If user passed a snapshot, then we don't care if a memtable is
// sealed or compaction happens because the snapshot would ensure
@ -2658,7 +2667,7 @@ Status DBImpl::MultiCFSnapshot(
if (!retry) {
if (last_try) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
@ -2770,37 +2779,37 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
multiget_cf_data;
autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr);
key_range_per_cf.emplace_back(cf_start, i - cf_start);
cf_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
cf_sv_pairs.emplace_back(cf, nullptr);
std::function<MultiGetColumnFamilyData*(
autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
iter_deref_lambda =
[](autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
};
SequenceNumber consistent_seqnum;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum, &sv_from_thread_local);
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
/* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
@ -2818,31 +2827,40 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
read_callback = &timestamp_read_callback;
}
auto cf_iter = multiget_cf_data.begin();
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
&sorted_keys, cf_iter->super_version, consistent_seqnum,
assert(key_range_per_cf.size() == cf_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
auto cf_sv_pair_iter = cf_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
cf_sv_pair_iter != cf_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
cf_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
++cf_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
for (++key_range_per_cf_iter;
key_range_per_cf_iter != key_range_per_cf.end();
++key_range_per_cf_iter) {
for (size_t i = key_range_per_cf_iter->start;
i < key_range_per_cf_iter->start + key_range_per_cf_iter->num_keys;
++i) {
*sorted_keys[i]->s = s;
}
}
}
for (const auto& iter : multiget_cf_data) {
for (const auto& cf_sv_pair : cf_sv_pairs) {
if (sv_from_thread_local) {
ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version);
} else {
TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV");
CleanupSuperVersion(iter.super_version);
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV");
CleanupSuperVersion(cf_sv_pair.super_version);
}
}
}
@ -2974,21 +2992,18 @@ void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
std::function<MultiGetColumnFamilyData*(
std::array<MultiGetColumnFamilyData, 1>::iterator&)>
iter_deref_lambda =
[](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
return &(*cf_iter);
};
std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs;
cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
read_options, callback, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum, &sv_from_thread_local);
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>(
read_options, callback,
[](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
/* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return;
}
@ -3027,11 +3042,11 @@ void DBImpl::MultiGetWithCallbackImpl(
}
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum,
cf_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd,
cf_sv_pairs[0].super_version);
}
// The actual implementation of batched MultiGet. Parameters -
@ -3813,69 +3828,62 @@ Status DBImpl::NewIterators(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
}
}
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
Status s;
for (auto* cf : column_families) {
assert(cf);
if (read_options.timestamp) {
s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
} else {
s = FailIfCfHasTs(cf);
}
if (!s.ok()) {
return s;
}
cf_sv_pairs.emplace_back(cf, nullptr);
}
iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfh_to_sv.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfh, sv] : cfh_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cfh));
}
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterators is overridden
// in WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/));
}
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr /* read_callback*/,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
/* extra_sv_ref */ true, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return s;
}
assert(cf_sv_pairs.size() == column_families.size());
if (read_options.tailing) {
for (const auto& cf_sv_pair : cf_sv_pairs) {
auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd,
cf_sv_pair.super_version,
/* allow_unprepared_value */ true);
iterators->push_back(
NewDBIterator(env_, read_options, *cf_sv_pair.cfd->ioptions(),
cf_sv_pair.super_version->mutable_cf_options,
cf_sv_pair.cfd->user_comparator(), iter,
cf_sv_pair.super_version->current, kMaxSequenceNumber,
cf_sv_pair.super_version->mutable_cf_options
.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cf_sv_pair.cfh));
}
} else {
for (const auto& cf_sv_pair : cf_sv_pairs) {
iterators->push_back(NewIteratorImpl(
read_options, cf_sv_pair.cfh, cf_sv_pair.super_version,
consistent_seqnum, nullptr /*read_callback*/));
}
}
return Status::OK();
}

View File

@ -1535,7 +1535,7 @@ class DBImpl : public DB {
Status WriteRecoverableState();
// Actual implementation of Close()
Status CloseImpl();
virtual Status CloseImpl();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
@ -2337,10 +2337,7 @@ class DBImpl : public DB {
// A structure to hold the information required to process MultiGet of keys
// belonging to one column family. For a multi column family MultiGet, there
// will be a container of these objects.
struct MultiGetColumnFamilyData {
ColumnFamilyHandle* cf;
ColumnFamilyData* cfd;
struct MultiGetKeyRangePerCf {
// For the batched MultiGet which relies on sorted keys, start specifies
// the index of first key belonging to this column family in the sorted
// list.
@ -2350,31 +2347,33 @@ class DBImpl : public DB {
// belonging to this column family in the sorted list
size_t num_keys;
MultiGetKeyRangePerCf() : start(0), num_keys(0) {}
MultiGetKeyRangePerCf(size_t first, size_t count)
: start(first), num_keys(count) {}
};
// A structure to contain ColumnFamilyData and the SuperVersion obtained for
// the consistent view of DB
struct ColumnFamilySuperVersionPair {
ColumnFamilyHandleImpl* cfh;
ColumnFamilyData* cfd;
// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(0),
num_keys(0),
ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
cfd(cfh->cfd()),
super_version(sv) {}
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first,
size_t count, SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(first),
num_keys(count),
super_version(sv) {}
MultiGetColumnFamilyData() = default;
ColumnFamilySuperVersionPair() = default;
};
// A common function to obtain a consistent snapshot, which can be implicit
// if the user doesn't specify a snapshot in read_options, across
// multiple column families for MultiGet. It will attempt to get an implicit
// multiple column families. It will attempt to get an implicit
// snapshot without acquiring the db_mutes, but will give up after a few
// tries and acquire the mutex if a memtable flush happens. The template
// allows both the batched and non-batched MultiGet to call this with
@ -2383,18 +2382,26 @@ class DBImpl : public DB {
// If callback is non-null, the callback is refreshed with the snapshot
// sequence number
//
// `extra_sv_ref` is used to indicate whether thread-local SuperVersion
// should be obtained with an extra ref (by GetReferencedSuperVersion()) or
// not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet
// does not require SuperVersion to be re-acquired throughout the entire
// invocation (no need extra ref), while MultiCfIterators may need the
// SuperVersion to be updated during Refresh() (requires extra ref).
//
// `sv_from_thread_local` being set to false indicates that the SuperVersion
// obtained from the ColumnFamilyData, whereas true indicates they are thread
// local.
//
// A non-OK status will be returned if for a column family that enables
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
// attemps to read collapsed history.
template <class T>
Status MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local);
template <class T, typename IterDerefFuncType>
Status MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local);
// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number

View File

@ -1420,6 +1420,14 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
if (compact_options.compression !=
CompressionType::kDisableCompressionOption) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [JOB %d] Found use of deprecated option "
"`CompactionOptions::compression`",
cfd->GetName().c_str(), job_context.job_id);
}
// Perform CompactFiles
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
TEST_SYNC_POINT_CALLBACK("TestCompactFiles:PausingManualCompaction:3",
@ -2164,16 +2172,6 @@ Status DBImpl::RunManualCompaction(
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore, trim_ts)) == nullptr &&
manual_conflict))) {
if (!scheduled) {
// There is a conflicting compaction
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
// Stop waiting since it was canceled. Pretend the error came from
// compaction so the below cleanup/error handling code can process it.
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
}
if (!manual.done) {
bg_cv_.Wait();
}
@ -2248,6 +2246,17 @@ Status DBImpl::RunManualCompaction(
*final_output_level = compaction->output_level();
}
}
if (!scheduled) {
// There is nothing scheduled to wait on, so any cancellation can end the
// manual now.
if (manual_compaction_paused_ > 0 || manual.canceled == true) {
// Stop waiting since it was canceled. Pretend the error came from
// compaction so the below cleanup/error handling code can process it.
manual.done = true;
manual.status =
Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
}
}
log_buffer.FlushBufferToLog();

View File

@ -0,0 +1,310 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_impl/db_impl_follower.h"
#include <cinttypes>
#include "db/arena_wrapped_db_iter.h"
#include "db/merge_context.h"
#include "env/composite_env_wrapper.h"
#include "env/fs_on_demand.h"
#include "logging/auto_roll_logger.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "rocksdb/db.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {
DBImplFollower::DBImplFollower(const DBOptions& db_options,
std::unique_ptr<Env>&& env,
const std::string& dbname, std::string src_path)
: DBImplSecondary(db_options, dbname, ""),
env_guard_(std::move(env)),
stop_requested_(false),
src_path_(std::move(src_path)),
cv_(&mu_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in follower mode");
LogFlush(immutable_db_options_.info_log);
}
DBImplFollower::~DBImplFollower() {
Status s = Close();
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s",
s.ToString().c_str());
}
}
// Recover a follower DB instance by reading the MANIFEST. The verification
// as part of the MANIFEST replay will ensure that local links to the
// leader's files are created, thus ensuring we can continue reading them
// even if the leader deletes those files due to compaction.
// TODO:
// 1. Devise a mechanism to prevent misconfiguration by, for example,
// keeping a local copy of the IDENTITY file and cross checking
// 2. Make the recovery more robust by retrying if the first attempt
// fails.
Status DBImplFollower::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
mutex_.AssertHeld();
JobContext job_context(0);
Status s;
s = static_cast<ReactiveVersionSet*>(versions_.get())
->Recover(column_families, &manifest_reader_, &manifest_reporter_,
&manifest_reader_status_);
if (!s.ok()) {
if (manifest_reader_status_) {
manifest_reader_status_->PermitUncheckedError();
}
return s;
}
if (immutable_db_options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
if (s.ok()) {
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
// Start the periodic catch-up thread
// TODO: See if it makes sense to have a threadpool, rather than a thread
// per follower DB instance
catch_up_thread_.reset(
new port::Thread(&DBImplFollower::PeriodicRefresh, this));
}
return s;
}
// Try to catch up by tailing the MANIFEST.
// TODO:
// 1. Cleanup obsolete files afterward
// 2. Add some error notifications and statistics
Status DBImplFollower::TryCatchUpWithLeader() {
assert(versions_.get() != nullptr);
assert(manifest_reader_.get() != nullptr);
Status s;
// read the manifest and apply new changes to the follower instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
{
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
if (cfd->mem()->GetEarliestSequenceNumber() <
versions_->LastSequence()) {
// Construct a new memtable with earliest sequence number set to the
// last sequence number in the VersionSet. This matters when
// DBImpl::MultiCFSnapshot tries to get consistent references
// to super versions in a lock free manner, it checks the earliest
// sequence number to detect if there was a change in version in
// the meantime.
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
MemTable* new_mem = cfd->ConstructNewMemtable(
mutable_cf_options, versions_->LastSequence());
cfd->mem()->SetNextLogNumber(cfd->GetLogNumber());
cfd->mem()->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(cfd->mem(), &job_context.memtables_to_free);
new_mem->Ref();
cfd->SetMemtable(new_mem);
}
// This will check if the old memtable is still referenced
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
}
}
job_context.Clean();
return s;
}
void DBImplFollower::PeriodicRefresh() {
while (!stop_requested_.load()) {
MutexLock l(&mu_);
int64_t wait_until =
immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_refresh_catchup_period_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
if (stop_requested_.load()) {
break;
}
Status s;
for (uint64_t i = 0;
i < immutable_db_options_.follower_catchup_retry_count &&
!stop_requested_.load();
++i) {
s = TryCatchUpWithLeader();
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Successful catch up on attempt %llu",
static_cast<unsigned long long>(i));
break;
}
wait_until = immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_catchup_retry_wait_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful");
}
}
}
Status DBImplFollower::Close() {
if (catch_up_thread_) {
stop_requested_.store(true);
{
MutexLock l(&mu_);
cv_.SignalAll();
}
catch_up_thread_->join();
catch_up_thread_.reset();
}
return DBImpl::Close();
}
Status DB::OpenAsFollower(const Options& options, const std::string& dbname,
const std::string& leader_path,
std::unique_ptr<DB>* dbptr) {
dbptr->reset();
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::OpenAsFollower(db_options, dbname, leader_path,
column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
delete handles[0];
}
return s;
}
Status DB::OpenAsFollower(
const DBOptions& db_options, const std::string& dbname,
const std::string& src_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr) {
dbptr->reset();
FileSystem* fs = db_options.env->GetFileSystem().get();
{
IOStatus io_s;
if (db_options.create_if_missing) {
io_s = fs->CreateDirIfMissing(dbname, IOOptions(), nullptr);
} else {
io_s = fs->FileExists(dbname, IOOptions(), nullptr);
}
if (!io_s.ok()) {
return static_cast<Status>(io_s);
}
}
std::unique_ptr<Env> new_env(new CompositeEnvWrapper(
db_options.env, NewOnDemandFileSystem(db_options.env->GetFileSystem(),
src_path, dbname)));
DBOptions tmp_opts(db_options);
Status s;
tmp_opts.env = new_env.get();
if (nullptr == tmp_opts.info_log) {
s = CreateLoggerFromOptions(dbname, tmp_opts, &tmp_opts.info_log);
if (!s.ok()) {
tmp_opts.info_log = nullptr;
return s;
}
}
handles->clear();
DBImplFollower* impl =
new DBImplFollower(tmp_opts, std::move(new_env), dbname, src_path);
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->file_options_,
impl->table_cache_.get(), impl->write_buffer_manager_,
&impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock();
s = impl->Recover(column_families, /*read_only=*/true,
/*error_if_wal_file_exists=*/false,
/*error_if_data_exists_in_wals=*/false);
if (s.ok()) {
for (const auto& cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (nullptr == cfd) {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
SuperVersionContext sv_context(false /* create_superversion */);
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
}
}
impl->mutex_.Unlock();
sv_context.Clean();
if (s.ok()) {
dbptr->reset(impl);
for (auto h : *handles) {
impl->NewThreadStatusCfInfo(
static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
}
} else {
for (auto h : *handles) {
delete h;
}
handles->clear();
delete impl;
}
return s;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,53 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "db/db_impl/db_impl_secondary.h"
#include "logging/logging.h"
#include "port/port.h"
namespace ROCKSDB_NAMESPACE {
class DBImplFollower : public DBImplSecondary {
public:
DBImplFollower(const DBOptions& db_options, std::unique_ptr<Env>&& env,
const std::string& dbname, std::string src_path);
~DBImplFollower();
Status Close() override;
protected:
bool OwnTablesAndLogs() const override {
// TODO: Change this to true once we've properly implemented file
// deletion for the read scaling case
return false;
}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/,
bool /*is_retry*/ = false, uint64_t* = nullptr,
RecoveryContext* /*recovery_ctx*/ = nullptr,
bool* /*can_retry*/ = nullptr) override;
private:
friend class DB;
Status TryCatchUpWithLeader();
void PeriodicRefresh();
std::unique_ptr<Env> env_guard_;
std::unique_ptr<port::Thread> catch_up_thread_;
std::atomic<bool> stop_requested_;
std::string src_path_;
port::Mutex mu_;
port::CondVar cv_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -277,6 +277,10 @@ class DBImplSecondary : public DBImpl {
return false;
}
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
private:
friend class DB;
@ -305,10 +309,6 @@ class DBImplSecondary : public DBImpl {
const CompactionServiceInput& input,
CompactionServiceResult* result);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
// Cache log readers for each log number, used for continue WAL replay
// after recovery
std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;

View File

@ -3555,6 +3555,121 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) {
iter->Reset();
}
TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
// After SV is obtained for the first CF, flush for the second CF
ASSERT_OK(Flush(1));
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ReadOptions read_options;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val_new");
}
for (auto* iter : iters) {
delete iter;
}
// Thread-local SVs are no longer obsolete nor in use
for (int i = 0; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) {
Options options = GetDefaultOptions();
options.atomic_flush = true;
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
// After SV is obtained for the first CF, do the atomic flush()
ASSERT_OK(Flush());
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Explicit snapshot wouldn't force reloading all svs. We should expect old
// values
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions read_options;
read_options.snapshot = snapshot;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val");
}
db_->ReleaseSnapshot(snapshot);
for (auto* iter : iters) {
delete iter;
}
// Thread-local SV for cf_0 is obsolete (atomic flush happened after the first
// SV Ref)
auto* cfd0 =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd();
ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
// Rest are not InUse nor Obsolete
for (int i = 1; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -313,6 +313,10 @@ TEST_F(DBMemTableTest, InsertWithHint) {
ASSERT_EQ("foo_v3", Get("foo_k3"));
ASSERT_EQ("bar_v1", Get("bar_k1"));
ASSERT_EQ("bar_v2", Get("bar_k2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), "foo_k1", "foo_k4"));
ASSERT_EQ(hint_bar, rep->last_hint_in());
ASSERT_EQ(hint_bar, rep->last_hint_out());
ASSERT_EQ(5, rep->num_insert_with_hint());
ASSERT_EQ("vvv", Get("NotInPrefixDomain"));
}

View File

@ -795,7 +795,7 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
std::thread threads[10];
for (int t = 0; t < 10; t++) {
threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix,
no_wal_value, this] {
no_wal_value, &options, this] {
for (int i = 0; i < 10; i++) {
ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
write_option_disable.disableWAL = true;
@ -806,7 +806,10 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
std::string wal_key =
wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
ASSERT_OK(dbfull()->SyncWAL());
ASSERT_OK(dbfull()->SyncWAL())
<< "options.env: " << options.env << ", env_: " << env_
<< ", env_->is_wal_sync_thread_safe_: "
<< env_->is_wal_sync_thread_safe_.load();
}
return;
});

View File

@ -765,8 +765,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_);
if (!allow_concurrent) {
// Extract prefix for insert with hint.
if (insert_with_hint_prefix_extractor_ != nullptr &&
// Extract prefix for insert with hint. Hints are for point key table
// (`table_`) only, not `range_del_table_`.
if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr &&
insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);

View File

@ -399,6 +399,113 @@ TEST_F(CoalescingIteratorTest, LowerAndUpperBounds) {
}
}
TEST_F(CoalescingIteratorTest, ConsistentViewExplicitSnapshot) {
Options options = GetDefaultOptions();
options.atomic_flush = true;
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
ASSERT_OK(Flush());
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
ReadOptions read_options;
const Snapshot* snapshot = db_->GetSnapshot();
read_options.snapshot = snapshot;
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("cf2_key");
ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
}
db_->ReleaseSnapshot(snapshot);
}
TEST_F(CoalescingIteratorTest, ConsistentViewImplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
ASSERT_OK(Flush(1));
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->Seek("cf2_key");
ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val_new");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "cf3_key->cf3_val_new");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("cf1_key");
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val_new");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val_new");
}
}
TEST_F(CoalescingIteratorTest, EmptyCfs) {
Options options = GetDefaultOptions();
{

330
env/fs_on_demand.cc vendored Normal file
View File

@ -0,0 +1,330 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "env/fs_on_demand.h"
#include <set>
#include "file/filename.h"
#include "port/port.h"
#include "rocksdb/types.h"
namespace ROCKSDB_NAMESPACE {
// Check if the input path is under orig (typically the local directory), and if
// so, change it to the equivalent path under replace (typically the remote
// directory). For example, if orig is "/data/follower", replace is
// "/data/leader", and the given path is "/data/follower/000010.sst", on return
// the path would be changed to
// "/data/leader/000010.sst".
// Return value is true if the path was modified, false otherwise
bool OnDemandFileSystem::CheckPathAndAdjust(const std::string& orig,
const std::string& replace,
std::string& path) {
size_t pos = path.find(orig);
if (pos > 0) {
return false;
}
path.replace(pos, orig.length(), replace);
return true;
}
bool OnDemandFileSystem::LookupFileType(const std::string& name,
FileType* type) {
std::size_t found = name.find_last_of('/');
std::string file_name = name.substr(found);
uint64_t number = 0;
return ParseFileName(file_name, &number, type);
}
// RocksDB opens non-SST files for reading in sequential file mode. This
// includes CURRENT, OPTIONS, MANIFEST etc. For these files, we open them
// in place in the source directory. For files that are appendable or
// can be renamed, which is MANIFEST and CURRENT files, we wrap the
// underlying FSSequentialFile with another class that checks when EOF
// has been reached and re-opens the file to see the latest data. On some
// distributed file systems, this is necessary.
IOStatus OnDemandFileSystem::NewSequentialFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
FileType type;
static std::unordered_set<FileType> valid_types(
{kWalFile, kDescriptorFile, kCurrentFile, kIdentityFile, kOptionsFile});
if (!LookupFileType(fname, &type) ||
(valid_types.find(type) == valid_types.end())) {
return IOStatus::NotSupported();
}
IOStatus s;
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
std::unique_ptr<FSSequentialFile> inner_file;
s = target()->NewSequentialFile(rname, file_opts, &inner_file, dbg);
if (s.ok() && type == kDescriptorFile) {
result->reset(new OnDemandSequentialFile(std::move(inner_file), this,
file_opts, rname));
} else {
*result = std::move(inner_file);
}
} else {
s = target()->NewSequentialFile(fname, file_opts, result, dbg);
}
return s;
}
// This is only supported for SST files. If the file is present locally,
// i.e in the destination dir, we just open it and return. If its in the
// remote, i.e source dir, we link it locally and open the link.
// TODO: Add support for blob files belonging to the new BlobDB
IOStatus OnDemandFileSystem::NewRandomAccessFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
FileType type;
if (!LookupFileType(fname, &type) || type != kTableFile) {
return IOStatus::NotSupported();
}
IOStatus s = target()->FileExists(fname, file_opts.io_options, nullptr);
if (s.IsNotFound() || s.IsPathNotFound()) {
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
s = target()->LinkFile(rname, fname, IOOptions(), nullptr);
if (!s.ok()) {
return s;
}
}
}
return s.ok() ? target()->NewRandomAccessFile(fname, file_opts, result, dbg)
: s;
}
// We don't expect to create any writable file other than info LOG files.
IOStatus OnDemandFileSystem::NewWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
FileType type;
if (!LookupFileType(fname, &type) || type != kInfoLogFile) {
return IOStatus::NotSupported();
}
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
IOStatus s = target()->FileExists(rname, file_opts.io_options, dbg);
if (s.ok()) {
return IOStatus::InvalidArgument(
"Writing to a file present in the remote directory not supoprted");
}
}
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
// Currently not supported, as there's no need for RocksDB to create a
// directory object for a DB in read-only mode.
IOStatus OnDemandFileSystem::NewDirectory(
const std::string& /*name*/, const IOOptions& /*io_opts*/,
std::unique_ptr<FSDirectory>* /*result*/, IODebugContext* /*dbg*/) {
return IOStatus::NotSupported();
}
// Check if the given file exists, either locally or remote. If the file is an
// SST file, then link it locally. We assume if the file existence is being
// checked, its for verification purposes, for example while replaying the
// MANIFEST. The file will be opened for reading some time in the future.
IOStatus OnDemandFileSystem::FileExists(const std::string& fname,
const IOOptions& options,
IODebugContext* dbg) {
IOStatus s = target()->FileExists(fname, options, dbg);
if (!s.IsNotFound() && !s.IsPathNotFound()) {
return s;
}
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
FileType type;
if (LookupFileType(fname, &type) && type == kTableFile) {
s = target()->LinkFile(rname, fname, options, dbg);
} else {
s = target()->FileExists(rname, options, dbg);
}
}
return s;
}
// Doa listing of both the local and remote directories and merge the two.
IOStatus OnDemandFileSystem::GetChildren(const std::string& dir,
const IOOptions& options,
std::vector<std::string>* result,
IODebugContext* dbg) {
std::string rdir = dir;
IOStatus s = target()->GetChildren(dir, options, result, dbg);
if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
return s;
}
std::vector<std::string> rchildren;
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rdir);
s = target()->GetChildren(rdir, options, &rchildren, dbg);
if (s.ok()) {
std::for_each(rchildren.begin(), rchildren.end(), [&](std::string& name) {
// Adjust name
(void)CheckPathAndAdjust(remote_path_, local_path_, name);
});
std::sort(result->begin(), result->end());
std::sort(rchildren.begin(), rchildren.end());
std::vector<std::string> output;
output.reserve(result->size() + rchildren.size());
std::set_union(result->begin(), result->end(), rchildren.begin(),
rchildren.end(), std::back_inserter(output));
*result = std::move(output);
}
return s;
}
// Doa listing of both the local and remote directories and merge the two.
IOStatus OnDemandFileSystem::GetChildrenFileAttributes(
const std::string& dir, const IOOptions& options,
std::vector<FileAttributes>* result, IODebugContext* dbg) {
std::string rdir = dir;
IOStatus s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
if (!s.ok() || !CheckPathAndAdjust(local_path_, remote_path_, rdir)) {
return s;
}
std::vector<FileAttributes> rchildren;
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rdir);
s = target()->GetChildrenFileAttributes(rdir, options, &rchildren, dbg);
if (s.ok()) {
struct FileAttributeSorter {
bool operator()(const FileAttributes& lhs, const FileAttributes& rhs) {
return lhs.name < rhs.name;
}
} file_attr_sorter;
std::for_each(
rchildren.begin(), rchildren.end(), [&](FileAttributes& file) {
// Adjust name
(void)CheckPathAndAdjust(remote_path_, local_path_, file.name);
});
std::sort(result->begin(), result->end(), file_attr_sorter);
std::sort(rchildren.begin(), rchildren.end(), file_attr_sorter);
std::vector<FileAttributes> output;
output.reserve(result->size() + rchildren.size());
std::set_union(rchildren.begin(), rchildren.end(), result->begin(),
result->end(), std::back_inserter(output), file_attr_sorter);
*result = std::move(output);
}
return s;
}
IOStatus OnDemandFileSystem::GetFileSize(const std::string& fname,
const IOOptions& options,
uint64_t* file_size,
IODebugContext* dbg) {
uint64_t local_size = 0;
IOStatus s = target()->GetFileSize(fname, options, &local_size, dbg);
if (!s.ok() && !s.IsNotFound() && !s.IsPathNotFound()) {
return s;
}
if (s.IsNotFound() || s.IsPathNotFound()) {
std::string rname = fname;
if (CheckPathAndAdjust(local_path_, remote_path_, rname)) {
// First clear any local directory cache as it may be out of date
target()->DiscardCacheForDirectory(rname);
FileType type;
if (LookupFileType(fname, &type) && type == kTableFile) {
s = target()->LinkFile(rname, fname, options, dbg);
if (s.ok()) {
s = target()->GetFileSize(fname, options, &local_size, dbg);
}
} else {
s = target()->GetFileSize(rname, options, &local_size, dbg);
}
}
}
*file_size = local_size;
return s;
}
// An implementation of Read that tracks whether we've reached EOF. If so,
// re-open the file to try to read past the previous EOF offset. After
// re-opening, positing it back to the last read offset.
IOStatus OnDemandSequentialFile::Read(size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
IOStatus s;
if (eof_) {
// Reopen the file. With some distributed file systems, this is required
// in order to get the new size
file_.reset();
s = fs_->NewSequentialFile(path_, file_opts_, &file_, dbg);
if (!s.ok()) {
return IOStatus::IOError("While opening file after relinking, got error ",
s.ToString());
}
s = file_->Skip(offset_);
if (!s.ok()) {
return IOStatus::IOError(
"While seeking to offset" + std::to_string(offset_) + "got error",
s.ToString());
}
eof_ = false;
}
s = file_->Read(n, options, result, scratch, dbg);
if (s.ok()) {
offset_ += result->size();
if (result->size() < n) {
// We reached EOF. Mark it so we know to relink next time
eof_ = true;
}
}
return s;
}
IOStatus OnDemandSequentialFile::Skip(uint64_t n) {
IOStatus s = file_->Skip(n);
if (s.ok()) {
offset_ += n;
}
return s;
}
bool OnDemandSequentialFile::use_direct_io() const {
return file_->use_direct_io();
}
size_t OnDemandSequentialFile::GetRequiredBufferAlignment() const {
return file_->GetRequiredBufferAlignment();
}
Temperature OnDemandSequentialFile::GetTemperature() const {
return file_->GetTemperature();
}
std::shared_ptr<FileSystem> NewOnDemandFileSystem(
const std::shared_ptr<FileSystem>& fs, std::string src_path,
std::string dest_path) {
return std::make_shared<OnDemandFileSystem>(fs, src_path, dest_path);
}
} // namespace ROCKSDB_NAMESPACE

139
env/fs_on_demand.h vendored Normal file
View File

@ -0,0 +1,139 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include "rocksdb/file_system.h"
namespace ROCKSDB_NAMESPACE {
// A FileSystem that links files to a local (destination) directory from a
// corresponding remote (source) directory on demand. The decision to link
// depends on the file type, with appendable or rename-able files, such as,
// descriptors, logs, CURRENT, being read in place in the remote directory,
// and SST files being linked. In the future, files read in place may be
// mirrored to the local directory, so the local dir has a complete database
// for troubleshooting purposes.
class OnDemandFileSystem : public FileSystemWrapper {
public:
OnDemandFileSystem(const std::shared_ptr<FileSystem>& target,
const std::string& remote_path,
const std::string& local_path)
: FileSystemWrapper(target),
remote_path_(remote_path),
local_path_(local_path) {}
const char* Name() const override { return "OnDemandFileSystem"; }
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg) override;
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override;
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override;
IOStatus ReuseWritableFile(const std::string& /*fname*/,
const std::string& /*old_fname*/,
const FileOptions& /*fopts*/,
std::unique_ptr<FSWritableFile>* /*result*/,
IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported("ReuseWritableFile");
}
IOStatus NewDirectory(const std::string& name, const IOOptions& io_opts,
std::unique_ptr<FSDirectory>* result,
IODebugContext* dbg) override;
IOStatus FileExists(const std::string& fname, const IOOptions& options,
IODebugContext* dbg) override;
IOStatus GetChildren(const std::string& dir, const IOOptions& options,
std::vector<std::string>* result,
IODebugContext* dbg) override;
IOStatus GetChildrenFileAttributes(const std::string& dir,
const IOOptions& options,
std::vector<FileAttributes>* result,
IODebugContext* dbg) override;
IOStatus GetFileSize(const std::string& fname, const IOOptions& options,
uint64_t* file_size, IODebugContext* dbg) override;
private:
bool CheckPathAndAdjust(const std::string& orig, const std::string& replace,
std::string& path);
bool LookupFileType(const std::string& name, FileType* type);
const std::string remote_path_;
const std::string local_path_;
};
// A wrapper class around an FSSequentialFile object. Its mainly
// intended to be used for appendable files like MANIFEST and logs.
// Beneath the covers, it tracks when EOF is reached, and reopens
// the file in order to read the latest appended data. This is
// necessary on some distributed file systems as they may have
// stale metadata about the file.
// TODO: Mirror the data read to a local file for troubleshooting
// purposes, as well as recovery in case the source dir is
// deleted.
class OnDemandSequentialFile : public FSSequentialFile {
public:
OnDemandSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
OnDemandFileSystem* fs, const FileOptions& file_opts,
const std::string& path)
: file_(std::move(file)),
fs_(fs),
file_opts_(file_opts),
path_(path),
eof_(false),
offset_(0) {}
virtual ~OnDemandSequentialFile() {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override;
IOStatus Skip(uint64_t n) override;
bool use_direct_io() const override;
size_t GetRequiredBufferAlignment() const override;
IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
return IOStatus::NotSupported("InvalidateCache not supported.");
}
IOStatus PositionedRead(uint64_t /*offset*/, size_t /*n*/,
const IOOptions& /*options*/, Slice* /*result*/,
char* /*scratch*/, IODebugContext* /*dbg*/) override {
return IOStatus::NotSupported("PositionedRead");
}
Temperature GetTemperature() const override;
private:
std::unique_ptr<FSSequentialFile> file_;
OnDemandFileSystem* fs_;
const FileOptions file_opts_;
const std::string path_;
bool eof_;
uint64_t offset_;
};
std::shared_ptr<FileSystem> NewOnDemandFileSystem(
const std::shared_ptr<FileSystem>& fs, std::string remote_path,
std::string local_path);
} // namespace ROCKSDB_NAMESPACE

View File

@ -205,15 +205,17 @@ IOStatus WritableFileWriter::Pad(const IOOptions& opts,
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
size_t pad_start = buf_.CurrentSize();
const size_t pad_start = buf_.CurrentSize();
// Assume pad_bytes is small compared to buf_ capacity. So we always
// use buf_ rather than write directly to file in certain cases like
// Append() does.
size_t actual_pad_bytes = 0;
while (left) {
size_t append_bytes = std::min(cap, left);
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
actual_pad_bytes += append_bytes;
if (left > 0) {
IOStatus s = Flush(io_options);
if (!s.ok()) {
@ -226,10 +228,13 @@ IOStatus WritableFileWriter::Pad(const IOOptions& opts,
pending_sync_ = true;
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + pad_bytes, std::memory_order_release);
Slice data(buf_.BufferStart() + pad_start, actual_pad_bytes);
UpdateFileChecksum(data);
if (perform_data_verification_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_,
buf_.BufferStart() + pad_start, pad_bytes);
buf_.BufferStart() + pad_start, actual_pad_bytes);
}
return IOStatus::OK();
}

View File

@ -229,11 +229,18 @@ struct AdvancedColumnFamilyOptions {
// if it is not explicitly set by the user. Otherwise, the default is 0.
int64_t max_write_buffer_size_to_maintain = 0;
// Allows thread-safe inplace updates. If this is true, there is no way to
// Allows thread-safe inplace updates.
//
// If this is true, there is no way to
// achieve point-in-time consistency using snapshot or iterator (assuming
// concurrent updates). Hence iterator and multi-get will return results
// which are not consistent as of any point-in-time.
//
// Backward iteration on memtables will not work either.
//
// It is intended to work or be compatible with a limited set of features:
// (1) Non-snapshot Get()
//
// If inplace_callback function is not set,
// Put(key, new_value) will update inplace the existing_value iff
// * key exists in current memtable

View File

@ -298,6 +298,29 @@ class DB {
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// EXPERIMENTAL
// Open a database as a follower. The difference between this and opening
// as secondary is that the follower database has its own directory with
// links to the actual files, and can tolarate obsolete file deletions by
// the leader to its own database. Another difference is the follower
// tries to keep up with the leader by periodically tailing the leader's
// MANIFEST, and (in the future) memtable updates, rather than relying on
// the user to manually call TryCatchupWithPrimary().
// Open as a follower with the default column family
static Status OpenAsFollower(const Options& options, const std::string& name,
const std::string& leader_path,
std::unique_ptr<DB>* dbptr);
// Open as a follower with multiple column families
static Status OpenAsFollower(
const DBOptions& db_options, const std::string& name,
const std::string& leader_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr);
// End EXPERIMENTAL
// Open DB and run the compaction.
// It's a read-only operation, the result won't be installed to the DB, it
// will be output to the `output_directory`. The API should only be used with

View File

@ -702,6 +702,16 @@ class FileSystem : public Customizable {
return IOStatus::OK();
}
// EXPERIMENTAL
// Discard any directory metadata cached in memory for the specified
// directory and its descendants. Useful for distributed file systems
// where the local cache may be out of sync with the actual directory state.
//
// The implementation is not required to be thread safe. Its the caller's
// responsibility to ensure that no directory operations happen
// concurrently.
virtual void DiscardCacheForDirectory(const std::string& /*path*/) {}
// Indicates to upper layers which FileSystem operations mentioned in
// FSSupportedOps are supported by underlying FileSystem. Each bit in
// supported_ops argument represent corresponding FSSupportedOps operation.
@ -1624,6 +1634,10 @@ class FileSystemWrapper : public FileSystem {
return target_->AbortIO(io_handles);
}
void DiscardCacheForDirectory(const std::string& path) override {
target_->DiscardCacheForDirectory(path);
}
void SupportedOps(int64_t& supported_ops) override {
return target_->SupportedOps(supported_ops);
}

View File

@ -1487,6 +1487,30 @@ struct DBOptions {
// use "0:00-23:59". To make an entire day have no offpeak period, leave
// this field blank. Default: Empty string (no offpeak).
std::string daily_offpeak_time_utc = "";
// EXPERIMENTAL
// When a RocksDB database is opened in follower mode, this option
// is set by the user to request the frequency of the follower
// attempting to refresh its view of the leader. RocksDB may choose to
// trigger catch ups more frequently if it detects any changes in the
// database state.
// Default every 10s.
uint64_t follower_refresh_catchup_period_ms = 10000;
// For a given catch up attempt, this option specifies the number of times
// to tail the MANIFEST and try to install a new, consistent version before
// giving up. Though it should be extremely rare, the catch up may fail if
// the leader is mutating the LSM at a very high rate and the follower is
// unable to get a consistent view.
// Default to 10 attempts
uint64_t follower_catchup_retry_count = 10;
// Time to wait between consecutive catch up attempts
// Default 100ms
uint64_t follower_catchup_retry_wait_ms = 100;
// End EXPERIMENTAL
};
// Options to control the behavior of a database (passed to DB::Open)
@ -1911,20 +1935,29 @@ Status CreateLoggerFromOptions(const std::string& dbname,
// CompactionOptions are used in CompactFiles() call.
struct CompactionOptions {
// DEPRECATED: this option is unsafe because it allows the user to set any
// `CompressionType` while always using `CompressionOptions` from the
// `ColumnFamilyOptions`. As a result the `CompressionType` and
// `CompressionOptions` can easily be inconsistent.
//
// Compaction output compression type
// Default: snappy
//
// Default: `kDisableCompressionOption`
//
// If set to `kDisableCompressionOption`, RocksDB will choose compression type
// according to the `ColumnFamilyOptions`, taking into account the output
// level if `compression_per_level` is specified.
// according to the `ColumnFamilyOptions`. RocksDB takes into account the
// output level in case the `ColumnFamilyOptions` has level-specific settings.
CompressionType compression;
// Compaction will create files of size `output_file_size_limit`.
// Default: MAX, which means that compaction will create a single file
uint64_t output_file_size_limit;
// If > 0, it will replace the option in the DBOptions for this compaction.
uint32_t max_subcompactions;
CompactionOptions()
: compression(kSnappyCompression),
: compression(kDisableCompressionOption),
output_file_size_limit(std::numeric_limits<uint64_t>::max()),
max_subcompactions(0) {}
};

View File

@ -280,7 +280,7 @@ struct PerfContextBase {
struct PerfContext : public PerfContextBase {
~PerfContext();
PerfContext() {}
PerfContext() { Reset(); }
PerfContext(const PerfContext&);
PerfContext& operator=(const PerfContext&);

View File

@ -83,7 +83,7 @@ class EnvMirror : public EnvWrapper {
std::sort(ar.begin(), ar.end());
std::sort(br.begin(), br.end());
if (!as.ok() || ar != br) {
assert(0 == "getchildren results don't match");
assert(nullptr == "getchildren results don't match");
}
*r = ar;
return as;

View File

@ -20,10 +20,9 @@ public class CompactionOptionsTest {
public void compression() {
try (final CompactionOptions compactionOptions = new CompactionOptions()) {
assertThat(compactionOptions.compression())
.isEqualTo(CompressionType.SNAPPY_COMPRESSION);
compactionOptions.setCompression(CompressionType.NO_COMPRESSION);
assertThat(compactionOptions.compression())
.isEqualTo(CompressionType.NO_COMPRESSION);
.isEqualTo(CompressionType.DISABLE_COMPRESSION_OPTION);
compactionOptions.setCompression(CompressionType.SNAPPY_COMPRESSION);
assertThat(compactionOptions.compression()).isEqualTo(CompressionType.SNAPPY_COMPRESSION);
}
}

View File

@ -559,6 +559,19 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, enforce_single_del_contracts),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_refresh_catchup_period_ms",
{offsetof(struct ImmutableDBOptions,
follower_refresh_catchup_period_ms),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_catchup_retry_count",
{offsetof(struct ImmutableDBOptions, follower_catchup_retry_count),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"follower_catchup_retry_wait_ms",
{offsetof(struct ImmutableDBOptions, follower_catchup_retry_wait_ms),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";
@ -756,7 +769,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
checksum_handoff_file_types(options.checksum_handoff_file_types),
lowest_used_cache_tier(options.lowest_used_cache_tier),
compaction_service(options.compaction_service),
enforce_single_del_contracts(options.enforce_single_del_contracts) {
enforce_single_del_contracts(options.enforce_single_del_contracts),
follower_refresh_catchup_period_ms(
options.follower_refresh_catchup_period_ms),
follower_catchup_retry_count(options.follower_catchup_retry_count),
follower_catchup_retry_wait_ms(options.follower_catchup_retry_wait_ms) {
fs = env->GetFileSystem();
clock = env->GetSystemClock().get();
logger = info_log.get();

View File

@ -104,6 +104,9 @@ struct ImmutableDBOptions {
Logger* logger;
std::shared_ptr<CompactionService> compaction_service;
bool enforce_single_del_contracts;
uint64_t follower_refresh_catchup_period_ms;
uint64_t follower_catchup_retry_count;
uint64_t follower_catchup_retry_wait_ms;
bool IsWalDirSameAsDBPath() const;
bool IsWalDirSameAsDBPath(const std::string& path) const;

3
src.mk
View File

@ -53,6 +53,7 @@ LIB_SOURCES = \
db/db_impl/db_impl_debug.cc \
db/db_impl/db_impl_experimental.cc \
db/db_impl/db_impl_files.cc \
db/db_impl/db_impl_follower.cc \
db/db_impl/db_impl_open.cc \
db/db_impl/db_impl_readonly.cc \
db/db_impl/db_impl_secondary.cc \
@ -109,6 +110,7 @@ LIB_SOURCES = \
env/env_encryption.cc \
env/env_posix.cc \
env/file_system.cc \
env/fs_on_demand.cc \
env/fs_posix.cc \
env/fs_remap.cc \
env/file_system_tracer.cc \
@ -474,6 +476,7 @@ TEST_MAIN_SOURCES = \
db/db_dynamic_level_test.cc \
db/db_encryption_test.cc \
db/db_flush_test.cc \
db/db_follower_test.cc \
db/db_readonly_with_timestamp_test.cc \
db/db_with_timestamp_basic_test.cc \
db/import_column_family_test.cc \

View File

@ -626,6 +626,14 @@ struct BlockBasedTableBuilder::Rep {
} else {
base_context_checksum = 0;
}
if (alignment > 0 && compression_type != kNoCompression) {
// With better sanitization in `CompactionPicker::CompactFiles()`, we
// would not need to handle this case here and could change it to an
// assertion instead.
SetStatus(Status::InvalidArgument(
"Enable block_align, but compression enabled"));
}
}
Rep(const Rep&) = delete;

View File

@ -619,6 +619,21 @@ Status BlockBasedTableFactory::ValidateOptions(
"Enable block_align, but compression "
"enabled");
}
if (table_options_.block_align &&
cf_opts.bottommost_compression != kDisableCompressionOption &&
cf_opts.bottommost_compression != kNoCompression) {
return Status::InvalidArgument(
"Enable block_align, but bottommost_compression enabled");
}
if (table_options_.block_align) {
for (auto level_compression : cf_opts.compression_per_level) {
if (level_compression != kDisableCompressionOption &&
level_compression != kNoCompression) {
return Status::InvalidArgument(
"Enable block_align, but compression_per_level enabled");
}
}
}
if (table_options_.block_align &&
(table_options_.block_size & (table_options_.block_size - 1))) {
return Status::InvalidArgument(

View File

@ -3188,6 +3188,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
Options options;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
options.create_if_missing = true;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
@ -3196,6 +3197,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupSeqScans) {
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
table_options.block_align = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
TableConstructor c(BytewiseComparator());
GenerateKVMap(&c);
@ -3326,6 +3329,7 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
std::unique_ptr<Env> env(
new CompositeEnvWrapper(c.env_, FileSystem::Default()));
options.env = env.get();
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
c.env_ = env.get();
@ -3338,6 +3342,8 @@ TEST_P(BlockBasedTableTest, BlockCacheLookupAsyncScansSeek) {
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
table_options.block_align = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
GenerateKVMap(&c);
@ -5425,6 +5431,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
Options options;
options.compression = kNoCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
const ImmutableOptions ioptions(options);
const MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
@ -5475,7 +5483,10 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
std::unique_ptr<TableReader> table_reader;
bbto.block_align = false;
Options options2;
options2.compression = kNoCompression;
options2.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(options2.table_factory->ValidateOptions(
DBOptions(options2), ColumnFamilyOptions(options2)));
ImmutableOptions ioptions2(options2);
const MutableCFOptions moptions2(options2);
@ -5505,6 +5516,31 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
table_reader.reset();
}
TEST_P(BlockBasedTableTest, FixBlockAlignMismatchedFileChecksums) {
Options options;
options.create_if_missing = true;
options.compression = kNoCompression;
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
BlockBasedTableOptions bbto;
bbto.block_align = true;
bbto.block_size = 1024;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
const std::string kDBPath =
test::PerThreadDBPath("block_align_padded_bytes_verify_file_checksums");
ASSERT_OK(DestroyDB(kDBPath, options));
DB* db;
ASSERT_OK(DB::Open(options, kDBPath, &db));
ASSERT_OK(db->Put(WriteOptions(), "k1", "v1"));
ASSERT_OK(db->Flush(FlushOptions()));
// Before the fix, VerifyFileChecksums() will fail as padded bytes from
// aligning blocks are used to generate the checksum to compare against the
// one not generated by padded bytes
ASSERT_OK(db->VerifyFileChecksums(ReadOptions()));
delete db;
}
TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
bbto.block_align = true;
@ -5516,6 +5552,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
Options options;
options.compression = kNoCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(options.table_factory->ValidateOptions(
DBOptions(options), ColumnFamilyOptions(options)));
const ImmutableOptions ioptions(options);
const MutableCFOptions moptions(options);
@ -5822,6 +5860,7 @@ TEST_P(BlockBasedTableTest, SeekMetaBlocks) {
TEST_P(BlockBasedTableTest, BadOptions) {
ROCKSDB_NAMESPACE::Options options;
options.compression = kNoCompression;
options.create_if_missing = true;
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
bbto.block_size = 4000;
bbto.block_align = true;
@ -5830,13 +5869,29 @@ TEST_P(BlockBasedTableTest, BadOptions) {
test::PerThreadDBPath("block_based_table_bad_options_test");
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_OK(DestroyDB(kDBPath, options));
ROCKSDB_NAMESPACE::DB* db;
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db));
bbto.block_size = 4096;
options.compression = kSnappyCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &db));
std::unique_ptr<DB> db;
{
ROCKSDB_NAMESPACE::DB* _db;
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db));
bbto.block_size = 4096;
options.compression = kSnappyCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db));
options.compression = kNoCompression;
options.bottommost_compression = kSnappyCompression;
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db));
options.bottommost_compression = kNoCompression;
options.compression_per_level.emplace_back(kSnappyCompression);
ASSERT_NOK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db));
options.compression_per_level.clear();
ASSERT_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDBPath, &_db));
db.reset(_db);
}
}
TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) {

View File

@ -1135,6 +1135,11 @@ DEFINE_int32(secondary_update_interval, 5,
"Secondary instance attempts to catch up with the primary every "
"secondary_update_interval seconds.");
DEFINE_bool(open_as_follower, false,
"Open a RocksDB DB as a follower. The leader instance can be "
"running in another db_bench process.");
DEFINE_string(leader_path, "", "Path to the directory of the leader DB");
DEFINE_bool(report_bg_io_stats, false,
"Measure times spents on I/Os while in compactions. ");
@ -4979,6 +4984,12 @@ class Benchmark {
},
FLAGS_secondary_update_interval, db));
}
} else if (FLAGS_open_as_follower) {
std::unique_ptr<DB> dbptr;
s = DB::OpenAsFollower(options, db_name, FLAGS_leader_path, &dbptr);
if (s.ok()) {
db->db = dbptr.release();
}
} else {
s = DB::Open(options, db_name, &db->db);
}

View File

@ -74,7 +74,16 @@ default_params = {
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
"inplace_update_support": lambda: random.randint(0, 1),
# `inplace_update_support` is incompatible with DB that has delete
# range data in memtables.
# Such data can result from any of the previous db stress runs
# using delete range.
# Since there is no easy way to keep track of whether delete range
# is used in any of the previous runs,
# to simpify our testing, we set `inplace_update_support` across
# runs and to disable delete range accordingly
# (see below `finalize_and_sanitize`).
"inplace_update_support": random.choice([0] * 9 + [1]),
"expected_values_dir": lambda: setup_expected_values_dir(),
"fail_if_options_file_error": lambda: random.randint(0, 1),
"flush_one_in": lambda: random.choice([1000, 1000000]),
@ -283,8 +292,7 @@ default_params = {
"hard_pending_compaction_bytes_limit" : lambda: random.choice([2 * 1024 * 1024] + [256 * 1073741824] * 4),
"enable_sst_partitioner_factory": lambda: random.choice([0, 1]),
"enable_do_not_compress_roles": lambda: random.choice([0, 1]),
# TODO(hx235): enable `block_align` after fixing the surfaced corruption issue
"block_align": 0,
"block_align": lambda: random.choice([0, 1]),
"lowest_used_cache_tier": lambda: random.choice([0, 1, 2]),
"enable_custom_split_merge": lambda: random.choice([0, 1]),
"adm_policy": lambda: random.choice([0, 1, 2, 3]),
@ -479,6 +487,8 @@ txn_params = {
"create_timestamped_snapshot_one_in": random.choice([0, 20]),
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in": 0,
# Should not be used with TransactionDB which uses snapshot.
"inplace_update_support": 0,
}
# For optimistic transaction db
@ -490,6 +500,8 @@ optimistic_txn_params = {
"occ_lock_bucket_count": lambda: random.choice([10, 100, 500]),
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in": 0,
# Should not be used with OptimisticTransactionDB which uses snapshot.
"inplace_update_support": 0,
}
best_efforts_recovery_params = {
@ -599,6 +611,8 @@ multiops_txn_default_params = {
"use_multi_get_entity": 0,
# `MultiOpsTxnsStressTest::TestIterateAgainstExpected()` is not implemented.
"verify_iterator_with_expected_state_one_in": 0,
# This test uses snapshot heavily which is incompatible with this option.
"inplace_update_support": 0,
}
multiops_wc_txn_params = {
@ -672,6 +686,11 @@ def finalize_and_sanitize(src_params):
):
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
if dest_params["inplace_update_support"] == 1:
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
dest_params["readpercent"] += dest_params["prefixpercent"]
dest_params["prefixpercent"] = 0
if (
dest_params.get("disable_wal") == 1
or dest_params.get("sync_fault_injection") == 1
@ -689,6 +708,11 @@ def finalize_and_sanitize(src_params):
# files, which would be problematic when unsynced data can be lost in
# crash recoveries.
dest_params["enable_compaction_filter"] = 0
# TODO(hx235): re-enable "reopen" after supporting unsynced data loss
# verification upon reopen. Currently reopen does not restore expected state
# with potential data loss in mind like start of each `./db_stress` run.
# Therefore it always expects no data loss.
dest_params["reopen"] = 0
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
if dest_params.get("unordered_write", 0) == 1:
dest_params["txn_write_policy"] = 1
@ -813,6 +837,7 @@ def finalize_and_sanitize(src_params):
# Enabling block_align with compression is not supported
if dest_params.get("block_align") == 1:
dest_params["compression_type"] = "none"
dest_params["bottommost_compression_type"] = "none"
# If periodic_compaction_seconds is not set, daily_offpeak_time_utc doesn't do anything
if dest_params.get("periodic_compaction_seconds") == 0:
dest_params["daily_offpeak_time_utc"] = ""

View File

@ -214,6 +214,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
} else if (parsed_params.cmd == GetEntityCommand::Name()) {
return new GetEntityCommand(parsed_params.cmd_params,
parsed_params.option_map, parsed_params.flags);
} else if (parsed_params.cmd == MultiGetEntityCommand::Name()) {
return new MultiGetEntityCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
} else if (parsed_params.cmd == PutCommand::Name()) {
return new PutCommand(parsed_params.cmd_params, parsed_params.option_map,
parsed_params.flags);
@ -2938,7 +2942,7 @@ void MultiGetCommand::DoCommand() {
fprintf(stderr, "Status for key %s: %s\n",
(is_key_hex_ ? StringToHex(keys_[i]) : keys_[i]).c_str(),
statuses[i].ToString().c_str());
failed = false;
failed = true;
}
}
if (failed) {
@ -2998,6 +3002,73 @@ void GetEntityCommand::DoCommand() {
// ----------------------------------------------------------------------------
MultiGetEntityCommand::MultiGetEntityCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, true /* is_read_only */,
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX})) {
if (params.size() < 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"At least one <key> must be specified for the multi_get_entity "
"command");
} else {
for (size_t i = 0; i < params.size(); i++) {
std::string key = params.at(i);
keys_.emplace_back(is_key_hex_ ? HexToString(key) : key);
}
}
}
void MultiGetEntityCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(MultiGetEntityCommand::Name());
ret.append(" <key_1> <key_2> <key_3> ...");
ret.append("\n");
}
void MultiGetEntityCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
size_t num_keys = keys_.size();
std::vector<Slice> key_slices;
std::vector<PinnableWideColumns> results(num_keys);
std::vector<Status> statuses(num_keys);
for (const std::string& key : keys_) {
key_slices.emplace_back(key);
}
db_->MultiGetEntity(ReadOptions(), GetCfHandle(), num_keys, key_slices.data(),
results.data(), statuses.data());
bool failed = false;
for (size_t i = 0; i < num_keys; ++i) {
std::string key = is_key_hex_ ? StringToHex(keys_[i]) : keys_[i];
if (statuses[i].ok()) {
std::ostringstream oss;
oss << key << DELIM;
WideColumnsHelper::DumpWideColumns(results[i].columns(), oss,
is_value_hex_);
fprintf(stdout, "%s\n", oss.str().c_str());
} else if (statuses[i].IsNotFound()) {
fprintf(stdout, "Key not found: %s\n", key.c_str());
} else {
fprintf(stderr, "Status for key %s: %s\n", key.c_str(),
statuses[i].ToString().c_str());
failed = true;
}
}
if (failed) {
exec_state_ =
LDBCommandExecuteResult::Failed("one or more keys had non-okay status");
}
}
// ----------------------------------------------------------------------------
ApproxSizeCommand::ApproxSizeCommand(
const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
@ -3473,7 +3544,7 @@ PutEntityCommand::PutEntityCommand(
void PutEntityCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(PutCommand::Name());
ret.append(PutEntityCommand::Name());
ret.append(
" <key> <column1_name>:<column1_value> <column2_name>:<column2_value> "
"<...>");

View File

@ -435,6 +435,22 @@ class GetEntityCommand : public LDBCommand {
std::string key_;
};
class MultiGetEntityCommand : public LDBCommand {
public:
static std::string Name() { return "multi_get_entity"; }
MultiGetEntityCommand(const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
void DoCommand() override;
static void Help(std::string& ret);
private:
std::vector<std::string> keys_;
};
class ApproxSizeCommand : public LDBCommand {
public:
static std::string Name() { return "approxsize"; }

View File

@ -89,8 +89,11 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
ret.append("\n\n");
ret.append("Data Access Commands:\n");
PutCommand::Help(ret);
PutEntityCommand::Help(ret);
GetCommand::Help(ret);
GetEntityCommand::Help(ret);
MultiGetCommand::Help(ret);
MultiGetEntityCommand::Help(ret);
BatchPutCommand::Help(ret);
ScanCommand::Help(ret);
DeleteCommand::Help(ret);

View File

@ -0,0 +1 @@
* Enabling `BlockBasedTableOptions::block_align` is now incompatible (i.e., APIs will return `Status::InvalidArgument`) with more ways of enabling compression: `CompactionOptions::compression`, `ColumnFamilyOptions::compression_per_level`, and `ColumnFamilyOptions::bottommost_compression`.

View File

@ -0,0 +1 @@
* Changed the default value of `CompactionOptions::compression` to `kDisableCompressionOption`, which means the compression type is determined by the `ColumnFamilyOptions`.

View File

@ -0,0 +1 @@
Fix a bug causing `VerifyFileChecksums()` to return false-positive corruption under `BlockBasedTableOptions::block_align=true`

View File

@ -0,0 +1 @@
Provide consistent view of the database across the column families for `NewIterators()` API.

View File

@ -0,0 +1 @@
* Fixed feature interaction bug for `DeleteRange()` together with `ColumnFamilyOptions::memtable_insert_with_hint_prefix_extractor`. The impact of this bug would likely be corruption or crashing.

View File

@ -0,0 +1 @@
* Fixed hang in `DisableManualCompactions()` where compactions waiting to be scheduled due to conflicts would not be canceled promptly

View File

@ -0,0 +1 @@
* Deprecated `CompactionOptions::compression` since `CompactionOptions`'s API for configuring compression was incomplete, unsafe, and likely unnecessary

View File

@ -5394,7 +5394,7 @@ static XXH_MALLOCF void* XXH_alignedMalloc(size_t s, size_t align)
*/
static void XXH_alignedFree(void* p)
{
if (p != NULL) {
if (p != nullptr) {
xxh_u8* ptr = (xxh_u8*)p;
/* Get the offset byte we added in XXH_malloc. */
xxh_u8 offset = ptr[-1];
@ -5407,7 +5407,7 @@ static void XXH_alignedFree(void* p)
XXH_PUBLIC_API XXH3_state_t* XXH3_createState(void)
{
XXH3_state_t* const state = (XXH3_state_t*)XXH_alignedMalloc(sizeof(XXH3_state_t), 64);
if (state==NULL) return NULL;
if (state==nullptr) return nullptr;
XXH3_INITSTATE(state);
return state;
}
@ -5457,7 +5457,7 @@ XXH3_reset_internal(XXH3_state_t* statePtr,
XXH_PUBLIC_API XXH_errorcode
XXH3_64bits_reset(XXH_NOESCAPE XXH3_state_t* statePtr)
{
if (statePtr == NULL) return XXH_ERROR;
if (statePtr == nullptr) return XXH_ERROR;
XXH3_reset_internal(statePtr, 0, XXH3_kSecret, XXH_SECRET_DEFAULT_SIZE);
return XXH_OK;
}
@ -5466,9 +5466,9 @@ XXH3_64bits_reset(XXH_NOESCAPE XXH3_state_t* statePtr)
XXH_PUBLIC_API XXH_errorcode
XXH3_64bits_reset_withSecret(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE const void* secret, size_t secretSize)
{
if (statePtr == NULL) return XXH_ERROR;
if (statePtr == nullptr) return XXH_ERROR;
XXH3_reset_internal(statePtr, 0, secret, secretSize);
if (secret == NULL) return XXH_ERROR;
if (secret == nullptr) return XXH_ERROR;
if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR;
return XXH_OK;
}
@ -5477,11 +5477,11 @@ XXH3_64bits_reset_withSecret(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE c
XXH_PUBLIC_API XXH_errorcode
XXH3_64bits_reset_withSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH64_hash_t seed)
{
if (statePtr == NULL) return XXH_ERROR;
if (statePtr == nullptr) return XXH_ERROR;
if (seed==0) return XXH3_64bits_reset(statePtr);
if ((seed != statePtr->seed) || (statePtr->extSecret != NULL))
if ((seed != statePtr->seed) || (statePtr->extSecret != nullptr))
XXH3_initCustomSecret(statePtr->customSecret, seed);
XXH3_reset_internal(statePtr, seed, NULL, XXH_SECRET_DEFAULT_SIZE);
XXH3_reset_internal(statePtr, seed, nullptr, XXH_SECRET_DEFAULT_SIZE);
return XXH_OK;
}
@ -5489,8 +5489,8 @@ XXH3_64bits_reset_withSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH64_hash_t see
XXH_PUBLIC_API XXH_errorcode
XXH3_64bits_reset_withSecretandSeed(XXH_NOESCAPE XXH3_state_t* statePtr, XXH_NOESCAPE const void* secret, size_t secretSize, XXH64_hash_t seed64)
{
if (statePtr == NULL) return XXH_ERROR;
if (secret == NULL) return XXH_ERROR;
if (statePtr == nullptr) return XXH_ERROR;
if (secret == nullptr) return XXH_ERROR;
if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR;
XXH3_reset_internal(statePtr, seed64, secret, secretSize);
statePtr->useSeed = 1; /* always, even if seed64==0 */
@ -5538,14 +5538,14 @@ XXH3_update(XXH3_state_t* XXH_RESTRICT const state,
XXH3_f_accumulate f_acc,
XXH3_f_scrambleAcc f_scramble)
{
if (input==NULL) {
if (input==nullptr) {
XXH_ASSERT(len == 0)
return XXH_OK;
}
XXH_ASSERT(state != NULL)
{ const xxh_u8* const bEnd = input + len;
const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret;
const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret;
#if defined(XXH3_STREAM_USE_STACK) && XXH3_STREAM_USE_STACK >= 1
/* For some reason, gcc and MSVC seem to suffer greatly
* when operating accumulators directly into state.
@ -5693,7 +5693,7 @@ XXH3_digest_long (XXH64_hash_t* acc,
/*! @ingroup XXH3_family */
XXH_PUBLIC_API XXH64_hash_t XXH3_64bits_digest (XXH_NOESCAPE const XXH3_state_t* state)
{
const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret;
const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret;
if (state->totalLen > XXH3_MIDSIZE_MAX) {
XXH_ALIGN(XXH_ACC_ALIGN) XXH64_hash_t acc[XXH_ACC_NB];
XXH3_digest_long(acc, state, secret);
@ -6131,7 +6131,7 @@ XXH_PUBLIC_API XXH128_hash_t
XXH3_128bits_withSecretandSeed(XXH_NOESCAPE const void* input, size_t len, XXH_NOESCAPE const void* secret, size_t secretSize, XXH64_hash_t seed)
{
if (len <= XXH3_MIDSIZE_MAX)
return XXH3_128bits_internal(input, len, seed, XXH3_kSecret, sizeof(XXH3_kSecret), NULL);
return XXH3_128bits_internal(input, len, seed, XXH3_kSecret, sizeof(XXH3_kSecret), nullptr);
return XXH3_hashLong_128b_withSecret(input, len, seed, secret, secretSize);
}
@ -6189,7 +6189,7 @@ XXH3_128bits_update(XXH_NOESCAPE XXH3_state_t* state, XXH_NOESCAPE const void* i
/*! @ingroup XXH3_family */
XXH_PUBLIC_API XXH128_hash_t XXH3_128bits_digest (XXH_NOESCAPE const XXH3_state_t* state)
{
const unsigned char* const secret = (state->extSecret == NULL) ? state->customSecret : state->extSecret;
const unsigned char* const secret = (state->extSecret == nullptr) ? state->customSecret : state->extSecret;
if (state->totalLen > XXH3_MIDSIZE_MAX) {
XXH_ALIGN(XXH_ACC_ALIGN) XXH64_hash_t acc[XXH_ACC_NB];
XXH3_digest_long(acc, state, secret);
@ -6287,7 +6287,7 @@ XXH3_generateSecret(XXH_NOESCAPE void* secretBuffer, size_t secretSize, XXH_NOES
XXH_ASSERT(secretSize >= XXH3_SECRET_SIZE_MIN)
#else
/* production mode, assert() are disabled */
if (secretBuffer == NULL) return XXH_ERROR;
if (secretBuffer == nullptr) return XXH_ERROR;
if (secretSize < XXH3_SECRET_SIZE_MIN) return XXH_ERROR;
#endif
@ -6298,7 +6298,7 @@ XXH3_generateSecret(XXH_NOESCAPE void* secretBuffer, size_t secretSize, XXH_NOES
#if (XXH_DEBUGLEVEL >= 1)
XXH_ASSERT(customSeed != NULL)
#else
if (customSeed == NULL) return XXH_ERROR;
if (customSeed == nullptr) return XXH_ERROR;
#endif
/* Fill secretBuffer with a copy of customSeed - repeat as needed */