Update APIs to support generic unique identifier format (#12384)

Summary:
The current design proposes using a combination of `job_id`, `db_id`, and `db_session_id` to create a unique identifier for remote compaction jobs. However, this approach may not be suitable for users who prefer a different format for the unique identifier.

At Meta, we are utilizing generic compute offload to offload compaction tasks to remote workers. The compute offload client generates a UUID for each task, which requires an update to the current RocksDB API for onboarding purposes.

Users still have the option to create the unique identifier by combining `job_id`, `db_id`, and `db_session_id` if they prefer.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12384

Test Plan:
```
$> ./compaction_service_test                                                                                                                             13:29:35
[==========] Running 14 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 14 tests from CompactionServiceTest
[ RUN      ] CompactionServiceTest.BasicCompactions
[       OK ] CompactionServiceTest.BasicCompactions (2642 ms)
[ RUN      ] CompactionServiceTest.ManualCompaction
[       OK ] CompactionServiceTest.ManualCompaction (454 ms)
[ RUN      ] CompactionServiceTest.CancelCompactionOnRemoteSide
[       OK ] CompactionServiceTest.CancelCompactionOnRemoteSide (1643 ms)
[ RUN      ] CompactionServiceTest.FailedToStart
[       OK ] CompactionServiceTest.FailedToStart (1332 ms)
[ RUN      ] CompactionServiceTest.InvalidResult
[       OK ] CompactionServiceTest.InvalidResult (1516 ms)
[ RUN      ] CompactionServiceTest.SubCompaction
[       OK ] CompactionServiceTest.SubCompaction (551 ms)
[ RUN      ] CompactionServiceTest.CompactionFilter
[       OK ] CompactionServiceTest.CompactionFilter (563 ms)
[ RUN      ] CompactionServiceTest.Snapshot
[       OK ] CompactionServiceTest.Snapshot (124 ms)
[ RUN      ] CompactionServiceTest.ConcurrentCompaction
[       OK ] CompactionServiceTest.ConcurrentCompaction (660 ms)
[ RUN      ] CompactionServiceTest.CompactionInfo
[       OK ] CompactionServiceTest.CompactionInfo (984 ms)
[ RUN      ] CompactionServiceTest.FallbackLocalAuto
[       OK ] CompactionServiceTest.FallbackLocalAuto (343 ms)
[ RUN      ] CompactionServiceTest.FallbackLocalManual
[       OK ] CompactionServiceTest.FallbackLocalManual (380 ms)
[ RUN      ] CompactionServiceTest.RemoteEventListener
[       OK ] CompactionServiceTest.RemoteEventListener (491 ms)
[ RUN      ] CompactionServiceTest.TablePropertiesCollector
[       OK ] CompactionServiceTest.TablePropertiesCollector (169 ms)
[----------] 14 tests from CompactionServiceTest (11854 ms total)

[----------] Global test environment tear-down
[==========] 14 tests from 1 test case ran. (11855 ms total)
[  PASSED  ] 14 tests.
```

Reviewed By: hx235

Differential Revision: D54220339

Pulled By: jaykorean

fbshipit-source-id: 5a9054f31933d1996adca02082eb37b6d5353224
This commit is contained in:
Jay Huh 2024-03-01 09:55:30 -08:00 committed by Facebook GitHub Bot
parent 4aed229fa7
commit 5bcc184975
4 changed files with 72 additions and 37 deletions

View File

@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
default:
assert(false); // unknown status
break;
@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);
if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}
@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

View File

@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService {
const char* Name() const override { return kClassName(); }
CompactionServiceJobStatus StartV2(
CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
std::string unique_id = Env::Default()->GenerateUniqueId();
jobs_.emplace(unique_id, compaction_service_input);
infos_.emplace(unique_id, info);
CompactionServiceScheduleResponse response(
unique_id, is_override_start_status_
? override_start_status_
: CompactionServiceJobStatus::kSuccess);
return response;
}
CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
std::string* result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
auto job_index = jobs_.find(scheduled_job_id);
if (job_index == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
compaction_input = std::move(job_index->second);
jobs_.erase(job_index);
auto info_index = infos_.find(scheduled_job_id);
if (info_index == infos_.end()) {
return CompactionServiceJobStatus::kFailure;
}
wait_info_ = std::move(info_index->second);
infos_.erase(info_index);
}
if (is_override_wait_status_) {
return override_wait_status_;
}
CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService {
OpenAndCompactOptions options;
options.canceled = &canceled_;
Status s = DB::OpenAndCompact(
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
compaction_input, compaction_service_result, options_override);
Status s =
DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
compaction_input, result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
*result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService {
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
std::map<std::string, std::string> jobs_;
std::map<std::string, CompactionServiceJobInfo> infos_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;

View File

@ -428,6 +428,17 @@ struct CompactionServiceJobInfo {
priority(priority_) {}
};
struct CompactionServiceScheduleResponse {
std::string scheduled_job_id; // Generated outside of primary host, unique
// across different DBs and sessions
CompactionServiceJobStatus status;
CompactionServiceScheduleResponse(std::string scheduled_job_id_,
CompactionServiceJobStatus status_)
: scheduled_job_id(scheduled_job_id_), status(status_) {}
explicit CompactionServiceScheduleResponse(CompactionServiceJobStatus status_)
: status(status_) {}
};
// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
@ -438,6 +449,24 @@ class CompactionService : public Customizable {
// Returns the name of this compaction service.
const char* Name() const override = 0;
// Schedule compaction to be processed remotely.
virtual CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& /*info*/,
const std::string& /*compaction_service_input*/) {
CompactionServiceScheduleResponse response(
CompactionServiceJobStatus::kUseLocal);
return response;
}
// Wait for the scheduled compaction to finish from the remote worker
virtual CompactionServiceJobStatus Wait(
const std::string& /*scheduled_job_id*/, std::string* /*result*/) {
return CompactionServiceJobStatus::kUseLocal;
}
// Deprecated. Please implement Schedule() and Wait() API to handle remote
// compaction
// Start the remote compaction with `compaction_service_input`, which can be
// passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
// information the user might want to know, which includes `job_id`.

View File

@ -0,0 +1 @@
Deprecate experimental Remote Compaction APIs - StartV2() and WaitForCompleteV2() and introduce Schedule() and Wait(). The new APIs essentially does the same thing as the old APIs. They allow taking externally generated unique id to wait for remote compaction to complete.