Load latest options from OPTIONS file in Remote host (#13025)

Summary:
We've been serializing and deserializing DBOptions and CFOptions (and other CF into) as part of `CompactionServiceInput`. These are all readily available in the OPTIONS file and the remote worker can read the OPTIONS file to obtain the same information. This helps reducing the size of payload significantly.

In a very rare scenario if the OPTIONS file is purged due to options change by primary host at the same time while the remote host is loading the latest options, it may fail. In this case, we just retry once.

This also solves the problem where we had to open the default CF with the CFOption from another CF if the remote compaction is for a non-default column family. (TODO comment in /db_impl_secondary.cc)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13025

Test Plan:
Unit Tests
```
./compaction_service_test
```
```
./compaction_job_test
```

Also tested with Meta's internal Offload Infra

Reviewed By: anand1976, cbi42

Differential Revision: D63100109

Pulled By: jaykorean

fbshipit-source-id: b7162695e31e2c5a920daa7f432842163a5b156d
This commit is contained in:
Jay Huh 2024-09-20 13:26:02 -07:00 committed by Facebook GitHub Bot
parent 6549b11714
commit 5f4a8c3da4
5 changed files with 93 additions and 112 deletions

View File

@ -377,9 +377,7 @@ class CompactionJob {
// doesn't contain the LSM tree information, which is passed though MANIFEST // doesn't contain the LSM tree information, which is passed though MANIFEST
// file. // file.
struct CompactionServiceInput { struct CompactionServiceInput {
ColumnFamilyDescriptor column_family; std::string cf_name;
DBOptions db_options;
std::vector<SequenceNumber> snapshots; std::vector<SequenceNumber> snapshots;
@ -402,9 +400,6 @@ struct CompactionServiceInput {
static Status Read(const std::string& data_str, CompactionServiceInput* obj); static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output); Status Write(std::string* output);
// Initialize a dummy ColumnFamilyDescriptor
CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
#ifndef NDEBUG #ifndef NDEBUG
bool TEST_Equals(CompactionServiceInput* other); bool TEST_Equals(CompactionServiceInput* other);
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch); bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);

View File

@ -1568,17 +1568,7 @@ TEST_F(CompactionJobTest, InputSerialization) {
const int kStrMaxLen = 1000; const int kStrMaxLen = 1000;
Random rnd(static_cast<uint32_t>(time(nullptr))); Random rnd(static_cast<uint32_t>(time(nullptr)));
Random64 rnd64(time(nullptr)); Random64 rnd64(time(nullptr));
input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen)); input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
input.column_family.options.comparator = ReverseBytewiseComparator();
input.column_family.options.max_bytes_for_level_base =
rnd64.Uniform(UINT64_MAX);
input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
input.column_family.options.compression = kZSTD;
input.column_family.options.compression_opts.level = 4;
input.db_options.max_background_flushes = 10;
input.db_options.paranoid_checks = rnd.OneIn(2);
input.db_options.statistics = CreateDBStatistics();
input.db_options.env = env_;
while (!rnd.OneIn(10)) { while (!rnd.OneIn(10)) {
input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX)); input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
} }
@ -1606,10 +1596,10 @@ TEST_F(CompactionJobTest, InputSerialization) {
ASSERT_TRUE(deserialized1.TEST_Equals(&input)); ASSERT_TRUE(deserialized1.TEST_Equals(&input));
// Test mismatch // Test mismatch
deserialized1.db_options.max_background_flushes += 10; deserialized1.output_level += 10;
std::string mismatch; std::string mismatch;
ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch)); ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
ASSERT_EQ(mismatch, "db_options.max_background_flushes"); ASSERT_EQ(mismatch, "output_level");
// Test unknown field // Test unknown field
CompactionServiceInput deserialized2; CompactionServiceInput deserialized2;

View File

@ -39,12 +39,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
MakeTableFileName(file->fd.GetNumber())); MakeTableFileName(file->fd.GetNumber()));
} }
} }
compaction_input.column_family.name =
compaction->column_family_data()->GetName(); compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);
compaction_input.snapshots = existing_snapshots_; compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start.has_value(); compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin = compaction_input.begin =
@ -70,7 +66,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
db_options_.info_log, db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s", "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_, compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str()); compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_); GetCompactionId(sub_compact), thread_pri_);
@ -84,13 +80,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"CompactionService failed to schedule a remote compaction job."); "CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.", "[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_); compaction->column_family_data()->GetName().c_str(),
job_id_);
return response.status; return response.status;
case CompactionServiceJobStatus::kUseLocal: case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
db_options_.info_log, db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_); compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status; return response.status;
default: default:
assert(false); // unknown status assert(false); // unknown status
@ -99,7 +96,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...", "[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_); compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary; std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status = CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id, db_options_.compaction_service->Wait(response.scheduled_job_id,
@ -109,7 +106,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
db_options_.info_log, db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_); compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status; return compaction_status;
} }
@ -134,9 +131,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"result is returned)."); "result is returned).");
compaction_result.status.PermitUncheckedError(); compaction_result.status.PermitUncheckedError();
} }
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(
"[%s] [JOB %d] Remote compaction failed.", db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_); compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status; return compaction_status;
} }
@ -162,7 +159,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
db_options_.info_log, db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: " "[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s", "%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_, compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str()); compaction_result.output_path.c_str(), output_files_oss.str().c_str());
// Installation Starts // Installation Starts
@ -264,8 +261,8 @@ Status CompactionServiceCompactionJob::Run() {
const VersionStorageInfo* storage_info = c->input_version()->storage_info(); const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info); assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level(); bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin; Slice begin = compaction_input_.begin;
@ -404,42 +401,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
}; };
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = { static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"column_family", {"cf_name",
OptionTypeInfo::Struct( {offsetof(struct CompactionServiceInput, cf_name),
"column_family", &cfd_type_info, OptionType::kEncodedString}},
offsetof(struct CompactionServiceInput, column_family),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"db_options",
{offsetof(struct CompactionServiceInput, db_options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto options = static_cast<DBOptions*>(addr);
return GetDBOptionsFromString(opts, DBOptions(), value, options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto options = static_cast<const DBOptions*>(addr);
std::string result;
auto status = GetStringFromDBOptions(opts, *options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const DBOptions*>(addr1);
const auto that_one = static_cast<const DBOptions*>(addr2);
auto this_conf = DBOptionsAsConfigurable(*this_one);
auto that_conf = DBOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>( {"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots), offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone, OptionVerificationType::kNormal, OptionTypeFlags::kNone,

View File

@ -12,7 +12,8 @@
#include "logging/auto_roll_logger.h" #include "logging/auto_roll_logger.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h" #include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/write_batch_util.h" #include "util/write_batch_util.h"
@ -938,69 +939,103 @@ Status DB::OpenAndCompact(
const std::string& output_directory, const std::string& input, const std::string& output_directory, const std::string& input,
std::string* output, std::string* output,
const CompactionServiceOptionsOverride& override_options) { const CompactionServiceOptionsOverride& override_options) {
// Check for cancellation
if (options.canceled && options.canceled->load(std::memory_order_acquire)) { if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused); return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
} }
// 1. Deserialize Compaction Input
CompactionServiceInput compaction_input; CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input); Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
compaction_input.db_options.max_open_files = -1; // 2. Load the options from latest OPTIONS file
compaction_input.db_options.compaction_service = nullptr; DBOptions db_options;
if (compaction_input.db_options.statistics) { ConfigOptions config_options;
compaction_input.db_options.statistics.reset(); config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
s = LoadLatestOptions(config_options, name, &db_options,
&all_column_families);
// In a very rare scenario, loading options may fail if the options changed by
// the primary host at the same time. Just retry once for now.
if (!s.ok()) {
s = LoadLatestOptions(config_options, name, &db_options,
&all_column_families);
if (!s.ok()) {
return s;
}
} }
compaction_input.db_options.env = override_options.env;
compaction_input.db_options.file_checksum_gen_factory = // 3. Override pointer configurations in DBOptions with
// CompactionServiceOptionsOverride
db_options.env = override_options.env;
db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory; override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics; db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator = db_options.listeners = override_options.listeners;
override_options.comparator; db_options.compaction_service = nullptr;
compaction_input.column_family.options.merge_operator = // We will close the DB after the compaction anyway.
override_options.merge_operator; // Open as many files as needed for the compaction.
compaction_input.column_family.options.compaction_filter = db_options.max_open_files = -1;
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
compaction_input.column_family.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
compaction_input.db_options.listeners = override_options.listeners;
// 4. Filter CFs that are needed for OpenAndCompact()
// We do not need to open all column families for the remote compaction.
// Only open default CF + target CF. If target CF == default CF, we will open
// just the default CF (Due to current limitation, DB cannot open without the
// default CF)
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family); for (auto& cf : all_column_families) {
// TODO: we have to open default CF, because of an implementation limitation, if (cf.name == compaction_input.cf_name) {
// currently we just use the same CF option from input, which is not collect cf.options.comparator = override_options.comparator;
// and open may fail. cf.options.merge_operator = override_options.merge_operator;
if (compaction_input.column_family.name != kDefaultColumnFamilyName) { cf.options.compaction_filter = override_options.compaction_filter;
column_families.emplace_back(kDefaultColumnFamilyName, cf.options.compaction_filter_factory =
compaction_input.column_family.options); override_options.compaction_filter_factory;
cf.options.prefix_extractor = override_options.prefix_extractor;
cf.options.table_factory = override_options.table_factory;
cf.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
cf.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
column_families.emplace_back(cf);
} else if (cf.name == kDefaultColumnFamilyName) {
column_families.emplace_back(cf);
}
} }
// 5. Open db As Secondary
DB* db; DB* db;
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, &handles, &db);
column_families, &handles, &db);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
assert(db);
// 6. Find the handle of the Column Family that this will compact
ColumnFamilyHandle* cfh = nullptr;
for (auto* handle : handles) {
if (compaction_input.cf_name == handle->GetName()) {
cfh = handle;
break;
}
}
assert(cfh);
// 7. Run the compaction without installation.
// Output will be stored in the directory specified by output_directory
CompactionServiceResult compaction_result; CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db); DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0); s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
s = db_secondary->CompactWithoutInstallation( &compaction_result);
options, handles[0], compaction_input, &compaction_result);
// 8. Serialize the result
Status serialization_status = compaction_result.Write(output); Status serialization_status = compaction_result.Write(output);
// 9. Close the db and return
for (auto& handle : handles) { for (auto& handle : handles) {
delete handle; delete handle;
} }

View File

@ -2295,9 +2295,6 @@ struct SizeApproximationOptions {
}; };
struct CompactionServiceOptionsOverride { struct CompactionServiceOptionsOverride {
// Currently pointer configurations are not passed to compaction service
// compaction so the user needs to set it. It will be removed once pointer
// configuration passing is supported.
Env* env = Env::Default(); Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr; std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;