Preserve Options File (#13074)

Summary:
In https://github.com/facebook/rocksdb/issues/13025 , we made a change to load the latest options file in the remote worker instead of serializing the entire set of options.

That was done under assumption that OPTIONS file do not get purged often. While testing, we learned that this happens more often than we want it to be, so we want to prevent the OPTIONS file from getting purged anytime between when the remote compaction is scheduled and the option is loaded in the remote worker.

Like how we are protecting new SST files from getting purged using `min_pending_output`, we are doing the same by keeping track of `min_options_file_number`. Any OPTIONS file with number greater than `min_options_file_number` will be protected from getting purged. Just like `min_pending_output`, `min_options_file_number` gets bumped when the compaction is done. This is only applicable when `options.compaction_service` is set.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13074

Test Plan:
```
./compaction_service_test --gtest_filter="*PreservedOptionsLocalCompaction*"
./compaction_service_test --gtest_filter="*PreservedOptionsRemoteCompaction*"
```

Reviewed By: anand1976

Differential Revision: D64433795

Pulled By: jaykorean

fbshipit-source-id: 0d902773f0909d9481dec40abf0b4c54ce5e86b2
This commit is contained in:
Jay Huh 2024-10-16 09:22:51 -07:00 committed by Facebook GitHub Bot
parent 55de26580a
commit da5e11310b
10 changed files with 228 additions and 12 deletions

View File

@ -396,6 +396,8 @@ struct CompactionServiceInput {
bool has_end = false;
std::string end;
uint64_t options_file_number;
// serialization interface to read and write the object
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);

View File

@ -48,6 +48,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.has_end = sub_compact->end.has_value();
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
compaction_input.options_file_number =
sub_compact->compaction->input_version()
->version_set()
->options_file_number();
TEST_SYNC_POINT_CALLBACK(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
&compaction_input);
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
@ -438,6 +446,10 @@ static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options_file_number",
{offsetof(struct CompactionServiceInput, options_file_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>

View File

@ -3,9 +3,9 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/options_util.h"
#include "table/unique_id_impl.h"
namespace ROCKSDB_NAMESPACE {
@ -396,6 +396,137 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, PreservedOptionsLocalCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
Random rnd(301);
for (auto i = 0; i < 2; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
std::string options_file_name = OptionsFileName(
dbname_,
compaction->input_version()->version_set()->options_file_number());
// Change option twice to make sure the very first OPTIONS file gets
// purged
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "4"}}));
ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "6"}}));
ASSERT_EQ(6, dbfull()->GetOptions().level0_file_num_compaction_trigger);
dbfull()->TEST_DeleteObsoleteFiles();
// For non-remote compactions, OPTIONS file can be deleted while
// using option at the start of the compaction
Status s = env_->FileExists(options_file_name);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsNotFound());
// Should be old value
ASSERT_EQ(2, compaction->mutable_cf_options()
->level0_file_num_compaction_trigger);
ASSERT_TRUE(dbfull()->min_options_file_numbers_.empty());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.ok());
}
TEST_F(CompactionServiceTest, PreservedOptionsRemoteCompaction) {
// For non-remote compaction do not preserve options file
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();
auto my_cs = GetCompactionService();
Random rnd(301);
for (auto i = 0; i < 2; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
}
bool is_primary_called = false;
// This will be called twice. One from primary and one from remote.
// Try changing the option when called from remote. Otherwise, the new option
// will be used
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", [&](void* /*arg*/) {
if (!is_primary_called) {
is_primary_called = true;
return;
}
// Change the option right before the compaction run
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "4"}}));
ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
dbfull()->TEST_DeleteObsoleteFiles();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
[&](void* arg) {
auto input = static_cast<CompactionServiceInput*>(arg);
std::string options_file_name =
OptionsFileName(dbname_, input->options_file_number);
ASSERT_OK(env_->FileExists(options_file_name));
ASSERT_FALSE(dbfull()->min_options_file_numbers_.empty());
ASSERT_EQ(dbfull()->min_options_file_numbers_.front(),
input->options_file_number);
DBOptions db_options;
ConfigOptions config_options;
std::vector<ColumnFamilyDescriptor> all_column_families;
config_options.env = env_;
ASSERT_OK(LoadOptionsFromFile(config_options, options_file_name,
&db_options, &all_column_families));
bool has_cf = false;
for (auto& cf : all_column_families) {
if (cf.name == input->cf_name) {
// Should be old value
ASSERT_EQ(2, cf.options.level0_file_num_compaction_trigger);
has_cf = true;
}
}
ASSERT_TRUE(has_cf);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
ASSERT_EQ(2, compaction->mutable_cf_options()
->level0_file_num_compaction_trigger);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.ok());
CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}
TEST_F(CompactionServiceTest, CorruptedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;

View File

@ -4779,6 +4779,24 @@ void DBImpl::ReleaseFileNumberFromPendingOutputs(
}
}
std::list<uint64_t>::iterator DBImpl::CaptureOptionsFileNumber() {
// We need to remember the iterator of our insert, because after the
// compaction is done, we need to remove that element from
// min_options_file_numbers_.
min_options_file_numbers_.push_back(versions_->options_file_number());
auto min_options_file_numbers_inserted_elem = min_options_file_numbers_.end();
--min_options_file_numbers_inserted_elem;
return min_options_file_numbers_inserted_elem;
}
void DBImpl::ReleaseOptionsFileNumber(
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
min_options_file_numbers_.erase(*v.get());
v.reset();
}
}
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {

View File

@ -853,6 +853,8 @@ class DBImpl : public DB {
uint64_t GetObsoleteSstFilesSize();
uint64_t MinOptionsFileNumberToKeep();
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
@ -1694,6 +1696,8 @@ class DBImpl : public DB {
friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class CompactionServiceTest_PreservedOptionsLocalCompaction_Test;
friend class CompactionServiceTest_PreservedOptionsRemoteCompaction_Test;
#endif
struct CompactionState;
@ -1965,6 +1969,12 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
// Similar to pending_outputs, preserve OPTIONS file. Used for remote
// compaction.
std::list<uint64_t>::iterator CaptureOptionsFileNumber();
void ReleaseOptionsFileNumber(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
// Sets bg error if there is an error writing to WAL.
IOStatus SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
@ -2756,6 +2766,11 @@ class DBImpl : public DB {
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;
// Similar to pending_outputs_, FindObsoleteFiles()/PurgeObsoleteFiles() never
// deletes any OPTIONS file that has number bigger than any of the file number
// in min_options_file_numbers_.
std::list<uint64_t> min_options_file_numbers_;
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition

View File

@ -1561,6 +1561,12 @@ Status DBImpl::CompactFilesImpl(
compaction_job.Prepare();
std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}
mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
@ -1570,6 +1576,10 @@ Status DBImpl::CompactFilesImpl(
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}
bool compaction_released = false;
Status status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
@ -3911,6 +3921,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare();
std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
@ -3920,6 +3936,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}
status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
io_s = compaction_job.io_status();

View File

@ -43,6 +43,14 @@ uint64_t DBImpl::GetObsoleteSstFilesSize() {
return versions_->GetObsoleteSstFilesSize();
}
uint64_t DBImpl::MinOptionsFileNumberToKeep() {
mutex_.AssertHeld();
if (!min_options_file_numbers_.empty()) {
return *min_options_file_numbers_.begin();
}
return std::numeric_limits<uint64_t>::max();
}
Status DBImpl::DisableFileDeletions() {
Status s;
int my_disable_delete_obsolete_files;
@ -147,6 +155,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// here but later find newer generated unfinalized files while scanning.
job_context->min_pending_output = MinObsoleteSstNumberToKeep();
job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine();
job_context->min_options_file_number = MinOptionsFileNumberToKeep();
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
@ -498,7 +507,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
dbname_);
// File numbers of most recent two OPTIONS file in candidate_files (found in
// previos FindObsoleteFiles(full_scan=true))
// previous FindObsoleteFiles(full_scan=true))
// At this point, there must not be any duplicate file numbers in
// candidate_files.
uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
@ -519,6 +528,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}
}
// For remote compactions, we need to keep OPTIONS file that may get
// referenced by the remote worker
optsfile_num2 = std::min(optsfile_num2, state.min_options_file_number);
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.

View File

@ -951,22 +951,20 @@ Status DB::OpenAndCompact(
return s;
}
// 2. Load the options from latest OPTIONS file
// 2. Load the options
DBOptions db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
s = LoadLatestOptions(config_options, name, &db_options,
&all_column_families);
// In a very rare scenario, loading options may fail if the options changed by
// the primary host at the same time. Just retry once for now.
if (!s.ok()) {
s = LoadLatestOptions(config_options, name, &db_options,
std::string options_file_name =
OptionsFileName(name, compaction_input.options_file_number);
s = LoadOptionsFromFile(config_options, options_file_name, &db_options,
&all_column_families);
if (!s.ok()) {
return s;
}
}
// 3. Override pointer configurations in DBOptions with
// CompactionServiceOptionsOverride

View File

@ -202,6 +202,10 @@ struct JobContext {
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number;
uint64_t pending_manifest_file_number;
// Used for remote compaction. To prevent OPTIONS files from getting
// purged by PurgeObsoleteFiles() of the primary host
uint64_t min_options_file_number;
uint64_t log_number;
uint64_t prev_log_number;

View File

@ -0,0 +1 @@
OPTIONS file to be loaded by remote worker is now preserved so that it does not get purged by the primary host. A similar technique as how we are preserving new SST files from getting purged is used for this. min_options_file_numbers_ is tracked like pending_outputs_ is tracked.