mirror of https://github.com/facebook/rocksdb.git
Delay bottommost level single file compactions (#11701)
Summary: For leveled compaction, RocksDB has a special kind of compaction with reason "kBottommmostFiles" that compacts bottommost level files to clear data held by snapshots (more detail in https://github.com/facebook/rocksdb/issues/3009). Such compactions can happen soon after a relevant snapshot is released. For some use cases, a bottommost file may contain only a small amount of keys that can be cleared, so compacting such a file has a high write amp. In addition, these bottommost files may be compacted in compactions with reason other than "kBottommmostFiles" if we wait for some time (so that enough data is ingested to trigger such a compaction). This PR introduces an option `bottommost_file_compaction_delay` to specify the delay of these bottommost level single file compactions. * The main change is in `VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction()` where we only add a file to `bottommost_files_marked_for_compaction_` if it oldest_snapshot is larger than its non-zero largest_seqno **and** the file is old enough. Note that if a file is not old enough but its largest_seqno is less than oldest_snapshot, we exclude it from the calculation of `bottommost_files_mark_threshold_`. This makes the change simpler, but such a file's eligibility for compaction will only be checked the next time `ComputeBottommostFilesMarkedForCompaction()` is called. This happens when a new Version is created (compaction, flush, SetOptions()...), a new enough snapshot is released (`VersionStorageInfo::UpdateOldestSnapshot()`) or when a compaction is picked and compaction score has to be re-calculated. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11701 Test Plan: * Add two unit tests to test when bottommost_file_compaction_delay > 0. * Ran crash test with the new option. Reviewed By: jaykorean, ajkr Differential Revision: D48331564 Pulled By: cbi42 fbshipit-source-id: c584f3dc5f6354fce3ed65f4c6366dc450b15ba8
This commit is contained in:
parent
0b6ee88d51
commit
d1ff401472
|
@ -84,7 +84,8 @@ class CompactionPickerTestBase : public testing::Test {
|
|||
options_.num_levels = num_levels;
|
||||
vstorage_.reset(new VersionStorageInfo(
|
||||
&icmp_, ucmp_, options_.num_levels, style, nullptr, false,
|
||||
EpochNumberRequirement::kMustPresent));
|
||||
EpochNumberRequirement::kMustPresent, ioptions_.clock,
|
||||
options_.bottommost_file_compaction_delay));
|
||||
vstorage_->PrepareForVersionAppend(ioptions_, mutable_cf_options_);
|
||||
}
|
||||
|
||||
|
@ -93,7 +94,8 @@ class CompactionPickerTestBase : public testing::Test {
|
|||
void AddVersionStorage() {
|
||||
temp_vstorage_.reset(new VersionStorageInfo(
|
||||
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
|
||||
vstorage_.get(), false, EpochNumberRequirement::kMustPresent));
|
||||
vstorage_.get(), false, EpochNumberRequirement::kMustPresent,
|
||||
ioptions_.clock, options_.bottommost_file_compaction_delay));
|
||||
}
|
||||
|
||||
void DeleteVersionStorage() {
|
||||
|
|
|
@ -4126,11 +4126,6 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
|
|||
// files does not need to be preserved in case of a future snapshot.
|
||||
ASSERT_OK(Put(Key(0), "val"));
|
||||
ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
|
||||
// release snapshot and wait for compactions to finish. Single-file
|
||||
// compactions should be triggered, which reduce the size of each bottom-level
|
||||
// file without changing file count.
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
|
@ -4138,6 +4133,11 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
|
|||
CompactionReason::kBottommostFiles);
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
// release snapshot and wait for compactions to finish. Single-file
|
||||
// compactions should be triggered, which reduce the size of each bottom-level
|
||||
// file without changing file count.
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
db_->GetLiveFilesMetaData(&post_release_metadata);
|
||||
ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
|
||||
|
@ -4154,6 +4154,78 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
|
|||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, DelayCompactBottomLevelFilesWithDeletions) {
|
||||
// bottom-level files may contain deletions due to snapshots protecting the
|
||||
// deleted keys. Once the snapshot is released and the files are old enough,
|
||||
// we should see them undergo single-file compactions.
|
||||
Options options = CurrentOptions();
|
||||
env_->SetMockSleep();
|
||||
options.bottommost_file_compaction_delay = 3600;
|
||||
DestroyAndReopen(options);
|
||||
CreateColumnFamilies({"one"}, options);
|
||||
const int kNumKey = 100;
|
||||
const int kValLen = 100;
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumKey; ++i) {
|
||||
ASSERT_OK(Put(Key(i), rnd.RandomString(kValLen)));
|
||||
}
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
for (int i = 0; i < kNumKey; i += 2) {
|
||||
ASSERT_OK(Delete(Key(i)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
MoveFilesToLevel(1);
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(1));
|
||||
|
||||
std::vector<LiveFileMetaData> pre_release_metadata;
|
||||
db_->GetLiveFilesMetaData(&pre_release_metadata);
|
||||
ASSERT_EQ(1, pre_release_metadata.size());
|
||||
std::atomic_int compaction_count = 0;
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
ASSERT_TRUE(compaction->compaction_reason() ==
|
||||
CompactionReason::kBottommostFiles);
|
||||
compaction_count++;
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
// just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
|
||||
// files does not need to be preserved in case of a future snapshot.
|
||||
ASSERT_OK(Put(Key(0), "val"));
|
||||
ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
|
||||
// release snapshot will not trigger compaction.
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
ASSERT_EQ(0, compaction_count);
|
||||
// Now the file is old enough for compaction.
|
||||
env_->MockSleepForSeconds(3600);
|
||||
// Another flush will trigger re-computation of the compaction score
|
||||
// to find out that the file is qualified for compaction.
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(0));
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
ASSERT_EQ(1, compaction_count);
|
||||
|
||||
std::vector<LiveFileMetaData> post_release_metadata;
|
||||
db_->GetLiveFilesMetaData(&post_release_metadata);
|
||||
ASSERT_EQ(2, post_release_metadata.size());
|
||||
|
||||
const auto& pre_file = pre_release_metadata[0];
|
||||
// Get the L1 (bottommost level) file.
|
||||
const auto& post_file = post_release_metadata[0].level == 0
|
||||
? post_release_metadata[1]
|
||||
: post_release_metadata[0];
|
||||
|
||||
ASSERT_EQ(1, pre_file.level);
|
||||
ASSERT_EQ(1, post_file.level);
|
||||
// the file is smaller than it was before as it was rewritten without
|
||||
// deletion markers/deleted keys.
|
||||
ASSERT_LT(post_file.size, pre_file.size);
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) {
|
||||
// bottom-level files may contain deletions due to snapshots protecting the
|
||||
// deleted keys. Once the snapshot is released, we should see files with many
|
||||
|
|
|
@ -1580,12 +1580,14 @@ class DBImpl : public DB {
|
|||
friend class ForwardIterator;
|
||||
friend struct SuperVersion;
|
||||
friend class CompactedDBImpl;
|
||||
#ifndef NDEBUG
|
||||
friend class DBTest_ConcurrentFlushWAL_Test;
|
||||
friend class DBTest_MixedSlowdownOptionsStop_Test;
|
||||
friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test;
|
||||
friend class DBCompactionTest_CompactionDuringShutdown_Test;
|
||||
friend class DBCompactionTest_DelayCompactBottomLevelFilesWithDeletions_Test;
|
||||
friend class DBCompactionTest_DisableCompactBottomLevelFiles_Test;
|
||||
friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test;
|
||||
#ifndef NDEBUG
|
||||
friend class DBTest2_ReadCallbackTest_Test;
|
||||
friend class WriteCallbackPTest_WriteWithCallbackTest_Test;
|
||||
friend class XFTransactionWriteHandler;
|
||||
|
|
|
@ -185,7 +185,8 @@ Status ImportColumnFamilyJob::Run() {
|
|||
&cfd_->internal_comparator(), cfd_->user_comparator(),
|
||||
cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
|
||||
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
|
||||
EpochNumberRequirement::kMightMissing);
|
||||
EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock,
|
||||
cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay);
|
||||
Status s;
|
||||
|
||||
for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
|
||||
|
|
|
@ -692,7 +692,8 @@ class Repairer {
|
|||
&cfd->internal_comparator(), cfd->user_comparator(),
|
||||
cfd->NumberLevels(), cfd->ioptions()->compaction_style,
|
||||
nullptr /* src_vstorage */, cfd->ioptions()->force_consistency_checks,
|
||||
EpochNumberRequirement::kMightMissing);
|
||||
EpochNumberRequirement::kMightMissing, cfd->ioptions()->clock,
|
||||
/*bottommost_file_compaction_delay=*/0);
|
||||
Status s;
|
||||
VersionEdit dummy_edit;
|
||||
for (const auto* table : cf_id_and_tables.second) {
|
||||
|
|
|
@ -37,7 +37,8 @@ class VersionBuilderTest : public testing::Test {
|
|||
ioptions_(options_),
|
||||
mutable_cf_options_(options_),
|
||||
vstorage_(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
|
||||
nullptr, false),
|
||||
nullptr, false, EpochNumberRequirement::kMustPresent,
|
||||
ioptions_.clock, options_.bottommost_file_compaction_delay),
|
||||
file_num_(1) {
|
||||
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
|
||||
size_being_compacted_.resize(options_.num_levels);
|
||||
|
@ -199,8 +200,9 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
|
|||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr, false);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -249,8 +251,9 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
|
|||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr, false);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -303,8 +306,9 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
|
|||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr, false);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -359,8 +363,9 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
|
|||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr, false);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -381,8 +386,9 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
|
|||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr, false);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
|
||||
VersionEdit version_edit;
|
||||
version_edit.AddFile(
|
||||
|
@ -548,9 +554,10 @@ TEST_F(VersionBuilderTest, ApplyFileDeletionAndAddition) {
|
|||
ASSERT_OK(builder.Apply(&addition));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -692,9 +699,10 @@ TEST_F(VersionBuilderTest, ApplyFileAdditionAndDeletion) {
|
|||
ASSERT_OK(builder.Apply(&deletion));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -736,9 +744,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAddition) {
|
|||
ASSERT_OK(builder.Apply(&edit));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -875,9 +884,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) {
|
|||
ASSERT_OK(builder.Apply(&edit));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -948,9 +958,10 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) {
|
|||
ASSERT_OK(builder.Apply(&garbage));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1128,9 +1139,10 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
|
|||
ASSERT_OK(builder.Apply(&edit));
|
||||
|
||||
constexpr bool force_consistency_checks = false;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1176,9 +1188,10 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
|
|||
|
||||
ASSERT_OK(second_builder.Apply(&second_edit));
|
||||
|
||||
VersionStorageInfo newer_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &new_vstorage,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo newer_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &new_vstorage,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(second_builder.SaveTo(&newer_vstorage));
|
||||
|
||||
|
@ -1262,9 +1275,10 @@ TEST_F(VersionBuilderTest, SaveBlobFilesToConcurrentJobs) {
|
|||
ASSERT_OK(builder.Apply(&edit));
|
||||
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1365,9 +1379,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) {
|
|||
|
||||
// Save to a new version in order to trigger consistency checks.
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1404,9 +1419,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesInconsistentLinks) {
|
|||
|
||||
// Save to a new version in order to trigger consistency checks.
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
const Status s = builder.SaveTo(&new_vstorage);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
|
@ -1445,9 +1461,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
|
|||
|
||||
// Save to a new version in order to trigger consistency checks.
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
const Status s = builder.SaveTo(&new_vstorage);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
|
@ -1494,9 +1511,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbageLinkedSsts) {
|
|||
|
||||
// Save to a new version in order to trigger consistency checks.
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
const Status s = builder.SaveTo(&new_vstorage);
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
|
@ -1657,9 +1675,10 @@ TEST_F(VersionBuilderTest, MaintainLinkedSstsForBlobFiles) {
|
|||
ASSERT_OK(builder.Apply(&edit2));
|
||||
|
||||
constexpr bool force_consistency_checks = true;
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
|
||||
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
|
||||
0);
|
||||
|
||||
ASSERT_OK(builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1708,9 +1727,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
|
|||
|
||||
VersionBuilder version_builder(env_options, &ioptions_, table_cache,
|
||||
&vstorage_, version_set);
|
||||
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */);
|
||||
VersionStorageInfo new_vstorage(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_OK(version_builder.Apply(&version_edit));
|
||||
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
|
||||
|
||||
|
@ -1718,9 +1738,10 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
|
|||
|
||||
VersionBuilder version_builder2(env_options, &ioptions_, table_cache,
|
||||
&new_vstorage, version_set);
|
||||
VersionStorageInfo new_vstorage2(&icmp_, ucmp_, options_.num_levels,
|
||||
kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */);
|
||||
VersionStorageInfo new_vstorage2(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr,
|
||||
true /* force_consistency_checks */,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
ASSERT_NOK(version_builder2.Apply(&version_edit));
|
||||
|
||||
UnrefFilesInVersion(&new_vstorage);
|
||||
|
@ -1758,7 +1779,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) {
|
|||
nullptr /* file_metadata_cache_res_mgr */);
|
||||
VersionStorageInfo new_vstorage_1(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
|
||||
nullptr /* src_vstorage */, true /* force_consistency_checks */);
|
||||
nullptr /* src_vstorage */, true /* force_consistency_checks */,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
|
||||
ASSERT_OK(version_builder_1.Apply(&version_edit_1));
|
||||
s = version_builder_1.SaveTo(&new_vstorage_1);
|
||||
|
@ -1795,7 +1817,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) {
|
|||
nullptr /* file_metadata_cache_res_mgr */);
|
||||
VersionStorageInfo new_vstorage_2(
|
||||
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
|
||||
nullptr /* src_vstorage */, true /* force_consistency_checks */);
|
||||
nullptr /* src_vstorage */, true /* force_consistency_checks */,
|
||||
EpochNumberRequirement::kMightMissing, nullptr, 0);
|
||||
|
||||
ASSERT_OK(version_builder_2.Apply(&version_edit_2));
|
||||
s = version_builder_2.SaveTo(&new_vstorage_2);
|
||||
|
|
|
@ -2114,7 +2114,8 @@ VersionStorageInfo::VersionStorageInfo(
|
|||
const Comparator* user_comparator, int levels,
|
||||
CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
|
||||
bool _force_consistency_checks,
|
||||
EpochNumberRequirement epoch_number_requirement)
|
||||
EpochNumberRequirement epoch_number_requirement, SystemClock* clock,
|
||||
uint32_t bottommost_file_compaction_delay)
|
||||
: internal_comparator_(internal_comparator),
|
||||
user_comparator_(user_comparator),
|
||||
// cfd is nullptr if Version is dummy
|
||||
|
@ -2142,6 +2143,8 @@ VersionStorageInfo::VersionStorageInfo(
|
|||
current_num_deletions_(0),
|
||||
current_num_samples_(0),
|
||||
estimated_compaction_needed_bytes_(0),
|
||||
clock_(clock),
|
||||
bottommost_file_compaction_delay_(bottommost_file_compaction_delay),
|
||||
finalized_(false),
|
||||
force_consistency_checks_(_force_consistency_checks),
|
||||
epoch_number_requirement_(epoch_number_requirement) {
|
||||
|
@ -2186,7 +2189,11 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
|
|||
? nullptr
|
||||
: cfd_->current()->storage_info(),
|
||||
cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks,
|
||||
epoch_number_requirement),
|
||||
epoch_number_requirement,
|
||||
cfd_ == nullptr ? nullptr : cfd_->ioptions()->clock,
|
||||
cfd_ == nullptr
|
||||
? 0
|
||||
: mutable_cf_options.bottommost_file_compaction_delay),
|
||||
vset_(vset),
|
||||
next_(this),
|
||||
prev_(this),
|
||||
|
@ -4178,14 +4185,48 @@ void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
|
|||
void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
|
||||
bottommost_files_marked_for_compaction_.clear();
|
||||
bottommost_files_mark_threshold_ = kMaxSequenceNumber;
|
||||
// If a file's creation time is larger than creation_time_ub,
|
||||
// it is too new to be marked for compaction.
|
||||
int64_t creation_time_ub = 0;
|
||||
bool needs_delay = bottommost_file_compaction_delay_ > 0;
|
||||
if (needs_delay) {
|
||||
int64_t current_time = 0;
|
||||
clock_->GetCurrentTime(¤t_time).PermitUncheckedError();
|
||||
// Note that if GetCurrentTime() fails, current_time will be 0.
|
||||
// We will treat it as is and treat all files as too new.
|
||||
// The subtraction will not underflow since
|
||||
// bottommost_file_compaction_delay_ is of type uint32_t.
|
||||
creation_time_ub =
|
||||
current_time - static_cast<int64_t>(bottommost_file_compaction_delay_);
|
||||
}
|
||||
|
||||
for (auto& level_and_file : bottommost_files_) {
|
||||
if (!level_and_file.second->being_compacted &&
|
||||
level_and_file.second->fd.largest_seqno != 0) {
|
||||
// largest_seqno might be nonzero due to containing the final key in an
|
||||
// earlier compaction, whose seqnum we didn't zero out. Multiple deletions
|
||||
// ensures the file really contains deleted or overwritten keys.
|
||||
// earlier compaction, whose seqnum we didn't zero out.
|
||||
if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
|
||||
bottommost_files_marked_for_compaction_.push_back(level_and_file);
|
||||
if (!needs_delay) {
|
||||
bottommost_files_marked_for_compaction_.push_back(level_and_file);
|
||||
} else if (creation_time_ub > 0) {
|
||||
int64_t creation_time = static_cast<int64_t>(
|
||||
level_and_file.second->TryGetFileCreationTime());
|
||||
if (creation_time == kUnknownFileCreationTime ||
|
||||
creation_time <= creation_time_ub) {
|
||||
bottommost_files_marked_for_compaction_.push_back(level_and_file);
|
||||
} else {
|
||||
// Just ignore this file for both
|
||||
// bottommost_files_marked_for_compaction_ and
|
||||
// bottommost_files_mark_threshold_. The next time
|
||||
// this method is called, it will try this file again. The method
|
||||
// is called after a new Version creation (compaction, flush, etc.),
|
||||
// after a compaction is picked, and after a snapshot newer than
|
||||
// bottommost_files_mark_threshold_ is released.
|
||||
}
|
||||
} else {
|
||||
// creation_time_ub <= 0, all files are too new to be marked for
|
||||
// compaction.
|
||||
}
|
||||
} else {
|
||||
bottommost_files_mark_threshold_ =
|
||||
std::min(bottommost_files_mark_threshold_,
|
||||
|
|
|
@ -132,8 +132,9 @@ class VersionStorageInfo {
|
|||
CompactionStyle compaction_style,
|
||||
VersionStorageInfo* src_vstorage,
|
||||
bool _force_consistency_checks,
|
||||
EpochNumberRequirement epoch_number_requirement =
|
||||
EpochNumberRequirement::kMustPresent);
|
||||
EpochNumberRequirement epoch_number_requirement,
|
||||
SystemClock* clock,
|
||||
uint32_t bottommost_file_compaction_delay);
|
||||
// No copying allowed
|
||||
VersionStorageInfo(const VersionStorageInfo&) = delete;
|
||||
void operator=(const VersionStorageInfo&) = delete;
|
||||
|
@ -748,6 +749,10 @@ class VersionStorageInfo {
|
|||
// target sizes.
|
||||
uint64_t estimated_compaction_needed_bytes_;
|
||||
|
||||
// Used for computing bottommost files marked for compaction.
|
||||
SystemClock* clock_;
|
||||
uint32_t bottommost_file_compaction_delay_;
|
||||
|
||||
bool finalized_;
|
||||
|
||||
// If set to true, we will run consistency checks even if RocksDB
|
||||
|
|
|
@ -130,7 +130,9 @@ class VersionStorageInfoTestBase : public testing::Test {
|
|||
mutable_cf_options_(options_),
|
||||
vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
|
||||
/*src_vstorage=*/nullptr,
|
||||
/*_force_consistency_checks=*/false) {}
|
||||
/*_force_consistency_checks=*/false,
|
||||
EpochNumberRequirement::kMustPresent, ioptions_.clock,
|
||||
mutable_cf_options_.bottommost_file_compaction_delay) {}
|
||||
|
||||
~VersionStorageInfoTestBase() override {
|
||||
for (int i = 0; i < vstorage_.num_levels(); ++i) {
|
||||
|
|
|
@ -330,6 +330,8 @@ DECLARE_bool(enable_thread_tracking);
|
|||
|
||||
DECLARE_uint32(memtable_max_range_deletions);
|
||||
|
||||
DECLARE_uint32(bottommost_file_compaction_delay);
|
||||
|
||||
// Tiered storage
|
||||
DECLARE_bool(enable_tiered_storage); // set last_level_temperature
|
||||
DECLARE_int64(preclude_last_level_data_seconds);
|
||||
|
|
|
@ -1113,4 +1113,8 @@ DEFINE_uint32(memtable_max_range_deletions, 0,
|
|||
"If nonzero, RocksDB will try to flush the current memtable"
|
||||
"after the number of range deletions is >= this limit");
|
||||
|
||||
DEFINE_uint32(bottommost_file_compaction_delay, 0,
|
||||
"Delay kBottommostFiles compaction by this amount of seconds."
|
||||
"See more in option comment.");
|
||||
|
||||
#endif // GFLAGS
|
||||
|
|
|
@ -3331,6 +3331,9 @@ void InitializeOptionsFromFlags(
|
|||
options.enable_thread_tracking = FLAGS_enable_thread_tracking;
|
||||
|
||||
options.memtable_max_range_deletions = FLAGS_memtable_max_range_deletions;
|
||||
|
||||
options.bottommost_file_compaction_delay =
|
||||
FLAGS_bottommost_file_compaction_delay;
|
||||
}
|
||||
|
||||
void InitializeOptionsGeneral(
|
||||
|
|
|
@ -1136,6 +1136,7 @@ struct AdvancedColumnFamilyOptions {
|
|||
//
|
||||
// Default: 0 (no protection)
|
||||
// Supported values: 0, 1, 2, 4, 8.
|
||||
// Dynamically changeable through the SetOptions() API.
|
||||
uint32_t memtable_protection_bytes_per_key = 0;
|
||||
|
||||
// UNDER CONSTRUCTION -- DO NOT USE
|
||||
|
@ -1199,8 +1200,21 @@ struct AdvancedColumnFamilyOptions {
|
|||
//
|
||||
// Default: 0 (no protection)
|
||||
// Supported values: 0, 1, 2, 4, 8.
|
||||
// Dynamically changeable through the SetOptions() API.
|
||||
uint8_t block_protection_bytes_per_key = 0;
|
||||
|
||||
// For leveled compaction, RocksDB may compact a file at the bottommost level
|
||||
// if it can compact away data that were protected by some snapshot.
|
||||
// The compaction reason in LOG for this kind of compactions is
|
||||
// "BottommostFiles". Usually such compaction can happen as soon as a
|
||||
// relevant snapshot is released. This option allows user to delay
|
||||
// such compactions. A file is qualified for "BottommostFiles" compaction
|
||||
// if it is at least "bottommost_file_compaction_delay" seconds old.
|
||||
//
|
||||
// Default: 0 (no delay)
|
||||
// Dynamically changeable through the SetOptions() API.
|
||||
uint32_t bottommost_file_compaction_delay = 0;
|
||||
|
||||
// Create ColumnFamilyOptions with default values for all fields
|
||||
AdvancedColumnFamilyOptions();
|
||||
// Create ColumnFamilyOptions from Options
|
||||
|
|
|
@ -507,6 +507,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
|
|||
{offsetof(struct MutableCFOptions, memtable_protection_bytes_per_key),
|
||||
OptionType::kUInt32T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kMutable}},
|
||||
{"bottommost_file_compaction_delay",
|
||||
{offsetof(struct MutableCFOptions, bottommost_file_compaction_delay),
|
||||
OptionType::kUInt32T, OptionVerificationType::kNormal,
|
||||
OptionTypeFlags::kMutable}},
|
||||
{"block_protection_bytes_per_key",
|
||||
{offsetof(struct MutableCFOptions, block_protection_bytes_per_key),
|
||||
OptionType::kUInt8T, OptionVerificationType::kNormal,
|
||||
|
@ -1117,6 +1121,8 @@ void MutableCFOptions::Dump(Logger* log) const {
|
|||
ROCKS_LOG_INFO(log,
|
||||
" experimental_mempurge_threshold: %f",
|
||||
experimental_mempurge_threshold);
|
||||
ROCKS_LOG_INFO(log, " bottommost_file_compaction_delay: %" PRIu32,
|
||||
bottommost_file_compaction_delay);
|
||||
|
||||
// Universal Compaction Options
|
||||
ROCKS_LOG_INFO(log, "compaction_options_universal.size_ratio : %d",
|
||||
|
|
|
@ -176,7 +176,9 @@ struct MutableCFOptions {
|
|||
sample_for_compression(
|
||||
options.sample_for_compression), // TODO: is 0 fine here?
|
||||
compression_per_level(options.compression_per_level),
|
||||
memtable_max_range_deletions(options.memtable_max_range_deletions) {
|
||||
memtable_max_range_deletions(options.memtable_max_range_deletions),
|
||||
bottommost_file_compaction_delay(
|
||||
options.bottommost_file_compaction_delay) {
|
||||
RefreshDerivedOptions(options.num_levels, options.compaction_style);
|
||||
}
|
||||
|
||||
|
@ -321,6 +323,7 @@ struct MutableCFOptions {
|
|||
uint64_t sample_for_compression;
|
||||
std::vector<CompressionType> compression_per_level;
|
||||
uint32_t memtable_max_range_deletions;
|
||||
uint32_t bottommost_file_compaction_delay;
|
||||
|
||||
// Derived options
|
||||
// Per-level target file size.
|
||||
|
|
|
@ -210,6 +210,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
|
|||
moptions.memtable_protection_bytes_per_key;
|
||||
cf_opts->block_protection_bytes_per_key =
|
||||
moptions.block_protection_bytes_per_key;
|
||||
cf_opts->bottommost_file_compaction_delay =
|
||||
moptions.bottommost_file_compaction_delay;
|
||||
|
||||
// Compaction related options
|
||||
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
|
||||
|
|
|
@ -560,7 +560,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
|||
"memtable_protection_bytes_per_key=2;"
|
||||
"persist_user_defined_timestamps=true;"
|
||||
"block_protection_bytes_per_key=1;"
|
||||
"memtable_max_range_deletions=999999;",
|
||||
"memtable_max_range_deletions=999999;"
|
||||
"bottommost_file_compaction_delay=7200;",
|
||||
new_options));
|
||||
|
||||
ASSERT_NE(new_options->blob_cache.get(), nullptr);
|
||||
|
|
|
@ -213,6 +213,8 @@ default_params = {
|
|||
"min_write_buffer_number_to_merge": lambda: random.choice([1, 2]),
|
||||
"preserve_internal_time_seconds": lambda: random.choice([0, 60, 3600, 36000]),
|
||||
"memtable_max_range_deletions": lambda: random.choice([0] * 6 + [100, 1000]),
|
||||
# 0 (disable) is the default and more commonly used value.
|
||||
"bottommost_file_compaction_delay": lambda: random.choice([0, 0, 0, 600, 3600, 86400]),
|
||||
}
|
||||
|
||||
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add a CF option `bottommost_file_compaction_delay` to allow specifying the delay of bottommost level single-file compactions.
|
Loading…
Reference in New Issue