Make compaction always use the input version with extra ref protection (#12992)

Summary:
`Compaction` is already creating its own ref for the input Version: 4b1d595306/db/compaction/compaction.cc (L73)

And properly Unref it during destruction:
4b1d595306/db/compaction/compaction.cc (L450)

This PR redirects compaction's access of `cfd->current()` to this input `Version`, to prepare for when a column family's data can be replaced all together, and `cfd->current()` is not safe to access for a compaction job. Because a new `Version` with just some other external files could be installed as `cfd->current()`. The compaction job's expectation of the current `Version` and the corresponding storage info to always have its input files will no longer be guaranteed.

My next follow up is to do a similar thing for flush, also to prepare it for when a column family's data can be replaced. I will make it create its own reference of the current `MemTableListVersion` and use it as input, all flush job's access of memtables will be wired to that input `MemTableListVersion`. Similarly this reference will be unreffed during a flush job's destruction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12992

Test Plan: Existing tests

Reviewed By: pdillinger

Differential Revision: D62212625

Pulled By: jowlyzhang

fbshipit-source-id: 9a781213469cf366857a128d50a702af683a046a
This commit is contained in:
Yu Zhang 2024-09-06 14:07:33 -07:00 committed by Facebook GitHub Bot
parent a24574e80a
commit 0c6e9c036a
9 changed files with 39 additions and 37 deletions

View File

@ -1565,28 +1565,6 @@ Status ColumnFamilyData::SetOptions(
return s;
}
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
return Env::WLTH_NOT_SET;
}
if (level == 0) {
return Env::WLTH_MEDIUM;
}
int base_level = current_->storage_info()->base_level();
// L1: medium, L2: long, ...
if (level - base_level >= 2) {
return Env::WLTH_EXTREME;
} else if (level < base_level) {
// There is no restriction which prevents level passed in to be smaller
// than base_level.
return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(
level - base_level + static_cast<int>(Env::WLTH_MEDIUM));
}
Status ColumnFamilyData::AddDirectories(
std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
Status s;

View File

@ -511,8 +511,6 @@ class ColumnFamilyData {
return initial_cf_options_;
}
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
// created_dirs remembers directory created, so that we don't need to call
// the same data creation operation again.
Status AddDirectories(

View File

@ -251,12 +251,13 @@ void CompactionJob::Prepare() {
// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
ColumnFamilyData* cfd = c->column_family_data();
[[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
assert(cfd != nullptr);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
@ -297,8 +298,8 @@ void CompactionJob::Prepare() {
for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp;
Status s =
cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr);
Status s = c->input_version()->GetTableProperties(read_options, &tp,
fmd, nullptr);
if (s.ok()) {
s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
}

View File

@ -261,11 +261,11 @@ Status CompactionServiceCompactionJob::Run() {
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;

View File

@ -1681,7 +1681,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.oldest_ancester_time = current_time;
meta.epoch_number = cfd->NewEpochNumber();
{
auto write_hint = cfd->CalculateSSTWriteHint(0);
auto write_hint =
cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
mutex_.Unlock();
SequenceNumber earliest_write_conflict_snapshot;

View File

@ -861,7 +861,7 @@ Status FlushJob::WriteLevel0Table() {
std::vector<BlobFileAddition> blob_file_additions;
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
auto write_hint = base_->storage_info()->CalculateSSTWriteHint(/*level=*/0);
Env::IOPriority io_priority = GetRateLimiterPriority();
db_mutex_->Unlock();
if (log_buffer_) {

View File

@ -451,7 +451,8 @@ class Repairer {
meta.file_creation_time = current_time;
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
auto write_hint = cfd->CalculateSSTWriteHint(0);
auto write_hint =
cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0);
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter = mem->NewRangeTombstoneIterator(

View File

@ -4919,6 +4919,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun(
return false;
}
Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint(
int level) const {
if (compaction_style_ != kCompactionStyleLevel) {
return Env::WLTH_NOT_SET;
}
if (level == 0) {
return Env::WLTH_MEDIUM;
}
// L1: medium, L2: long, ...
if (level - base_level_ >= 2) {
return Env::WLTH_EXTREME;
} else if (level < base_level_) {
// There is no restriction which prevents level passed in to be smaller
// than base_level.
return Env::WLTH_MEDIUM;
}
return static_cast<Env::WriteLifeTimeHint>(
level - base_level_ + static_cast<int>(Env::WLTH_MEDIUM));
}
void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
std::vector<uint64_t>* live_blob_files) const {
assert(live_table_files);

View File

@ -626,6 +626,8 @@ class VersionStorageInfo {
const Slice& largest_user_key,
int last_level, int last_l0_idx);
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level) const;
private:
void ComputeCompensatedSizes();
void UpdateNumNonEmptyLevels();