mirror of https://github.com/facebook/rocksdb.git
Best efforts recovery recover seqno prefix (#12938)
Summary: This PR make best efforts recovery more permissive by allowing it to recover incomplete Version that presents a valid point in time view from the user's perspective. Currently, a Version is only valid and saved if all files consisting that Version can be found. With this change, if only a suffix of L0 files (and their associated blob files) are missing, a valid Version is also available to be saved and recover to. Note that we don't do this if the column family was atomically flushed. Because atomic flush also need a consistent view across the column families, we cannot guarantee that if we are recovering to incomplete version. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12938 Test Plan: Existing tests and added unit tests. Reviewed By: anand1976 Differential Revision: D61414381 Pulled By: jowlyzhang fbshipit-source-id: f9b73deb34d35ad696ab42315928b656d586262a
This commit is contained in:
parent
4d3518951a
commit
295326b6ee
|
@ -3407,6 +3407,46 @@ class TableFileListener : public EventListener {
|
|||
InstrumentedMutex mutex_;
|
||||
std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
|
||||
};
|
||||
|
||||
class FlushTableFileListener : public EventListener {
|
||||
public:
|
||||
void OnTableFileCreated(const TableFileCreationInfo& info) override {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
if (info.reason != TableFileCreationReason::kFlush) {
|
||||
return;
|
||||
}
|
||||
cf_to_flushed_files_[info.cf_name].push_back(info.file_path);
|
||||
}
|
||||
std::vector<std::string>& GetFlushedFiles(const std::string& cf_name) {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
return cf_to_flushed_files_[cf_name];
|
||||
}
|
||||
|
||||
private:
|
||||
InstrumentedMutex mutex_;
|
||||
std::unordered_map<std::string, std::vector<std::string>>
|
||||
cf_to_flushed_files_;
|
||||
};
|
||||
|
||||
class FlushBlobFileListener : public EventListener {
|
||||
public:
|
||||
void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
if (info.reason != BlobFileCreationReason::kFlush) {
|
||||
return;
|
||||
}
|
||||
cf_to_flushed_blobs_files_[info.cf_name].push_back(info.file_path);
|
||||
}
|
||||
std::vector<std::string>& GetFlushedBlobFiles(const std::string& cf_name) {
|
||||
InstrumentedMutexLock lock(&mutex_);
|
||||
return cf_to_flushed_blobs_files_[cf_name];
|
||||
}
|
||||
|
||||
private:
|
||||
InstrumentedMutex mutex_;
|
||||
std::unordered_map<std::string, std::vector<std::string>>
|
||||
cf_to_flushed_blobs_files_;
|
||||
};
|
||||
} // anonymous namespace
|
||||
|
||||
TEST_F(DBBasicTest, LastSstFileNotInManifest) {
|
||||
|
@ -3512,6 +3552,121 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
|
|||
}
|
||||
}
|
||||
|
||||
// Param 0: whether to enable blob DB.
|
||||
// Param 1: when blob DB is enabled, whether to also delete the missing L0
|
||||
// file's associated blob file.
|
||||
class BestEffortsRecoverIncompleteVersionTest
|
||||
: public DBTestBase,
|
||||
public testing::WithParamInterface<std::tuple<bool, bool>> {
|
||||
public:
|
||||
BestEffortsRecoverIncompleteVersionTest()
|
||||
: DBTestBase("best_efforts_recover_incomplete_version_test",
|
||||
/*env_do_fsync=*/false) {}
|
||||
};
|
||||
|
||||
TEST_P(BestEffortsRecoverIncompleteVersionTest, Basic) {
|
||||
Options options = CurrentOptions();
|
||||
options.enable_blob_files = std::get<0>(GetParam());
|
||||
bool delete_blob_file_too = std::get<1>(GetParam());
|
||||
DestroyAndReopen(options);
|
||||
FlushTableFileListener* flush_table_listener = new FlushTableFileListener();
|
||||
FlushBlobFileListener* flush_blob_listener = new FlushBlobFileListener();
|
||||
// Disable auto compaction to simplify SST file name tracking.
|
||||
options.disable_auto_compactions = true;
|
||||
options.listeners.emplace_back(flush_table_listener);
|
||||
options.listeners.emplace_back(flush_blob_listener);
|
||||
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
|
||||
std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
|
||||
"eevee"};
|
||||
int num_cfs = static_cast<int>(handles_.size());
|
||||
ASSERT_EQ(3, num_cfs);
|
||||
std::string start = "a";
|
||||
Slice start_slice = start;
|
||||
std::string end = "d";
|
||||
Slice end_slice = end;
|
||||
for (int cf = 0; cf != num_cfs; ++cf) {
|
||||
ASSERT_OK(Put(cf, "a", "a_value"));
|
||||
ASSERT_OK(Flush(cf));
|
||||
// Compact file to L1 to avoid trivial file move in the next compaction
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf],
|
||||
&start_slice, &end_slice));
|
||||
ASSERT_OK(Put(cf, "a", "a_value_new"));
|
||||
ASSERT_OK(Flush(cf));
|
||||
ASSERT_OK(Put(cf, "b", "b_value"));
|
||||
ASSERT_OK(Flush(cf));
|
||||
ASSERT_OK(Put(cf, "f", "f_value"));
|
||||
ASSERT_OK(Flush(cf));
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf],
|
||||
&start_slice, &end_slice));
|
||||
}
|
||||
|
||||
dbfull()->TEST_DeleteObsoleteFiles();
|
||||
|
||||
// Delete the most recent L0 file which is before a compaction.
|
||||
for (int i = 0; i < num_cfs; ++i) {
|
||||
std::vector<std::string>& files =
|
||||
flush_table_listener->GetFlushedFiles(all_cf_names[i]);
|
||||
ASSERT_EQ(4, files.size());
|
||||
ASSERT_OK(env_->DeleteFile(files[files.size() - 1]));
|
||||
if (options.enable_blob_files) {
|
||||
std::vector<std::string>& blob_files =
|
||||
flush_blob_listener->GetFlushedBlobFiles(all_cf_names[i]);
|
||||
ASSERT_EQ(4, blob_files.size());
|
||||
if (delete_blob_file_too) {
|
||||
ASSERT_OK(env_->DeleteFile(blob_files[files.size() - 1]));
|
||||
}
|
||||
}
|
||||
}
|
||||
options.best_efforts_recovery = true;
|
||||
ReopenWithColumnFamilies(all_cf_names, options);
|
||||
|
||||
for (int i = 0; i < num_cfs; ++i) {
|
||||
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
VersionStorageInfo* vstorage = cfd->current()->storage_info();
|
||||
// The L0 file flushed right before the last compaction is missing.
|
||||
ASSERT_EQ(0, vstorage->LevelFiles(0).size());
|
||||
// Only the output of the last compaction is available.
|
||||
ASSERT_EQ(1, vstorage->LevelFiles(1).size());
|
||||
}
|
||||
// Verify data
|
||||
ReadOptions read_opts;
|
||||
read_opts.total_order_seek = true;
|
||||
for (int i = 0; i < num_cfs; ++i) {
|
||||
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[i]));
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ("a", iter->key());
|
||||
ASSERT_EQ("a_value_new", iter->value());
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ("b", iter->key());
|
||||
ASSERT_EQ("b_value", iter->value());
|
||||
iter->Next();
|
||||
ASSERT_FALSE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
|
||||
// Write more data.
|
||||
for (int cf = 0; cf < num_cfs; ++cf) {
|
||||
ASSERT_OK(Put(cf, "g", "g_value"));
|
||||
ASSERT_OK(Flush(cf));
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
|
||||
nullptr));
|
||||
std::string value;
|
||||
ASSERT_OK(db_->Get(ReadOptions(), handles_[cf], "g", &value));
|
||||
ASSERT_EQ("g_value", value);
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(BestEffortsRecoverIncompleteVersionTest,
|
||||
BestEffortsRecoverIncompleteVersionTest,
|
||||
testing::Values(std::make_tuple(false, false),
|
||||
std::make_tuple(true, false),
|
||||
std::make_tuple(true, true)));
|
||||
|
||||
TEST_F(DBBasicTest, BestEffortsRecoveryTryMultipleManifests) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
|
|
|
@ -282,12 +282,45 @@ class VersionBuilder::Rep {
|
|||
ColumnFamilyData* cfd_;
|
||||
VersionEditHandler* version_edit_handler_;
|
||||
bool track_found_and_missing_files_;
|
||||
// If false, only a complete Version with all files consisting it found is
|
||||
// considered valid. If true, besides complete Version, if the Version is
|
||||
// never edited in an atomic group, an incomplete Version with only a suffix
|
||||
// of L0 files missing is also considered valid.
|
||||
bool allow_incomplete_valid_version_;
|
||||
|
||||
// These are only tracked if `track_found_and_missing_files_` are enabled.
|
||||
// These are only tracked if `track_found_and_missing_files_` is enabled.
|
||||
|
||||
// The SST files that are found (blob files not included yet).
|
||||
std::unordered_set<uint64_t> found_files_;
|
||||
std::unordered_set<uint64_t> missing_files_;
|
||||
// Missing SST files for L0
|
||||
std::unordered_set<uint64_t> l0_missing_files_;
|
||||
// Missing SST files for non L0 levels
|
||||
std::unordered_set<uint64_t> non_l0_missing_files_;
|
||||
// Intermediate SST files (blob files not included yet)
|
||||
std::vector<std::string> intermediate_files_;
|
||||
// The highest file number for all the missing blob files, useful to check
|
||||
// if a complete Version is available.
|
||||
uint64_t missing_blob_files_high_ = kInvalidBlobFileNumber;
|
||||
// Missing blob files, useful to check if only the missing L0 files'
|
||||
// associated blob files are missing.
|
||||
std::unordered_set<uint64_t> missing_blob_files_;
|
||||
// True if all files consisting the Version can be found. Or if
|
||||
// `allow_incomplete_valid_version_` is true and the version history is not
|
||||
// ever edited in an atomic group, this will be true if only a
|
||||
// suffix of L0 SST files and their associated blob files are missing.
|
||||
bool valid_version_available_;
|
||||
// True if version is ever edited in an atomic group.
|
||||
bool edited_in_atomic_group_;
|
||||
|
||||
// Flag to indicate if the Version is updated since last validity check. If no
|
||||
// `Apply` call is made between a `Rep`'s construction and a
|
||||
// `ValidVersionAvailable` check or between two `ValidVersionAvailable` calls.
|
||||
// This flag will be true to indicate the cached validity value can be
|
||||
// directly used without a recheck.
|
||||
bool version_updated_since_last_check_;
|
||||
|
||||
// End of fields that are only tracked when `track_found_and_missing_files_`
|
||||
// is enabled.
|
||||
|
||||
public:
|
||||
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
|
||||
|
@ -295,7 +328,7 @@ class VersionBuilder::Rep {
|
|||
VersionSet* version_set,
|
||||
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
|
||||
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
|
||||
bool track_found_and_missing_files)
|
||||
bool track_found_and_missing_files, bool allow_incomplete_valid_version)
|
||||
: file_options_(file_options),
|
||||
ioptions_(ioptions),
|
||||
table_cache_(table_cache),
|
||||
|
@ -311,13 +344,22 @@ class VersionBuilder::Rep {
|
|||
file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr),
|
||||
cfd_(cfd),
|
||||
version_edit_handler_(version_edit_handler),
|
||||
track_found_and_missing_files_(track_found_and_missing_files) {
|
||||
track_found_and_missing_files_(track_found_and_missing_files),
|
||||
allow_incomplete_valid_version_(allow_incomplete_valid_version) {
|
||||
assert(ioptions_);
|
||||
|
||||
levels_ = new LevelState[num_levels_];
|
||||
if (track_found_and_missing_files_) {
|
||||
assert(cfd_);
|
||||
assert(version_edit_handler_);
|
||||
// `track_found_and_missing_files_` mode used by VersionEditHandlerPIT
|
||||
// assumes the initial base version is valid. For best efforts recovery,
|
||||
// base will be empty. For manifest tailing usage like secondary instance,
|
||||
// they do not allow incomplete version, so the base version in subsequent
|
||||
// catch up attempts should be valid too.
|
||||
valid_version_available_ = true;
|
||||
edited_in_atomic_group_ = false;
|
||||
version_updated_since_last_check_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,10 +382,17 @@ class VersionBuilder::Rep {
|
|||
cfd_(other.cfd_),
|
||||
version_edit_handler_(other.version_edit_handler_),
|
||||
track_found_and_missing_files_(other.track_found_and_missing_files_),
|
||||
allow_incomplete_valid_version_(other.allow_incomplete_valid_version_),
|
||||
found_files_(other.found_files_),
|
||||
missing_files_(other.missing_files_),
|
||||
l0_missing_files_(other.l0_missing_files_),
|
||||
non_l0_missing_files_(other.non_l0_missing_files_),
|
||||
intermediate_files_(other.intermediate_files_),
|
||||
missing_blob_files_high_(other.missing_blob_files_high_) {
|
||||
missing_blob_files_high_(other.missing_blob_files_high_),
|
||||
missing_blob_files_(other.missing_blob_files_),
|
||||
valid_version_available_(other.valid_version_available_),
|
||||
edited_in_atomic_group_(other.edited_in_atomic_group_),
|
||||
version_updated_since_last_check_(
|
||||
other.version_updated_since_last_check_) {
|
||||
assert(ioptions_);
|
||||
levels_ = new LevelState[num_levels_];
|
||||
for (int level = 0; level < num_levels_; level++) {
|
||||
|
@ -729,6 +778,7 @@ class VersionBuilder::Rep {
|
|||
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
|
||||
missing_blob_files_high_ =
|
||||
std::max(missing_blob_files_high_, blob_file_number);
|
||||
missing_blob_files_.insert(blob_file_number);
|
||||
s = Status::OK();
|
||||
} else if (!s.ok()) {
|
||||
return s;
|
||||
|
@ -855,11 +905,13 @@ class VersionBuilder::Rep {
|
|||
|
||||
if (track_found_and_missing_files_) {
|
||||
assert(version_edit_handler_);
|
||||
auto fiter = missing_files_.find(file_number);
|
||||
if (fiter != missing_files_.end()) {
|
||||
missing_files_.erase(fiter);
|
||||
if (l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
|
||||
l0_missing_files_.erase(file_number);
|
||||
} else if (non_l0_missing_files_.find(file_number) !=
|
||||
non_l0_missing_files_.end()) {
|
||||
non_l0_missing_files_.erase(file_number);
|
||||
} else {
|
||||
fiter = found_files_.find(file_number);
|
||||
auto fiter = found_files_.find(file_number);
|
||||
// Only mark new files added during this catchup attempt for deletion.
|
||||
// These files were never installed in VersionStorageInfo.
|
||||
// Already referenced files that are deleted by a VersionEdit will
|
||||
|
@ -954,7 +1006,11 @@ class VersionBuilder::Rep {
|
|||
MakeTableFileName(ioptions_->cf_paths[0].path, file_number);
|
||||
s = version_edit_handler_->VerifyFile(cfd_, fpath, level, meta);
|
||||
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
|
||||
missing_files_.insert(file_number);
|
||||
if (0 == level) {
|
||||
l0_missing_files_.insert(file_number);
|
||||
} else {
|
||||
non_l0_missing_files_.insert(file_number);
|
||||
}
|
||||
if (s.IsCorruption()) {
|
||||
found_files_.insert(file_number);
|
||||
}
|
||||
|
@ -987,6 +1043,7 @@ class VersionBuilder::Rep {
|
|||
|
||||
// Apply all of the edits in *edit to the current state.
|
||||
Status Apply(const VersionEdit* edit) {
|
||||
bool version_updated = false;
|
||||
{
|
||||
const Status s = CheckConsistency(base_vstorage_);
|
||||
if (!s.ok()) {
|
||||
|
@ -1004,6 +1061,7 @@ class VersionBuilder::Rep {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
version_updated = true;
|
||||
}
|
||||
|
||||
// Increase the amount of garbage for blob files affected by GC
|
||||
|
@ -1012,6 +1070,7 @@ class VersionBuilder::Rep {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
version_updated = true;
|
||||
}
|
||||
|
||||
// Delete table files
|
||||
|
@ -1023,6 +1082,7 @@ class VersionBuilder::Rep {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
version_updated = true;
|
||||
}
|
||||
|
||||
// Add new table files
|
||||
|
@ -1034,6 +1094,7 @@ class VersionBuilder::Rep {
|
|||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
version_updated = true;
|
||||
}
|
||||
|
||||
// Populate compact cursors for round-robin compaction, leave
|
||||
|
@ -1046,6 +1107,13 @@ class VersionBuilder::Rep {
|
|||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
if (track_found_and_missing_files_ && version_updated) {
|
||||
version_updated_since_last_check_ = true;
|
||||
if (!edited_in_atomic_group_ && edit->IsInAtomicGroup()) {
|
||||
edited_in_atomic_group_ = true;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -1188,14 +1256,35 @@ class VersionBuilder::Rep {
|
|||
mutable_meta.GetGarbageBlobCount(), mutable_meta.GetGarbageBlobBytes());
|
||||
}
|
||||
|
||||
bool OnlyLinkedToMissingL0Files(
|
||||
const std::unordered_set<uint64_t>& linked_ssts) const {
|
||||
return std::all_of(
|
||||
linked_ssts.begin(), linked_ssts.end(), [&](const uint64_t& element) {
|
||||
return l0_missing_files_.find(element) != l0_missing_files_.end();
|
||||
});
|
||||
}
|
||||
|
||||
// Add the blob file specified by meta to *vstorage if it is determined to
|
||||
// contain valid data (blobs).
|
||||
template <typename Meta>
|
||||
static void AddBlobFileIfNeeded(VersionStorageInfo* vstorage, Meta&& meta) {
|
||||
void AddBlobFileIfNeeded(VersionStorageInfo* vstorage, Meta&& meta,
|
||||
uint64_t blob_file_number) const {
|
||||
assert(vstorage);
|
||||
assert(meta);
|
||||
|
||||
if (meta->GetLinkedSsts().empty() &&
|
||||
const auto& linked_ssts = meta->GetLinkedSsts();
|
||||
if (track_found_and_missing_files_) {
|
||||
if (missing_blob_files_.find(blob_file_number) !=
|
||||
missing_blob_files_.end()) {
|
||||
return;
|
||||
}
|
||||
// Leave the empty case for the below blob garbage collection logic.
|
||||
if (!linked_ssts.empty() && OnlyLinkedToMissingL0Files(linked_ssts)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (linked_ssts.empty() &&
|
||||
meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
|
||||
return;
|
||||
}
|
||||
|
@ -1207,6 +1296,7 @@ class VersionBuilder::Rep {
|
|||
// applied, and save the result into *vstorage.
|
||||
void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
|
||||
assert(vstorage);
|
||||
assert(!track_found_and_missing_files_ || valid_version_available_);
|
||||
|
||||
assert(base_vstorage_);
|
||||
vstorage->ReserveBlob(base_vstorage_->GetBlobFiles().size() +
|
||||
|
@ -1222,22 +1312,24 @@ class VersionBuilder::Rep {
|
|||
}
|
||||
|
||||
auto process_base =
|
||||
[vstorage](const std::shared_ptr<BlobFileMetaData>& base_meta) {
|
||||
[this, vstorage](const std::shared_ptr<BlobFileMetaData>& base_meta) {
|
||||
assert(base_meta);
|
||||
|
||||
AddBlobFileIfNeeded(vstorage, base_meta);
|
||||
AddBlobFileIfNeeded(vstorage, base_meta,
|
||||
base_meta->GetBlobFileNumber());
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
auto process_mutable =
|
||||
[vstorage](const MutableBlobFileMetaData& mutable_meta) {
|
||||
AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
|
||||
[this, vstorage](const MutableBlobFileMetaData& mutable_meta) {
|
||||
AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta),
|
||||
mutable_meta.GetBlobFileNumber());
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
auto process_both = [vstorage](
|
||||
auto process_both = [this, vstorage](
|
||||
const std::shared_ptr<BlobFileMetaData>& base_meta,
|
||||
const MutableBlobFileMetaData& mutable_meta) {
|
||||
assert(base_meta);
|
||||
|
@ -1250,12 +1342,14 @@ class VersionBuilder::Rep {
|
|||
mutable_meta.GetGarbageBlobBytes());
|
||||
assert(base_meta->GetLinkedSsts() == mutable_meta.GetLinkedSsts());
|
||||
|
||||
AddBlobFileIfNeeded(vstorage, base_meta);
|
||||
AddBlobFileIfNeeded(vstorage, base_meta,
|
||||
base_meta->GetBlobFileNumber());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta));
|
||||
AddBlobFileIfNeeded(vstorage, CreateBlobFileMetaData(mutable_meta),
|
||||
mutable_meta.GetBlobFileNumber());
|
||||
|
||||
return true;
|
||||
};
|
||||
|
@ -1267,6 +1361,10 @@ class VersionBuilder::Rep {
|
|||
void MaybeAddFile(VersionStorageInfo* vstorage, int level,
|
||||
FileMetaData* f) const {
|
||||
const uint64_t file_number = f->fd.GetNumber();
|
||||
if (track_found_and_missing_files_ && level == 0 &&
|
||||
l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto& level_state = levels_[level];
|
||||
|
||||
|
@ -1290,16 +1388,16 @@ class VersionBuilder::Rep {
|
|||
}
|
||||
}
|
||||
|
||||
bool ContainsCompletePIT() {
|
||||
bool ContainsCompleteVersion() const {
|
||||
assert(track_found_and_missing_files_);
|
||||
return missing_files_.empty() &&
|
||||
return l0_missing_files_.empty() && non_l0_missing_files_.empty() &&
|
||||
(missing_blob_files_high_ == kInvalidBlobFileNumber ||
|
||||
missing_blob_files_high_ < GetMinOldestBlobFileNumber());
|
||||
}
|
||||
|
||||
bool HasMissingFiles() const {
|
||||
assert(track_found_and_missing_files_);
|
||||
return !missing_files_.empty() ||
|
||||
return !l0_missing_files_.empty() || !non_l0_missing_files_.empty() ||
|
||||
missing_blob_files_high_ != kInvalidBlobFileNumber;
|
||||
}
|
||||
|
||||
|
@ -1321,6 +1419,16 @@ class VersionBuilder::Rep {
|
|||
const auto& unordered_added_files = levels_[level].added_files;
|
||||
vstorage->Reserve(level, base_files.size() + unordered_added_files.size());
|
||||
|
||||
MergeUnorderdAddedFilesWithBase(
|
||||
base_files, unordered_added_files, cmp,
|
||||
[&](FileMetaData* file) { MaybeAddFile(vstorage, level, file); });
|
||||
}
|
||||
|
||||
template <typename Cmp, typename AddFileFunc>
|
||||
void MergeUnorderdAddedFilesWithBase(
|
||||
const std::vector<FileMetaData*>& base_files,
|
||||
const std::unordered_map<uint64_t, FileMetaData*>& unordered_added_files,
|
||||
Cmp cmp, AddFileFunc add_file_func) const {
|
||||
// Sort added files for the level.
|
||||
std::vector<FileMetaData*> added_files;
|
||||
added_files.reserve(unordered_added_files.size());
|
||||
|
@ -1336,9 +1444,9 @@ class VersionBuilder::Rep {
|
|||
while (added_iter != added_end || base_iter != base_end) {
|
||||
if (base_iter == base_end ||
|
||||
(added_iter != added_end && cmp(*added_iter, *base_iter))) {
|
||||
MaybeAddFile(vstorage, level, *added_iter++);
|
||||
add_file_func(*added_iter++);
|
||||
} else {
|
||||
MaybeAddFile(vstorage, level, *base_iter++);
|
||||
add_file_func(*base_iter++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1397,8 +1505,110 @@ class VersionBuilder::Rep {
|
|||
}
|
||||
}
|
||||
|
||||
bool ValidVersionAvailable() {
|
||||
assert(track_found_and_missing_files_);
|
||||
if (version_updated_since_last_check_) {
|
||||
valid_version_available_ = ContainsCompleteVersion();
|
||||
if (!valid_version_available_ && !edited_in_atomic_group_ &&
|
||||
allow_incomplete_valid_version_) {
|
||||
valid_version_available_ = OnlyMissingL0Suffix();
|
||||
}
|
||||
version_updated_since_last_check_ = false;
|
||||
}
|
||||
return valid_version_available_;
|
||||
}
|
||||
|
||||
bool OnlyMissingL0Suffix() const {
|
||||
if (!non_l0_missing_files_.empty()) {
|
||||
return false;
|
||||
}
|
||||
assert(!(l0_missing_files_.empty() && missing_blob_files_.empty()));
|
||||
|
||||
if (!l0_missing_files_.empty() && !MissingL0FilesAreL0Suffix()) {
|
||||
return false;
|
||||
}
|
||||
if (!missing_blob_files_.empty() &&
|
||||
!RemainingSstFilesNotMissingBlobFiles()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check missing L0 files are a suffix of expected sorted L0 files.
|
||||
bool MissingL0FilesAreL0Suffix() const {
|
||||
assert(non_l0_missing_files_.empty());
|
||||
assert(!l0_missing_files_.empty());
|
||||
std::vector<FileMetaData*> expected_sorted_l0_files;
|
||||
const auto& base_files = base_vstorage_->LevelFiles(0);
|
||||
const auto& unordered_added_files = levels_[0].added_files;
|
||||
expected_sorted_l0_files.reserve(base_files.size() +
|
||||
unordered_added_files.size());
|
||||
EpochNumberRequirement epoch_number_requirement =
|
||||
base_vstorage_->GetEpochNumberRequirement();
|
||||
|
||||
if (epoch_number_requirement == EpochNumberRequirement::kMightMissing) {
|
||||
MergeUnorderdAddedFilesWithBase(
|
||||
base_files, unordered_added_files, *level_zero_cmp_by_seqno_,
|
||||
[&](FileMetaData* file) {
|
||||
expected_sorted_l0_files.push_back(file);
|
||||
});
|
||||
} else {
|
||||
MergeUnorderdAddedFilesWithBase(
|
||||
base_files, unordered_added_files, *level_zero_cmp_by_epochno_,
|
||||
[&](FileMetaData* file) {
|
||||
expected_sorted_l0_files.push_back(file);
|
||||
});
|
||||
}
|
||||
assert(expected_sorted_l0_files.size() >= l0_missing_files_.size());
|
||||
std::unordered_set<uint64_t> unaddressed_missing_files = l0_missing_files_;
|
||||
for (auto iter = expected_sorted_l0_files.begin();
|
||||
iter != expected_sorted_l0_files.end(); iter++) {
|
||||
uint64_t file_number = (*iter)->fd.GetNumber();
|
||||
if (l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
|
||||
assert(unaddressed_missing_files.find(file_number) !=
|
||||
unaddressed_missing_files.end());
|
||||
unaddressed_missing_files.erase(file_number);
|
||||
} else if (!unaddressed_missing_files.empty()) {
|
||||
return false;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for each of the missing blob file missing, it either is older than
|
||||
// the minimum oldest blob file required by this Version or only linked to
|
||||
// the missing L0 files.
|
||||
bool RemainingSstFilesNotMissingBlobFiles() const {
|
||||
assert(non_l0_missing_files_.empty());
|
||||
assert(!missing_blob_files_.empty());
|
||||
bool no_l0_files_missing = l0_missing_files_.empty();
|
||||
uint64_t min_oldest_blob_file_num = GetMinOldestBlobFileNumber();
|
||||
for (const auto& missing_blob_file : missing_blob_files_) {
|
||||
if (missing_blob_file < min_oldest_blob_file_num) {
|
||||
continue;
|
||||
}
|
||||
auto iter = mutable_blob_file_metas_.find(missing_blob_file);
|
||||
assert(iter != mutable_blob_file_metas_.end());
|
||||
auto linked_ssts = iter->second.GetLinkedSsts();
|
||||
// TODO(yuzhangyu): In theory, if no L0 SST files ara missing, and only
|
||||
// blob files exclusively linked to a L0 suffix are missing, we can
|
||||
// recover to a valid point in time too. We don't recover that type of
|
||||
// incomplete Version yet.
|
||||
if (!linked_ssts.empty() && no_l0_files_missing) {
|
||||
return false;
|
||||
}
|
||||
if (!OnlyLinkedToMissingL0Files(linked_ssts)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Save the current state in *vstorage.
|
||||
Status SaveTo(VersionStorageInfo* vstorage) const {
|
||||
assert(!track_found_and_missing_files_ || valid_version_available_);
|
||||
Status s;
|
||||
|
||||
#ifndef NDEBUG
|
||||
|
@ -1431,6 +1641,7 @@ class VersionBuilder::Rep {
|
|||
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
|
||||
uint8_t block_protection_bytes_per_key) {
|
||||
assert(table_cache_ != nullptr);
|
||||
assert(!track_found_and_missing_files_ || valid_version_available_);
|
||||
|
||||
size_t table_cache_capacity =
|
||||
table_cache_->get_cache().get()->GetCapacity();
|
||||
|
@ -1470,6 +1681,11 @@ class VersionBuilder::Rep {
|
|||
for (int level = 0; level < num_levels_; level++) {
|
||||
for (auto& file_meta_pair : levels_[level].added_files) {
|
||||
auto* file_meta = file_meta_pair.second;
|
||||
uint64_t file_number = file_meta->fd.GetNumber();
|
||||
if (track_found_and_missing_files_ && level == 0 &&
|
||||
l0_missing_files_.find(file_number) != l0_missing_files_.end()) {
|
||||
continue;
|
||||
}
|
||||
// If the file has been opened before, just skip it.
|
||||
if (!file_meta->table_reader_handle) {
|
||||
files_meta.emplace_back(file_meta, level);
|
||||
|
@ -1536,10 +1752,11 @@ VersionBuilder::VersionBuilder(
|
|||
VersionSet* version_set,
|
||||
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr,
|
||||
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
|
||||
bool track_found_and_missing_files)
|
||||
bool track_found_and_missing_files, bool allow_incomplete_valid_version)
|
||||
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
|
||||
version_set, file_metadata_cache_res_mgr, cfd,
|
||||
version_edit_handler, track_found_and_missing_files)) {}
|
||||
version_edit_handler, track_found_and_missing_files,
|
||||
allow_incomplete_valid_version)) {}
|
||||
|
||||
VersionBuilder::~VersionBuilder() = default;
|
||||
|
||||
|
@ -1573,8 +1790,8 @@ void VersionBuilder::CreateOrReplaceSavePoint() {
|
|||
rep_ = std::make_unique<Rep>(*savepoint_);
|
||||
}
|
||||
|
||||
bool VersionBuilder::ContainsCompletePIT() const {
|
||||
return rep_->ContainsCompletePIT();
|
||||
bool VersionBuilder::ValidVersionAvailable() {
|
||||
return rep_->ValidVersionAvailable();
|
||||
}
|
||||
|
||||
bool VersionBuilder::HasMissingFiles() const { return rep_->HasMissingFiles(); }
|
||||
|
@ -1586,7 +1803,7 @@ std::vector<std::string>& VersionBuilder::GetAndClearIntermediateFiles() {
|
|||
void VersionBuilder::ClearFoundFiles() { return rep_->ClearFoundFiles(); }
|
||||
|
||||
Status VersionBuilder::SaveSavePointTo(VersionStorageInfo* vstorage) const {
|
||||
if (!savepoint_) {
|
||||
if (!savepoint_ || !savepoint_->ValidVersionAvailable()) {
|
||||
return Status::InvalidArgument();
|
||||
}
|
||||
return savepoint_->SaveTo(vstorage);
|
||||
|
@ -1598,7 +1815,7 @@ Status VersionBuilder::LoadSavePointTableHandlers(
|
|||
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
||||
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
|
||||
uint8_t block_protection_bytes_per_key) {
|
||||
if (!savepoint_) {
|
||||
if (!savepoint_ || !savepoint_->ValidVersionAvailable()) {
|
||||
return Status::InvalidArgument();
|
||||
}
|
||||
return savepoint_->LoadTableHandlers(
|
||||
|
@ -1611,25 +1828,27 @@ void VersionBuilder::ClearSavePoint() { savepoint_.reset(nullptr); }
|
|||
|
||||
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
|
||||
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler,
|
||||
bool track_found_and_missing_files)
|
||||
bool track_found_and_missing_files, bool allow_incomplete_valid_version)
|
||||
: version_builder_(new VersionBuilder(
|
||||
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
||||
cfd->table_cache(), cfd->current()->storage_info(),
|
||||
cfd->current()->version_set(),
|
||||
cfd->GetFileMetadataCacheReservationManager(), cfd,
|
||||
version_edit_handler, track_found_and_missing_files)),
|
||||
version_edit_handler, track_found_and_missing_files,
|
||||
allow_incomplete_valid_version)),
|
||||
version_(cfd->current()) {
|
||||
version_->Ref();
|
||||
}
|
||||
|
||||
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
|
||||
ColumnFamilyData* cfd, Version* v, VersionEditHandler* version_edit_handler,
|
||||
bool track_found_and_missing_files)
|
||||
bool track_found_and_missing_files, bool allow_incomplete_valid_version)
|
||||
: version_builder_(new VersionBuilder(
|
||||
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
||||
cfd->table_cache(), v->storage_info(), v->version_set(),
|
||||
cfd->GetFileMetadataCacheReservationManager(), cfd,
|
||||
version_edit_handler, track_found_and_missing_files)),
|
||||
version_edit_handler, track_found_and_missing_files,
|
||||
allow_incomplete_valid_version)),
|
||||
version_(v) {
|
||||
assert(version_ != cfd->current());
|
||||
}
|
||||
|
|
|
@ -42,7 +42,8 @@ class VersionBuilder {
|
|||
file_metadata_cache_res_mgr = nullptr,
|
||||
ColumnFamilyData* cfd = nullptr,
|
||||
VersionEditHandler* version_edit_handler = nullptr,
|
||||
bool track_found_and_missing_files = false);
|
||||
bool track_found_and_missing_files = false,
|
||||
bool allow_incomplete_valid_version = false);
|
||||
~VersionBuilder();
|
||||
|
||||
bool CheckConsistencyForNumLevels();
|
||||
|
@ -66,12 +67,20 @@ class VersionBuilder {
|
|||
// VersionEdits applied to the builder will not affect the Version in this
|
||||
// save point. VersionBuilder currently only supports creating one save point,
|
||||
// so when `CreateOrReplaceSavePoint` is called again, the previous save point
|
||||
// is cleared. `CreateOrReplaceSavePoint` can be called explicitly to clear
|
||||
// is cleared. `ClearSavePoint` can be called explicitly to clear
|
||||
// the save point too.
|
||||
void CreateOrReplaceSavePoint();
|
||||
|
||||
// The builder can find all the files to build a `Version`.
|
||||
bool ContainsCompletePIT() const;
|
||||
// The builder can find all the files to build a `Version`. Or if
|
||||
// `allow_incomplete_valid_version_` is true and the version history is never
|
||||
// edited in an atomic group, and only a suffix of L0 SST files and their
|
||||
// associated blob files are missing.
|
||||
// From the users' perspective, missing a suffix of L0 files means missing the
|
||||
// user's most recently written data. So the remaining available files still
|
||||
// presents a valid point in time view, although for some previous time.
|
||||
// This validity check result will be cached and reused if the Version is not
|
||||
// updated between two validity checks.
|
||||
bool ValidVersionAvailable();
|
||||
|
||||
bool HasMissingFiles() const;
|
||||
|
||||
|
@ -114,11 +123,13 @@ class BaseReferencedVersionBuilder {
|
|||
public:
|
||||
explicit BaseReferencedVersionBuilder(
|
||||
ColumnFamilyData* cfd, VersionEditHandler* version_edit_handler = nullptr,
|
||||
bool track_found_and_missing_files = false);
|
||||
bool track_found_and_missing_files = false,
|
||||
bool allow_incomplete_valid_version = false);
|
||||
BaseReferencedVersionBuilder(
|
||||
ColumnFamilyData* cfd, Version* v,
|
||||
VersionEditHandler* version_edit_handler = nullptr,
|
||||
bool track_found_and_missing_files = false);
|
||||
bool track_found_and_missing_files = false,
|
||||
bool allow_incomplete_valid_version = false);
|
||||
~BaseReferencedVersionBuilder();
|
||||
VersionBuilder* version_builder() const { return version_builder_.get(); }
|
||||
|
||||
|
|
|
@ -155,6 +155,7 @@ VersionEditHandler::VersionEditHandler(
|
|||
VersionSet* version_set, bool track_found_and_missing_files,
|
||||
bool no_error_if_files_missing, const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const ReadOptions& read_options, bool skip_load_table_files,
|
||||
bool allow_incomplete_valid_version,
|
||||
EpochNumberRequirement epoch_number_requirement)
|
||||
: VersionEditHandlerBase(read_options),
|
||||
read_only_(read_only),
|
||||
|
@ -165,6 +166,7 @@ VersionEditHandler::VersionEditHandler(
|
|||
io_tracer_(io_tracer),
|
||||
skip_load_table_files_(skip_load_table_files),
|
||||
initialized_(false),
|
||||
allow_incomplete_valid_version_(allow_incomplete_valid_version),
|
||||
epoch_number_requirement_(epoch_number_requirement) {
|
||||
assert(version_set_ != nullptr);
|
||||
}
|
||||
|
@ -475,7 +477,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
|
|||
cfd->set_initialized();
|
||||
assert(builders_.find(cf_id) == builders_.end());
|
||||
builders_.emplace(cf_id, VersionBuilderUPtr(new BaseReferencedVersionBuilder(
|
||||
cfd, this, track_found_and_missing_files_)));
|
||||
cfd, this, track_found_and_missing_files_,
|
||||
allow_incomplete_valid_version_)));
|
||||
return cfd;
|
||||
}
|
||||
|
||||
|
@ -687,12 +690,13 @@ Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles(
|
|||
VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
|
||||
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
|
||||
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const ReadOptions& read_options,
|
||||
const ReadOptions& read_options, bool allow_incomplete_valid_version,
|
||||
EpochNumberRequirement epoch_number_requirement)
|
||||
: VersionEditHandler(read_only, column_families, version_set,
|
||||
/*track_found_and_missing_files=*/true,
|
||||
/*no_error_if_files_missing=*/true, io_tracer,
|
||||
read_options, epoch_number_requirement) {}
|
||||
read_options, allow_incomplete_valid_version,
|
||||
epoch_number_requirement) {}
|
||||
|
||||
VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
|
||||
for (const auto& cfid_and_version : atomic_update_versions_) {
|
||||
|
@ -841,19 +845,19 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit(
|
|||
auto builder_iter = builders_.find(cfd->GetID());
|
||||
assert(builder_iter != builders_.end());
|
||||
VersionBuilder* builder = builder_iter->second->version_builder();
|
||||
const bool valid_pit_before_edit = builder->ContainsCompletePIT();
|
||||
const bool valid_pit_before_edit = builder->ValidVersionAvailable();
|
||||
builder->CreateOrReplaceSavePoint();
|
||||
s = builder->Apply(&edit);
|
||||
const bool valid_pit_after_edit = builder->ContainsCompletePIT();
|
||||
const bool valid_pit_after_edit = builder->ValidVersionAvailable();
|
||||
|
||||
// A new version will be created if:
|
||||
// 1) no error has occurred so far, and
|
||||
// 2) log_number_, next_file_number_ and last_sequence_ are known, and
|
||||
// 3) not in an AtomicGroup
|
||||
// 4) any of the following:
|
||||
// a) a complete point in time view is available before applying the edit
|
||||
// and a complete point in time view is not available after the edit.
|
||||
// b) a complete point in time view is available after the edit and the
|
||||
// a) a valid Version is available before applying the edit
|
||||
// and a valid Version is not available after the edit.
|
||||
// b) a valid Version is available after the edit and the
|
||||
// caller explicitly request that a new version be created.
|
||||
if (s.ok() && !missing_info && !in_atomic_group_ &&
|
||||
((!valid_pit_after_edit && valid_pit_before_edit) ||
|
||||
|
@ -1017,7 +1021,8 @@ Status ManifestTailer::Initialize() {
|
|||
assert(base_version);
|
||||
base_version->Ref();
|
||||
VersionBuilderUPtr new_builder(new BaseReferencedVersionBuilder(
|
||||
default_cfd, base_version, this, track_found_and_missing_files_));
|
||||
default_cfd, base_version, this, track_found_and_missing_files_,
|
||||
allow_incomplete_valid_version_));
|
||||
builder_iter->second = std::move(new_builder);
|
||||
|
||||
initialized_ = true;
|
||||
|
|
|
@ -119,13 +119,14 @@ class VersionEditHandler : public VersionEditHandlerBase {
|
|||
VersionSet* version_set, bool track_found_and_missing_files,
|
||||
bool no_error_if_files_missing,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const ReadOptions& read_options,
|
||||
const ReadOptions& read_options, bool allow_incomplete_valid_version,
|
||||
EpochNumberRequirement epoch_number_requirement =
|
||||
EpochNumberRequirement::kMustPresent)
|
||||
: VersionEditHandler(read_only, column_families, version_set,
|
||||
track_found_and_missing_files,
|
||||
no_error_if_files_missing, io_tracer, read_options,
|
||||
/*skip_load_table_files=*/false,
|
||||
allow_incomplete_valid_version,
|
||||
epoch_number_requirement) {}
|
||||
|
||||
~VersionEditHandler() override {}
|
||||
|
@ -159,6 +160,7 @@ class VersionEditHandler : public VersionEditHandlerBase {
|
|||
bool no_error_if_files_missing,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const ReadOptions& read_options, bool skip_load_table_files,
|
||||
bool allow_incomplete_valid_version,
|
||||
EpochNumberRequirement epoch_number_requirement =
|
||||
EpochNumberRequirement::kMustPresent);
|
||||
|
||||
|
@ -213,6 +215,11 @@ class VersionEditHandler : public VersionEditHandlerBase {
|
|||
bool skip_load_table_files_;
|
||||
bool initialized_;
|
||||
std::unique_ptr<std::unordered_map<uint32_t, std::string>> cf_to_cmp_names_;
|
||||
// If false, only a complete Version for which all files consisting it can be
|
||||
// found is considered a valid Version. If true, besides complete Version, an
|
||||
// incomplete Version with only a suffix of L0 files missing is also
|
||||
// considered valid if the Version is never edited in an atomic group.
|
||||
const bool allow_incomplete_valid_version_;
|
||||
EpochNumberRequirement epoch_number_requirement_;
|
||||
std::unordered_set<uint32_t> cfds_to_mark_no_udt_;
|
||||
|
||||
|
@ -234,7 +241,9 @@ class VersionEditHandler : public VersionEditHandlerBase {
|
|||
|
||||
// A class similar to its base class, i.e. VersionEditHandler.
|
||||
// VersionEditHandlerPointInTime restores the versions to the most recent point
|
||||
// in time such that at this point, the version does not have missing files.
|
||||
// in time such that at this point, the version does not have missing files. Or
|
||||
// if `allow_incomplete_valid_version` is true, only a suffix of L0 files (and
|
||||
// their associated blob files) are missing.
|
||||
//
|
||||
// Not thread-safe, external synchronization is necessary if an object of
|
||||
// VersionEditHandlerPointInTime is shared by multiple threads.
|
||||
|
@ -243,7 +252,7 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
|
|||
VersionEditHandlerPointInTime(
|
||||
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
|
||||
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const ReadOptions& read_options,
|
||||
const ReadOptions& read_options, bool allow_incomplete_valid_version,
|
||||
EpochNumberRequirement epoch_number_requirement =
|
||||
EpochNumberRequirement::kMustPresent);
|
||||
~VersionEditHandlerPointInTime() override;
|
||||
|
@ -311,6 +320,7 @@ class ManifestTailer : public VersionEditHandlerPointInTime {
|
|||
EpochNumberRequirement::kMustPresent)
|
||||
: VersionEditHandlerPointInTime(/*read_only=*/false, column_families,
|
||||
version_set, io_tracer, read_options,
|
||||
/*allow_incomplete_valid_version=*/false,
|
||||
epoch_number_requirement),
|
||||
mode_(Mode::kRecovery) {}
|
||||
|
||||
|
@ -359,7 +369,9 @@ class DumpManifestHandler : public VersionEditHandler {
|
|||
/*read_only=*/true, column_families, version_set,
|
||||
/*track_found_and_missing_files=*/false,
|
||||
/*no_error_if_files_missing=*/false, io_tracer, read_options,
|
||||
/*skip_load_table_files=*/true),
|
||||
/*skip_load_table_files=*/true,
|
||||
/*allow_incomplete_valid_version=*/false,
|
||||
/*epoch_number_requirement=*/EpochNumberRequirement::kMustPresent),
|
||||
verbose_(verbose),
|
||||
hex_(hex),
|
||||
json_(json),
|
||||
|
|
|
@ -6080,7 +6080,8 @@ Status VersionSet::Recover(
|
|||
VersionEditHandler handler(
|
||||
read_only, column_families, const_cast<VersionSet*>(this),
|
||||
/*track_found_and_missing_files=*/false, no_error_if_files_missing,
|
||||
io_tracer_, read_options, EpochNumberRequirement::kMightMissing);
|
||||
io_tracer_, read_options, /*allow_incomplete_valid_version=*/false,
|
||||
EpochNumberRequirement::kMightMissing);
|
||||
handler.Iterate(reader, &log_read_status);
|
||||
s = handler.status();
|
||||
if (s.ok()) {
|
||||
|
@ -6256,7 +6257,8 @@ Status VersionSet::TryRecoverFromOneManifest(
|
|||
/*checksum=*/true, /*log_num=*/0);
|
||||
VersionEditHandlerPointInTime handler_pit(
|
||||
read_only, column_families, const_cast<VersionSet*>(this), io_tracer_,
|
||||
read_options, EpochNumberRequirement::kMightMissing);
|
||||
read_options, /*allow_incomplete_valid_version=*/true,
|
||||
EpochNumberRequirement::kMightMissing);
|
||||
|
||||
handler_pit.Iterate(reader, &s);
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#include <algorithm>
|
||||
|
||||
#include "db/blob/blob_log_writer.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/log_writer.h"
|
||||
|
@ -1345,18 +1346,27 @@ class VersionSetTestBase {
|
|||
std::string key; // the only key
|
||||
int level = 0;
|
||||
uint64_t epoch_number;
|
||||
bool file_missing = false;
|
||||
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
|
||||
SstInfo(uint64_t file_num, const std::string& cf_name,
|
||||
const std::string& _key,
|
||||
uint64_t _epoch_number = kUnknownEpochNumber)
|
||||
: SstInfo(file_num, cf_name, _key, 0, _epoch_number) {}
|
||||
uint64_t _epoch_number = kUnknownEpochNumber,
|
||||
bool _file_missing = false,
|
||||
uint64_t _oldest_blob_file_number = kInvalidBlobFileNumber)
|
||||
: SstInfo(file_num, cf_name, _key, 0, _epoch_number, _file_missing,
|
||||
_oldest_blob_file_number) {}
|
||||
SstInfo(uint64_t file_num, const std::string& cf_name,
|
||||
const std::string& _key, int lvl,
|
||||
uint64_t _epoch_number = kUnknownEpochNumber)
|
||||
uint64_t _epoch_number = kUnknownEpochNumber,
|
||||
bool _file_missing = false,
|
||||
uint64_t _oldest_blob_file_number = kInvalidBlobFileNumber)
|
||||
: file_number(file_num),
|
||||
column_family(cf_name),
|
||||
key(_key),
|
||||
level(lvl),
|
||||
epoch_number(_epoch_number) {}
|
||||
epoch_number(_epoch_number),
|
||||
file_missing(_file_missing),
|
||||
oldest_blob_file_number(_oldest_blob_file_number) {}
|
||||
};
|
||||
|
||||
// Create dummy sst, return their metadata. Note that only file name and size
|
||||
|
@ -1395,9 +1405,13 @@ class VersionSetTestBase {
|
|||
ASSERT_NE(0, file_size);
|
||||
file_metas->emplace_back(
|
||||
file_num, /*file_path_id=*/0, file_size, ikey, ikey, 0, 0, false,
|
||||
Temperature::kUnknown, 0, 0, 0, info.epoch_number,
|
||||
kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2,
|
||||
0, 0, /* user_defined_timestamps_persisted */ true);
|
||||
Temperature::kUnknown, info.oldest_blob_file_number, 0, 0,
|
||||
info.epoch_number, kUnknownFileChecksum, kUnknownFileChecksumFuncName,
|
||||
kNullUniqueId64x2, 0, 0,
|
||||
/* user_defined_timestamps_persisted */ true);
|
||||
if (info.file_missing) {
|
||||
ASSERT_OK(fs_->DeleteFile(fname, IOOptions(), nullptr));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3869,8 +3883,9 @@ INSTANTIATE_TEST_CASE_P(
|
|||
class VersionSetTestMissingFiles : public VersionSetTestBase,
|
||||
public testing::Test {
|
||||
public:
|
||||
VersionSetTestMissingFiles()
|
||||
: VersionSetTestBase("version_set_test_missing_files"),
|
||||
explicit VersionSetTestMissingFiles(
|
||||
const std::string& test_name = "version_set_test_missing_files")
|
||||
: VersionSetTestBase(test_name),
|
||||
internal_comparator_(
|
||||
std::make_shared<InternalKeyComparator>(options_.comparator)) {}
|
||||
|
||||
|
@ -3947,7 +3962,8 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
|
|||
// This method updates last_sequence_.
|
||||
void WriteFileAdditionAndDeletionToManifest(
|
||||
uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
|
||||
const std::vector<std::pair<int, uint64_t>>& deleted_files) {
|
||||
const std::vector<std::pair<int, uint64_t>>& deleted_files,
|
||||
const std::vector<BlobFileAddition>& blob_files = {}) {
|
||||
VersionEdit edit;
|
||||
edit.SetColumnFamily(cf);
|
||||
for (const auto& elem : added_files) {
|
||||
|
@ -3958,6 +3974,9 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
|
|||
int level = elem.first;
|
||||
edit.DeleteFile(level, elem.second);
|
||||
}
|
||||
for (const auto& elem : blob_files) {
|
||||
edit.AddBlobFile(elem);
|
||||
}
|
||||
edit.SetLastSequence(last_seqno_);
|
||||
++last_seqno_;
|
||||
assert(log_writer_.get() != nullptr);
|
||||
|
@ -4171,6 +4190,251 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
|
|||
}
|
||||
}
|
||||
|
||||
class BestEffortsRecoverIncompleteVersionTest
|
||||
: public VersionSetTestMissingFiles {
|
||||
public:
|
||||
BestEffortsRecoverIncompleteVersionTest()
|
||||
: VersionSetTestMissingFiles("best_efforts_recover_incomplete_version") {}
|
||||
|
||||
struct BlobInfo {
|
||||
uint64_t file_number;
|
||||
bool file_missing;
|
||||
std::string key;
|
||||
std::string blob;
|
||||
BlobInfo(uint64_t _file_number, bool _file_missing, std::string _key,
|
||||
std::string _blob)
|
||||
: file_number(_file_number),
|
||||
file_missing(_file_missing),
|
||||
key(_key),
|
||||
blob(_blob) {}
|
||||
};
|
||||
|
||||
void CreateDummyBlobFiles(const std::vector<BlobInfo>& infos,
|
||||
std::vector<BlobFileAddition>* blob_metas) {
|
||||
for (const auto& info : infos) {
|
||||
if (!info.file_missing) {
|
||||
WriteDummyBlobFile(info.file_number, info.key, info.blob);
|
||||
}
|
||||
blob_metas->emplace_back(
|
||||
info.file_number, 1 /*total_blob_count*/,
|
||||
info.key.size() + info.blob.size() /*total_blob_bytes*/,
|
||||
"" /*checksum_method*/, "" /*check_sum_value*/);
|
||||
}
|
||||
}
|
||||
// Creates a test blob file that is valid so it can pass the
|
||||
// `VersionEditHandlerPointInTime::VerifyBlobFile` check.
|
||||
void WriteDummyBlobFile(uint64_t blob_file_number, const Slice& key,
|
||||
const Slice& blob) {
|
||||
ImmutableOptions options;
|
||||
std::string blob_file_path = BlobFileName(dbname_, blob_file_number);
|
||||
|
||||
std::unique_ptr<FSWritableFile> file;
|
||||
ASSERT_OK(
|
||||
fs_->NewWritableFile(blob_file_path, FileOptions(), &file, nullptr));
|
||||
|
||||
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
|
||||
std::move(file), blob_file_path, FileOptions(), options.clock));
|
||||
|
||||
BlobLogWriter blob_log_writer(std::move(file_writer), options.clock,
|
||||
/*statistics*/ nullptr, blob_file_number,
|
||||
/*use_fsync*/ true,
|
||||
/*do_flush*/ false);
|
||||
|
||||
constexpr ExpirationRange expiration_range;
|
||||
BlobLogHeader header(/*column_family_id*/ 0, kNoCompression,
|
||||
/*has_ttl*/ false, expiration_range);
|
||||
ASSERT_OK(blob_log_writer.WriteHeader(WriteOptions(), header));
|
||||
std::string compressed_blob;
|
||||
uint64_t key_offset = 0;
|
||||
uint64_t blob_offset = 0;
|
||||
ASSERT_OK(blob_log_writer.AddRecord(WriteOptions(), key, blob, &key_offset,
|
||||
&blob_offset));
|
||||
BlobLogFooter footer;
|
||||
footer.blob_count = 1;
|
||||
footer.expiration_range = expiration_range;
|
||||
std::string checksum_method;
|
||||
std::string checksum_value;
|
||||
ASSERT_OK(blob_log_writer.AppendFooter(WriteOptions(), footer,
|
||||
&checksum_method, &checksum_value));
|
||||
}
|
||||
|
||||
void RecoverFromManifestWithMissingFiles(
|
||||
const std::vector<std::pair<int, FileMetaData>>& added_files,
|
||||
const std::vector<BlobFileAddition>& blob_files) {
|
||||
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
|
||||
WriteFileAdditionAndDeletionToManifest(
|
||||
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>(),
|
||||
blob_files);
|
||||
log_writer_.reset();
|
||||
Status s = SetCurrentFile(WriteOptions(), fs_.get(), dbname_, 1, nullptr);
|
||||
ASSERT_OK(s);
|
||||
std::string manifest_path;
|
||||
VerifyManifest(&manifest_path);
|
||||
std::string db_id;
|
||||
bool has_missing_table_file = false;
|
||||
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
|
||||
/*read_only=*/false, &db_id,
|
||||
&has_missing_table_file);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_TRUE(has_missing_table_file);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(BestEffortsRecoverIncompleteVersionTest, NonL0MissingFiles) {
|
||||
std::vector<SstInfo> sst_files = {
|
||||
SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
|
||||
100 /* epoch_number */, true /* file_missing */),
|
||||
SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
101 /* epoch_number */, false /* file_missing */),
|
||||
SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
102 /* epoch_number */, false /* file_missing */),
|
||||
};
|
||||
std::vector<FileMetaData> file_metas;
|
||||
CreateDummyTableFiles(sst_files, &file_metas);
|
||||
|
||||
std::vector<std::pair<int, FileMetaData>> added_files;
|
||||
for (size_t i = 0; i < sst_files.size(); i++) {
|
||||
const auto& info = sst_files[i];
|
||||
const auto& meta = file_metas[i];
|
||||
added_files.emplace_back(info.level, meta);
|
||||
}
|
||||
RecoverFromManifestWithMissingFiles(added_files,
|
||||
std::vector<BlobFileAddition>());
|
||||
std::vector<uint64_t> all_table_files;
|
||||
std::vector<uint64_t> all_blob_files;
|
||||
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
|
||||
ASSERT_TRUE(all_table_files.empty());
|
||||
}
|
||||
|
||||
TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingNonSuffixL0Files) {
|
||||
std::vector<SstInfo> sst_files = {
|
||||
SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
|
||||
100 /* epoch_number */, false /* file_missing */),
|
||||
SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
101 /* epoch_number */, true /* file_missing */),
|
||||
SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
102 /* epoch_number */, false /* file_missing */),
|
||||
};
|
||||
std::vector<FileMetaData> file_metas;
|
||||
CreateDummyTableFiles(sst_files, &file_metas);
|
||||
|
||||
std::vector<std::pair<int, FileMetaData>> added_files;
|
||||
for (size_t i = 0; i < sst_files.size(); i++) {
|
||||
const auto& info = sst_files[i];
|
||||
const auto& meta = file_metas[i];
|
||||
added_files.emplace_back(info.level, meta);
|
||||
}
|
||||
RecoverFromManifestWithMissingFiles(added_files,
|
||||
std::vector<BlobFileAddition>());
|
||||
std::vector<uint64_t> all_table_files;
|
||||
std::vector<uint64_t> all_blob_files;
|
||||
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
|
||||
ASSERT_TRUE(all_table_files.empty());
|
||||
}
|
||||
|
||||
TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingBlobFiles) {
|
||||
std::vector<SstInfo> sst_files = {
|
||||
SstInfo(100, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
100 /* epoch_number */, false /* file_missing */,
|
||||
102 /*oldest_blob_file_number*/),
|
||||
SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
101 /* epoch_number */, false /* file_missing */,
|
||||
103 /*oldest_blob_file_number*/),
|
||||
};
|
||||
std::vector<FileMetaData> file_metas;
|
||||
CreateDummyTableFiles(sst_files, &file_metas);
|
||||
|
||||
std::vector<BlobInfo> blob_files = {
|
||||
BlobInfo(102, true /*file_missing*/, "a", "blob1"),
|
||||
BlobInfo(103, true /*file_missing*/, "a", "blob2"),
|
||||
};
|
||||
std::vector<BlobFileAddition> blob_meta;
|
||||
CreateDummyBlobFiles(blob_files, &blob_meta);
|
||||
|
||||
std::vector<std::pair<int, FileMetaData>> added_files;
|
||||
for (size_t i = 0; i < sst_files.size(); i++) {
|
||||
const auto& info = sst_files[i];
|
||||
const auto& meta = file_metas[i];
|
||||
added_files.emplace_back(info.level, meta);
|
||||
}
|
||||
RecoverFromManifestWithMissingFiles(added_files, blob_meta);
|
||||
std::vector<uint64_t> all_table_files;
|
||||
std::vector<uint64_t> all_blob_files;
|
||||
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
|
||||
ASSERT_TRUE(all_table_files.empty());
|
||||
}
|
||||
|
||||
TEST_F(BestEffortsRecoverIncompleteVersionTest, MissingL0SuffixOnly) {
|
||||
std::vector<SstInfo> sst_files = {
|
||||
SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
|
||||
100 /* epoch_number */, false /* file_missing */),
|
||||
SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
101 /* epoch_number */, false /* file_missing */),
|
||||
SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
102 /* epoch_number */, true /* file_missing */),
|
||||
};
|
||||
std::vector<FileMetaData> file_metas;
|
||||
CreateDummyTableFiles(sst_files, &file_metas);
|
||||
|
||||
std::vector<std::pair<int, FileMetaData>> added_files;
|
||||
for (size_t i = 0; i < sst_files.size(); i++) {
|
||||
const auto& info = sst_files[i];
|
||||
const auto& meta = file_metas[i];
|
||||
added_files.emplace_back(info.level, meta);
|
||||
}
|
||||
RecoverFromManifestWithMissingFiles(added_files,
|
||||
std::vector<BlobFileAddition>());
|
||||
std::vector<uint64_t> all_table_files;
|
||||
std::vector<uint64_t> all_blob_files;
|
||||
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
|
||||
ASSERT_EQ(2, all_table_files.size());
|
||||
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
VersionStorageInfo* vstorage = cfd->current()->storage_info();
|
||||
ASSERT_EQ(1, vstorage->LevelFiles(0).size());
|
||||
ASSERT_EQ(1, vstorage->LevelFiles(1).size());
|
||||
}
|
||||
|
||||
TEST_F(BestEffortsRecoverIncompleteVersionTest,
|
||||
MissingL0SuffixAndTheirBlobFiles) {
|
||||
std::vector<SstInfo> sst_files = {
|
||||
SstInfo(100, kDefaultColumnFamilyName, "a", 1 /* level */,
|
||||
100 /* epoch_number */, false /* file_missing */),
|
||||
SstInfo(101, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
101 /* epoch_number */, false /* file_missing */,
|
||||
103 /*oldest_blob_file_number*/),
|
||||
SstInfo(102, kDefaultColumnFamilyName, "a", 0 /* level */,
|
||||
102 /* epoch_number */, true /* file_missing */,
|
||||
104 /*oldest_blob_file_number*/),
|
||||
};
|
||||
std::vector<FileMetaData> file_metas;
|
||||
CreateDummyTableFiles(sst_files, &file_metas);
|
||||
|
||||
std::vector<BlobInfo> blob_files = {
|
||||
BlobInfo(103, false /*file_missing*/, "a", "blob1"),
|
||||
BlobInfo(104, true /*file_missing*/, "a", "blob2"),
|
||||
};
|
||||
std::vector<BlobFileAddition> blob_meta;
|
||||
CreateDummyBlobFiles(blob_files, &blob_meta);
|
||||
|
||||
std::vector<std::pair<int, FileMetaData>> added_files;
|
||||
for (size_t i = 0; i < sst_files.size(); i++) {
|
||||
const auto& info = sst_files[i];
|
||||
const auto& meta = file_metas[i];
|
||||
added_files.emplace_back(info.level, meta);
|
||||
}
|
||||
RecoverFromManifestWithMissingFiles(added_files, blob_meta);
|
||||
std::vector<uint64_t> all_table_files;
|
||||
std::vector<uint64_t> all_blob_files;
|
||||
versions_->AddLiveFiles(&all_table_files, &all_blob_files);
|
||||
ASSERT_EQ(2, all_table_files.size());
|
||||
ASSERT_EQ(1, all_blob_files.size());
|
||||
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
VersionStorageInfo* vstorage = cfd->current()->storage_info();
|
||||
ASSERT_EQ(1, vstorage->LevelFiles(0).size());
|
||||
ASSERT_EQ(1, vstorage->LevelFiles(1).size());
|
||||
ASSERT_EQ(1, vstorage->GetBlobFiles().size());
|
||||
}
|
||||
|
||||
class ChargeFileMetadataTest : public DBTestBase {
|
||||
public:
|
||||
ChargeFileMetadataTest()
|
||||
|
|
|
@ -1434,7 +1434,17 @@ struct DBOptions {
|
|||
// For example, if an SST or blob file referenced by the MANIFEST is missing,
|
||||
// BER might be able to find a set of files corresponding to an old "point in
|
||||
// time" version of the column family, possibly from an older MANIFEST
|
||||
// file. Some other kinds of DB files (e.g. CURRENT, LOCK, IDENTITY) are
|
||||
// file.
|
||||
// Besides complete "point in time" version, an incomplete version with
|
||||
// only a suffix of L0 files missing can also be recovered to if the
|
||||
// versioning history doesn't include an atomic flush. From the users'
|
||||
// perspective, missing a suffix of L0 files means missing the
|
||||
// user's most recently written data. So the remaining available files still
|
||||
// presents a valid point in time view, although for some previous time. It's
|
||||
// not done for atomic flush because that guarantees a consistent view across
|
||||
// column families. We cannot guarantee that if recovering an incomplete
|
||||
// version.
|
||||
// Some other kinds of DB files (e.g. CURRENT, LOCK, IDENTITY) are
|
||||
// either ignored or replaced with BER, or quietly fixed regardless of BER
|
||||
// setting. BER does require at least one valid MANIFEST to recover to a
|
||||
// non-trivial DB state, unlike `ldb repair`.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
*Best efforts recovery supports recovering to incomplete Version with a clean seqno cut that presents a valid point in time view from the user's perspective, if versioning history doesn't include atomic flush.
|
Loading…
Reference in New Issue