Move file tracking in VersionEditHandlerPointInTime to VersionBuilder (#12928)

Summary:
`VersionEditHandlerPointInTime` is tracking found files, missing files, intermediate files in order to decide to build a `Version` on negative edge trigger (transition from valid to invalid) without applying  the current `VersionEdit`.  However, applying `VersionEdit` and check completeness of a `Version` are specialization of `VersionBuilder`.  More importantly, when we augment best efforts recovery to recover not just complete point in time Version but also a prefix of seqno for a point in time Version, such checks need to be duplicated in `VersionEditHandlerPointInTime` and `VersionBuilder`.

To avoid this, this refactor move all the file tracking functionality in `VersionEditHandlerPointInTime` into `VersionBuilder`.  To continue to let `VersionEditHandlerPIT` do the edge trigger check and  build a `Version` before applying the current `VersionEdit`, a suite of APIs to supporting creating a save point and its associated functions are added in `VersionBuilder` to achieve this.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12928

Test Plan: Existing tests

Reviewed By: anand1976

Differential Revision: D61171320

Pulled By: jowlyzhang

fbshipit-source-id: 604f66f8b1e3a3e13da59d8ba357c74e8a366dbc
This commit is contained in:
Yu Zhang 2024-08-12 21:09:37 -07:00 committed by Facebook GitHub Bot
parent c21fe1a47f
commit d458331ee9
6 changed files with 428 additions and 288 deletions

View File

@ -289,10 +289,12 @@ TEST_F(DBFollowerTest, RetryCatchup) {
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"DBImpl::BackgroundCompaction:Start", {"DBImpl::BackgroundCompaction:Start",
"DBImplFollower::TryCatchupWithLeader:Begin2"}, "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", {"VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin1",
"DBImpl::BackgroundCompaction:BeforeCompaction"}, "DBImpl::BackgroundCompaction:BeforeCompaction"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"}, {"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
@ -335,10 +337,12 @@ TEST_F(DBFollowerTest, RetryCatchupManifestRollover) {
SyncPoint::GetInstance()->LoadDependency({ SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"}, {"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"}, {"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1", {"VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin1",
"Leader::Done"}, "Leader::Done"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"}, "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", {"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"}, "Follower::WaitForCatchup:1"},
}); });

View File

@ -29,6 +29,7 @@
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/version_edit_handler.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "port/port.h" #include "port/port.h"
#include "table/table_reader.h" #include "table/table_reader.h"
@ -37,6 +38,25 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class VersionBuilder::Rep { class VersionBuilder::Rep {
class NewestFirstBySeqNo {
public:
bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
assert(lhs);
assert(rhs);
if (lhs->fd.largest_seqno != rhs->fd.largest_seqno) {
return lhs->fd.largest_seqno > rhs->fd.largest_seqno;
}
if (lhs->fd.smallest_seqno != rhs->fd.smallest_seqno) {
return lhs->fd.smallest_seqno > rhs->fd.smallest_seqno;
}
// Break ties by file number
return lhs->fd.GetNumber() > rhs->fd.GetNumber();
}
};
class NewestFirstByEpochNumber { class NewestFirstByEpochNumber {
private: private:
inline static const NewestFirstBySeqNo seqno_cmp; inline static const NewestFirstBySeqNo seqno_cmp;
@ -249,9 +269,9 @@ class VersionBuilder::Rep {
std::unordered_map<uint64_t, int> table_file_levels_; std::unordered_map<uint64_t, int> table_file_levels_;
// Current compact cursors that should be changed after the last compaction // Current compact cursors that should be changed after the last compaction
std::unordered_map<int, InternalKey> updated_compact_cursors_; std::unordered_map<int, InternalKey> updated_compact_cursors_;
NewestFirstByEpochNumber level_zero_cmp_by_epochno_; std::shared_ptr<NewestFirstByEpochNumber> level_zero_cmp_by_epochno_;
NewestFirstBySeqNo level_zero_cmp_by_seqno_; std::shared_ptr<NewestFirstBySeqNo> level_zero_cmp_by_seqno_;
BySmallestKey level_nonzero_cmp_; std::shared_ptr<BySmallestKey> level_nonzero_cmp_;
// Mutable metadata objects for all blob files affected by the series of // Mutable metadata objects for all blob files affected by the series of
// version edits. // version edits.
@ -259,11 +279,23 @@ class VersionBuilder::Rep {
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_; std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
ColumnFamilyData* cfd_;
VersionEditHandler* version_edit_handler_;
bool track_found_and_missing_files_;
// These are only tracked if `track_found_and_missing_files_` are enabled.
std::unordered_set<uint64_t> found_files_;
std::unordered_set<uint64_t> missing_files_;
std::vector<std::string> intermediate_files_;
uint64_t missing_blob_files_high_ = kInvalidBlobFileNumber;
public: public:
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions, Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
TableCache* table_cache, VersionStorageInfo* base_vstorage, TableCache* table_cache, VersionStorageInfo* base_vstorage,
VersionSet* version_set, VersionSet* version_set,
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr) std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
bool track_found_and_missing_files)
: file_options_(file_options), : file_options_(file_options),
ioptions_(ioptions), ioptions_(ioptions),
table_cache_(table_cache), table_cache_(table_cache),
@ -271,11 +303,60 @@ class VersionBuilder::Rep {
version_set_(version_set), version_set_(version_set),
num_levels_(base_vstorage->num_levels()), num_levels_(base_vstorage->num_levels()),
has_invalid_levels_(false), has_invalid_levels_(false),
level_nonzero_cmp_(base_vstorage_->InternalComparator()), level_zero_cmp_by_epochno_(
file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr) { std::make_shared<NewestFirstByEpochNumber>()),
level_zero_cmp_by_seqno_(std::make_shared<NewestFirstBySeqNo>()),
level_nonzero_cmp_(std::make_shared<BySmallestKey>(
base_vstorage_->InternalComparator())),
file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr),
cfd_(cfd),
version_edit_handler_(version_edit_handler),
track_found_and_missing_files_(track_found_and_missing_files) {
assert(ioptions_); assert(ioptions_);
levels_ = new LevelState[num_levels_]; levels_ = new LevelState[num_levels_];
if (track_found_and_missing_files_) {
assert(cfd_);
assert(version_edit_handler_);
}
}
Rep(const Rep& other)
: file_options_(other.file_options_),
ioptions_(other.ioptions_),
table_cache_(other.table_cache_),
base_vstorage_(other.base_vstorage_),
version_set_(other.version_set_),
num_levels_(other.num_levels_),
invalid_level_sizes_(other.invalid_level_sizes_),
has_invalid_levels_(other.has_invalid_levels_),
table_file_levels_(other.table_file_levels_),
updated_compact_cursors_(other.updated_compact_cursors_),
level_zero_cmp_by_epochno_(other.level_zero_cmp_by_epochno_),
level_zero_cmp_by_seqno_(other.level_zero_cmp_by_seqno_),
level_nonzero_cmp_(other.level_nonzero_cmp_),
mutable_blob_file_metas_(other.mutable_blob_file_metas_),
file_metadata_cache_res_mgr_(other.file_metadata_cache_res_mgr_),
cfd_(other.cfd_),
version_edit_handler_(other.version_edit_handler_),
track_found_and_missing_files_(other.track_found_and_missing_files_),
found_files_(other.found_files_),
missing_files_(other.missing_files_),
intermediate_files_(other.intermediate_files_),
missing_blob_files_high_(other.missing_blob_files_high_) {
assert(ioptions_);
levels_ = new LevelState[num_levels_];
for (int level = 0; level < num_levels_; level++) {
levels_[level] = other.levels_[level];
const auto& added = levels_[level].added_files;
for (auto& pair : added) {
RefFile(pair.second);
}
}
if (track_found_and_missing_files_) {
assert(cfd_);
assert(version_edit_handler_);
}
} }
~Rep() { ~Rep() {
@ -289,6 +370,12 @@ class VersionBuilder::Rep {
delete[] levels_; delete[] levels_;
} }
void RefFile(FileMetaData* f) {
assert(f);
assert(f->refs > 0);
f->refs++;
}
void UnrefFile(FileMetaData* f) { void UnrefFile(FileMetaData* f) {
f->refs--; f->refs--;
if (f->refs <= 0) { if (f->refs <= 0) {
@ -397,7 +484,7 @@ class VersionBuilder::Rep {
if (epoch_number_requirement == if (epoch_number_requirement ==
EpochNumberRequirement::kMightMissing) { EpochNumberRequirement::kMightMissing) {
if (!level_zero_cmp_by_seqno_(lhs, rhs)) { if (!level_zero_cmp_by_seqno_->operator()(lhs, rhs)) {
std::ostringstream oss; std::ostringstream oss;
oss << "L0 files are not sorted properly: files #" oss << "L0 files are not sorted properly: files #"
<< lhs->fd.GetNumber() << " with seqnos (largest, smallest) " << lhs->fd.GetNumber() << " with seqnos (largest, smallest) "
@ -429,7 +516,7 @@ class VersionBuilder::Rep {
} }
} }
if (!level_zero_cmp_by_epochno_(lhs, rhs)) { if (!level_zero_cmp_by_epochno_->operator()(lhs, rhs)) {
std::ostringstream oss; std::ostringstream oss;
oss << "L0 files are not sorted properly: files #" oss << "L0 files are not sorted properly: files #"
<< lhs->fd.GetNumber() << " with epoch number " << lhs->fd.GetNumber() << " with epoch number "
@ -458,7 +545,7 @@ class VersionBuilder::Rep {
assert(lhs); assert(lhs);
assert(rhs); assert(rhs);
if (!level_nonzero_cmp_(lhs, rhs)) { if (!level_nonzero_cmp_->operator()(lhs, rhs)) {
std::ostringstream oss; std::ostringstream oss;
oss << 'L' << level << " files are not sorted properly: files #" oss << 'L' << level << " files are not sorted properly: files #"
<< lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber(); << lhs->fd.GetNumber() << ", #" << rhs->fd.GetNumber();
@ -634,7 +721,21 @@ class VersionBuilder::Rep {
mutable_blob_file_metas_.emplace( mutable_blob_file_metas_.emplace(
blob_file_number, MutableBlobFileMetaData(std::move(shared_meta))); blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
return Status::OK(); Status s;
if (track_found_and_missing_files_) {
assert(version_edit_handler_);
s = version_edit_handler_->VerifyBlobFile(cfd_, blob_file_number,
blob_file_addition);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_blob_files_high_ =
std::max(missing_blob_files_high_, blob_file_number);
s = Status::OK();
} else if (!s.ok()) {
return s;
}
}
return s;
} }
Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) { Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
@ -752,6 +853,27 @@ class VersionBuilder::Rep {
table_file_levels_[file_number] = table_file_levels_[file_number] =
VersionStorageInfo::FileLocation::Invalid().GetLevel(); VersionStorageInfo::FileLocation::Invalid().GetLevel();
if (track_found_and_missing_files_) {
assert(version_edit_handler_);
auto fiter = missing_files_.find(file_number);
if (fiter != missing_files_.end()) {
missing_files_.erase(fiter);
} else {
fiter = found_files_.find(file_number);
// Only mark new files added during this catchup attempt for deletion.
// These files were never installed in VersionStorageInfo.
// Already referenced files that are deleted by a VersionEdit will
// be added to the VersionStorageInfo's obsolete files when the old
// version is dereferenced.
if (fiter != found_files_.end()) {
assert(!ioptions_->cf_paths.empty());
intermediate_files_.emplace_back(
MakeTableFileName(ioptions_->cf_paths[0].path, file_number));
found_files_.erase(fiter);
}
}
}
return Status::OK(); return Status::OK();
} }
@ -824,7 +946,27 @@ class VersionBuilder::Rep {
table_file_levels_[file_number] = level; table_file_levels_[file_number] = level;
return Status::OK(); Status s;
if (track_found_and_missing_files_) {
assert(version_edit_handler_);
assert(!ioptions_->cf_paths.empty());
const std::string fpath =
MakeTableFileName(ioptions_->cf_paths[0].path, file_number);
s = version_edit_handler_->VerifyFile(cfd_, fpath, level, meta);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files_.insert(file_number);
if (s.IsCorruption()) {
found_files_.insert(file_number);
}
s = Status::OK();
} else if (!s.ok()) {
return s;
} else {
found_files_.insert(file_number);
}
}
return s;
} }
Status ApplyCompactCursors(int level, Status ApplyCompactCursors(int level,
@ -1148,6 +1290,29 @@ class VersionBuilder::Rep {
} }
} }
bool ContainsCompletePIT() {
assert(track_found_and_missing_files_);
return missing_files_.empty() &&
(missing_blob_files_high_ == kInvalidBlobFileNumber ||
missing_blob_files_high_ < GetMinOldestBlobFileNumber());
}
bool HasMissingFiles() const {
assert(track_found_and_missing_files_);
return !missing_files_.empty() ||
missing_blob_files_high_ != kInvalidBlobFileNumber;
}
std::vector<std::string>& GetAndClearIntermediateFiles() {
assert(track_found_and_missing_files_);
return intermediate_files_;
}
void ClearFoundFiles() {
assert(track_found_and_missing_files_);
found_files_.clear();
}
template <typename Cmp> template <typename Cmp>
void SaveSSTFilesTo(VersionStorageInfo* vstorage, int level, Cmp cmp) const { void SaveSSTFilesTo(VersionStorageInfo* vstorage, int level, Cmp cmp) const {
// Merge the set of added files with the set of pre-existing files. // Merge the set of added files with the set of pre-existing files.
@ -1215,13 +1380,13 @@ class VersionBuilder::Rep {
} }
if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) { if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) {
SaveSSTFilesTo(vstorage, /* level */ 0, level_zero_cmp_by_seqno_); SaveSSTFilesTo(vstorage, /* level */ 0, *level_zero_cmp_by_seqno_);
} else { } else {
SaveSSTFilesTo(vstorage, /* level */ 0, level_zero_cmp_by_epochno_); SaveSSTFilesTo(vstorage, /* level */ 0, *level_zero_cmp_by_epochno_);
} }
for (int level = 1; level < num_levels_; ++level) { for (int level = 1; level < num_levels_; ++level) {
SaveSSTFilesTo(vstorage, level, level_nonzero_cmp_); SaveSSTFilesTo(vstorage, level, *level_nonzero_cmp_);
} }
} }
@ -1369,9 +1534,12 @@ VersionBuilder::VersionBuilder(
const FileOptions& file_options, const ImmutableCFOptions* ioptions, const FileOptions& file_options, const ImmutableCFOptions* ioptions,
TableCache* table_cache, VersionStorageInfo* base_vstorage, TableCache* table_cache, VersionStorageInfo* base_vstorage,
VersionSet* version_set, VersionSet* version_set,
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr) std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
bool track_found_and_missing_files)
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage, : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
version_set, file_metadata_cache_res_mgr)) {} version_set, file_metadata_cache_res_mgr, cfd,
version_edit_handler, track_found_and_missing_files)) {}
VersionBuilder::~VersionBuilder() = default; VersionBuilder::~VersionBuilder() = default;
@ -1399,27 +1567,69 @@ Status VersionBuilder::LoadTableHandlers(
read_options, block_protection_bytes_per_key); read_options, block_protection_bytes_per_key);
} }
uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const { void VersionBuilder::CreateOrReplaceSavePoint() {
return rep_->GetMinOldestBlobFileNumber(); assert(rep_);
savepoint_ = std::move(rep_);
rep_ = std::make_unique<Rep>(*savepoint_);
} }
bool VersionBuilder::ContainsCompletePIT() const {
return rep_->ContainsCompletePIT();
}
bool VersionBuilder::HasMissingFiles() const { return rep_->HasMissingFiles(); }
std::vector<std::string>& VersionBuilder::GetAndClearIntermediateFiles() {
return rep_->GetAndClearIntermediateFiles();
}
void VersionBuilder::ClearFoundFiles() { return rep_->ClearFoundFiles(); }
Status VersionBuilder::SaveSavePointTo(VersionStorageInfo* vstorage) const {
if (!savepoint_) {
return Status::InvalidArgument();
}
return savepoint_->SaveTo(vstorage);
}
Status VersionBuilder::LoadSavePointTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key) {
if (!savepoint_) {
return Status::InvalidArgument();
}
return savepoint_->LoadTableHandlers(
internal_stats, max_threads, prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin,
read_options, block_protection_bytes_per_key);
}
void VersionBuilder::ClearSavePoint() { savepoint_.reset(nullptr); }
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd) ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
bool track_found_and_missing_files)
: version_builder_(new VersionBuilder( : version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->ioptions(), cfd->current()->version_set()->file_options(), cfd->ioptions(),
cfd->table_cache(), cfd->current()->storage_info(), cfd->table_cache(), cfd->current()->storage_info(),
cfd->current()->version_set(), cfd->current()->version_set(),
cfd->GetFileMetadataCacheReservationManager())), cfd->GetFileMetadataCacheReservationManager(), cfd,
version_edit_handler, track_found_and_missing_files)),
version_(cfd->current()) { version_(cfd->current()) {
version_->Ref(); version_->Ref();
} }
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder( BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd, Version* v) ColumnFamilyData* cfd, Version* v, VersionEditHandler* version_edit_handler,
bool track_found_and_missing_files)
: version_builder_(new VersionBuilder( : version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->ioptions(), cfd->current()->version_set()->file_options(), cfd->ioptions(),
cfd->table_cache(), v->storage_info(), v->version_set(), cfd->table_cache(), v->storage_info(), v->version_set(),
cfd->GetFileMetadataCacheReservationManager())), cfd->GetFileMetadataCacheReservationManager(), cfd,
version_edit_handler, track_found_and_missing_files)),
version_(v) { version_(v) {
assert(version_ != cfd->current()); assert(version_ != cfd->current());
} }

View File

@ -26,6 +26,7 @@ struct FileMetaData;
class InternalStats; class InternalStats;
class Version; class Version;
class VersionSet; class VersionSet;
class VersionEditHandler;
class ColumnFamilyData; class ColumnFamilyData;
class CacheReservationManager; class CacheReservationManager;
@ -38,22 +39,71 @@ class VersionBuilder {
const ImmutableCFOptions* ioptions, TableCache* table_cache, const ImmutableCFOptions* ioptions, TableCache* table_cache,
VersionStorageInfo* base_vstorage, VersionSet* version_set, VersionStorageInfo* base_vstorage, VersionSet* version_set,
std::shared_ptr<CacheReservationManager> std::shared_ptr<CacheReservationManager>
file_metadata_cache_res_mgr = nullptr); file_metadata_cache_res_mgr = nullptr,
ColumnFamilyData* cfd = nullptr,
VersionEditHandler* version_edit_handler = nullptr,
bool track_found_and_missing_files = false);
~VersionBuilder(); ~VersionBuilder();
bool CheckConsistencyForNumLevels(); bool CheckConsistencyForNumLevels();
Status Apply(const VersionEdit* edit); Status Apply(const VersionEdit* edit);
// Save the current Version to the provided `vstorage`.
Status SaveTo(VersionStorageInfo* vstorage) const; Status SaveTo(VersionStorageInfo* vstorage) const;
// Load all the table handlers for the current Version in the builder.
Status LoadTableHandlers( Status LoadTableHandlers(
InternalStats* internal_stats, int max_threads, InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load, bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor, const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options, size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key); uint8_t block_protection_bytes_per_key);
uint64_t GetMinOldestBlobFileNumber() const;
//============APIs only used by VersionEditHandlerPointInTime ============//
// Creates a save point for the Version that has been built so far. Subsequent
// VersionEdits applied to the builder will not affect the Version in this
// save point. VersionBuilder currently only supports creating one save point,
// so when `CreateOrReplaceSavePoint` is called again, the previous save point
// is cleared. `CreateOrReplaceSavePoint` can be called explicitly to clear
// the save point too.
void CreateOrReplaceSavePoint();
// The builder can find all the files to build a `Version`.
bool ContainsCompletePIT() const;
bool HasMissingFiles() const;
// When applying a sequence of VersionEdit, intermediate files are the ones
// that are added and then deleted. The caller should clear this intermediate
// files tracking after calling this API. So that the tracking for subsequent
// VersionEdits can start over with a clean state.
std::vector<std::string>& GetAndClearIntermediateFiles();
// Clearing all the found files in this Version.
void ClearFoundFiles();
// Save the Version in the save point to the provided `vstorage`.
// Non-OK status will be returned if there is not a valid save point.
Status SaveSavePointTo(VersionStorageInfo* vstorage) const;
// Load all the table handlers for the Version in the save point.
// Non-OK status will be returned if there is not a valid save point.
Status LoadSavePointTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key);
void ClearSavePoint();
//======= End of APIs only used by VersionEditPointInTime==========//
private: private:
class Rep; class Rep;
std::unique_ptr<Rep> savepoint_;
std::unique_ptr<Rep> rep_; std::unique_ptr<Rep> rep_;
}; };
@ -62,8 +112,13 @@ class VersionBuilder {
// Both of the constructor and destructor need to be called inside DB Mutex. // Both of the constructor and destructor need to be called inside DB Mutex.
class BaseReferencedVersionBuilder { class BaseReferencedVersionBuilder {
public: public:
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd); explicit BaseReferencedVersionBuilder(
BaseReferencedVersionBuilder(ColumnFamilyData* cfd, Version* v); ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler = nullptr,
bool track_found_and_missing_files = false);
BaseReferencedVersionBuilder(
ColumnFamilyData* cfd, Version* v,
VersionEditHandler* version_edit_handler = nullptr,
bool track_found_and_missing_files = false);
~BaseReferencedVersionBuilder(); ~BaseReferencedVersionBuilder();
VersionBuilder* version_builder() const { return version_builder_.get(); } VersionBuilder* version_builder() const { return version_builder_.get(); }
@ -71,23 +126,4 @@ class BaseReferencedVersionBuilder {
std::unique_ptr<VersionBuilder> version_builder_; std::unique_ptr<VersionBuilder> version_builder_;
Version* version_; Version* version_;
}; };
class NewestFirstBySeqNo {
public:
bool operator()(const FileMetaData* lhs, const FileMetaData* rhs) const {
assert(lhs);
assert(rhs);
if (lhs->fd.largest_seqno != rhs->fd.largest_seqno) {
return lhs->fd.largest_seqno > rhs->fd.largest_seqno;
}
if (lhs->fd.smallest_seqno != rhs->fd.smallest_seqno) {
return lhs->fd.smallest_seqno > rhs->fd.smallest_seqno;
}
// Break ties by file number
return lhs->fd.GetNumber() > rhs->fd.GetNumber();
}
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -218,15 +218,15 @@ Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit, Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
ColumnFamilyData** cfd) { ColumnFamilyData** cfd) {
bool cf_in_not_found = false; bool do_not_open_cf = false;
bool cf_in_builders = false; bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
assert(cfd != nullptr); assert(cfd != nullptr);
*cfd = nullptr; *cfd = nullptr;
const std::string& cf_name = edit.GetColumnFamilyName(); const std::string& cf_name = edit.GetColumnFamilyName();
Status s; Status s;
if (cf_in_builders || cf_in_not_found) { if (cf_in_builders || do_not_open_cf) {
s = Status::Corruption("MANIFEST adding the same column family twice: " + s = Status::Corruption("MANIFEST adding the same column family twice: " +
cf_name); cf_name);
} }
@ -239,7 +239,7 @@ Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
cf_name.compare(kPersistentStatsColumnFamilyName) == 0; cf_name.compare(kPersistentStatsColumnFamilyName) == 0;
if (cf_options == name_to_options_.end() && if (cf_options == name_to_options_.end() &&
!is_persistent_stats_column_family) { !is_persistent_stats_column_family) {
column_families_not_found_.emplace(edit.GetColumnFamily(), cf_name); do_not_open_column_families_.emplace(edit.GetColumnFamily(), cf_name);
} else { } else {
if (is_persistent_stats_column_family) { if (is_persistent_stats_column_family) {
ColumnFamilyOptions cfo; ColumnFamilyOptions cfo;
@ -256,9 +256,9 @@ Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit, Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
ColumnFamilyData** cfd) { ColumnFamilyData** cfd) {
bool cf_in_not_found = false; bool do_not_open_cf = false;
bool cf_in_builders = false; bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
assert(cfd != nullptr); assert(cfd != nullptr);
*cfd = nullptr; *cfd = nullptr;
@ -266,8 +266,8 @@ Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
Status s; Status s;
if (cf_in_builders) { if (cf_in_builders) {
tmp_cfd = DestroyCfAndCleanup(edit); tmp_cfd = DestroyCfAndCleanup(edit);
} else if (cf_in_not_found) { } else if (do_not_open_cf) {
column_families_not_found_.erase(edit.GetColumnFamily()); do_not_open_column_families_.erase(edit.GetColumnFamily());
} else { } else {
s = Status::Corruption("MANIFEST - dropping non-existing column family"); s = Status::Corruption("MANIFEST - dropping non-existing column family");
} }
@ -288,22 +288,20 @@ Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) {
Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
ColumnFamilyData** cfd) { ColumnFamilyData** cfd) {
bool cf_in_not_found = false; bool do_not_open_cf = false;
bool cf_in_builders = false; bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders); CheckColumnFamilyId(edit, &do_not_open_cf, &cf_in_builders);
assert(cfd != nullptr); assert(cfd != nullptr);
*cfd = nullptr; *cfd = nullptr;
Status s; Status s;
if (!cf_in_not_found) { if (!do_not_open_cf) {
if (!cf_in_builders) { if (!cf_in_builders) {
s = Status::Corruption( s = Status::Corruption(
"MANIFEST record referencing unknown column family"); "MANIFEST record referencing unknown column family");
} }
ColumnFamilyData* tmp_cfd = nullptr; ColumnFamilyData* tmp_cfd = nullptr;
if (s.ok()) { if (s.ok()) {
auto builder_iter = builders_.find(edit.GetColumnFamily());
assert(builder_iter != builders_.end());
tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
edit.GetColumnFamily()); edit.GetColumnFamily());
assert(tmp_cfd != nullptr); assert(tmp_cfd != nullptr);
@ -318,56 +316,33 @@ Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false); s = MaybeCreateVersionBeforeApplyEdit(edit, tmp_cfd,
if (s.ok()) { /*force_create_version=*/false);
s = builder_iter->second->version_builder()->Apply(&edit);
}
} }
*cfd = tmp_cfd; *cfd = tmp_cfd;
} }
return s; return s;
} }
// TODO maybe cache the computation result
bool VersionEditHandler::HasMissingFiles() const {
bool ret = false;
for (const auto& elem : cf_to_missing_files_) {
const auto& missing_files = elem.second;
if (!missing_files.empty()) {
ret = true;
break;
}
}
if (!ret) {
for (const auto& elem : cf_to_missing_blob_files_high_) {
if (elem.second != kInvalidBlobFileNumber) {
ret = true;
break;
}
}
}
return ret;
}
void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit, void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
bool* cf_in_not_found, bool* do_not_open_cf,
bool* cf_in_builders) const { bool* cf_in_builders) const {
assert(cf_in_not_found != nullptr); assert(do_not_open_cf != nullptr);
assert(cf_in_builders != nullptr); assert(cf_in_builders != nullptr);
// Not found means that user didn't supply that column // Not found means that user didn't supply that column
// family option AND we encountered column family add // family option AND we encountered column family add
// record. Once we encounter column family drop record, // record. Once we encounter column family drop record,
// we will delete the column family from // we will delete the column family from
// column_families_not_found. // do_not_open_column_families_.
uint32_t cf_id = edit.GetColumnFamily(); uint32_t cf_id = edit.GetColumnFamily();
bool in_not_found = column_families_not_found_.find(cf_id) != bool in_do_not_open = do_not_open_column_families_.find(cf_id) !=
column_families_not_found_.end(); do_not_open_column_families_.end();
// in builders means that user supplied that column family // in builders means that user supplied that column family
// option AND that we encountered column family add record // option AND that we encountered column family add record
bool in_builders = builders_.find(cf_id) != builders_.end(); bool in_builders = builders_.find(cf_id) != builders_.end();
// They cannot both be true // They cannot both be true
assert(!(in_not_found && in_builders)); assert(!(in_do_not_open && in_builders));
*cf_in_not_found = in_not_found; *do_not_open_cf = in_do_not_open;
*cf_in_builders = in_builders; *cf_in_builders = in_builders;
} }
@ -396,9 +371,9 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
// There were some column families in the MANIFEST that weren't specified // There were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode // in the argument. This is OK in read_only mode
if (s->ok() && MustOpenAllColumnFamilies() && if (s->ok() && MustOpenAllColumnFamilies() &&
!column_families_not_found_.empty()) { !do_not_open_column_families_.empty()) {
std::string msg; std::string msg;
for (const auto& cf : column_families_not_found_) { for (const auto& cf : do_not_open_column_families_) {
msg.append(", "); msg.append(", ");
msg.append(cf.second); msg.append(cf.second);
} }
@ -453,7 +428,8 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
} }
assert(cfd->initialized()); assert(cfd->initialized());
VersionEdit edit; VersionEdit edit;
*s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true); *s = MaybeCreateVersionBeforeApplyEdit(edit, cfd,
/*force_create_version=*/true);
if (!s->ok()) { if (!s->ok()) {
break; break;
} }
@ -498,13 +474,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
assert(cfd != nullptr); assert(cfd != nullptr);
cfd->set_initialized(); cfd->set_initialized();
assert(builders_.find(cf_id) == builders_.end()); assert(builders_.find(cf_id) == builders_.end());
builders_.emplace(cf_id, builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(
VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd))); cfd, this, track_found_and_missing_files_)));
if (track_found_and_missing_files_) {
cf_to_found_files_.emplace(cf_id, std::unordered_set<uint64_t>());
cf_to_missing_files_.emplace(cf_id, std::unordered_set<uint64_t>());
cf_to_missing_blob_files_high_.emplace(cf_id, kInvalidBlobFileNumber);
}
return cfd; return cfd;
} }
@ -514,21 +485,6 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
auto builder_iter = builders_.find(cf_id); auto builder_iter = builders_.find(cf_id);
assert(builder_iter != builders_.end()); assert(builder_iter != builders_.end());
builders_.erase(builder_iter); builders_.erase(builder_iter);
if (track_found_and_missing_files_) {
auto found_files_iter = cf_to_found_files_.find(cf_id);
assert(found_files_iter != cf_to_found_files_.end());
cf_to_found_files_.erase(found_files_iter);
auto missing_files_iter = cf_to_missing_files_.find(cf_id);
assert(missing_files_iter != cf_to_missing_files_.end());
cf_to_missing_files_.erase(missing_files_iter);
auto missing_blob_files_high_iter =
cf_to_missing_blob_files_high_.find(cf_id);
assert(missing_blob_files_high_iter !=
cf_to_missing_blob_files_high_.end());
cf_to_missing_blob_files_high_.erase(missing_blob_files_high_iter);
}
ColumnFamilyData* ret = ColumnFamilyData* ret =
version_set_->GetColumnFamilySet()->GetColumnFamily(cf_id); version_set_->GetColumnFamilySet()->GetColumnFamily(cf_id);
assert(ret != nullptr); assert(ret != nullptr);
@ -538,15 +494,14 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
return ret; return ret;
} }
Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, Status VersionEditHandler::MaybeCreateVersionBeforeApplyEdit(
ColumnFamilyData* cfd, const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
bool force_create_version) {
assert(cfd->initialized()); assert(cfd->initialized());
Status s; Status s;
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
if (force_create_version) { if (force_create_version) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* v = new Version(cfd, version_set_, version_set_->file_options_, auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(), io_tracer_, *cfd->GetLatestMutableCFOptions(), io_tracer_,
version_set_->current_version_number_++, version_set_->current_version_number_++,
@ -562,6 +517,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
delete v; delete v;
} }
} }
s = builder->Apply(&edit);
return s; return s;
} }
@ -762,7 +718,8 @@ Status VersionEditHandlerPointInTime::OnAtomicGroupReplayBegin() {
assert(!cfd->IsDropped()); assert(!cfd->IsDropped());
assert(cfd->initialized()); assert(cfd->initialized());
VersionEdit edit; VersionEdit edit;
Status s = MaybeCreateVersion(edit, cfd, true /* force_create_version */); Status s = MaybeCreateVersionBeforeApplyEdit(
edit, cfd, true /* force_create_version */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -824,17 +781,17 @@ void VersionEditHandlerPointInTime::CheckIterationResult(
} }
assert(cfd->initialized()); assert(cfd->initialized());
auto v_iter = versions_.find(cfd->GetID()); auto v_iter = versions_.find(cfd->GetID());
auto builder_iter = builders_.find(cfd->GetID());
if (v_iter != versions_.end()) { if (v_iter != versions_.end()) {
assert(v_iter->second != nullptr); assert(v_iter->second != nullptr);
assert(builder_iter != builders_.end());
version_set_->AppendVersion(cfd, v_iter->second); version_set_->AppendVersion(cfd, v_iter->second);
versions_.erase(v_iter); versions_.erase(v_iter);
// Let's clear found_files, since any files in that are part of the // Let's clear found_files, since any files in that are part of the
// installed Version. Any files that got obsoleted would have already // installed Version. Any files that got obsoleted would have already
// been moved to intermediate_files_ // been moved to intermediate_files_
auto found_files_iter = cf_to_found_files_.find(cfd->GetID()); builder_iter->second->version_builder()->ClearFoundFiles();
assert(found_files_iter != cf_to_found_files_.end());
found_files_iter->second.clear();
} }
} }
} else { } else {
@ -863,147 +820,50 @@ ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
return cfd; return cfd;
} }
Status VersionEditHandlerPointInTime::MaybeCreateVersion( Status VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit(
const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) { const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1"); TEST_SYNC_POINT(
TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"); "VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin1");
TEST_SYNC_POINT(
"VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit:"
"Begin2");
assert(cfd != nullptr); assert(cfd != nullptr);
if (!force_create_version) { if (!force_create_version) {
assert(edit.GetColumnFamily() == cfd->GetID()); assert(edit.GetColumnFamily() == cfd->GetID());
} }
auto found_files_iter = cf_to_found_files_.find(cfd->GetID());
assert(found_files_iter != cf_to_found_files_.end());
std::unordered_set<uint64_t>& found_files = found_files_iter->second;
auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
assert(missing_files_iter != cf_to_missing_files_.end());
std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
auto missing_blob_files_high_iter =
cf_to_missing_blob_files_high_.find(cfd->GetID());
assert(missing_blob_files_high_iter != cf_to_missing_blob_files_high_.end());
const uint64_t prev_missing_blob_file_high =
missing_blob_files_high_iter->second;
VersionBuilder* builder = nullptr;
if (prev_missing_blob_file_high != kInvalidBlobFileNumber) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
builder = builder_iter->second->version_builder();
assert(builder != nullptr);
}
// At this point, we have not yet applied the new version edits read from the
// MANIFEST. We check whether we have any missing table and blob files.
const bool prev_has_missing_files =
!missing_files.empty() ||
(prev_missing_blob_file_high != kInvalidBlobFileNumber &&
prev_missing_blob_file_high >= builder->GetMinOldestBlobFileNumber());
for (const auto& file : edit.GetDeletedFiles()) {
uint64_t file_num = file.second;
auto fiter = missing_files.find(file_num);
if (fiter != missing_files.end()) {
missing_files.erase(fiter);
} else {
fiter = found_files.find(file_num);
// Only mark new files added during this catchup attempt for deletion.
// These files were never installed in VersionStorageInfo.
// Already referenced files that are deleted by a VersionEdit will
// be added to the VersionStorageInfo's obsolete files when the old
// version is dereferenced.
if (fiter != found_files.end()) {
intermediate_files_.emplace_back(
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num));
found_files.erase(fiter);
}
}
}
assert(!cfd->ioptions()->cf_paths.empty());
Status s;
for (const auto& elem : edit.GetNewFiles()) {
int level = elem.first;
const FileMetaData& meta = elem.second;
const FileDescriptor& fd = meta.fd;
uint64_t file_num = fd.GetNumber();
const std::string fpath =
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
s = VerifyFile(cfd, fpath, level, meta);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files.insert(file_num);
if (s.IsCorruption()) {
found_files.insert(file_num);
}
s = Status::OK();
} else if (!s.ok()) {
break;
} else {
found_files.insert(file_num);
}
}
uint64_t missing_blob_file_num = prev_missing_blob_file_high;
for (const auto& elem : edit.GetBlobFileAdditions()) {
uint64_t file_num = elem.GetBlobFileNumber();
s = VerifyBlobFile(cfd, file_num, elem);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_blob_file_num = std::max(missing_blob_file_num, file_num);
s = Status::OK();
} else if (!s.ok()) {
break;
}
}
bool has_missing_blob_files = false;
if (missing_blob_file_num != kInvalidBlobFileNumber &&
missing_blob_file_num >= prev_missing_blob_file_high) {
missing_blob_files_high_iter->second = missing_blob_file_num;
has_missing_blob_files = true;
} else if (missing_blob_file_num < prev_missing_blob_file_high) {
assert(false);
}
// We still have not applied the new version edit, but have tried to add new
// table and blob files after verifying their presence and consistency.
// Therefore, we know whether we will see new missing table and blob files
// later after actually applying the version edit. We perform the check here
// and record the result.
const bool has_missing_files =
!missing_files.empty() || has_missing_blob_files;
bool missing_info = !version_edit_params_.HasLogNumber() || bool missing_info = !version_edit_params_.HasLogNumber() ||
!version_edit_params_.HasNextFile() || !version_edit_params_.HasNextFile() ||
!version_edit_params_.HasLastSequence(); !version_edit_params_.HasLastSequence();
// Create version before apply edit. The version will represent the state Status s;
// before applying the version edit. auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
VersionBuilder* builder = builder_iter->second->version_builder();
const bool valid_pit_before_edit = builder->ContainsCompletePIT();
builder->CreateOrReplaceSavePoint();
s = builder->Apply(&edit);
const bool valid_pit_after_edit = builder->ContainsCompletePIT();
// A new version will be created if: // A new version will be created if:
// 1) no error has occurred so far, and // 1) no error has occurred so far, and
// 2) log_number_, next_file_number_ and last_sequence_ are known, and // 2) log_number_, next_file_number_ and last_sequence_ are known, and
// 3) not in an AtomicGroup // 3) not in an AtomicGroup
// 4) any of the following: // 4) any of the following:
// a) no missing file before, but will have missing file(s) after applying // a) a complete point in time view is available before applying the edit
// this version edit. // and a complete point in time view is not available after the edit.
// b) no missing file after applying the version edit, and the caller // b) a complete point in time view is available after the edit and the
// explicitly request that a new version be created. // caller explicitly request that a new version be created.
if (s.ok() && !missing_info && !in_atomic_group_ && if (s.ok() && !missing_info && !in_atomic_group_ &&
((has_missing_files && !prev_has_missing_files) || ((!valid_pit_after_edit && valid_pit_before_edit) ||
(!has_missing_files && force_create_version))) { (valid_pit_after_edit && force_create_version))) {
if (!builder) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
builder = builder_iter->second->version_builder();
assert(builder);
}
const MutableCFOptions* cf_opts_ptr = cfd->GetLatestMutableCFOptions(); const MutableCFOptions* cf_opts_ptr = cfd->GetLatestMutableCFOptions();
auto* version = new Version(cfd, version_set_, version_set_->file_options_, auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cf_opts_ptr, io_tracer_, *cf_opts_ptr, io_tracer_,
version_set_->current_version_number_++, version_set_->current_version_number_++,
epoch_number_requirement_); epoch_number_requirement_);
s = builder->LoadTableHandlers( s = builder->LoadSavePointTableHandlers(
cfd->internal_stats(), cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads, false, true, version_set_->db_options_->max_file_opening_threads, false, true,
cf_opts_ptr->prefix_extractor, MaxFileSizeForL0MetaPin(*cf_opts_ptr), cf_opts_ptr->prefix_extractor, MaxFileSizeForL0MetaPin(*cf_opts_ptr),
@ -1015,7 +875,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
} }
return s; return s;
} }
s = builder->SaveTo(version->storage_info()); s = builder->SaveSavePointTo(version->storage_info());
if (s.ok()) { if (s.ok()) {
if (AtomicUpdateVersionsContains(cfd->GetID())) { if (AtomicUpdateVersionsContains(cfd->GetID())) {
AtomicUpdateVersionsPut(version); AtomicUpdateVersionsPut(version);
@ -1038,6 +898,8 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
delete version; delete version;
} }
} }
builder->ClearSavePoint();
return s; return s;
} }
@ -1072,6 +934,15 @@ Status VersionEditHandlerPointInTime::LoadTables(
return Status::OK(); return Status::OK();
} }
bool VersionEditHandlerPointInTime::HasMissingFiles() const {
for (const auto& builder : builders_) {
if (builder.second->version_builder()->HasMissingFiles()) {
return true;
}
}
return false;
}
bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() { bool VersionEditHandlerPointInTime::AtomicUpdateVersionsCompleted() {
return atomic_update_versions_missing_ == 0; return atomic_update_versions_missing_ == 0;
} }
@ -1145,8 +1016,8 @@ Status ManifestTailer::Initialize() {
Version* base_version = dummy_version->Next(); Version* base_version = dummy_version->Next();
assert(base_version); assert(base_version);
base_version->Ref(); base_version->Ref();
VersionBuilderUPtr new_builder( VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
new BaseReferencedVersionBuilder(default_cfd, base_version)); default_cfd, base_version, this, track_found_and_missing_files_));
builder_iter->second = std::move(new_builder); builder_iter->second = std::move(new_builder);
initialized_ = true; initialized_ = true;
@ -1189,8 +1060,8 @@ Status ManifestTailer::OnColumnFamilyAdd(VersionEdit& edit,
Version* base_version = dummy_version->Next(); Version* base_version = dummy_version->Next();
assert(base_version); assert(base_version);
base_version->Ref(); base_version->Ref();
VersionBuilderUPtr new_builder( VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
new BaseReferencedVersionBuilder(tmp_cfd, base_version)); tmp_cfd, base_version, this, track_found_and_missing_files_));
builder_iter->second = std::move(new_builder); builder_iter->second = std::move(new_builder);
#ifndef NDEBUG #ifndef NDEBUG
@ -1213,6 +1084,18 @@ void ManifestTailer::CheckIterationResult(const log::Reader& reader,
} }
} }
std::vector<std::string> ManifestTailer::GetAndClearIntermediateFiles() {
std::vector<std::string> res;
for (const auto& builder : builders_) {
auto files =
builder.second->version_builder()->GetAndClearIntermediateFiles();
res.insert(res.end(), std::make_move_iterator(files.begin()),
std::make_move_iterator(files.end()));
files.erase(files.begin(), files.end());
}
return res;
}
Status ManifestTailer::VerifyFile(ColumnFamilyData* cfd, Status ManifestTailer::VerifyFile(ColumnFamilyData* cfd,
const std::string& fpath, int level, const std::string& fpath, int level,
const FileMetaData& fmeta) { const FileMetaData& fmeta) {

View File

@ -134,14 +134,24 @@ class VersionEditHandler : public VersionEditHandlerBase {
return version_edit_params_; return version_edit_params_;
} }
bool HasMissingFiles() const;
void GetDbId(std::string* db_id) const { void GetDbId(std::string* db_id) const {
if (db_id && version_edit_params_.HasDbId()) { if (db_id && version_edit_params_.HasDbId()) {
*db_id = version_edit_params_.GetDbId(); *db_id = version_edit_params_.GetDbId();
} }
} }
virtual Status VerifyFile(ColumnFamilyData* /*cfd*/,
const std::string& /*fpath*/, int /*level*/,
const FileMetaData& /*fmeta*/) {
return Status::OK();
}
virtual Status VerifyBlobFile(ColumnFamilyData* /*cfd*/,
uint64_t /*blob_file_num*/,
const BlobFileAddition& /*blob_addition*/) {
return Status::OK();
}
protected: protected:
explicit VersionEditHandler( explicit VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families, bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
@ -166,7 +176,7 @@ class VersionEditHandler : public VersionEditHandlerBase {
Status Initialize() override; Status Initialize() override;
void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found, void CheckColumnFamilyId(const VersionEdit& edit, bool* do_not_open_cf,
bool* cf_in_builders) const; bool* cf_in_builders) const;
void CheckIterationResult(const log::Reader& reader, Status* s) override; void CheckIterationResult(const log::Reader& reader, Status* s) override;
@ -176,9 +186,9 @@ class VersionEditHandler : public VersionEditHandlerBase {
virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit); virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit);
virtual Status MaybeCreateVersion(const VersionEdit& edit, virtual Status MaybeCreateVersionBeforeApplyEdit(const VersionEdit& edit,
ColumnFamilyData* cfd, ColumnFamilyData* cfd,
bool force_create_version); bool force_create_version);
virtual Status LoadTables(ColumnFamilyData* cfd, virtual Status LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache, bool prefetch_index_and_filter_in_cache,
@ -191,16 +201,13 @@ class VersionEditHandler : public VersionEditHandlerBase {
VersionSet* version_set_; VersionSet* version_set_;
std::unordered_map<uint32_t, VersionBuilderUPtr> builders_; std::unordered_map<uint32_t, VersionBuilderUPtr> builders_;
std::unordered_map<std::string, ColumnFamilyOptions> name_to_options_; std::unordered_map<std::string, ColumnFamilyOptions> name_to_options_;
// Keeps track of column families in manifest that were not found in
// column families parameters. if those column families are not dropped
// by subsequent manifest records, Recover() will return failure status.
std::unordered_map<uint32_t, std::string> column_families_not_found_;
VersionEditParams version_edit_params_;
const bool track_found_and_missing_files_; const bool track_found_and_missing_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>> cf_to_found_files_; // Keeps track of column families in manifest that were not found in
std::unordered_map<uint32_t, std::unordered_set<uint64_t>> // column families parameters. Namely, the user asks to not open these column
cf_to_missing_files_; // families. In non read only mode, if those column families are not dropped
std::unordered_map<uint32_t, uint64_t> cf_to_missing_blob_files_high_; // by subsequent manifest records, Recover() will return failure status.
std::unordered_map<uint32_t, std::string> do_not_open_column_families_;
VersionEditParams version_edit_params_;
bool no_error_if_files_missing_; bool no_error_if_files_missing_;
std::shared_ptr<IOTracer> io_tracer_; std::shared_ptr<IOTracer> io_tracer_;
bool skip_load_table_files_; bool skip_load_table_files_;
@ -241,23 +248,27 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
EpochNumberRequirement::kMustPresent); EpochNumberRequirement::kMustPresent);
~VersionEditHandlerPointInTime() override; ~VersionEditHandlerPointInTime() override;
bool HasMissingFiles() const;
virtual Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath,
int level, const FileMetaData& fmeta) override;
virtual Status VerifyBlobFile(ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition) override;
protected: protected:
Status OnAtomicGroupReplayBegin() override; Status OnAtomicGroupReplayBegin() override;
Status OnAtomicGroupReplayEnd() override; Status OnAtomicGroupReplayEnd() override;
void CheckIterationResult(const log::Reader& reader, Status* s) override; void CheckIterationResult(const log::Reader& reader, Status* s) override;
ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override; ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override;
// `MaybeCreateVersion(..., false)` creates a version upon a negative edge // `MaybeCreateVersionBeforeApplyEdit(..., false)` creates a version upon a
// trigger (transition from valid to invalid). // negative edge trigger (transition from valid to invalid).
// //
// `MaybeCreateVersion(..., true)` creates a version on a positive level // `MaybeCreateVersionBeforeApplyEdit(..., true)` creates a version on a
// trigger (state is valid). // positive level trigger (state is valid).
Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd, Status MaybeCreateVersionBeforeApplyEdit(const VersionEdit& edit,
bool force_create_version) override; ColumnFamilyData* cfd,
virtual Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath, bool force_create_version) override;
int level, const FileMetaData& fmeta);
virtual Status VerifyBlobFile(ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition);
Status LoadTables(ColumnFamilyData* cfd, Status LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache, bool prefetch_index_and_filter_in_cache,
@ -275,8 +286,6 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
bool in_atomic_group_ = false; bool in_atomic_group_ = false;
std::vector<std::string> intermediate_files_;
private: private:
bool AtomicUpdateVersionsCompleted(); bool AtomicUpdateVersionsCompleted();
bool AtomicUpdateVersionsContains(uint32_t cfid); bool AtomicUpdateVersionsContains(uint32_t cfid);
@ -305,6 +314,9 @@ class ManifestTailer : public VersionEditHandlerPointInTime {
epoch_number_requirement), epoch_number_requirement),
mode_(Mode::kRecovery) {} mode_(Mode::kRecovery) {}
Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath, int level,
const FileMetaData& fmeta) override;
void PrepareToReadNewManifest() { void PrepareToReadNewManifest() {
initialized_ = false; initialized_ = false;
ClearReadBuffer(); ClearReadBuffer();
@ -314,9 +326,7 @@ class ManifestTailer : public VersionEditHandlerPointInTime {
return cfds_changed_; return cfds_changed_;
} }
std::vector<std::string>& GetIntermediateFiles() { std::vector<std::string> GetAndClearIntermediateFiles();
return intermediate_files_;
}
protected: protected:
Status Initialize() override; Status Initialize() override;
@ -329,9 +339,6 @@ class ManifestTailer : public VersionEditHandlerPointInTime {
void CheckIterationResult(const log::Reader& reader, Status* s) override; void CheckIterationResult(const log::Reader& reader, Status* s) override;
Status VerifyFile(ColumnFamilyData* cfd, const std::string& fpath, int level,
const FileMetaData& fmeta) override;
enum Mode : uint8_t { enum Mode : uint8_t {
kRecovery = 0, kRecovery = 0,
kCatchUp = 1, kCatchUp = 1,

View File

@ -7477,7 +7477,7 @@ Status ReactiveVersionSet::ReadAndApply(
*cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies()); *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
} }
if (files_to_delete) { if (files_to_delete) {
*files_to_delete = std::move(manifest_tailer_->GetIntermediateFiles()); *files_to_delete = manifest_tailer_->GetAndClearIntermediateFiles();
} }
return s; return s;