diff --git a/HISTORY.md b/HISTORY.md index a38c1fbc7c..5d07ebaad3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ * Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint. * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet. * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp. +* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 7c3a98e32f..3e2ba08e76 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -985,7 +985,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.column_family.name.c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, - GetCompactionId(sub_compact)); + GetCompactionId(sub_compact), thread_pri_); CompactionServiceJobStatus compaction_status = db_options_.compaction_service->StartV2(info, compaction_input_binary); if (compaction_status != CompactionServiceJobStatus::kSuccess) { diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index e2c326be91..4555dba39a 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -123,8 +123,8 @@ class MyTestCompactionService : public CompactionService, : db_path_(std::move(db_path)), options_(options), statistics_(statistics), - start_info_("na", "na", "na", 0), - wait_info_("na", "na", "na", 0) {} + start_info_("na", "na", "na", 0, Env::TOTAL), + wait_info_("na", "na", "na", 0, Env::TOTAL) {} static const char* kClassName() { return "MyTestCompactionService"; } @@ -575,36 +575,87 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) { } TEST_P(CompactionServiceTest, CompactionInfo) { - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - ReopenWithCompactionService(&options); - GenerateTestData(); - - auto my_cs = GetCompactionService(); - - std::string start_str = Key(15); - std::string end_str = Key(45); - Slice start(start_str); - Slice end(end_str); - uint64_t comp_num = my_cs->GetCompactionNum(); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); - ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); - VerifyTestData(); // only test compaction info for new compaction service interface - if (GetParam() == MyTestCompactionServiceType) { - auto cs = static_cast_with_check(my_cs); - CompactionServiceJobInfo info = cs->GetCompactionInfoForStart(); - ASSERT_EQ(dbname_, info.db_name); - std::string db_id, db_session_id; - ASSERT_OK(db_->GetDbIdentity(db_id)); - ASSERT_EQ(db_id, info.db_id); - ASSERT_OK(db_->GetDbSessionId(db_session_id)); - ASSERT_EQ(db_session_id, info.db_session_id); - info = cs->GetCompactionInfoForWait(); - ASSERT_EQ(dbname_, info.db_name); - ASSERT_EQ(db_id, info.db_id); - ASSERT_EQ(db_session_id, info.db_session_id); + if (GetParam() != MyTestCompactionServiceType) { + return; } + + Options options = CurrentOptions(); + ReopenWithCompactionService(&options); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + auto my_cs = + static_cast_with_check(GetCompactionService()); + uint64_t comp_num = my_cs->GetCompactionNum(); + ASSERT_GE(comp_num, 1); + + CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(dbname_, info.db_name); + std::string db_id, db_session_id; + ASSERT_OK(db_->GetDbIdentity(db_id)); + ASSERT_EQ(db_id, info.db_id); + ASSERT_OK(db_->GetDbSessionId(db_session_id)); + ASSERT_EQ(db_session_id, info.db_session_id); + ASSERT_EQ(Env::LOW, info.priority); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(dbname_, info.db_name); + ASSERT_EQ(db_id, info.db_id); + ASSERT_EQ(db_session_id, info.db_session_id); + ASSERT_EQ(Env::LOW, info.priority); + + // Test priority USER + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + SstFileMetaData file = meta.levels[1].files[0]; + ASSERT_OK(db_->CompactFiles(CompactionOptions(), + {file.db_path + "/" + file.name}, 2)); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(Env::USER, info.priority); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(Env::USER, info.priority); + + // Test priority BOTTOM + env_->SetBackgroundThreads(1, Env::BOTTOM); + options.num_levels = 2; + ReopenWithCompactionService(&options); + my_cs = + static_cast_with_check(GetCompactionService()); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + info = my_cs->GetCompactionInfoForStart(); + ASSERT_EQ(Env::BOTTOM, info.priority); + info = my_cs->GetCompactionInfoForWait(); + ASSERT_EQ(Env::BOTTOM, info.priority); } INSTANTIATE_TEST_CASE_P( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3160fec7f7..0550608323 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -384,13 +384,16 @@ struct CompactionServiceJobInfo { // `db_session_id` could help you build unique id across // different DBs and sessions. - // TODO: Add priority information + Env::Priority priority; + CompactionServiceJobInfo(std::string db_name_, std::string db_id_, - std::string db_session_id_, uint64_t job_id_) + std::string db_session_id_, uint64_t job_id_, + Env::Priority priority_) : db_name(std::move(db_name_)), db_id(std::move(db_id_)), db_session_id(std::move(db_session_id_)), - job_id(job_id_) {} + job_id(job_id_), + priority(priority_) {} }; class CompactionService : public Customizable {