diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 442eaf8ea7..2411c27aac 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); - CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->StartV2(info, compaction_input_binary); - switch (compaction_status) { + CompactionServiceScheduleResponse response = + db_options_.compaction_service->Schedule(info, compaction_input_binary); + switch (response.status) { case CompactionServiceJobStatus::kSuccess: break; case CompactionServiceJobStatus::kFailure: sub_compact->status = Status::Incomplete( - "CompactionService failed to start compaction job."); + "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; default: assert(false); // unknown status break; @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Waiting for remote compaction...", compaction_input.column_family.name.c_str(), job_id_); std::string compaction_result_binary; - compaction_status = db_options_.compaction_service->WaitForCompleteV2( - info, &compaction_result_binary); + CompactionServiceJobStatus compaction_status = + db_options_.compaction_service->Wait(response.scheduled_job_id, + &compaction_result_binary); if (compaction_status == CompactionServiceJobStatus::kUseLocal) { - ROCKS_LOG_INFO(db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API " - "WaitForComplete.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", + compaction_input.column_family.name.c_str(), job_id_); return compaction_status; } @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1b..3fd6ad83bc 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService { const char* Name() const override { return kClassName(); } - CompactionServiceJobStatus StartV2( + CompactionServiceScheduleResponse Schedule( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); - jobs_.emplace(info.job_id, compaction_service_input); - CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - if (is_override_start_status_) { - return override_start_status_; - } - return s; + std::string unique_id = Env::Default()->GenerateUniqueId(); + jobs_.emplace(unique_id, compaction_service_input); + infos_.emplace(unique_id, info); + CompactionServiceScheduleResponse response( + unique_id, is_override_start_status_ + ? override_start_status_ + : CompactionServiceJobStatus::kSuccess); + return response; } - CompactionServiceJobStatus WaitForCompleteV2( - const CompactionServiceJobInfo& info, - std::string* compaction_service_result) override { + CompactionServiceJobStatus Wait(const std::string& scheduled_job_id, + std::string* result) override { std::string compaction_input; - assert(info.db_name == db_path_); { InstrumentedMutexLock l(&mutex_); - wait_info_ = info; - auto i = jobs_.find(info.job_id); - if (i == jobs_.end()) { + auto job_index = jobs_.find(scheduled_job_id); + if (job_index == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } - compaction_input = std::move(i->second); - jobs_.erase(i); - } + compaction_input = std::move(job_index->second); + jobs_.erase(job_index); + auto info_index = infos_.find(scheduled_job_id); + if (info_index == infos_.end()) { + return CompactionServiceJobStatus::kFailure; + } + wait_info_ = std::move(info_index->second); + infos_.erase(info_index); + } if (is_override_wait_status_) { return override_wait_status_; } - CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService { OpenAndCompactOptions options; options.canceled = &canceled_; - Status s = DB::OpenAndCompact( - options, db_path_, db_path_ + "/" + std::to_string(info.job_id), - compaction_input, compaction_service_result, options_override); + Status s = + DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id, + compaction_input, result, options_override); if (is_override_wait_result_) { - *compaction_service_result = override_wait_result_; + *result = override_wait_result_; } compaction_num_.fetch_add(1); if (s.ok()) { @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService { private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; - std::map jobs_; + std::map jobs_; + std::map infos_; const std::string db_path_; Options options_; std::shared_ptr statistics_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8932075de0..5a8d8a9eef 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo { priority(priority_) {} }; +struct CompactionServiceScheduleResponse { + std::string scheduled_job_id; // Generated outside of primary host, unique + // across different DBs and sessions + CompactionServiceJobStatus status; + CompactionServiceScheduleResponse(std::string scheduled_job_id_, + CompactionServiceJobStatus status_) + : scheduled_job_id(scheduled_job_id_), status(status_) {} + explicit CompactionServiceScheduleResponse(CompactionServiceJobStatus status_) + : status(status_) {} +}; + // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. @@ -438,6 +449,24 @@ class CompactionService : public Customizable { // Returns the name of this compaction service. const char* Name() const override = 0; + // Schedule compaction to be processed remotely. + virtual CompactionServiceScheduleResponse Schedule( + const CompactionServiceJobInfo& /*info*/, + const std::string& /*compaction_service_input*/) { + CompactionServiceScheduleResponse response( + CompactionServiceJobStatus::kUseLocal); + return response; + } + + // Wait for the scheduled compaction to finish from the remote worker + virtual CompactionServiceJobStatus Wait( + const std::string& /*scheduled_job_id*/, std::string* /*result*/) { + return CompactionServiceJobStatus::kUseLocal; + } + + // Deprecated. Please implement Schedule() and Wait() API to handle remote + // compaction + // Start the remote compaction with `compaction_service_input`, which can be // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the // information the user might want to know, which includes `job_id`. diff --git a/unreleased_history/public_api_changes/new_remote_compaction_api.md b/unreleased_history/public_api_changes/new_remote_compaction_api.md new file mode 100644 index 0000000000..eac106c9de --- /dev/null +++ b/unreleased_history/public_api_changes/new_remote_compaction_api.md @@ -0,0 +1 @@ +Deprecate experimental Remote Compaction APIs - StartV2() and WaitForCompleteV2() and introduce Schedule() and Wait(). The new APIs essentially does the same thing as the old APIs. They allow taking externally generated unique id to wait for remote compaction to complete.