Make OffpeakTimeInfo available in VersionSet (#12018)

Summary:
As mentioned in  https://github.com/facebook/rocksdb/issues/11893, we are going to use the offpeak time information to pre-process TTL-based compactions. To do so, we need to access `daily_offpeak_time_utc` in `VersionStorageInfo::ComputeCompactionScore()` where we pick the files to compact. This PR is to make the offpeak time information available at the time of compaction-scoring. We are not changing any compaction scoring logic just yet. Will follow up in a separate PR.

There were two ways to achieve what we want.
1.  Make `MutableDBOptions` available in `ColumnFamilyData` and `ComputeCompactionScore()` take `MutableDBOptions` along with `ImmutableOptions` and `MutableCFOptions`.
2. Make `daily_offpeak_time_utc` and `IsNowOffpeak()` available in `VersionStorageInfo`.

We chose the latter as it involves smaller changes.

This change includes the following
- Introduction of `OffpeakTimeInfo` and `IsNowOffpeak()` has been moved from `MutableDBOptions`
- `OffpeakTimeInfo` added to `VersionSet` and it can be set during construction and by `ChangeOffpeakTimeInfo()`
- During `SetDBOptions()`, if offpeak time info needs to change, it calls `MaybeScheduleFlushOrCompaction()` to re-compute compaction scores and process compactions as needed

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12018

Test Plan:
- `DBOptionsTest::OffpeakTimes` changed to include checks for `MaybeScheduleFlushOrCompaction()` calls and `VersionSet`'s OffpeakTimeInfo value change during `SetDBOptions()`.
- `VersionSetTest::OffpeakTimeInfoTest` added to test `ChangeOffpeakTimeInfo()`. `IsNowOffpeak()` tests moved from `DBOptionsTest::OffpeakTimes`

Reviewed By: pdillinger

Differential Revision: D50723881

Pulled By: jaykorean

fbshipit-source-id: 3cff0291936f3729c0e9c7750834b9378fb435f6
This commit is contained in:
Jay Huh 2023-10-27 15:56:48 -07:00 committed by Facebook GitHub Bot
parent 526f36b483
commit e230e4d248
25 changed files with 355 additions and 249 deletions

View File

@ -772,6 +772,7 @@ set(SOURCES
options/configurable.cc
options/customizable.cc
options/db_options.cc
options/offpeak_time_info.cc
options/options.cc
options/options_helper.cc
options/options_parser.cc

View File

@ -163,6 +163,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"options/configurable.cc",
"options/customizable.cc",
"options/db_options.cc",
"options/offpeak_time_info.cc",
"options/options.cc",
"options/options_helper.cc",
"options/options_parser.cc",

View File

@ -215,7 +215,8 @@ class CompactionJobTestBase : public testing::Test {
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")),
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "",
/*daily_offpeak_time_utc*/ "")),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
@ -540,11 +541,11 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_OK(s);
db_options_.info_log = info_log;
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(env_, dbname_));

View File

@ -85,7 +85,8 @@ class CompactionPickerTestBase : public testing::Test {
vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, style, nullptr, false,
EpochNumberRequirement::kMustPresent, ioptions_.clock,
options_.bottommost_file_compaction_delay));
options_.bottommost_file_compaction_delay,
OffpeakTimeInfo(mutable_db_options_.daily_offpeak_time_utc)));
vstorage_->PrepareForVersionAppend(ioptions_, mutable_cf_options_);
}
@ -95,7 +96,8 @@ class CompactionPickerTestBase : public testing::Test {
temp_vstorage_.reset(new VersionStorageInfo(
&icmp_, ucmp_, options_.num_levels, ioptions_.compaction_style,
vstorage_.get(), false, EpochNumberRequirement::kMustPresent,
ioptions_.clock, options_.bottommost_file_compaction_delay));
ioptions_.clock, options_.bottommost_file_compaction_delay,
OffpeakTimeInfo(mutable_db_options_.daily_offpeak_time_utc)));
}
void DeleteVersionStorage() {

View File

@ -276,10 +276,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
this->RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
});
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_,
&write_controller_, &block_cache_tracer_,
io_tracer_, db_id_, db_session_id_));
versions_.reset(new VersionSet(
dbname_, &immutable_db_options_, file_options_, table_cache_.get(),
write_buffer_manager_, &write_controller_, &block_cache_tracer_,
io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -1328,17 +1328,24 @@ Status DBImpl::SetDBOptions(
const bool max_compactions_increased =
new_bg_job_limits.max_compactions >
current_bg_job_limits.max_compactions;
const bool offpeak_time_changed =
versions_->offpeak_time_info().daily_offpeak_time_utc !=
new_db_options.daily_offpeak_time_utc;
if (max_flushes_increased || max_compactions_increased) {
if (max_flushes_increased || max_compactions_increased ||
offpeak_time_changed) {
if (max_flushes_increased) {
env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
Env::Priority::HIGH);
}
if (max_compactions_increased) {
env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
Env::Priority::LOW);
}
if (offpeak_time_changed) {
versions_->ChangeOffpeakTimeInfo(
new_db_options.daily_offpeak_time_utc);
}
MaybeScheduleFlushOrCompaction();
}

View File

@ -2783,6 +2783,7 @@ void DBImpl::EnableManualCompaction() {
void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Start");
if (!opened_successfully_) {
// Compaction may introduce data race to DB open
return;

View File

@ -1099,7 +1099,7 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) {
ASSERT_EQ(fifo_temp_opt[1].age, 30000);
}
TEST_F(DBOptionsTest, OffPeakTimes) {
TEST_F(DBOptionsTest, OffpeakTimes) {
Options options;
options.create_if_missing = true;
Random rnd(test::RandomSeed());
@ -1164,8 +1164,8 @@ TEST_F(DBOptionsTest, OffPeakTimes) {
now_utc_minute * 60 + now_utc_second);
Status s = DBImpl::TEST_ValidateOptions(options);
ASSERT_OK(s);
auto db_options = MutableDBOptions(options);
ASSERT_EQ(expected, db_options.IsNowOffPeak(mock_clock.get()));
auto offpeak_info = OffpeakTimeInfo(options.daily_offpeak_time_utc);
ASSERT_EQ(expected, offpeak_info.IsNowOffpeak(mock_clock.get()));
};
options.daily_offpeak_time_utc = "";
@ -1194,100 +1194,53 @@ TEST_F(DBOptionsTest, OffPeakTimes) {
verify_is_now_offpeak(true, 23, 59, 1);
verify_is_now_offpeak(true, 23, 59, 59);
// Open the db and test by Get/SetDBOptions
options.daily_offpeak_time_utc = "";
DestroyAndReopen(options);
ASSERT_EQ("", dbfull()->GetDBOptions().daily_offpeak_time_utc);
int may_schedule_compaction_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MaybeScheduleFlushOrCompaction:Start",
[&](void*) { may_schedule_compaction_called++; });
SyncPoint::GetInstance()->EnableProcessing();
// Make sure calling SetDBOptions with invalid option does not set the value
// nor call MaybeScheduleFlushOrCompaction()
for (std::string invalid_case : invalid_cases) {
ASSERT_NOK(
dbfull()->SetDBOptions({{"daily_offpeak_time_utc", invalid_case}}));
ASSERT_EQ(
"",
dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc);
}
ASSERT_EQ(0, may_schedule_compaction_called);
// Changing to new valid values should call MaybeScheduleFlushOrCompaction()
// and sets the offpeak_time_info in VersionSet
int expected_count = 0;
for (std::string valid_case : valid_cases) {
if (dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc !=
valid_case) {
expected_count++;
}
ASSERT_OK(dbfull()->SetDBOptions({{"daily_offpeak_time_utc", valid_case}}));
ASSERT_EQ(valid_case, dbfull()->GetDBOptions().daily_offpeak_time_utc);
ASSERT_EQ(
valid_case,
dbfull()->GetVersionSet()->offpeak_time_info().daily_offpeak_time_utc);
}
Close();
ASSERT_EQ(expected_count, may_schedule_compaction_called);
// Sets off-peak time from 11:30PM to 4:30AM next day.
// Starting at 1:30PM, use mock sleep to make time pass
// and see if IsNowOffPeak() returns correctly per time changes
int now_hour = 13;
int now_minute = 30;
options.daily_offpeak_time_utc = "23:30-04:30";
auto mock_clock = std::make_shared<MockSystemClock>(env_->GetSystemClock());
auto mock_env = std::make_unique<CompositeEnvWrapper>(env_, mock_clock);
// Add some extra random days to current time
int days = rnd.Uniform(100);
mock_clock->SetCurrentTime(days * 86400 + now_hour * 3600 + now_minute * 60);
options.env = mock_env.get();
// Starting at 1:30PM. It's not off-peak
DestroyAndReopen(options);
ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Now it's at 4:30PM. Still not off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Now it's at 11:30PM. It's off-peak
mock_clock->MockSleepForSeconds(7 * 3600);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Now it's at 2:30AM next day. It's still off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Now it's at 4:30AM. It's still off-peak
mock_clock->MockSleepForSeconds(2 * 3600);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Sleep for one more minute. It's at 4:31AM It's no longer off-peak
mock_clock->MockSleepForSeconds(60);
ASSERT_FALSE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
Close();
// Entire day offpeak
options.daily_offpeak_time_utc = "00:00-23:59";
DestroyAndReopen(options);
// It doesn't matter what time it is. It should be just offpeak.
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Mock Sleep for 3 hours. It's still off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Mock Sleep for 20 hours. It's still off-peak
mock_clock->MockSleepForSeconds(20 * 3600);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Mock Sleep for 59 minutes. It's still off-peak
mock_clock->MockSleepForSeconds(59 * 60);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Mock Sleep for 59 seconds. It's still off-peak
mock_clock->MockSleepForSeconds(59);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Mock Sleep for 1 second (exactly 24h passed). It's still off-peak
mock_clock->MockSleepForSeconds(1);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Another second for sanity check
mock_clock->MockSleepForSeconds(1);
ASSERT_TRUE(MutableDBOptions(dbfull()->GetDBOptions())
.IsNowOffPeak(mock_clock.get()));
// Changing to the same value should not call MaybeScheduleFlushOrCompaction()
ASSERT_OK(
dbfull()->SetDBOptions({{"daily_offpeak_time_utc", "06:30-11:30"}}));
may_schedule_compaction_called = 0;
ASSERT_OK(
dbfull()->SetDBOptions({{"daily_offpeak_time_utc", "06:30-11:30"}}));
ASSERT_EQ(0, may_schedule_compaction_called);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}

View File

@ -1541,7 +1541,8 @@ class RecoveryTestHelper {
test->dbname_, &db_options, file_options, table_cache.get(),
&write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ ""));
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "",
options.daily_offpeak_time_utc));
wal_manager.reset(
new WalManager(db_options, file_options, /*io_tracer=*/nullptr));

View File

@ -127,11 +127,11 @@ class FlushJobTestBase : public testing::Test {
column_families.emplace_back(cf_name, cf_options_);
}
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
EXPECT_OK(versions_->Recover(column_families, false));
}

View File

@ -186,7 +186,8 @@ Status ImportColumnFamilyJob::Run() {
cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock,
cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay);
cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay,
cfd_->current()->version_set()->offpeak_time_info());
Status s;
for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {

View File

@ -107,7 +107,7 @@ class MemTableListTest : public testing::Test {
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "",
/*db_session_id*/ "");
/*db_session_id*/ "", /*daily_offpeak_time_utc*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
@ -158,7 +158,7 @@ class MemTableListTest : public testing::Test {
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "",
/*db_session_id*/ "");
/*db_session_id*/ "", /*daily_offpeak_time_utc*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());

View File

@ -122,7 +122,7 @@ class Repairer {
vset_(dbname_, &immutable_db_options_, file_options_,
raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id=*/"", db_session_id_),
/*db_id=*/"", db_session_id_, db_options.daily_offpeak_time_utc),
next_file_number_(1),
db_lock_(nullptr),
closed_(false) {
@ -694,7 +694,8 @@ class Repairer {
cfd->NumberLevels(), cfd->ioptions()->compaction_style,
nullptr /* src_vstorage */, cfd->ioptions()->force_consistency_checks,
EpochNumberRequirement::kMightMissing, cfd->ioptions()->clock,
/*bottommost_file_compaction_delay=*/0);
/*bottommost_file_compaction_delay=*/0,
cfd->current()->version_set()->offpeak_time_info());
Status s;
VersionEdit dummy_edit;
for (const auto* table : cf_id_and_tables.second) {

View File

@ -38,7 +38,8 @@ class VersionBuilderTest : public testing::Test {
mutable_cf_options_(options_),
vstorage_(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
nullptr, false, EpochNumberRequirement::kMustPresent,
ioptions_.clock, options_.bottommost_file_compaction_delay),
ioptions_.clock, options_.bottommost_file_compaction_delay,
OffpeakTimeInfo(options_.daily_offpeak_time_utc)),
file_num_(1) {
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
size_being_compacted_.resize(options_.num_levels);
@ -202,7 +203,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
@ -253,7 +255,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
@ -308,7 +311,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
@ -365,7 +369,8 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
@ -388,7 +393,8 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr, false,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
VersionEdit version_edit;
version_edit.AddFile(
@ -557,7 +563,7 @@ TEST_F(VersionBuilderTest, ApplyFileDeletionAndAddition) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -702,7 +708,7 @@ TEST_F(VersionBuilderTest, ApplyFileAdditionAndDeletion) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -747,7 +753,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileAddition) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -887,7 +893,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileInBase) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -961,7 +967,7 @@ TEST_F(VersionBuilderTest, ApplyBlobFileGarbageFileAdditionApplied) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -1142,7 +1148,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -1191,7 +1197,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesTo) {
VersionStorageInfo newer_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &new_vstorage,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(second_builder.SaveTo(&newer_vstorage));
@ -1278,7 +1284,7 @@ TEST_F(VersionBuilderTest, SaveBlobFilesToConcurrentJobs) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -1382,7 +1388,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFiles) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -1422,7 +1428,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesInconsistentLinks) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
const Status s = builder.SaveTo(&new_vstorage);
ASSERT_TRUE(s.IsCorruption());
@ -1464,7 +1470,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbage) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
const Status s = builder.SaveTo(&new_vstorage);
ASSERT_TRUE(s.IsCorruption());
@ -1514,7 +1520,7 @@ TEST_F(VersionBuilderTest, CheckConsistencyForBlobFilesAllGarbageLinkedSsts) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
const Status s = builder.SaveTo(&new_vstorage);
ASSERT_TRUE(s.IsCorruption());
@ -1678,7 +1684,7 @@ TEST_F(VersionBuilderTest, MaintainLinkedSstsForBlobFiles) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, &vstorage_,
force_consistency_checks, EpochNumberRequirement::kMightMissing, nullptr,
0);
0, OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(builder.SaveTo(&new_vstorage));
@ -1730,7 +1736,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
VersionStorageInfo new_vstorage(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr,
true /* force_consistency_checks */,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder.Apply(&version_edit));
ASSERT_OK(version_builder.SaveTo(&new_vstorage));
@ -1741,7 +1748,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForFileDeletedTwice) {
VersionStorageInfo new_vstorage2(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, nullptr,
true /* force_consistency_checks */,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_NOK(version_builder2.Apply(&version_edit));
UnrefFilesInVersion(&new_vstorage);
@ -1780,7 +1788,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) {
VersionStorageInfo new_vstorage_1(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
nullptr /* src_vstorage */, true /* force_consistency_checks */,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder_1.Apply(&version_edit_1));
s = version_builder_1.SaveTo(&new_vstorage_1);
@ -1818,7 +1827,8 @@ TEST_F(VersionBuilderTest, CheckConsistencyForL0FilesSortedByEpochNumber) {
VersionStorageInfo new_vstorage_2(
&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel,
nullptr /* src_vstorage */, true /* force_consistency_checks */,
EpochNumberRequirement::kMightMissing, nullptr, 0);
EpochNumberRequirement::kMightMissing, nullptr, 0,
OffpeakTimeInfo(options_.daily_offpeak_time_utc));
ASSERT_OK(version_builder_2.Apply(&version_edit_2));
s = version_builder_2.SaveTo(&new_vstorage_2);

View File

@ -2124,7 +2124,8 @@ VersionStorageInfo::VersionStorageInfo(
CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
bool _force_consistency_checks,
EpochNumberRequirement epoch_number_requirement, SystemClock* clock,
uint32_t bottommost_file_compaction_delay)
uint32_t bottommost_file_compaction_delay,
OffpeakTimeInfo offpeak_time_info)
: internal_comparator_(internal_comparator),
user_comparator_(user_comparator),
// cfd is nullptr if Version is dummy
@ -2156,7 +2157,8 @@ VersionStorageInfo::VersionStorageInfo(
bottommost_file_compaction_delay_(bottommost_file_compaction_delay),
finalized_(false),
force_consistency_checks_(_force_consistency_checks),
epoch_number_requirement_(epoch_number_requirement) {
epoch_number_requirement_(epoch_number_requirement),
offpeak_time_info_(offpeak_time_info) {
if (ref_vstorage != nullptr) {
accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
@ -2200,9 +2202,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks,
epoch_number_requirement,
cfd_ == nullptr ? nullptr : cfd_->ioptions()->clock,
cfd_ == nullptr
? 0
: mutable_cf_options.bottommost_file_compaction_delay),
cfd_ == nullptr ? 0
: mutable_cf_options.bottommost_file_compaction_delay,
vset->offpeak_time_info()),
vset_(vset),
next_(this),
prev_(this),
@ -5043,15 +5045,13 @@ void AtomicGroupReadBuffer::Clear() {
replay_buffer_.clear();
}
VersionSet::VersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id,
const std::string& db_session_id)
VersionSet::VersionSet(
const std::string& dbname, const ImmutableDBOptions* _db_options,
const FileOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager, WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
const std::string& db_session_id, const std::string& daily_offpeak_time_utc)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer, io_tracer,
@ -5076,7 +5076,8 @@ VersionSet::VersionSet(const std::string& dbname,
file_options_(storage_options),
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer),
db_session_id_(db_session_id) {}
db_session_id_(db_session_id),
offpeak_time_info_(OffpeakTimeInfo(daily_offpeak_time_utc)) {}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
@ -6201,7 +6202,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
/*db_id*/ "",
/*db_session_id*/ "");
/*db_session_id*/ "", options->daily_offpeak_time_utc);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -7242,7 +7243,8 @@ ReactiveVersionSet::ReactiveVersionSet(
: VersionSet(dbname, _db_options, _file_options, table_cache,
write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "",
/*db_session_id*/ "") {}
/*db_session_id*/ "",
/*daily_offpeak_time_utc*/ "") {}
ReactiveVersionSet::~ReactiveVersionSet() {}

View File

@ -53,6 +53,7 @@
#endif
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "options/offpeak_time_info.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
@ -134,7 +135,8 @@ class VersionStorageInfo {
bool _force_consistency_checks,
EpochNumberRequirement epoch_number_requirement,
SystemClock* clock,
uint32_t bottommost_file_compaction_delay);
uint32_t bottommost_file_compaction_delay,
OffpeakTimeInfo offpeak_time_info);
// No copying allowed
VersionStorageInfo(const VersionStorageInfo&) = delete;
void operator=(const VersionStorageInfo&) = delete;
@ -751,7 +753,8 @@ class VersionStorageInfo {
// target sizes.
uint64_t estimated_compaction_needed_bytes_;
// Used for computing bottommost files marked for compaction.
// Used for computing bottommost files marked for compaction and checking for
// offpeak time.
SystemClock* clock_;
uint32_t bottommost_file_compaction_delay_;
@ -763,6 +766,8 @@ class VersionStorageInfo {
EpochNumberRequirement epoch_number_requirement_;
OffpeakTimeInfo offpeak_time_info_;
friend class Version;
friend class VersionSet;
};
@ -1146,7 +1151,8 @@ class VersionSet {
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id);
const std::string& db_id, const std::string& db_session_id,
const std::string& daily_offpeak_time_utc);
// No copying allowed
VersionSet(const VersionSet&) = delete;
void operator=(const VersionSet&) = delete;
@ -1501,6 +1507,12 @@ class VersionSet {
new_options.writable_file_max_buffer_size;
}
// TODO - Consider updating together when file options change in SetDBOptions
const OffpeakTimeInfo& offpeak_time_info() { return offpeak_time_info_; }
void ChangeOffpeakTimeInfo(const std::string& daily_offpeak_time_utc) {
offpeak_time_info_.daily_offpeak_time_utc = daily_offpeak_time_utc;
}
const ImmutableDBOptions* db_options() const { return db_options_; }
static uint64_t GetNumLiveVersions(Version* dummy_versions);
@ -1651,6 +1663,9 @@ class VersionSet {
std::string db_session_id_;
// Off-peak time information used for compaction scoring
OffpeakTimeInfo offpeak_time_info_;
private:
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,

View File

@ -21,6 +21,7 @@
#include "table/block_based/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/unique_id_impl.h"
#include "test_util/mock_time_env.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
@ -132,7 +133,8 @@ class VersionStorageInfoTestBase : public testing::Test {
/*src_vstorage=*/nullptr,
/*_force_consistency_checks=*/false,
EpochNumberRequirement::kMustPresent, ioptions_.clock,
mutable_cf_options_.bottommost_file_compaction_delay) {}
mutable_cf_options_.bottommost_file_compaction_delay,
OffpeakTimeInfo()) {}
~VersionStorageInfoTestBase() override {
for (int i = 0; i < vstorage_.num_levels(); ++i) {
@ -1199,11 +1201,11 @@ class VersionSetTestBase {
immutable_options_.fs = fs_;
immutable_options_.clock = env_->GetSystemClock().get();
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, nullptr);
@ -1303,11 +1305,11 @@ class VersionSetTestBase {
}
void ReopenDB() {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
EXPECT_OK(versions_->Recover(column_families_, false));
}
@ -1815,11 +1817,11 @@ TEST_F(VersionSetTest, WalAddition) {
// Recover a new VersionSet.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1882,11 +1884,11 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) {
// Recover a new VersionSet.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
@ -1935,11 +1937,11 @@ TEST_F(VersionSetTest, WalDeletion) {
// Recover a new VersionSet, only the non-closed WAL should show up.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1973,11 +1975,11 @@ TEST_F(VersionSetTest, WalDeletion) {
// Recover from the new MANIFEST, only the non-closed WAL should show up.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -2093,11 +2095,11 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
// Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -2129,11 +2131,11 @@ TEST_F(VersionSetTest, DeleteAllWals) {
// Recover a new VersionSet, all WALs are deleted.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 0);
@ -2171,11 +2173,11 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
// Recover a new VersionSet, the min log number and the last WAL should be
// kept.
{
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> new_versions(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
std::string db_id;
ASSERT_OK(
new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
@ -2190,6 +2192,73 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
}
}
TEST_F(VersionSetTest, OffpeakTimeInfoTest) {
Random rnd(test::RandomSeed());
// Sets off-peak time from 11:30PM to 4:30AM next day.
// Starting at 1:30PM, use mock sleep to make time pass
// and see if IsNowOffpeak() returns correctly per time changes
int now_hour = 13;
int now_minute = 30;
versions_->ChangeOffpeakTimeInfo("23:30-04:30");
auto mock_clock = std::make_shared<MockSystemClock>(env_->GetSystemClock());
// Add some extra random days to current time
int days = rnd.Uniform(100);
mock_clock->SetCurrentTime(days * 86400 + now_hour * 3600 + now_minute * 60);
// Starting at 1:30PM. It's not off-peak
ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Now it's at 4:30PM. Still not off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Now it's at 11:30PM. It's off-peak
mock_clock->MockSleepForSeconds(7 * 3600);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Now it's at 2:30AM next day. It's still off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Now it's at 4:30AM. It's still off-peak
mock_clock->MockSleepForSeconds(2 * 3600);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Sleep for one more minute. It's at 4:31AM It's no longer off-peak
mock_clock->MockSleepForSeconds(60);
ASSERT_FALSE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Entire day offpeak
versions_->ChangeOffpeakTimeInfo("00:00-23:59");
// It doesn't matter what time it is. It should be just offpeak.
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Mock Sleep for 3 hours. It's still off-peak
mock_clock->MockSleepForSeconds(3 * 3600);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Mock Sleep for 20 hours. It's still off-peak
mock_clock->MockSleepForSeconds(20 * 3600);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Mock Sleep for 59 minutes. It's still off-peak
mock_clock->MockSleepForSeconds(59 * 60);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Mock Sleep for 59 seconds. It's still off-peak
mock_clock->MockSleepForSeconds(59);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Mock Sleep for 1 second (exactly 24h passed). It's still off-peak
mock_clock->MockSleepForSeconds(1);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
// Another second for sanity check
mock_clock->MockSleepForSeconds(1);
ASSERT_TRUE(versions_->offpeak_time_info().IsNowOffpeak(mock_clock.get()));
}
TEST_F(VersionStorageInfoTest, AddRangeDeletionCompensatedFileSize) {
// Tests that compensated range deletion size is added to compensated file
// size.
@ -2236,11 +2305,11 @@ class VersionSetWithTimestampTest : public VersionSetTest {
}
void VerifyFullHistoryTsLow(uint64_t expected_ts_low) {
std::unique_ptr<VersionSet> vset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
std::unique_ptr<VersionSet> vset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
/*db_id=*/nullptr));
for (auto* cfd : *(vset->GetColumnFamilySet())) {

View File

@ -25,7 +25,8 @@ class OfflineManifestWriter {
options.table_cache_numshardbits)),
versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "") {}
/*db_id*/ "", /*db_session_id*/ "",
options.daily_offpeak_time_utc) {}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) {
return versions_.Recover(column_families, /*read_only*/ false,

View File

@ -50,11 +50,11 @@ class WalManagerTest : public testing::Test {
db_options_.fs = env_->GetFileSystem();
db_options_.clock = env_->GetSystemClock().get();
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ ""));
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
@ -333,4 +333,3 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1066,38 +1066,6 @@ void MutableDBOptions::Dump(Logger* log) const {
daily_offpeak_time_utc.c_str());
}
bool MutableDBOptions::IsNowOffPeak(SystemClock* clock) const {
if (daily_offpeak_time_utc.empty()) {
return false;
}
int64_t now;
if (clock->GetCurrentTime(&now).ok()) {
constexpr int kSecondsPerDay = 86400;
constexpr int kSecondsPerMinute = 60;
int seconds_since_midnight_to_nearest_minute =
(static_cast<int>(now % kSecondsPerDay) / kSecondsPerMinute) *
kSecondsPerMinute;
int start_time = 0, end_time = 0;
bool success =
TryParseTimeRangeString(daily_offpeak_time_utc, start_time, end_time);
assert(success);
assert(start_time != end_time);
if (!success) {
// If the validation was done properly, we should never reach here
return false;
}
// if the offpeak duration spans overnight (i.e. 23:30 - 4:30 next day)
if (start_time > end_time) {
return start_time <= seconds_since_midnight_to_nearest_minute ||
seconds_since_midnight_to_nearest_minute <= end_time;
} else {
return start_time <= seconds_since_midnight_to_nearest_minute &&
seconds_since_midnight_to_nearest_minute <= end_time;
}
}
return false;
}
Status GetMutableDBOptionsFromStrings(
const MutableDBOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,

View File

@ -136,9 +136,7 @@ struct MutableDBOptions {
bool strict_bytes_per_sync;
size_t compaction_readahead_size;
int max_background_flushes;
std::string daily_offpeak_time_utc;
bool IsNowOffPeak(SystemClock* clock) const;
};
Status GetStringFromMutableDBOptions(const ConfigOptions& config_options,

View File

@ -0,0 +1,48 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "options/offpeak_time_info.h"
#include "rocksdb/system_clock.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
OffpeakTimeInfo::OffpeakTimeInfo() : daily_offpeak_time_utc("") {}
OffpeakTimeInfo::OffpeakTimeInfo(const std::string& offpeak_time)
: daily_offpeak_time_utc(offpeak_time) {}
bool OffpeakTimeInfo::IsNowOffpeak(SystemClock* clock) const {
if (daily_offpeak_time_utc.empty()) {
return false;
}
int64_t now;
if (clock->GetCurrentTime(&now).ok()) {
constexpr int kSecondsPerDay = 86400;
constexpr int kSecondsPerMinute = 60;
int seconds_since_midnight_to_nearest_minute =
(static_cast<int>(now % kSecondsPerDay) / kSecondsPerMinute) *
kSecondsPerMinute;
int start_time = 0, end_time = 0;
bool success =
TryParseTimeRangeString(daily_offpeak_time_utc, start_time, end_time);
assert(success);
assert(start_time != end_time);
if (!success) {
// If the validation was done properly, we should never reach here
return false;
}
// if the offpeak duration spans overnight (i.e. 23:30 - 4:30 next day)
if (start_time > end_time) {
return start_time <= seconds_since_midnight_to_nearest_minute ||
seconds_since_midnight_to_nearest_minute <= end_time;
} else {
return start_time <= seconds_since_midnight_to_nearest_minute &&
seconds_since_midnight_to_nearest_minute <= end_time;
}
}
return false;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,22 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
struct OffpeakTimeInfo {
OffpeakTimeInfo();
explicit OffpeakTimeInfo(const std::string& offpeak_time);
std::string daily_offpeak_time_utc;
bool IsNowOffpeak(SystemClock* clock) const;
};
} // namespace ROCKSDB_NAMESPACE

1
src.mk
View File

@ -156,6 +156,7 @@ LIB_SOURCES = \
options/configurable.cc \
options/customizable.cc \
options/db_options.cc \
options/offpeak_time_info.cc \
options/options.cc \
options/options_helper.cc \
options/options_parser.cc \

View File

@ -1364,7 +1364,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex,
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "");
/*db_id*/ "", /*db_session_id*/ "",
options.daily_offpeak_time_utc);
Status s = versions.DumpManifest(options, file, verbose, hex, json, cf_descs);
if (!s.ok()) {
fprintf(stderr, "Error in processing file %s %s\n", file.c_str(),
@ -1507,7 +1508,8 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options,
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "");
/*db_id*/ "", /*db_session_id*/ "",
options.daily_offpeak_time_utc);
std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path,
immutable_db_options.fs.get());
@ -2328,7 +2330,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int* levels) {
WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "");
/*db_id*/ "", /*db_session_id*/ "",
opt.daily_offpeak_time_utc);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt));

View File

@ -207,7 +207,8 @@ class FileChecksumTestHelper {
WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_);
VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,
&wc, nullptr, nullptr, "", "");
&wc, nullptr, nullptr, "", "",
options_.daily_offpeak_time_utc);
std::vector<std::string> cf_name_list;
Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_,