diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 224f4e46f3..dd3b537373 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -377,9 +377,7 @@ class CompactionJob { // doesn't contain the LSM tree information, which is passed though MANIFEST // file. struct CompactionServiceInput { - ColumnFamilyDescriptor column_family; - - DBOptions db_options; + std::string cf_name; std::vector snapshots; @@ -402,9 +400,6 @@ struct CompactionServiceInput { static Status Read(const std::string& data_str, CompactionServiceInput* obj); Status Write(std::string* output); - // Initialize a dummy ColumnFamilyDescriptor - CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {} - #ifndef NDEBUG bool TEST_Equals(CompactionServiceInput* other); bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 8e85a9f96f..e286817e6f 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -1568,17 +1568,7 @@ TEST_F(CompactionJobTest, InputSerialization) { const int kStrMaxLen = 1000; Random rnd(static_cast(time(nullptr))); Random64 rnd64(time(nullptr)); - input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); - input.column_family.options.comparator = ReverseBytewiseComparator(); - input.column_family.options.max_bytes_for_level_base = - rnd64.Uniform(UINT64_MAX); - input.column_family.options.disable_auto_compactions = rnd.OneIn(2); - input.column_family.options.compression = kZSTD; - input.column_family.options.compression_opts.level = 4; - input.db_options.max_background_flushes = 10; - input.db_options.paranoid_checks = rnd.OneIn(2); - input.db_options.statistics = CreateDBStatistics(); - input.db_options.env = env_; + input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); while (!rnd.OneIn(10)) { input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); } @@ -1606,10 +1596,10 @@ TEST_F(CompactionJobTest, InputSerialization) { ASSERT_TRUE(deserialized1.TEST_Equals(&input)); // Test mismatch - deserialized1.db_options.max_background_flushes += 10; + deserialized1.output_level += 10; std::string mismatch; ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch)); - ASSERT_EQ(mismatch, "db_options.max_background_flushes"); + ASSERT_EQ(mismatch, "output_level"); // Test unknown field CompactionServiceInput deserialized2; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index a923e4fcc4..8a8db33627 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -39,12 +39,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( MakeTableFileName(file->fd.GetNumber())); } } - compaction_input.column_family.name = - compaction->column_family_data()->GetName(); - compaction_input.column_family.options = - compaction->column_family_data()->GetLatestCFOptions(); - compaction_input.db_options = - BuildDBOptions(db_options_, mutable_db_options_copy_); + + compaction_input.cf_name = compaction->column_family_data()->GetName(); compaction_input.snapshots = existing_snapshots_; compaction_input.has_begin = sub_compact->start.has_value(); compaction_input.begin = @@ -70,7 +66,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); @@ -84,13 +80,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), + job_id_); return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return response.status; default: assert(false); // unknown status @@ -99,7 +96,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); std::string compaction_result_binary; CompactionServiceJobStatus compaction_status = db_options_.compaction_service->Wait(response.scheduled_job_id, @@ -109,7 +106,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", - compaction_input.column_family.name.c_str(), job_id_); + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -134,9 +131,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "result is returned)."); compaction_result.status.PermitUncheckedError(); } - ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_WARN( + db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.", + compaction->column_family_data()->GetName().c_str(), job_id_); return compaction_status; } @@ -162,7 +159,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( db_options_.info_log, "[%s] [JOB %d] Received remote compaction result, output path: " "%s, files: %s", - compaction_input.column_family.name.c_str(), job_id_, + compaction->column_family_data()->GetName().c_str(), job_id_, compaction_result.output_path.c_str(), output_files_oss.str().c_str()); // Installation Starts @@ -264,8 +261,8 @@ Status CompactionServiceCompactionJob::Run() { const VersionStorageInfo* storage_info = c->input_version()->storage_info(); assert(storage_info); assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); + bottommost_level_ = c->bottommost_level(); Slice begin = compaction_input_.begin; @@ -404,42 +401,9 @@ static std::unordered_map cfd_type_info = { }; static std::unordered_map cs_input_type_info = { - {"column_family", - OptionTypeInfo::Struct( - "column_family", &cfd_type_info, - offsetof(struct CompactionServiceInput, column_family), - OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, - {"db_options", - {offsetof(struct CompactionServiceInput, db_options), - OptionType::kConfigurable, OptionVerificationType::kNormal, - OptionTypeFlags::kNone, - [](const ConfigOptions& opts, const std::string& /*name*/, - const std::string& value, void* addr) { - auto options = static_cast(addr); - return GetDBOptionsFromString(opts, DBOptions(), value, options); - }, - [](const ConfigOptions& opts, const std::string& /*name*/, - const void* addr, std::string* value) { - const auto options = static_cast(addr); - std::string result; - auto status = GetStringFromDBOptions(opts, *options, &result); - *value = "{" + result + "}"; - return status; - }, - [](const ConfigOptions& opts, const std::string& name, const void* addr1, - const void* addr2, std::string* mismatch) { - const auto this_one = static_cast(addr1); - const auto that_one = static_cast(addr2); - auto this_conf = DBOptionsAsConfigurable(*this_one); - auto that_conf = DBOptionsAsConfigurable(*that_one); - std::string mismatch_opt; - bool result = - this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); - if (!result) { - *mismatch = name + "." + mismatch_opt; - } - return result; - }}}, + {"cf_name", + {offsetof(struct CompactionServiceInput, cf_name), + OptionType::kEncodedString}}, {"snapshots", OptionTypeInfo::Vector( offsetof(struct CompactionServiceInput, snapshots), OptionVerificationType::kNormal, OptionTypeFlags::kNone, diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 92944d1181..fb7ea1110e 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -12,7 +12,8 @@ #include "logging/auto_roll_logger.h" #include "logging/logging.h" #include "monitoring/perf_context_imp.h" -#include "rocksdb/configurable.h" +#include "rocksdb/convenience.h" +#include "rocksdb/utilities/options_util.h" #include "util/cast_util.h" #include "util/write_batch_util.h" @@ -938,69 +939,103 @@ Status DB::OpenAndCompact( const std::string& output_directory, const std::string& input, std::string* output, const CompactionServiceOptionsOverride& override_options) { + // Check for cancellation if (options.canceled && options.canceled->load(std::memory_order_acquire)) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + + // 1. Deserialize Compaction Input CompactionServiceInput compaction_input; Status s = CompactionServiceInput::Read(input, &compaction_input); if (!s.ok()) { return s; } - compaction_input.db_options.max_open_files = -1; - compaction_input.db_options.compaction_service = nullptr; - if (compaction_input.db_options.statistics) { - compaction_input.db_options.statistics.reset(); + // 2. Load the options from latest OPTIONS file + DBOptions db_options; + ConfigOptions config_options; + config_options.env = override_options.env; + std::vector all_column_families; + s = LoadLatestOptions(config_options, name, &db_options, + &all_column_families); + // In a very rare scenario, loading options may fail if the options changed by + // the primary host at the same time. Just retry once for now. + if (!s.ok()) { + s = LoadLatestOptions(config_options, name, &db_options, + &all_column_families); + if (!s.ok()) { + return s; + } } - compaction_input.db_options.env = override_options.env; - compaction_input.db_options.file_checksum_gen_factory = + + // 3. Override pointer configurations in DBOptions with + // CompactionServiceOptionsOverride + db_options.env = override_options.env; + db_options.file_checksum_gen_factory = override_options.file_checksum_gen_factory; - compaction_input.db_options.statistics = override_options.statistics; - compaction_input.column_family.options.comparator = - override_options.comparator; - compaction_input.column_family.options.merge_operator = - override_options.merge_operator; - compaction_input.column_family.options.compaction_filter = - override_options.compaction_filter; - compaction_input.column_family.options.compaction_filter_factory = - override_options.compaction_filter_factory; - compaction_input.column_family.options.prefix_extractor = - override_options.prefix_extractor; - compaction_input.column_family.options.table_factory = - override_options.table_factory; - compaction_input.column_family.options.sst_partitioner_factory = - override_options.sst_partitioner_factory; - compaction_input.column_family.options.table_properties_collector_factories = - override_options.table_properties_collector_factories; - compaction_input.db_options.listeners = override_options.listeners; + db_options.statistics = override_options.statistics; + db_options.listeners = override_options.listeners; + db_options.compaction_service = nullptr; + // We will close the DB after the compaction anyway. + // Open as many files as needed for the compaction. + db_options.max_open_files = -1; + // 4. Filter CFs that are needed for OpenAndCompact() + // We do not need to open all column families for the remote compaction. + // Only open default CF + target CF. If target CF == default CF, we will open + // just the default CF (Due to current limitation, DB cannot open without the + // default CF) std::vector column_families; - column_families.push_back(compaction_input.column_family); - // TODO: we have to open default CF, because of an implementation limitation, - // currently we just use the same CF option from input, which is not collect - // and open may fail. - if (compaction_input.column_family.name != kDefaultColumnFamilyName) { - column_families.emplace_back(kDefaultColumnFamilyName, - compaction_input.column_family.options); + for (auto& cf : all_column_families) { + if (cf.name == compaction_input.cf_name) { + cf.options.comparator = override_options.comparator; + cf.options.merge_operator = override_options.merge_operator; + cf.options.compaction_filter = override_options.compaction_filter; + cf.options.compaction_filter_factory = + override_options.compaction_filter_factory; + cf.options.prefix_extractor = override_options.prefix_extractor; + cf.options.table_factory = override_options.table_factory; + cf.options.sst_partitioner_factory = + override_options.sst_partitioner_factory; + cf.options.table_properties_collector_factories = + override_options.table_properties_collector_factories; + column_families.emplace_back(cf); + } else if (cf.name == kDefaultColumnFamilyName) { + column_families.emplace_back(cf); + } } + // 5. Open db As Secondary DB* db; std::vector handles; - - s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, - column_families, &handles, &db); + s = DB::OpenAsSecondary(db_options, name, output_directory, column_families, + &handles, &db); if (!s.ok()) { return s; } + assert(db); + // 6. Find the handle of the Column Family that this will compact + ColumnFamilyHandle* cfh = nullptr; + for (auto* handle : handles) { + if (compaction_input.cf_name == handle->GetName()) { + cfh = handle; + break; + } + } + assert(cfh); + + // 7. Run the compaction without installation. + // Output will be stored in the directory specified by output_directory CompactionServiceResult compaction_result; DBImplSecondary* db_secondary = static_cast_with_check(db); - assert(handles.size() > 0); - s = db_secondary->CompactWithoutInstallation( - options, handles[0], compaction_input, &compaction_result); + s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input, + &compaction_result); + // 8. Serialize the result Status serialization_status = compaction_result.Write(output); + // 9. Close the db and return for (auto& handle : handles) { delete handle; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 507f9bab80..27feadb804 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2295,9 +2295,6 @@ struct SizeApproximationOptions { }; struct CompactionServiceOptionsOverride { - // Currently pointer configurations are not passed to compaction service - // compaction so the user needs to set it. It will be removed once pointer - // configuration passing is supported. Env* env = Env::Default(); std::shared_ptr file_checksum_gen_factory = nullptr;