Add compaction priority information in RemoteCompaction (#8707)

Summary:
Add compaction priority information in RemoteCompaction, which
can be used to schedule high priority job first.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8707

Test Plan: unittest

Reviewed By: ajkr

Differential Revision: D30548401

Pulled By: jay-zhuang

fbshipit-source-id: b30446511fb31b4583c49edd8565d496cf013a34
This commit is contained in:
Jay Zhuang 2021-09-16 15:08:23 -07:00 committed by Facebook GitHub Bot
parent 64ca0d9b46
commit b97c53b629
4 changed files with 89 additions and 34 deletions

View File

@ -21,6 +21,7 @@
* Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint.
* `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
* Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API

View File

@ -985,7 +985,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact));
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
if (compaction_status != CompactionServiceJobStatus::kSuccess) {

View File

@ -123,8 +123,8 @@ class MyTestCompactionService : public CompactionService,
: db_path_(std::move(db_path)),
options_(options),
statistics_(statistics),
start_info_("na", "na", "na", 0),
wait_info_("na", "na", "na", 0) {}
start_info_("na", "na", "na", 0, Env::TOTAL),
wait_info_("na", "na", "na", 0, Env::TOTAL) {}
static const char* kClassName() { return "MyTestCompactionService"; }
@ -575,36 +575,87 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) {
}
TEST_P(CompactionServiceTest, CompactionInfo) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
std::string start_str = Key(15);
std::string end_str = Key(45);
Slice start(start_str);
Slice end(end_str);
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
VerifyTestData();
// only test compaction info for new compaction service interface
if (GetParam() == MyTestCompactionServiceType) {
auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
ASSERT_EQ(dbname_, info.db_name);
std::string db_id, db_session_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
ASSERT_EQ(db_id, info.db_id);
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ASSERT_EQ(db_session_id, info.db_session_id);
info = cs->GetCompactionInfoForWait();
ASSERT_EQ(dbname_, info.db_name);
ASSERT_EQ(db_id, info.db_id);
ASSERT_EQ(db_session_id, info.db_session_id);
if (GetParam() != MyTestCompactionServiceType) {
return;
}
Options options = CurrentOptions();
ReopenWithCompactionService(&options);
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
auto my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
uint64_t comp_num = my_cs->GetCompactionNum();
ASSERT_GE(comp_num, 1);
CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(dbname_, info.db_name);
std::string db_id, db_session_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
ASSERT_EQ(db_id, info.db_id);
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ASSERT_EQ(db_session_id, info.db_session_id);
ASSERT_EQ(Env::LOW, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(dbname_, info.db_name);
ASSERT_EQ(db_id, info.db_id);
ASSERT_EQ(db_session_id, info.db_session_id);
ASSERT_EQ(Env::LOW, info.priority);
// Test priority USER
ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(&meta);
SstFileMetaData file = meta.levels[1].files[0];
ASSERT_OK(db_->CompactFiles(CompactionOptions(),
{file.db_path + "/" + file.name}, 2));
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(Env::USER, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::USER, info.priority);
// Test priority BOTTOM
env_->SetBackgroundThreads(1, Env::BOTTOM);
options.num_levels = 2;
ReopenWithCompactionService(&options);
my_cs =
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j;
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
int key_id = i * 20 + j * 2;
ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
info = my_cs->GetCompactionInfoForStart();
ASSERT_EQ(Env::BOTTOM, info.priority);
info = my_cs->GetCompactionInfoForWait();
ASSERT_EQ(Env::BOTTOM, info.priority);
}
INSTANTIATE_TEST_CASE_P(

View File

@ -384,13 +384,16 @@ struct CompactionServiceJobInfo {
// `db_session_id` could help you build unique id across
// different DBs and sessions.
// TODO: Add priority information
Env::Priority priority;
CompactionServiceJobInfo(std::string db_name_, std::string db_id_,
std::string db_session_id_, uint64_t job_id_)
std::string db_session_id_, uint64_t job_id_,
Env::Priority priority_)
: db_name(std::move(db_name_)),
db_id(std::move(db_id_)),
db_session_id(std::move(db_session_id_)),
job_id(job_id_) {}
job_id(job_id_),
priority(priority_) {}
};
class CompactionService : public Customizable {