mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-29 18:33:58 +00:00
dynamic disable_auto_compactions
Summary: Add more tests as well Test Plan: unit test Reviewers: igor, sdong, yhchiang Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D24747
This commit is contained in:
parent
dc50a1a593
commit
065a67c4f0
|
@ -2323,12 +2323,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||||
} else {
|
} else {
|
||||||
// no need to refcount in iteration since it's always under a mutex
|
// no need to refcount in iteration since it's always under a mutex
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
if (!cfd->options()->disable_auto_compactions) {
|
// Pick up latest mutable CF Options and use it throughout the
|
||||||
|
// compaction job
|
||||||
|
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
||||||
|
if (!mutable_cf_options->disable_auto_compactions) {
|
||||||
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
|
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
|
||||||
// compaction is not necessary. Need to make sure mutex is held
|
// compaction is not necessary. Need to make sure mutex is held
|
||||||
// until we make a copy in the following code
|
// until we make a copy in the following code
|
||||||
c.reset(cfd->PickCompaction(
|
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
|
||||||
*cfd->GetLatestMutableCFOptions(), log_buffer));
|
|
||||||
if (c != nullptr) {
|
if (c != nullptr) {
|
||||||
// update statistics
|
// update statistics
|
||||||
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
|
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
|
||||||
|
|
|
@ -8659,14 +8659,13 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||||
options.env = env_;
|
options.env = env_;
|
||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
options.compression = kNoCompression;
|
options.compression = kNoCompression;
|
||||||
options.max_background_compactions = 4;
|
|
||||||
options.hard_rate_limit = 1.1;
|
options.hard_rate_limit = 1.1;
|
||||||
options.write_buffer_size = k128KB;
|
options.write_buffer_size = k128KB;
|
||||||
options.max_write_buffer_number = 2;
|
options.max_write_buffer_number = 2;
|
||||||
// Compaction related options
|
// Compaction related options
|
||||||
options.level0_file_num_compaction_trigger = 3;
|
options.level0_file_num_compaction_trigger = 3;
|
||||||
options.level0_slowdown_writes_trigger = 10;
|
options.level0_slowdown_writes_trigger = 4;
|
||||||
options.level0_stop_writes_trigger = 20;
|
options.level0_stop_writes_trigger = 8;
|
||||||
options.max_grandparent_overlap_factor = 10;
|
options.max_grandparent_overlap_factor = 10;
|
||||||
options.expanded_compaction_factor = 25;
|
options.expanded_compaction_factor = 25;
|
||||||
options.source_compaction_factor = 1;
|
options.source_compaction_factor = 1;
|
||||||
|
@ -8674,6 +8673,10 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||||
options.target_file_size_multiplier = 1;
|
options.target_file_size_multiplier = 1;
|
||||||
options.max_bytes_for_level_base = k256KB;
|
options.max_bytes_for_level_base = k256KB;
|
||||||
options.max_bytes_for_level_multiplier = 4;
|
options.max_bytes_for_level_multiplier = 4;
|
||||||
|
|
||||||
|
// Block flush thread and disable compaction thread
|
||||||
|
env_->SetBackgroundThreads(1, Env::LOW);
|
||||||
|
env_->SetBackgroundThreads(1, Env::HIGH);
|
||||||
DestroyAndReopen(&options);
|
DestroyAndReopen(&options);
|
||||||
|
|
||||||
auto gen_l0_kb = [this](int start, int size, int stride) {
|
auto gen_l0_kb = [this](int start, int size, int stride) {
|
||||||
|
@ -8745,6 +8748,78 @@ TEST(DBTest, DynamicCompactionOptions) {
|
||||||
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
|
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
|
||||||
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
|
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
|
||||||
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
|
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
|
||||||
|
|
||||||
|
// Clean up memtable and L0
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
// Block compaction
|
||||||
|
SleepingBackgroundTask sleeping_task_low1;
|
||||||
|
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1,
|
||||||
|
Env::Priority::LOW);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||||
|
int count = 0;
|
||||||
|
Random rnd(301);
|
||||||
|
WriteOptions wo;
|
||||||
|
wo.timeout_hint_us = 10000;
|
||||||
|
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
|
||||||
|
// Wait for compaction so that put won't timeout
|
||||||
|
dbfull()->TEST_FlushMemTable(true);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
ASSERT_EQ(count, 8);
|
||||||
|
// Unblock
|
||||||
|
sleeping_task_low1.WakeUp();
|
||||||
|
sleeping_task_low1.WaitUntilDone();
|
||||||
|
|
||||||
|
// Reduce stop trigger
|
||||||
|
ASSERT_TRUE(dbfull()->SetOptions({
|
||||||
|
{"level0_stop_writes_trigger", "6"}
|
||||||
|
}));
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||||
|
|
||||||
|
// Block compaction
|
||||||
|
SleepingBackgroundTask sleeping_task_low2;
|
||||||
|
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2,
|
||||||
|
Env::Priority::LOW);
|
||||||
|
count = 0;
|
||||||
|
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) {
|
||||||
|
// Wait for compaction so that put won't timeout
|
||||||
|
dbfull()->TEST_FlushMemTable(true);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
ASSERT_EQ(count, 6);
|
||||||
|
// Unblock
|
||||||
|
sleeping_task_low2.WakeUp();
|
||||||
|
sleeping_task_low2.WaitUntilDone();
|
||||||
|
|
||||||
|
// Test disable_auto_compactions
|
||||||
|
ASSERT_TRUE(dbfull()->SetOptions({
|
||||||
|
{"disable_auto_compactions", "true"}
|
||||||
|
}));
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
|
||||||
|
// Wait for compaction so that put won't timeout
|
||||||
|
dbfull()->TEST_FlushMemTable(true);
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
|
||||||
|
|
||||||
|
ASSERT_TRUE(dbfull()->SetOptions({
|
||||||
|
{"disable_auto_compactions", "false"}
|
||||||
|
}));
|
||||||
|
dbfull()->CompactRange(nullptr, nullptr);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
|
||||||
|
// Wait for compaction so that put won't timeout
|
||||||
|
dbfull()->TEST_FlushMemTable(true);
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
ASSERT_LT(NumTableFilesAtLevel(0), 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
|
@ -22,6 +22,7 @@ struct MutableCFOptions {
|
||||||
options.memtable_prefix_bloom_huge_page_tlb_size),
|
options.memtable_prefix_bloom_huge_page_tlb_size),
|
||||||
max_successive_merges(options.max_successive_merges),
|
max_successive_merges(options.max_successive_merges),
|
||||||
filter_deletes(options.filter_deletes),
|
filter_deletes(options.filter_deletes),
|
||||||
|
disable_auto_compactions(options.disable_auto_compactions),
|
||||||
level0_file_num_compaction_trigger(
|
level0_file_num_compaction_trigger(
|
||||||
options.level0_file_num_compaction_trigger),
|
options.level0_file_num_compaction_trigger),
|
||||||
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
|
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
|
||||||
|
@ -47,6 +48,7 @@ struct MutableCFOptions {
|
||||||
memtable_prefix_bloom_huge_page_tlb_size(0),
|
memtable_prefix_bloom_huge_page_tlb_size(0),
|
||||||
max_successive_merges(0),
|
max_successive_merges(0),
|
||||||
filter_deletes(false),
|
filter_deletes(false),
|
||||||
|
disable_auto_compactions(false),
|
||||||
level0_file_num_compaction_trigger(0),
|
level0_file_num_compaction_trigger(0),
|
||||||
level0_slowdown_writes_trigger(0),
|
level0_slowdown_writes_trigger(0),
|
||||||
level0_stop_writes_trigger(0),
|
level0_stop_writes_trigger(0),
|
||||||
|
@ -83,6 +85,7 @@ struct MutableCFOptions {
|
||||||
bool filter_deletes;
|
bool filter_deletes;
|
||||||
|
|
||||||
// Compaction related options
|
// Compaction related options
|
||||||
|
bool disable_auto_compactions;
|
||||||
int level0_file_num_compaction_trigger;
|
int level0_file_num_compaction_trigger;
|
||||||
int level0_slowdown_writes_trigger;
|
int level0_slowdown_writes_trigger;
|
||||||
int level0_stop_writes_trigger;
|
int level0_stop_writes_trigger;
|
||||||
|
|
|
@ -103,7 +103,9 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value,
|
||||||
template<typename OptionsType>
|
template<typename OptionsType>
|
||||||
bool ParseCompactionOptions(const std::string& name, const std::string& value,
|
bool ParseCompactionOptions(const std::string& name, const std::string& value,
|
||||||
OptionsType* new_options) {
|
OptionsType* new_options) {
|
||||||
if (name == "level0_file_num_compaction_trigger") {
|
if (name == "disable_auto_compactions") {
|
||||||
|
new_options->disable_auto_compactions = ParseBoolean(name, value);
|
||||||
|
} else if (name == "level0_file_num_compaction_trigger") {
|
||||||
new_options->level0_file_num_compaction_trigger = ParseInt(value);
|
new_options->level0_file_num_compaction_trigger = ParseInt(value);
|
||||||
} else if (name == "level0_slowdown_writes_trigger") {
|
} else if (name == "level0_slowdown_writes_trigger") {
|
||||||
new_options->level0_slowdown_writes_trigger = ParseInt(value);
|
new_options->level0_slowdown_writes_trigger = ParseInt(value);
|
||||||
|
@ -270,8 +272,6 @@ bool GetColumnFamilyOptionsFromMap(
|
||||||
new_options->soft_rate_limit = ParseDouble(o.second);
|
new_options->soft_rate_limit = ParseDouble(o.second);
|
||||||
} else if (o.first == "hard_rate_limit") {
|
} else if (o.first == "hard_rate_limit") {
|
||||||
new_options->hard_rate_limit = ParseDouble(o.second);
|
new_options->hard_rate_limit = ParseDouble(o.second);
|
||||||
} else if (o.first == "disable_auto_compactions") {
|
|
||||||
new_options->disable_auto_compactions = ParseBoolean(o.first, o.second);
|
|
||||||
} else if (o.first == "purge_redundant_kvs_while_flush") {
|
} else if (o.first == "purge_redundant_kvs_while_flush") {
|
||||||
new_options->purge_redundant_kvs_while_flush =
|
new_options->purge_redundant_kvs_while_flush =
|
||||||
ParseBoolean(o.first, o.second);
|
ParseBoolean(o.first, o.second);
|
||||||
|
|
Loading…
Reference in a new issue