diff --git a/CMakeLists.txt b/CMakeLists.txt index b475a2224f..95ecf79175 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -772,6 +772,7 @@ set(SOURCES options/configurable.cc options/customizable.cc options/db_options.cc + options/offpeak_time_info.cc options/options.cc options/options_helper.cc options/options_parser.cc diff --git a/TARGETS b/TARGETS index 6f0efa4007..e8aaf325d4 100644 --- a/TARGETS +++ b/TARGETS @@ -163,6 +163,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "options/configurable.cc", "options/customizable.cc", "options/db_options.cc", + "options/offpeak_time_info.cc", "options/options.cc", "options/options_helper.cc", "options/options_parser.cc", diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 8bf3132a1e..eccd57701a 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -215,7 +215,8 @@ class CompactionJobTestBase : public testing::Test { dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")), + /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "", + /*daily_offpeak_time_utc*/ "")), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()), error_handler_(nullptr, db_options_, &mutex_), @@ -540,11 +541,11 @@ class CompactionJobTestBase : public testing::Test { ASSERT_OK(s); db_options_.info_log = info_log; - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + versions_.reset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); compaction_job_stats_.Reset(); ASSERT_OK(SetIdentityFile(env_, dbname_)); diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 64326a95c9..a5184c956c 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -85,7 +85,8 @@ class CompactionPickerTestBase : public testing::Test { vstorage_.reset(new VersionStorageInfo( &icmp_, ucmp_, options_.num_levels, style, nullptr, false, EpochNumberRequirement::kMustPresent, ioptions_.clock, - options_.bottommost_file_compaction_delay)); + options_.bottommost_file_compaction_delay, + OffpeakTimeInfo(mutable_db_options_.daily_offpeak_time_utc))); vstorage_->PrepareForVersionAppend(ioptions_, mutable_cf_options_); } @@ -95,7 +96,8 @@ class CompactionPickerTestBase : public testing::Test { temp_vstorage_.reset(new VersionStorageInfo( &icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style, vstorage_.get(), false, EpochNumberRequirement::kMustPresent, - ioptions_.clock, options_.bottommost_file_compaction_delay)); + ioptions_.clock, options_.bottommost_file_compaction_delay, + OffpeakTimeInfo(mutable_db_options_.daily_offpeak_time_utc))); } void DeleteVersionStorage() { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2d4456485c..4a62791485 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -276,10 +276,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, this->RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0); }); - versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, - table_cache_.get(), write_buffer_manager_, - &write_controller_, &block_cache_tracer_, - io_tracer_, db_id_, db_session_id_)); + versions_.reset(new VersionSet( + dbname_, &immutable_db_options_, file_options_, table_cache_.get(), + write_buffer_manager_, &write_controller_, &block_cache_tracer_, + io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -1328,17 +1328,24 @@ Status DBImpl::SetDBOptions( const bool max_compactions_increased = new_bg_job_limits.max_compactions > current_bg_job_limits.max_compactions; + const bool offpeak_time_changed = + versions_->offpeak_time_info().daily_offpeak_time_utc != + new_db_options.daily_offpeak_time_utc; - if (max_flushes_increased || max_compactions_increased) { + if (max_flushes_increased || max_compactions_increased || + offpeak_time_changed) { if (max_flushes_increased) { env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes, Env::Priority::HIGH); } - if (max_compactions_increased) { env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions, Env::Priority::LOW); } + if (offpeak_time_changed) { + versions_->ChangeOffpeakTimeInfo( + new_db_options.daily_offpeak_time_utc); + } MaybeScheduleFlushOrCompaction(); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 5af305d310..08812a35bd 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2783,6 +2783,7 @@ void DBImpl::EnableManualCompaction() { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Start"); if (!opened_successfully_) { // Compaction may introduce data race to DB open return; diff --git a/db/db_options_test.cc b/db/db_options_test.cc index f52982bbcb..fa7f52d297 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -1099,7 +1099,7 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) { ASSERT_EQ(fifo_temp_opt[1].age, 30000); } -TEST_F(DBOptionsTest, OffPeakTimes) { +TEST_F(DBOptionsTest, OffpeakTimes) { Options options; options.create_if_missing = true; Random rnd(test::RandomSeed()); @@ -1164,8 +1164,8 @@ TEST_F(DBOptionsTest, OffPeakTimes) { now_utc_minute * 60 + now_utc_second); Status s = DBImpl::TEST_ValidateOptions(options); ASSERT_OK(s); - auto db_options = MutableDBOptions(options); - ASSERT_EQ(expected, db_options.IsNowOffPeak(mock_clock.get())); + auto offpeak_info = OffpeakTimeInfo(options.daily_offpeak_time_utc); + ASSERT_EQ(expected, offpeak_info.IsNowOffpeak(mock_clock.get())); }; options.daily_offpeak_time_utc = ""; @@ -1194,100 +1194,53 @@ TEST_F(DBOptionsTest, OffPeakTimes) { verify_is_now_offpeak(true, 23, 59, 1); verify_is_now_offpeak(true, 23, 59, 59); - // Open the db and test by Get/SetDBOptions options.daily_offpeak_time_utc = ""; DestroyAndReopen(options); ASSERT_EQ("", dbfull()->GetDBOptions().daily_offpeak_time_utc); + + int may_schedule_compaction_called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MaybeScheduleFlushOrCompaction:Start", + [&](void*) { may_schedule_compaction_called++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Make sure calling SetDBOptions with invalid option does not set the value + // nor call MaybeScheduleFlushOrCompaction() for (std::string invalid_case : invalid_cases) { ASSERT_NOK( dbfull()->SetDBOptions({{"daily_offpeak_time_utc", invalid_case}})); + ASSERT_EQ( + "", + dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc); } + ASSERT_EQ(0, may_schedule_compaction_called); + + // Changing to new valid values should call MaybeScheduleFlushOrCompaction() + // and sets the offpeak_time_info in VersionSet + int expected_count = 0; for (std::string valid_case : valid_cases) { + if (dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc != + valid_case) { + expected_count++; + } ASSERT_OK(dbfull()->SetDBOptions({{"daily_offpeak_time_utc", valid_case}})); ASSERT_EQ(valid_case, dbfull()->GetDBOptions().daily_offpeak_time_utc); + ASSERT_EQ( + valid_case, + dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc); } - Close(); + ASSERT_EQ(expected_count, may_schedule_compaction_called); - // Sets off-peak time from 11:30PM to 4:30AM next day. - // Starting at 1:30PM, use mock sleep to make time pass - // and see if IsNowOffPeak() returns correctly per time changes - int now_hour = 13; - int now_minute = 30; - options.daily_offpeak_time_utc = "23:30-04:30"; - auto mock_clock = std::make_shared(env_->GetSystemClock()); - auto mock_env = std::make_unique(env_, mock_clock); - // Add some extra random days to current time - int days = rnd.Uniform(100); - mock_clock->SetCurrentTime(days * 86400 + now_hour * 3600 + now_minute * 60); - options.env = mock_env.get(); - - // Starting at 1:30PM. It's not off-peak - DestroyAndReopen(options); - ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Now it's at 4:30PM. Still not off-peak - mock_clock->MockSleepForSeconds(3 * 3600); - ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Now it's at 11:30PM. It's off-peak - mock_clock->MockSleepForSeconds(7 * 3600); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Now it's at 2:30AM next day. It's still off-peak - mock_clock->MockSleepForSeconds(3 * 3600); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Now it's at 4:30AM. It's still off-peak - mock_clock->MockSleepForSeconds(2 * 3600); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Sleep for one more minute. It's at 4:31AM It's no longer off-peak - mock_clock->MockSleepForSeconds(60); - ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - Close(); - - // Entire day offpeak - options.daily_offpeak_time_utc = "00:00-23:59"; - DestroyAndReopen(options); - // It doesn't matter what time it is. It should be just offpeak. - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Mock Sleep for 3 hours. It's still off-peak - mock_clock->MockSleepForSeconds(3 * 3600); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Mock Sleep for 20 hours. It's still off-peak - mock_clock->MockSleepForSeconds(20 * 3600); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Mock Sleep for 59 minutes. It's still off-peak - mock_clock->MockSleepForSeconds(59 * 60); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Mock Sleep for 59 seconds. It's still off-peak - mock_clock->MockSleepForSeconds(59); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - - // Mock Sleep for 1 second (exactly 24h passed). It's still off-peak - mock_clock->MockSleepForSeconds(1); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); - // Another second for sanity check - mock_clock->MockSleepForSeconds(1); - ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions()) - .IsNowOffPeak(mock_clock.get())); + // Changing to the same value should not call MaybeScheduleFlushOrCompaction() + ASSERT_OK( + dbfull()->SetDBOptions({{"daily_offpeak_time_utc", "06:30-11:30"}})); + may_schedule_compaction_called = 0; + ASSERT_OK( + dbfull()->SetDBOptions({{"daily_offpeak_time_utc", "06:30-11:30"}})); + ASSERT_EQ(0, may_schedule_compaction_called); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); Close(); } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index edc2ebf2ee..7e7a89cdfa 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1541,7 +1541,8 @@ class RecoveryTestHelper { test->dbname_, &db_options, file_options, table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")); + /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "", + options.daily_offpeak_time_utc)); wal_manager.reset( new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 0f78717096..9a626eac89 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -127,11 +127,11 @@ class FlushJobTestBase : public testing::Test { column_families.emplace_back(cf_name, cf_options_); } - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + versions_.reset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); EXPECT_OK(versions_->Recover(column_families, false)); } diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index 32bc4eead4..3ad8b7b610 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -186,7 +186,8 @@ Status ImportColumnFamilyJob::Run() { cfd_->NumberLevels(), cfd_->ioptions()->compaction_style, nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks, EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock, - cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay); + cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay, + cfd_->current()->version_set()->offpeak_time_info()); Status s; for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) { @@ -429,4 +430,4 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo( return status; } -} // namespace ROCKSDB_NAMESPACE \ No newline at end of file +} // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 3203c7a00e..6292f46e41 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -107,7 +107,7 @@ class MemTableListTest : public testing::Test { table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id*/ "", - /*db_session_id*/ ""); + /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -158,7 +158,7 @@ class MemTableListTest : public testing::Test { table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id*/ "", - /*db_session_id*/ ""); + /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index e303eae643..dee5d6c7ec 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -122,7 +122,7 @@ class Repairer { vset_(dbname_, &immutable_db_options_, file_options_, raw_table_cache_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id=*/"", db_session_id_), + /*db_id=*/"", db_session_id_, db_options.daily_offpeak_time_utc), next_file_number_(1), db_lock_(nullptr), closed_(false) { @@ -694,7 +694,8 @@ class Repairer { cfd->NumberLevels(), cfd->ioptions()->compaction_style, nullptr /* src_vstorage */, cfd->ioptions()->force_consistency_checks, EpochNumberRequirement::kMightMissing, cfd->ioptions()->clock, - /*bottommost_file_compaction_delay=*/0); + /*bottommost_file_compaction_delay=*/0, + cfd->current()->version_set()->offpeak_time_info()); Status s; VersionEdit dummy_edit; for (const auto* table : cf_id_and_tables.second) { diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 34db9aba85..00b4a810a1 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -38,7 +38,8 @@ class VersionBuilderTest : public testing::Test { mutable_cf_options_(options_), vstorage_(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, EpochNumberRequirement::kMustPresent, - ioptions_.clock, options_.bottommost_file_compaction_delay), + ioptions_.clock, options_.bottommost_file_compaction_delay, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)), file_num_(1) { mutable_cf_options_.RefreshDerivedOptions(ioptions_); size_being_compacted_.resize(options_.num_levels); @@ -202,7 +203,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder.Apply(&version_edit)); ASSERT_OK(version_builder.SaveTo(&new_vstorage)); @@ -253,7 +255,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder.Apply(&version_edit)); ASSERT_OK(version_builder.SaveTo(&new_vstorage)); @@ -308,7 +311,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder.Apply(&version_edit)); ASSERT_OK(version_builder.SaveTo(&new_vstorage)); @@ -365,7 +369,8 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder.Apply(&version_edit)); ASSERT_OK(version_builder.SaveTo(&new_vstorage)); @@ -388,7 +393,8 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); VersionEdit version_edit; version_edit.AddFile( @@ -557,7 +563,7 @@ TEST_F(VersionBuilderTest, ApplyFileDeletionAndAddition) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -702,7 +708,7 @@ TEST_F(VersionBuilderTest, ApplyFileAdditionAndDeletion) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -747,7 +753,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAddition) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -887,7 +893,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -961,7 +967,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -1142,7 +1148,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -1191,7 +1197,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) { VersionStorageInfo newer_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &new_vstorage, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(second_builder.SaveTo(&newer_vstorage)); @@ -1278,7 +1284,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesToConcurrentJobs) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -1382,7 +1388,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -1422,7 +1428,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesInconsistentLinks) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); const Status s = builder.SaveTo(&new_vstorage); ASSERT_TRUE(s.IsCorruption()); @@ -1464,7 +1470,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); const Status s = builder.SaveTo(&new_vstorage); ASSERT_TRUE(s.IsCorruption()); @@ -1514,7 +1520,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbageLinkedSsts) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); const Status s = builder.SaveTo(&new_vstorage); ASSERT_TRUE(s.IsCorruption()); @@ -1678,7 +1684,7 @@ TEST_F(VersionBuilderTest, MaintainLinkedSstsForBlobFiles) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_, force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr, - 0); + 0, OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(builder.SaveTo(&new_vstorage)); @@ -1730,7 +1736,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) { VersionStorageInfo new_vstorage( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, true /* force_consistency_checks */, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder.Apply(&version_edit)); ASSERT_OK(version_builder.SaveTo(&new_vstorage)); @@ -1741,7 +1748,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) { VersionStorageInfo new_vstorage2( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, true /* force_consistency_checks */, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_NOK(version_builder2.Apply(&version_edit)); UnrefFilesInVersion(&new_vstorage); @@ -1780,7 +1788,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) { VersionStorageInfo new_vstorage_1( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr /* src_vstorage */, true /* force_consistency_checks */, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder_1.Apply(&version_edit_1)); s = version_builder_1.SaveTo(&new_vstorage_1); @@ -1818,7 +1827,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) { VersionStorageInfo new_vstorage_2( &icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr /* src_vstorage */, true /* force_consistency_checks */, - EpochNumberRequirement::kMightMissing, nullptr, 0); + EpochNumberRequirement::kMightMissing, nullptr, 0, + OffpeakTimeInfo(options_.daily_offpeak_time_utc)); ASSERT_OK(version_builder_2.Apply(&version_edit_2)); s = version_builder_2.SaveTo(&new_vstorage_2); diff --git a/db/version_set.cc b/db/version_set.cc index 482b4c90dd..17c430575d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2124,7 +2124,8 @@ VersionStorageInfo::VersionStorageInfo( CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage, bool _force_consistency_checks, EpochNumberRequirement epoch_number_requirement, SystemClock* clock, - uint32_t bottommost_file_compaction_delay) + uint32_t bottommost_file_compaction_delay, + OffpeakTimeInfo offpeak_time_info) : internal_comparator_(internal_comparator), user_comparator_(user_comparator), // cfd is nullptr if Version is dummy @@ -2156,7 +2157,8 @@ VersionStorageInfo::VersionStorageInfo( bottommost_file_compaction_delay_(bottommost_file_compaction_delay), finalized_(false), force_consistency_checks_(_force_consistency_checks), - epoch_number_requirement_(epoch_number_requirement) { + epoch_number_requirement_(epoch_number_requirement), + offpeak_time_info_(offpeak_time_info) { if (ref_vstorage != nullptr) { accumulated_file_size_ = ref_vstorage->accumulated_file_size_; accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_; @@ -2200,9 +2202,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks, epoch_number_requirement, cfd_ == nullptr ? nullptr : cfd_->ioptions()->clock, - cfd_ == nullptr - ? 0 - : mutable_cf_options.bottommost_file_compaction_delay), + cfd_ == nullptr ? 0 + : mutable_cf_options.bottommost_file_compaction_delay, + vset->offpeak_time_info()), vset_(vset), next_(this), prev_(this), @@ -5043,15 +5045,13 @@ void AtomicGroupReadBuffer::Clear() { replay_buffer_.clear(); } -VersionSet::VersionSet(const std::string& dbname, - const ImmutableDBOptions* _db_options, - const FileOptions& storage_options, Cache* table_cache, - WriteBufferManager* write_buffer_manager, - WriteController* write_controller, - BlockCacheTracer* const block_cache_tracer, - const std::shared_ptr& io_tracer, - const std::string& db_id, - const std::string& db_session_id) +VersionSet::VersionSet( + const std::string& dbname, const ImmutableDBOptions* _db_options, + const FileOptions& storage_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, WriteController* write_controller, + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr& io_tracer, const std::string& db_id, + const std::string& db_session_id, const std::string& daily_offpeak_time_utc) : column_family_set_(new ColumnFamilySet( dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller, block_cache_tracer, io_tracer, @@ -5076,7 +5076,8 @@ VersionSet::VersionSet(const std::string& dbname, file_options_(storage_options), block_cache_tracer_(block_cache_tracer), io_tracer_(io_tracer), - db_session_id_(db_session_id) {} + db_session_id_(db_session_id), + offpeak_time_info_(OffpeakTimeInfo(daily_offpeak_time_utc)) {} VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on @@ -6201,7 +6202,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, /*db_id*/ "", - /*db_session_id*/ ""); + /*db_session_id*/ "", options->daily_offpeak_time_utc); Status status; std::vector dummy; @@ -7242,7 +7243,8 @@ ReactiveVersionSet::ReactiveVersionSet( : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, /*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "", - /*db_session_id*/ "") {} + /*db_session_id*/ "", + /*daily_offpeak_time_utc*/ "") {} ReactiveVersionSet::~ReactiveVersionSet() {} diff --git a/db/version_set.h b/db/version_set.h index 55bce41e9d..a6bfc5aa67 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -53,6 +53,7 @@ #endif #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" +#include "options/offpeak_time_info.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_checksum.h" @@ -134,7 +135,8 @@ class VersionStorageInfo { bool _force_consistency_checks, EpochNumberRequirement epoch_number_requirement, SystemClock* clock, - uint32_t bottommost_file_compaction_delay); + uint32_t bottommost_file_compaction_delay, + OffpeakTimeInfo offpeak_time_info); // No copying allowed VersionStorageInfo(const VersionStorageInfo&) = delete; void operator=(const VersionStorageInfo&) = delete; @@ -751,7 +753,8 @@ class VersionStorageInfo { // target sizes. uint64_t estimated_compaction_needed_bytes_; - // Used for computing bottommost files marked for compaction. + // Used for computing bottommost files marked for compaction and checking for + // offpeak time. SystemClock* clock_; uint32_t bottommost_file_compaction_delay_; @@ -763,6 +766,8 @@ class VersionStorageInfo { EpochNumberRequirement epoch_number_requirement_; + OffpeakTimeInfo offpeak_time_info_; + friend class Version; friend class VersionSet; }; @@ -1146,7 +1151,8 @@ class VersionSet { WriteController* write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id); + const std::string& db_id, const std::string& db_session_id, + const std::string& daily_offpeak_time_utc); // No copying allowed VersionSet(const VersionSet&) = delete; void operator=(const VersionSet&) = delete; @@ -1501,6 +1507,12 @@ class VersionSet { new_options.writable_file_max_buffer_size; } + // TODO - Consider updating together when file options change in SetDBOptions + const OffpeakTimeInfo& offpeak_time_info() { return offpeak_time_info_; } + void ChangeOffpeakTimeInfo(const std::string& daily_offpeak_time_utc) { + offpeak_time_info_.daily_offpeak_time_utc = daily_offpeak_time_utc; + } + const ImmutableDBOptions* db_options() const { return db_options_; } static uint64_t GetNumLiveVersions(Version* dummy_versions); @@ -1651,6 +1663,9 @@ class VersionSet { std::string db_session_id_; + // Off-peak time information used for compaction scoring + OffpeakTimeInfo offpeak_time_info_; + private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 2526e752f4..43d4036a3a 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -21,6 +21,7 @@ #include "table/block_based/block_based_table_factory.h" #include "table/mock_table.h" #include "table/unique_id_impl.h" +#include "test_util/mock_time_env.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/string_util.h" @@ -132,7 +133,8 @@ class VersionStorageInfoTestBase : public testing::Test { /*src_vstorage=*/nullptr, /*_force_consistency_checks=*/false, EpochNumberRequirement::kMustPresent, ioptions_.clock, - mutable_cf_options_.bottommost_file_compaction_delay) {} + mutable_cf_options_.bottommost_file_compaction_delay, + OffpeakTimeInfo()) {} ~VersionStorageInfoTestBase() override { for (int i = 0; i < vstorage_.num_levels(); ++i) { @@ -1199,11 +1201,11 @@ class VersionSetTestBase { immutable_options_.fs = fs_; immutable_options_.clock = env_->GetSystemClock().get(); - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + versions_.reset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); reactive_versions_ = std::make_shared( dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, nullptr); @@ -1303,11 +1305,11 @@ class VersionSetTestBase { } void ReopenDB() { - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + versions_.reset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); EXPECT_OK(versions_->Recover(column_families_, false)); } @@ -1815,11 +1817,11 @@ TEST_F(VersionSetTest, WalAddition) { // Recover a new VersionSet. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -1882,11 +1884,11 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) { // Recover a new VersionSet. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 2); @@ -1935,11 +1937,11 @@ TEST_F(VersionSetTest, WalDeletion) { // Recover a new VersionSet, only the non-closed WAL should show up. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -1973,11 +1975,11 @@ TEST_F(VersionSetTest, WalDeletion) { // Recover from the new MANIFEST, only the non-closed WAL should show up. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2093,11 +2095,11 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { // Recover a new VersionSet, WAL0 is deleted, WAL1 is not. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2129,11 +2131,11 @@ TEST_F(VersionSetTest, DeleteAllWals) { // Recover a new VersionSet, all WALs are deleted. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 0); @@ -2171,11 +2173,11 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { // Recover a new VersionSet, the min log number and the last WAL should be // kept. { - std::unique_ptr new_versions( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr new_versions(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); std::string db_id; ASSERT_OK( new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); @@ -2190,6 +2192,73 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { } } +TEST_F(VersionSetTest, OffpeakTimeInfoTest) { + Random rnd(test::RandomSeed()); + + // Sets off-peak time from 11:30PM to 4:30AM next day. + // Starting at 1:30PM, use mock sleep to make time pass + // and see if IsNowOffpeak() returns correctly per time changes + int now_hour = 13; + int now_minute = 30; + versions_->ChangeOffpeakTimeInfo("23:30-04:30"); + + auto mock_clock = std::make_shared(env_->GetSystemClock()); + // Add some extra random days to current time + int days = rnd.Uniform(100); + mock_clock->SetCurrentTime(days * 86400 + now_hour * 3600 + now_minute * 60); + + // Starting at 1:30PM. It's not off-peak + ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Now it's at 4:30PM. Still not off-peak + mock_clock->MockSleepForSeconds(3 * 3600); + ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Now it's at 11:30PM. It's off-peak + mock_clock->MockSleepForSeconds(7 * 3600); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Now it's at 2:30AM next day. It's still off-peak + mock_clock->MockSleepForSeconds(3 * 3600); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Now it's at 4:30AM. It's still off-peak + mock_clock->MockSleepForSeconds(2 * 3600); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Sleep for one more minute. It's at 4:31AM It's no longer off-peak + mock_clock->MockSleepForSeconds(60); + ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Entire day offpeak + versions_->ChangeOffpeakTimeInfo("00:00-23:59"); + // It doesn't matter what time it is. It should be just offpeak. + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Mock Sleep for 3 hours. It's still off-peak + mock_clock->MockSleepForSeconds(3 * 3600); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Mock Sleep for 20 hours. It's still off-peak + mock_clock->MockSleepForSeconds(20 * 3600); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Mock Sleep for 59 minutes. It's still off-peak + mock_clock->MockSleepForSeconds(59 * 60); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Mock Sleep for 59 seconds. It's still off-peak + mock_clock->MockSleepForSeconds(59); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + + // Mock Sleep for 1 second (exactly 24h passed). It's still off-peak + mock_clock->MockSleepForSeconds(1); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); + // Another second for sanity check + mock_clock->MockSleepForSeconds(1); + ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get())); +} + TEST_F(VersionStorageInfoTest, AddRangeDeletionCompensatedFileSize) { // Tests that compensated range deletion size is added to compensated file // size. @@ -2236,11 +2305,11 @@ class VersionSetWithTimestampTest : public VersionSetTest { } void VerifyFullHistoryTsLow(uint64_t expected_ts_low) { - std::unique_ptr vset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + std::unique_ptr vset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, /*db_id=*/nullptr)); for (auto* cfd : *(vset->GetColumnFamilySet())) { diff --git a/db/version_util.h b/db/version_util.h index e39f255719..f6042fa03f 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -25,7 +25,8 @@ class OfflineManifestWriter { options.table_cache_numshardbits)), versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "") {} + /*db_id*/ "", /*db_session_id*/ "", + options.daily_offpeak_time_utc) {} Status Recover(const std::vector& column_families) { return versions_.Recover(column_families, /*read_only*/ false, diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 0144e18468..abd7cd7eff 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -50,11 +50,11 @@ class WalManagerTest : public testing::Test { db_options_.fs = env_->GetFileSystem(); db_options_.clock = env_->GetSystemClock().get(); - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ "")); + versions_.reset(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, + /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "")); wal_manager_.reset( new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); @@ -333,4 +333,3 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } - diff --git a/options/db_options.cc b/options/db_options.cc index b26d18e75b..ca72404dd2 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -1066,38 +1066,6 @@ void MutableDBOptions::Dump(Logger* log) const { daily_offpeak_time_utc.c_str()); } -bool MutableDBOptions::IsNowOffPeak(SystemClock* clock) const { - if (daily_offpeak_time_utc.empty()) { - return false; - } - int64_t now; - if (clock->GetCurrentTime(&now).ok()) { - constexpr int kSecondsPerDay = 86400; - constexpr int kSecondsPerMinute = 60; - int seconds_since_midnight_to_nearest_minute = - (static_cast(now % kSecondsPerDay) / kSecondsPerMinute) * - kSecondsPerMinute; - int start_time = 0, end_time = 0; - bool success = - TryParseTimeRangeString(daily_offpeak_time_utc, start_time, end_time); - assert(success); - assert(start_time != end_time); - if (!success) { - // If the validation was done properly, we should never reach here - return false; - } - // if the offpeak duration spans overnight (i.e. 23:30 - 4:30 next day) - if (start_time > end_time) { - return start_time <= seconds_since_midnight_to_nearest_minute || - seconds_since_midnight_to_nearest_minute <= end_time; - } else { - return start_time <= seconds_since_midnight_to_nearest_minute && - seconds_since_midnight_to_nearest_minute <= end_time; - } - } - return false; -} - Status GetMutableDBOptionsFromStrings( const MutableDBOptions& base_options, const std::unordered_map& options_map, diff --git a/options/db_options.h b/options/db_options.h index 85a4d949b9..701a83febb 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -136,9 +136,7 @@ struct MutableDBOptions { bool strict_bytes_per_sync; size_t compaction_readahead_size; int max_background_flushes; - std::string daily_offpeak_time_utc; - bool IsNowOffPeak(SystemClock* clock) const; }; Status GetStringFromMutableDBOptions(const ConfigOptions& config_options, diff --git a/options/offpeak_time_info.cc b/options/offpeak_time_info.cc new file mode 100644 index 0000000000..678d112f1a --- /dev/null +++ b/options/offpeak_time_info.cc @@ -0,0 +1,48 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "options/offpeak_time_info.h" + +#include "rocksdb/system_clock.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +OffpeakTimeInfo::OffpeakTimeInfo() : daily_offpeak_time_utc("") {} +OffpeakTimeInfo::OffpeakTimeInfo(const std::string& offpeak_time) + : daily_offpeak_time_utc(offpeak_time) {} + +bool OffpeakTimeInfo::IsNowOffpeak(SystemClock* clock) const { + if (daily_offpeak_time_utc.empty()) { + return false; + } + int64_t now; + if (clock->GetCurrentTime(&now).ok()) { + constexpr int kSecondsPerDay = 86400; + constexpr int kSecondsPerMinute = 60; + int seconds_since_midnight_to_nearest_minute = + (static_cast(now % kSecondsPerDay) / kSecondsPerMinute) * + kSecondsPerMinute; + int start_time = 0, end_time = 0; + bool success = + TryParseTimeRangeString(daily_offpeak_time_utc, start_time, end_time); + assert(success); + assert(start_time != end_time); + if (!success) { + // If the validation was done properly, we should never reach here + return false; + } + // if the offpeak duration spans overnight (i.e. 23:30 - 4:30 next day) + if (start_time > end_time) { + return start_time <= seconds_since_midnight_to_nearest_minute || + seconds_since_midnight_to_nearest_minute <= end_time; + } else { + return start_time <= seconds_since_midnight_to_nearest_minute && + seconds_since_midnight_to_nearest_minute <= end_time; + } + } + return false; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/options/offpeak_time_info.h b/options/offpeak_time_info.h new file mode 100644 index 0000000000..74b456d3c1 --- /dev/null +++ b/options/offpeak_time_info.h @@ -0,0 +1,22 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { +class SystemClock; + +struct OffpeakTimeInfo { + OffpeakTimeInfo(); + explicit OffpeakTimeInfo(const std::string& offpeak_time); + std::string daily_offpeak_time_utc; + bool IsNowOffpeak(SystemClock* clock) const; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index a019205ae9..160ffdb2e0 100644 --- a/src.mk +++ b/src.mk @@ -156,6 +156,7 @@ LIB_SOURCES = \ options/configurable.cc \ options/customizable.cc \ options/db_options.cc \ + options/offpeak_time_info.cc \ options/options.cc \ options/options_helper.cc \ options/options_parser.cc \ diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 6edf0637f4..b467ab6d35 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1364,7 +1364,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ "", + options.daily_offpeak_time_utc); Status s = versions.DumpManifest(options, file, verbose, hex, json, cf_descs); if (!s.ok()) { fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), @@ -1507,7 +1508,8 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ "", + options.daily_offpeak_time_utc); std::vector cf_name_list; s = versions.ListColumnFamilies(&cf_name_list, db_path, immutable_db_options.fs.get()); @@ -2328,7 +2330,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int* levels) { WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_id*/ "", /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ "", + opt.daily_offpeak_time_utc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index c5b4115d14..05d433dbeb 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -207,7 +207,8 @@ class FileChecksumTestHelper { WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, - &wc, nullptr, nullptr, "", ""); + &wc, nullptr, nullptr, "", "", + options_.daily_offpeak_time_utc); std::vector cf_name_list; Status s; s = versions.ListColumnFamilies(&cf_name_list, dbname_,