diff --git a/HISTORY.md b/HISTORY.md index 6aacb132ed..32628a65ca 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,9 @@ * Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the file_creation_time of the oldest SST file in the DB. +### New Features +* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold. + ## 6.5.1 (10/16/2019) ### Bug Fixes * Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strange results when reseek happens with a different iterator upper bound. diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index a76f3b450b..c3c9263971 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -499,6 +499,120 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) { ASSERT_TRUE(compaction->is_trivial_move()); } +TEST_F(CompactionPickerTest, UniversalPeriodicCompaction1) { + // The case where universal periodic compaction can be picked + // with some newer files being compacted. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.periodic_compaction_seconds = 1000; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", kFileSize, 0, 260, 300); + Add(3, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(4, 3U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 6U, "501", "750", kFileSize, 0, 101, 150); + + file_map_[2].first->being_compacted = true; + UpdateVersionStorageInfo(); + vstorage_->TEST_AddFileMarkedForPeriodicCompaction(4, file_map_[3].first); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(0, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); +} + +TEST_F(CompactionPickerTest, UniversalPeriodicCompaction2) { + // The case where universal periodic compaction does not + // pick up only level to compact if it doesn't cover + // any file marked as periodic compaction. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.periodic_compaction_seconds = 1000; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(3, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(4, 3U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 6U, "501", "750", kFileSize, 0, 101, 150); + + file_map_[5].first->being_compacted = true; + UpdateVersionStorageInfo(); + vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[1].first); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_FALSE(compaction); +} + +TEST_F(CompactionPickerTest, UniversalPeriodicCompaction3) { + // The case where universal periodic compaction does not + // pick up only the last sorted run which is an L0 file if it isn't + // marked as periodic compaction. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.periodic_compaction_seconds = 1000; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(0, 6U, "501", "750", kFileSize, 0, 101, 150); + + file_map_[5].first->being_compacted = true; + UpdateVersionStorageInfo(); + vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[1].first); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_FALSE(compaction); +} + +TEST_F(CompactionPickerTest, UniversalPeriodicCompaction4) { + // The case where universal periodic compaction couldn't form + // a compaction that inlcudes any file marked for periodic compaction. + // Right now we form the compaction anyway if it is more than one + // sorted run. Just put the case here to validate that it doesn't + // crash. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.periodic_compaction_seconds = 1000; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(3, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(4, 3U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 6U, "501", "750", kFileSize, 0, 101, 150); + + file_map_[2].first->being_compacted = true; + UpdateVersionStorageInfo(); + vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[2].first); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(!compaction || + compaction->start_level() != compaction->output_level()); +} + TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { NewVersionStorage(1, kCompactionStyleFIFO); const int kFileCount = diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 7eddfa2b83..4acc5c893b 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -90,6 +90,19 @@ class UniversalCompactionBuilder { Compaction* PickDeleteTriggeredCompaction(); + // Form a compaction from the sorted run indicated by start_index to the + // oldest sorted run. + // The caller is responsible for making sure that those files are not in + // compaction. + Compaction* PickCompactionToOldest(size_t start_index, + CompactionReason compaction_reason); + + // Try to pick periodic compaction. The caller should only call it + // if there is at least one file marked for periodic compaction. + // null will be returned if no such a compaction can be formed + // because some files are being compacted. + Compaction* PickPeriodicCompaction(); + // Used in universal compaction when the enabled_trivial_move // option is set. Checks whether there are any overlapping files // in the input. Returns true if the input files are non @@ -253,6 +266,9 @@ bool UniversalCompactionPicker::NeedsCompaction( if (vstorage->CompactionScore(kLevel0) >= 1) { return true; } + if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) { + return true; + } if (!vstorage->FilesMarkedForCompaction().empty()) { return true; } @@ -358,7 +374,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_); if (sorted_runs_.size() == 0 || - (vstorage_->FilesMarkedForCompaction().empty() && + (vstorage_->FilesMarkedForPeriodicCompaction().empty() && + vstorage_->FilesMarkedForCompaction().empty() && sorted_runs_.size() < (unsigned int)mutable_cf_options_ .level0_file_num_compaction_trigger)) { ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n", @@ -373,11 +390,19 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n", cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp)); - // Check for size amplification first. Compaction* c = nullptr; - if (sorted_runs_.size() >= - static_cast( - mutable_cf_options_.level0_file_num_compaction_trigger)) { + // Periodic compaction has higher priority than other type of compaction + // because it's a hard requirement. + if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) { + // Always need to do a full compaction for periodic compaction. + c = PickPeriodicCompaction(); + } + + // Check for size amplification. + if (c == nullptr && + sorted_runs_.size() >= + static_cast( + mutable_cf_options_.level0_file_num_compaction_trigger)) { if ((c = PickCompactionToReduceSizeAmp()) != nullptr) { ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n", cf_name_.c_str()); @@ -441,7 +466,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() { } if (mutable_cf_options_.compaction_options_universal.allow_trivial_move == - true) { + true && + c->compaction_reason() != CompactionReason::kPeriodicCompaction) { c->set_is_trivial_move(IsInputFilesNonOverlapping(c)); } @@ -815,59 +841,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { " earliest-file-size %" PRIu64, cf_name_.c_str(), candidate_size, earliest_file_size); } - assert(start_index < sorted_runs_.size() - 1); - - // Estimate total file size - uint64_t estimated_total_size = 0; - for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { - estimated_total_size += sorted_runs_[loop].size; - } - uint32_t path_id = - GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); - int start_level = sorted_runs_[start_index].level; - - std::vector inputs(vstorage_->num_levels()); - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i].level = start_level + static_cast(i); - } - // We always compact all the files, so always compress. - for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { - auto& picking_sr = sorted_runs_[loop]; - if (picking_sr.level == 0) { - FileMetaData* f = picking_sr.file; - inputs[0].files.push_back(f); - } else { - auto& files = inputs[picking_sr.level - start_level].files; - for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { - files.push_back(f); - } - } - char file_num_buf[256]; - picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); - ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: size amp picking %s", - cf_name_.c_str(), file_num_buf); - } - - // output files at the bottom most level, unless it's reserved - int output_level = vstorage_->num_levels() - 1; - // last level is reserved for the files ingested behind - if (ioptions_.allow_ingest_behind) { - assert(output_level > 1); - output_level--; - } - - return new Compaction( - vstorage_, ioptions_, mutable_cf_options_, std::move(inputs), - output_level, - MaxFileSizeForLevel(mutable_cf_options_, output_level, - kCompactionStyleUniversal), - /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, - output_level, 1), - GetCompressionOptions(ioptions_, vstorage_, output_level), - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, - CompactionReason::kUniversalSizeAmplification); + return PickCompactionToOldest(start_index, + CompactionReason::kUniversalSizeAmplification); } // Pick files marked for compaction. Typically, files are marked by @@ -987,6 +962,142 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { score_, false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); } + +Compaction* UniversalCompactionBuilder::PickCompactionToOldest( + size_t start_index, CompactionReason compaction_reason) { + assert(start_index < sorted_runs_.size() - 1); + + // Estimate total file size + uint64_t estimated_total_size = 0; + for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { + estimated_total_size += sorted_runs_[loop].size; + } + uint32_t path_id = + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + int start_level = sorted_runs_[start_index].level; + + std::vector inputs(vstorage_->num_levels()); + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i].level = start_level + static_cast(i); + } + for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { + auto& picking_sr = sorted_runs_[loop]; + if (picking_sr.level == 0) { + FileMetaData* f = picking_sr.file; + inputs[0].files.push_back(f); + } else { + auto& files = inputs[picking_sr.level - start_level].files; + for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + std::string comp_reason_print_string; + if (compaction_reason == CompactionReason::kPeriodicCompaction) { + comp_reason_print_string = "periodic compaction"; + } else if (compaction_reason == + CompactionReason::kUniversalSizeAmplification) { + comp_reason_print_string = "size amp"; + } else { + assert(false); + } + + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s", + cf_name_.c_str(), comp_reason_print_string.c_str(), + file_num_buf); + } + + // output files at the bottom most level, unless it's reserved + int output_level = vstorage_->num_levels() - 1; + // last level is reserved for the files ingested behind + if (ioptions_.allow_ingest_behind) { + assert(output_level > 1); + output_level--; + } + + // We never check size for + // compaction_options_universal.compression_size_percent, + // because we always compact all the files, so always compress. + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, std::move(inputs), + output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + LLONG_MAX, path_id, + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level, + 1, true /* enable_compression */), + GetCompressionOptions(ioptions_, vstorage_, start_level, + true /* enable_compression */), + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + score_, false /* deletion_compaction */, compaction_reason); +} + +Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction", + cf_name_.c_str()); + + // In universal compaction, sorted runs contain older data are almost always + // generated earlier too. To simplify the problem, we just try to trigger + // a full compaction. We start from the oldest sorted run and include + // all sorted runs, until we hit a sorted already being compacted. + // Since usually the largest (which is usually the oldest) sorted run is + // included anyway, doing a full compaction won't increase write + // amplification much. + + // Get some information from marked files to check whether a file is + // included in the compaction. + + size_t start_index = sorted_runs_.size(); + while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) { + start_index--; + } + if (start_index == sorted_runs_.size()) { + return nullptr; + } + + // There is a rare corner case where we can't pick up all the files + // because some files are being compacted and we end up with picking files + // but none of them need periodic compaction. Unless we simply recompact + // the last sorted run (either the last level or last L0 file), we would just + // execute the compaction, in order to simplify the logic. + if (start_index == sorted_runs_.size() - 1) { + bool included_file_marked = false; + int start_level = sorted_runs_[start_index].level; + FileMetaData* start_file = sorted_runs_[start_index].file; + for (const std::pair& level_file_pair : + vstorage_->FilesMarkedForPeriodicCompaction()) { + if (start_level != 0) { + // Last sorted run is a level + if (start_level == level_file_pair.first) { + included_file_marked = true; + break; + } + } else { + // Last sorted run is a L0 file. + if (start_file == level_file_pair.second) { + included_file_marked = true; + break; + } + } + } + if (!included_file_marked) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: Cannot form a compaction covering file " + "marked for periodic compaction", + cf_name_.c_str()); + return nullptr; + } + } + + Compaction* c = PickCompactionToOldest(start_index, + CompactionReason::kPeriodicCompaction); + + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionPicker::PickPeriodicCompaction:Return", c); + + return c; +} } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index fa2277bad2..4d42084e6d 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -41,10 +41,9 @@ class DBTestUniversalCompaction : public DBTestUniversalCompactionBase { DBTestUniversalCompactionBase("/db_universal_compaction_test") {} }; -class DBTestUniversalDeleteTrigCompaction : public DBTestBase { +class DBTestUniversalCompaction2 : public DBTestBase { public: - DBTestUniversalDeleteTrigCompaction() - : DBTestBase("/db_universal_compaction_test") {} + DBTestUniversalCompaction2() : DBTestBase("/db_universal_compaction_test2") {} }; namespace { @@ -1915,7 +1914,7 @@ INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, ::testing::Combine(::testing::Values(1, 8), ::testing::Bool())); -TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) { +TEST_F(DBTestUniversalCompaction2, BasicL0toL1) { const int kNumKeys = 3000; const int kWindowSize = 100; const int kNumDelsTrigger = 90; @@ -1956,7 +1955,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) { ASSERT_GT(NumTableFilesAtLevel(6), 0); } -TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) { +TEST_F(DBTestUniversalCompaction2, SingleLevel) { const int kNumKeys = 3000; const int kWindowSize = 100; const int kNumDelsTrigger = 90; @@ -1995,7 +1994,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) { ASSERT_EQ(1, NumTableFilesAtLevel(0)); } -TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) { +TEST_F(DBTestUniversalCompaction2, MultipleLevels) { const int kWindowSize = 100; const int kNumDelsTrigger = 90; @@ -2067,7 +2066,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) { ASSERT_GT(NumTableFilesAtLevel(6), 0); } -TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) { +TEST_F(DBTestUniversalCompaction2, OverlappingL0) { const int kWindowSize = 100; const int kNumDelsTrigger = 90; @@ -2107,7 +2106,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) { ASSERT_GT(NumTableFilesAtLevel(6), 0); } -TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) { +TEST_F(DBTestUniversalCompaction2, IngestBehind) { const int kNumKeys = 3000; const int kWindowSize = 100; const int kNumDelsTrigger = 90; @@ -2150,6 +2149,72 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) { ASSERT_GT(NumTableFilesAtLevel(5), 0); } +TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) { + Options opts = CurrentOptions(); + opts.env = env_; + opts.compaction_style = kCompactionStyleUniversal; + opts.level0_file_num_compaction_trigger = 10; + opts.max_open_files = -1; + opts.compaction_options_universal.size_ratio = 10; + opts.compaction_options_universal.min_merge_width = 2; + opts.compaction_options_universal.max_size_amplification_percent = 200; + opts.periodic_compaction_seconds = 48 * 60 * 60; // 2 days + opts.num_levels = 5; + env_->addon_time_.store(0); + Reopen(opts); + + int periodic_compactions = 0; + int start_level = -1; + int output_level = -1; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "UniversalCompactionPicker::PickPeriodicCompaction:Return", + [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + ASSERT_TRUE(arg != nullptr); + ASSERT_TRUE(compaction->compaction_reason() == + CompactionReason::kPeriodicCompaction); + start_level = compaction->start_level(); + output_level = compaction->output_level(); + periodic_compactions++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Case 1: Oldest flushed file excceeds periodic compaction threshold. + ASSERT_OK(Put("foo", "bar")); + Flush(); + ASSERT_EQ(0, periodic_compactions); + // Move clock forward so that the flushed file would qualify periodic + // compaction. + env_->addon_time_.store(48 * 60 * 60 + 100); + + // Another flush would trigger compaction the oldest file. + ASSERT_OK(Put("foo", "bar2")); + Flush(); + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(1, periodic_compactions); + ASSERT_EQ(0, start_level); + ASSERT_EQ(4, output_level); + + // Case 2: Oldest compacted file excceeds periodic compaction threshold + periodic_compactions = 0; + // A flush doesn't trigger a periodic compaction when threshold not hit + ASSERT_OK(Put("foo", "bar2")); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, periodic_compactions); + + // After periodic compaction threshold hits, a flush will trigger + // a compaction + ASSERT_OK(Put("foo", "bar2")); + env_->addon_time_.fetch_add(48 * 60 * 60 + 100); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(1, periodic_compactions); + ASSERT_EQ(0, start_level); + ASSERT_EQ(4, output_level); +} + } // namespace rocksdb #endif // !defined(ROCKSDB_LITE) diff --git a/db/version_set.h b/db/version_set.h index 6b9d71c374..758bd5e5d3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -312,6 +312,10 @@ class VersionStorageInfo { return files_marked_for_periodic_compaction_; } + void TEST_AddFileMarkedForPeriodicCompaction(int level, FileMetaData* f) { + files_marked_for_periodic_compaction_.emplace_back(level, f); + } + // REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: DB mutex held during access const autovector>&