Steps toward preserve/preclude options mutable (#13124)

Summary:
Follow-up to https://github.com/facebook/rocksdb/issues/13114

This change makes the options mutable in testing only through some internal hooks, so that we can keep the easier mechanics and testing of making the options mutable separate from a more interesting and critical fix needed for the options to be *safely* mutable. See https://github.com/facebook/rocksdb/pull/9964/files#r1024449523 for some background on the interesting remaining problem, which we've added a test for here, with the failing piece commented out (because it puts the DB in a failure state): PrecludeLastLevelTest.RangeTombstoneSnapshotMigrateFromLast.

The mechanics of making the options mutable turned out to be smaller than expected because `RegisterRecordSeqnoTimeWorker()` and `RecordSeqnoToTimeMapping()` are already robust to things like frequently switching between preserve/preclude durations e.g. with new and dropped column families, based on work from
 https://github.com/facebook/rocksdb/issues/11920, https://github.com/facebook/rocksdb/issues/11929, and https://github.com/facebook/rocksdb/issues/12253. Mostly, `options_mutex_` prevents races
in applying the options changes, and smart capacity enforcement in `SeqnoToTimeMapping` means it doesn't really matter if the periodic task wakes up too often by being re-scheduled repeatedly.

Functional changes needed other than marking mutable:
* Update periodic task registration (as needed) from SetOptions, with a mapping recorded then also in case it's needed.
* Install SuperVersion(s) with updated mapping when the registration function itself updates the mapping.

Possible follow-up (aside from already mentioned):
* Some FIXME code in RangeTombstoneSnapshotMigrateFromLast is present because Flush does not automatically include a seqno to time mapping entry that puts an upper bound on how new the flushed data is. This has the potential to be a measurable CPU impact so needs to be done carefully.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13124

Test Plan:
updated/refactored tests in tiered_compaction_test to parametrically use dynamic configuration changes (or DB restarts) when changing operating parameters such as these.

CheckInternalKeyRange test got some heavier refactoring in preparation for follow-up, and manually verified that the test still fails when relevant `if (!safe_to_penultimate_level) ...` code is disabled.

Reviewed By: jowlyzhang

Differential Revision: D65634146

Pulled By: pdillinger

fbshipit-source-id: 25c9d00fd5b7fd1b408b5f36d58dc48647970528
This commit is contained in:
Peter Dillinger 2024-11-18 19:34:01 -08:00 committed by Facebook GitHub Bot
parent f69a5fe8ee
commit 8a36543326
8 changed files with 394 additions and 160 deletions

View File

@ -1561,6 +1561,11 @@ Status ColumnFamilyData::SetOptions(
BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
ConfigOptions config_opts;
config_opts.mutable_options_only = true;
#ifndef NDEBUG
if (TEST_allowSetOptionsImmutableInMutable) {
config_opts.mutable_options_only = false;
}
#endif
Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
&cf_opts);
if (s.ok()) {

View File

@ -9,15 +9,26 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "options/cf_options.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/mock_time_env.h"
#include "util/defer.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
ConfigOptions GetStrictConfigOptions() {
ConfigOptions config_options;
config_options.ignore_unknown_options = false;
config_options.ignore_unsupported_options = false;
config_options.input_strings_escaped = false;
return config_options;
}
} // namespace
class TieredCompactionTest : public DBTestBase {
public:
@ -1234,82 +1245,9 @@ TEST_F(TieredCompactionTest, RangeBasedTieredStorageLevel) {
db_->ReleaseSnapshot(temp_snap);
}
TEST_F(TieredCompactionTest, CheckInternalKeyRange) {
// When compacting keys from the last level to penultimate level,
// output to penultimate level should be within internal key range
// of input files from penultimate level.
// Set up:
// L5:
// File 1: DeleteRange[1, 3)@4, File 2: [3@5, 100@6]
// L6:
// File 3: [2@1, 3@2], File 4: [50@3]
//
// When File 1 and File 3 are being compacted,
// Key(3) cannot be compacted up, otherwise it causes
// inconsistency where File 3's Key(3) has a lower sequence number
// than File 2's Key(3).
const int kNumLevels = 7;
auto options = CurrentOptions();
SetColdTemperature(options);
options.level_compaction_dynamic_level_bytes = true;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
options.max_subcompactions = 10;
options.preclude_last_level_data_seconds = 10000;
DestroyAndReopen(options);
auto cmp = options.comparator;
std::string hot_start = Key(0);
std::string hot_end = Key(0);
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
cmp->Compare(context->key, hot_start) >= 0 &&
cmp->Compare(context->key, hot_end) < 0;
});
SyncPoint::GetInstance()->EnableProcessing();
// File 1
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(3), "val3"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
// File 2
ASSERT_OK(Put(Key(50), "val50"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
const Snapshot* snapshot = db_->GetSnapshot();
hot_end = Key(100);
std::string start = Key(1);
std::string end = Key(3);
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
// File 3
ASSERT_OK(Put(Key(3), "vall"));
ASSERT_OK(Put(Key(100), "val100"));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
// Try to compact keys up
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
start = Key(1);
end = Key(2);
Slice begin_slice(start);
Slice end_slice(end);
ASSERT_OK(db_->CompactRange(cro, &begin_slice, &end_slice));
// Without internal key range checking, we get the following error:
// Corruption: force_consistency_checks(DEBUG): VersionBuilder: L5 has
// overlapping ranges: file #18 largest key: '6B6579303030303033' seq:102,
// type:1 vs. file #15 smallest key: '6B6579303030303033' seq:104, type:1
db_->ReleaseSnapshot(snapshot);
}
class PrecludeLastLevelTest : public DBTestBase {
class PrecludeLastLevelTestBase : public DBTestBase {
public:
PrecludeLastLevelTest(std::string test_name = "preclude_last_level_test")
PrecludeLastLevelTestBase(std::string test_name = "preclude_last_level_test")
: DBTestBase(test_name, /*env_do_fsync=*/false) {
mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
mock_clock_->SetCurrentTime(kMockStartTime);
@ -1334,9 +1272,52 @@ class PrecludeLastLevelTest : public DBTestBase {
});
mock_clock_->SetCurrentTime(kMockStartTime);
}
void ApplyConfigChangeImpl(
bool dynamic, Options* options,
const std::unordered_map<std::string, std::string>& config_change,
const std::unordered_map<std::string, std::string>& db_config_change) {
if (dynamic) {
if (config_change.size() > 0) {
// FIXME: temporary while preserve/preclude options are not user mutable
SaveAndRestore<bool> m(&TEST_allowSetOptionsImmutableInMutable, true);
ASSERT_OK(db_->SetOptions(config_change));
}
if (db_config_change.size() > 0) {
ASSERT_OK(db_->SetDBOptions(db_config_change));
}
} else {
if (config_change.size() > 0) {
ASSERT_OK(GetColumnFamilyOptionsFromMap(
GetStrictConfigOptions(), *options, config_change, options));
}
if (db_config_change.size() > 0) {
ASSERT_OK(GetDBOptionsFromMap(GetStrictConfigOptions(), *options,
db_config_change, options));
}
Reopen(*options);
}
}
};
TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
class PrecludeLastLevelTest : public PrecludeLastLevelTestBase,
public testing::WithParamInterface<bool> {
public:
using PrecludeLastLevelTestBase::PrecludeLastLevelTestBase;
bool UseDynamicConfig() const { return GetParam(); }
void ApplyConfigChange(
Options* options,
const std::unordered_map<std::string, std::string>& config_change,
const std::unordered_map<std::string, std::string>& db_config_change =
{}) {
ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
db_config_change);
}
};
TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
@ -1367,9 +1348,8 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// enable preclude feature
options.preclude_last_level_data_seconds = 10000;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"},
{"last_level_temperature", "kCold"}});
// all data is hot, even they're in the last level
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
@ -1393,7 +1373,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) {
Close();
}
TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
@ -1423,16 +1403,17 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
// all data is pushed to the last level
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// enable preclude feature
options.preclude_last_level_data_seconds = 10000;
options.last_level_temperature = Temperature::kCold;
// enable preclude feature, and...
// make sure it won't trigger Size Amp compaction, unlike normal Size Amp
// compaction which is typically a last level compaction, when tiered Storage
// ("preclude_last_level") is enabled, size amp won't include the last level.
// As the last level would be in cold tier and the size would not be a
// problem, which also avoid frequent hot to cold storage compaction.
options.compaction_options_universal.max_size_amplification_percent = 400;
Reopen(options);
ApplyConfigChange(
&options,
{{"preclude_last_level_data_seconds", "10000"},
{"last_level_temperature", "kCold"},
{"compaction_options_universal.max_size_amplification_percent", "400"}});
// all data is hot, even they're in the last level
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
@ -1464,7 +1445,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) {
Close();
}
TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
TEST_P(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
@ -1512,9 +1493,8 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
}
// enable preclude feature
options.preclude_last_level_data_seconds = 2000;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "2000"},
{"last_level_temperature", "kCold"}});
// Generate a sstable and trigger manual compaction
ASSERT_OK(Put(Key(10), "value"));
@ -1532,7 +1512,7 @@ TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) {
Close();
}
TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) {
TEST_P(PrecludeLastLevelTest, SmallPrecludeTime) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
@ -1585,13 +1565,203 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) {
Close();
}
// Test Param: protection_bytes_per_key for WriteBatch
TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) {
// When compacting keys from the last level to penultimate level,
// output to penultimate level should be within internal key range
// of input files from penultimate level.
// Set up:
// L5:
// File 1: DeleteRange[1, 3)@4, File 2: [3@5, 100@6]
// L6:
// File 3: [2@1, 3@2], File 4: [50@3]
//
// When File 1 and File 3 are being compacted,
// Key(3) cannot be compacted up, otherwise it causes
// inconsistency where File 3's Key(3) has a lower sequence number
// than File 2's Key(3).
const int kNumLevels = 7;
auto options = CurrentOptions();
options.env = mock_env_.get();
options.last_level_temperature = Temperature::kCold;
options.level_compaction_dynamic_level_bytes = true;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
options.max_subcompactions = 10;
options.preserve_internal_time_seconds = 10000;
DestroyAndReopen(options);
// File 3
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(3), "val3"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
// File 4
ASSERT_OK(Put(Key(50), "val50"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}});
const Snapshot* snapshot = db_->GetSnapshot();
// File 1
std::string start = Key(1);
std::string end = Key(3);
ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), start, end));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
// File 2
ASSERT_OK(Put(Key(3), "vall"));
ASSERT_OK(Put(Key(100), "val100"));
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_EQ("0,0,0,0,0,2,2", FilesPerLevel());
auto VerifyLogicalState = [&]() {
// First with snapshot
ASSERT_EQ("val2", Get(Key(2), snapshot));
ASSERT_EQ("val3", Get(Key(3), snapshot));
ASSERT_EQ("val50", Get(Key(50), snapshot));
ASSERT_EQ("NOT_FOUND", Get(Key(100), snapshot));
// Then without snapshot
ASSERT_EQ("NOT_FOUND", Get(Key(2)));
ASSERT_EQ("vall", Get(Key(3)));
ASSERT_EQ("val50", Get(Key(50)));
ASSERT_EQ("val100", Get(Key(100)));
};
VerifyLogicalState();
// Try to compact keys up
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
// Without internal key range checking, we get the following error:
// Corruption: force_consistency_checks(DEBUG): VersionBuilder: L5 has
// overlapping ranges: file #18 largest key: '6B6579303030303033' seq:102,
// type:1 vs. file #15 smallest key: '6B6579303030303033' seq:104, type:1
ASSERT_OK(CompactRange(cro, Key(1), Key(2)));
VerifyLogicalState();
db_->ReleaseSnapshot(snapshot);
Close();
}
TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) {
// Reproducer for issue originally described in
// https://github.com/facebook/rocksdb/pull/9964/files#r1024449523
if (!UseDynamicConfig()) {
// Depends on config change while holding a snapshot
return;
}
const int kNumLevels = 7;
auto options = CurrentOptions();
options.env = mock_env_.get();
options.last_level_temperature = Temperature::kCold;
options.level_compaction_dynamic_level_bytes = true;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
options.max_subcompactions = 10;
options.preserve_internal_time_seconds = 30000;
DestroyAndReopen(options);
// Entries with much older write time
ASSERT_OK(Put(Key(2), "val2"));
ASSERT_OK(Put(Key(6), "val6"));
for (int i = 0; i < 10; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
}
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), Key(1), Key(5)));
ASSERT_OK(Put(Key(1), "val1"));
ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}});
// To exercise the WithinPenultimateLevelOutputRange feature, we want files
// around the middle file to be compacted on the penultimate level
ASSERT_OK(Put(Key(0), "val0"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(3), "val3"));
ASSERT_OK(Flush());
ASSERT_OK(Put(Key(7), "val7"));
// FIXME: ideally this wouldn't be necessary to get a seqno to time entry
// into a later compaction to get data into the last level
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
ASSERT_OK(Flush());
MoveFilesToLevel(5);
ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
auto VerifyLogicalState = [&]() {
// First with snapshot
if (snapshot) {
ASSERT_EQ("NOT_FOUND", Get(Key(0), snapshot));
ASSERT_EQ("NOT_FOUND", Get(Key(1), snapshot));
ASSERT_EQ("val2", Get(Key(2), snapshot));
ASSERT_EQ("NOT_FOUND", Get(Key(3), snapshot));
ASSERT_EQ("val6", Get(Key(6), snapshot));
ASSERT_EQ("NOT_FOUND", Get(Key(7), snapshot));
}
// Then without snapshot
ASSERT_EQ("val0", Get(Key(0)));
ASSERT_EQ("val1", Get(Key(1)));
ASSERT_EQ("NOT_FOUND", Get(Key(2)));
ASSERT_EQ("val3", Get(Key(3)));
ASSERT_EQ("val6", Get(Key(6)));
ASSERT_EQ("val7", Get(Key(7)));
};
VerifyLogicalState();
// Try a limited range compaction
// FIXME: this currently hits the "Unsafe to store Seq later than snapshot"
// error. Needs to work safely for preclude option to be user mutable.
// ASSERT_OK(CompactRange({}, Key(3), Key(4)));
EXPECT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
VerifyLogicalState();
// Compact everything, but some data still goes to both penultimate and last
// levels. A full-range compaction should be safe to "migrate" data from the
// last level to penultimate (because of preclude setting change).
ASSERT_OK(CompactRange({}, {}, {}));
EXPECT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
VerifyLogicalState();
// Key1 should have been migrated out of the last level
auto& meta = *GetLevelFileMetadatas(6)[0];
ASSERT_LT(Key(1), meta.smallest.user_key().ToString());
// Make data eligible for last level
db_->ReleaseSnapshot(snapshot);
snapshot = nullptr;
mock_clock_->MockSleepForSeconds(static_cast<int>(10000));
ASSERT_OK(CompactRange({}, {}, {}));
EXPECT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
VerifyLogicalState();
Close();
}
INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest,
::testing::Bool());
class TimedPutPrecludeLastLevelTest
: public PrecludeLastLevelTest,
: public PrecludeLastLevelTestBase,
public testing::WithParamInterface<size_t> {
public:
TimedPutPrecludeLastLevelTest()
: PrecludeLastLevelTest("timed_put_preclude_last_level_test") {}
: PrecludeLastLevelTestBase("timed_put_preclude_last_level_test") {}
size_t ProtectionBytesPerKey() const { return GetParam(); }
};
TEST_P(TimedPutPrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
@ -1609,7 +1779,7 @@ TEST_P(TimedPutPrecludeLastLevelTest, FastTrackTimedPutToLastLevel) {
options.last_level_temperature = Temperature::kCold;
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();
wo.protection_bytes_per_key = ProtectionBytesPerKey();
Random rnd(301);
@ -1663,7 +1833,7 @@ TEST_P(TimedPutPrecludeLastLevelTest, InterleavedTimedPutAndPut) {
options.default_write_temperature = Temperature::kHot;
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();
wo.protection_bytes_per_key = ProtectionBytesPerKey();
// Start time: kMockStartTime = 10000000;
ASSERT_OK(TimedPut(0, Key(0), "v0", kMockStartTime - 1 * 24 * 60 * 60, wo));
@ -1689,7 +1859,7 @@ TEST_P(TimedPutPrecludeLastLevelTest, PreserveTimedPutOnPenultimateLevel) {
options.default_write_temperature = Temperature::kHot;
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();
wo.protection_bytes_per_key = ProtectionBytesPerKey();
// Creating a snapshot to manually control when preferred sequence number is
// swapped in. An entry's preferred seqno won't get swapped in until it's
@ -1759,7 +1929,7 @@ TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) {
options.table_properties_collector_factories.push_back(factory);
DestroyAndReopen(options);
WriteOptions wo;
wo.protection_bytes_per_key = GetParam();
wo.protection_bytes_per_key = ProtectionBytesPerKey();
Random rnd(301);
@ -1814,7 +1984,7 @@ TEST_P(TimedPutPrecludeLastLevelTest, AutoTriggerCompaction) {
INSTANTIATE_TEST_CASE_P(TimedPutPrecludeLastLevelTest,
TimedPutPrecludeLastLevelTest, ::testing::Values(0, 8));
TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
TEST_P(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
@ -1845,9 +2015,8 @@ TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// enable preclude feature
options.preclude_last_level_data_seconds = 2000;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "2000"},
{"last_level_temperature", "kCold"}});
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
@ -1878,21 +2047,30 @@ TEST_F(PrecludeLastLevelTest, LastLevelOnlyCompactionPartial) {
Close();
}
class PrecludeLastLevelTestWithParms
: public PrecludeLastLevelTest,
public testing::WithParamInterface<bool> {
class PrecludeLastLevelOptionalTest
: public PrecludeLastLevelTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PrecludeLastLevelTestWithParms() : PrecludeLastLevelTest() {}
bool UseDynamicConfig() const { return std::get<0>(GetParam()); }
void ApplyConfigChange(
Options* options,
const std::unordered_map<std::string, std::string>& config_change,
const std::unordered_map<std::string, std::string>& db_config_change =
{}) {
ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
db_config_change);
}
bool EnablePrecludeLastLevel() const { return std::get<1>(GetParam()); }
};
TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
TEST_P(PrecludeLastLevelOptionalTest, LastLevelOnlyCompactionNoPreclude) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kKeyPerSec = 10;
bool enable_preclude_last_level = GetParam();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preserve_internal_time_seconds = 2000;
@ -1950,7 +2128,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
if (enable_preclude_last_level && is_manual_compaction_running) {
if (EnablePrecludeLastLevel() && is_manual_compaction_running) {
ASSERT_TRUE(compaction == nullptr);
verified_compaction_order = true;
} else {
@ -1977,12 +2155,11 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
SyncPoint::GetInstance()->EnableProcessing();
// only enable if the Parameter is true
if (enable_preclude_last_level) {
options.preclude_last_level_data_seconds = 2000;
}
options.max_background_jobs = 8;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
ApplyConfigChange(&options,
{{"preclude_last_level_data_seconds",
EnablePrecludeLastLevel() ? "2000" : "0"},
{"last_level_temperature", "kCold"}},
{{"max_background_jobs", "8"}});
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
@ -2011,7 +2188,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (enable_preclude_last_level) {
if (EnablePrecludeLastLevel()) {
ASSERT_NE("0,0,0,0,0,1,1", FilesPerLevel());
} else {
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
@ -2025,7 +2202,7 @@ TEST_P(PrecludeLastLevelTestWithParms, LastLevelOnlyCompactionNoPreclude) {
Close();
}
TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
TEST_P(PrecludeLastLevelOptionalTest, PeriodicCompactionToPenultimateLevel) {
// Test the last level only periodic compaction should also be blocked by an
// ongoing compaction in penultimate level if tiered compaction is enabled
// otherwise, the periodic compaction should just run for the last level.
@ -2035,8 +2212,6 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
const int kKeyPerSec = 1;
const int kNumKeys = 100;
bool enable_preclude_last_level = GetParam();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preserve_internal_time_seconds = 20000;
@ -2063,12 +2238,11 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
// enable preclude feature
if (enable_preclude_last_level) {
options.preclude_last_level_data_seconds = 20000;
}
options.max_background_jobs = 8;
options.last_level_temperature = Temperature::kCold;
Reopen(options);
ApplyConfigChange(&options,
{{"preclude_last_level_data_seconds",
EnablePrecludeLastLevel() ? "2000" : "0"},
{"last_level_temperature", "kCold"}},
{{"max_background_jobs", "8"}});
std::atomic_bool is_size_ratio_compaction_running = false;
std::atomic_bool verified_last_level_compaction = false;
@ -2093,7 +2267,7 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
auto compaction = static_cast<Compaction*>(arg);
if (is_size_ratio_compaction_running) {
if (enable_preclude_last_level) {
if (EnablePrecludeLastLevel()) {
ASSERT_TRUE(compaction == nullptr);
} else {
ASSERT_TRUE(compaction != nullptr);
@ -2151,8 +2325,10 @@ TEST_P(PrecludeLastLevelTestWithParms, PeriodicCompactionToPenultimateLevel) {
Close();
}
INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTestWithParms,
PrecludeLastLevelTestWithParms, testing::Bool());
INSTANTIATE_TEST_CASE_P(PrecludeLastLevelOptionalTest,
PrecludeLastLevelOptionalTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
// partition the SST into 3 ranges [0, 19] [20, 39] [40, ...]
class ThreeRangesPartitioner : public SstPartitioner {
@ -2196,7 +2372,7 @@ class ThreeRangesPartitionerFactory : public SstPartitionerFactory {
}
};
TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
TEST_P(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kKeyPerSec = 10;
@ -2207,6 +2383,7 @@ TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
Random rnd(301);
@ -2244,11 +2421,10 @@ TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
// L6: [0, 299]
ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel());
ASSERT_OK(options.statistics->Reset());
// enable tiered storage feature
options.preclude_last_level_data_seconds = 10000;
options.last_level_temperature = Temperature::kCold;
options.statistics = CreateDBStatistics();
Reopen(options);
ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"},
{"last_level_temperature", "kCold"}});
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
@ -2294,7 +2470,7 @@ TEST_F(PrecludeLastLevelTest, PartialPenultimateLevelCompaction) {
Close();
}
TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
TEST_P(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
const int kNumLevels = 7;
const int kSecondsPerKey = 10;
const int kNumFiles = 3;
@ -2433,12 +2609,25 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) {
// Tests DBIter::GetProperty("rocksdb.iterator.write-time") return a data's
// approximate write unix time.
// Test Param:
// 1) use tailing iterator or regular iterator (when it applies)
class IteratorWriteTimeTest : public PrecludeLastLevelTest,
public testing::WithParamInterface<bool> {
class IteratorWriteTimeTest
: public PrecludeLastLevelTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
IteratorWriteTimeTest() : PrecludeLastLevelTest("iterator_write_time_test") {}
IteratorWriteTimeTest()
: PrecludeLastLevelTestBase("iterator_write_time_test") {}
bool UseTailingIterator() const { return std::get<0>(GetParam()); }
bool UseDynamicConfig() const { return std::get<1>(GetParam()); }
void ApplyConfigChange(
Options* options,
const std::unordered_map<std::string, std::string>& config_change,
const std::unordered_map<std::string, std::string>& db_config_change =
{}) {
ApplyConfigChangeImpl(UseDynamicConfig(), options, config_change,
db_config_change);
}
uint64_t VerifyKeyAndGetWriteTime(Iterator* iter,
const std::string& expected_key) {
@ -2478,10 +2667,14 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// While there are issues with tracking seqno 0
ASSERT_OK(Delete("something_to_bump_seqno"));
ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
Random rnd(301);
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
@ -2495,7 +2688,7 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
}
ReadOptions ropts;
ropts.tailing = GetParam();
ropts.tailing = UseTailingIterator();
int i;
// Forward iteration
@ -2535,10 +2728,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromMemtables) {
ASSERT_EQ(-1, i);
}
// Reopen the DB and disable the seqno to time recording, data with user
// specified write time can still get a write time before it's flushed.
options.preserve_internal_time_seconds = 0;
Reopen(options);
// Disable the seqno to time recording. Data with user specified write time
// can still get a write time before it's flushed.
ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "0"}});
ASSERT_OK(TimedPut(Key(kKeyWithWriteTime), rnd.RandomString(100),
kUserSpecifiedWriteTime));
{
@ -2574,10 +2766,14 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
// While there are issues with tracking seqno 0
ASSERT_OK(Delete("something_to_bump_seqno"));
ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
Random rnd(301);
for (int i = 0; i < kNumKeys; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
@ -2592,7 +2788,7 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
ASSERT_OK(Flush());
ReadOptions ropts;
ropts.tailing = GetParam();
ropts.tailing = UseTailingIterator();
std::string prop;
int i;
@ -2638,10 +2834,9 @@ TEST_P(IteratorWriteTimeTest, ReadFromSstFile) {
ASSERT_EQ(-1, i);
}
// Reopen the DB and disable the seqno to time recording. Data retrieved from
// SST files still have write time available.
options.preserve_internal_time_seconds = 0;
Reopen(options);
// Disable the seqno to time recording. Data retrieved from SST files still
// have write time available.
ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "0"}});
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
@ -2705,11 +2900,12 @@ TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) {
options.compaction_style = kCompactionStyleUniversal;
options.env = mock_env_.get();
options.level0_file_num_compaction_trigger = kNumTrigger;
options.preserve_internal_time_seconds = 10000;
options.num_levels = kNumLevels;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
ApplyConfigChange(&options, {{"preserve_internal_time_seconds", "10000"}});
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); });
ASSERT_OK(Put("foo", "fv1"));
@ -2720,7 +2916,7 @@ TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) {
ASSERT_OK(Merge("foo", "bv1"));
ReadOptions ropts;
ropts.tailing = GetParam();
ropts.tailing = UseTailingIterator();
{
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));
iter->SeekToFirst();
@ -2738,7 +2934,7 @@ TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) {
}
INSTANTIATE_TEST_CASE_P(IteratorWriteTimeTest, IteratorWriteTimeTest,
testing::Bool());
testing::Combine(testing::Bool(), testing::Bool()));
} // namespace ROCKSDB_NAMESPACE

View File

@ -847,7 +847,7 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
bool mapping_was_empty = false;
std::vector<SuperVersionContext> sv_contexts;
{
InstrumentedMutexLock l(&mutex_);
@ -862,6 +862,7 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
max_preserve_seconds = std::max(preserve_seconds, max_preserve_seconds);
}
}
size_t old_mapping_size = seqno_to_time_mapping_.Size();
if (min_preserve_seconds == std::numeric_limits<uint64_t>::max()) {
// Don't track
seqno_to_time_mapping_.SetCapacity(0);
@ -873,9 +874,17 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
seqno_to_time_mapping_.SetCapacity(cap);
seqno_to_time_mapping_.SetMaxTimeSpan(max_preserve_seconds);
}
mapping_was_empty = seqno_to_time_mapping_.Empty();
if (old_mapping_size != seqno_to_time_mapping_.Size()) {
InstallSeqnoToTimeMappingInSV(&sv_contexts);
}
}
// clean up outside db mutex
for (SuperVersionContext& sv_context : sv_contexts) {
sv_context.Clean();
}
sv_contexts.clear();
uint64_t seqno_time_cadence = 0;
if (min_preserve_seconds != std::numeric_limits<uint64_t>::max()) {
// round up to 1 when the time_duration is smaller than
@ -915,8 +924,6 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
assert(!is_new_db || last_seqno_zero);
if (is_new_db && last_seqno_zero) {
// Pre-allocate seqnos and pre-populate historical mapping
assert(mapping_was_empty);
// We can simply modify these, before writes are allowed
constexpr uint64_t kMax = kMaxSeqnoTimePairsPerSST;
versions_->SetLastAllocatedSequence(kMax);
@ -941,9 +948,10 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
// Pre-populate mappings for reserved sequence numbers.
RecordSeqnoToTimeMapping(max_preserve_seconds);
} else if (mapping_was_empty) {
} else {
if (!last_seqno_zero) {
// Ensure at least one mapping (or log a warning)
// Ensure at least one mapping (or log a warning), and
// an updated entry whenever relevant SetOptions is called
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
} else {
// FIXME (see limitation described above)
@ -1304,6 +1312,12 @@ Status DBImpl::SetOptions(
}
sv_context.Clean();
if (s.ok() && (options_map.count("preserve_internal_time_seconds") > 0 ||
options_map.count("preclude_last_level_data_seconds") > 0)) {
s = RegisterRecordSeqnoTimeWorker(read_options, write_options,
false /* is_new_db*/);
}
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
@ -3479,6 +3493,8 @@ void DBImpl::MultiGetEntityWithCallback(
Status DBImpl::WrapUpCreateColumnFamilies(
const ReadOptions& read_options, const WriteOptions& write_options,
const std::vector<const ColumnFamilyOptions*>& cf_options) {
options_mutex_.AssertHeld();
// NOTE: this function is skipped for create_missing_column_families and
// DB::Open, so new functionality here might need to go into Open also.
bool register_worker = false;
@ -3701,6 +3717,8 @@ Status DBImpl::DropColumnFamilies(
}
Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
options_mutex_.AssertHeld();
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
const WriteOptions write_options;

View File

@ -934,6 +934,13 @@ Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
return s;
}
Status DBTestBase::CompactRange(const CompactRangeOptions& options,
std::optional<Slice> begin,
std::optional<Slice> end) {
return db_->CompactRange(options, begin ? &begin.value() : nullptr,
end ? &end.value() : nullptr);
}
uint64_t DBTestBase::GetNumSnapshots() {
uint64_t int_num;
EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.num-snapshots", &int_num));

View File

@ -1225,6 +1225,9 @@ class DBTestBase : public testing::Test {
const Snapshot* snapshot = nullptr,
const bool async = false);
Status CompactRange(const CompactRangeOptions& options,
std::optional<Slice> begin, std::optional<Slice> end);
uint64_t GetNumSnapshots();
uint64_t GetTimeOldestSnapshots();

View File

@ -29,6 +29,8 @@ struct Options;
// treat errors (e.g. ignore_unknown_objects), the format
// of the serialization (e.g. delimiter), and how to compare
// options (sanity_level).
// NOTE: members of this struct make it potentially problematic for
// static storage duration ("static initialization order fiasco")
struct ConfigOptions {
// Constructs a new ConfigOptions with a new object registry.
// This method should only be used when a DBOptions is not available,

View File

@ -1273,5 +1273,7 @@ std::vector<std::string> TEST_GetImmutableInMutableCFOptions() {
}
return result;
}
bool TEST_allowSetOptionsImmutableInMutable = false;
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE

View File

@ -360,6 +360,7 @@ Status GetMutableOptionsFromStrings(
#ifndef NDEBUG
std::vector<std::string> TEST_GetImmutableInMutableCFOptions();
extern bool TEST_allowSetOptionsImmutableInMutable;
#endif
} // namespace ROCKSDB_NAMESPACE