Re-persist blob file metadata when a new manifest file is created (#6630)

Summary:
Does what it says on the can. Similarly to table files, we need to re-persist
the metadata of live blob files whenever a new manifest file is opened.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6630

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D20802126

Pulled By: ltamasi

fbshipit-source-id: 5738692d898790293bf09d66e9997369bbf89566
This commit is contained in:
Levi Tamasi 2020-04-02 11:51:17 -07:00 committed by Facebook GitHub Bot
parent 2b02ea25e2
commit 2165c3bacc
2 changed files with 99 additions and 0 deletions

View file

@ -5211,6 +5211,8 @@ Status VersionSet::WriteCurrentStateToManifest(
}
for (auto cfd : *column_family_set_) {
assert(cfd);
if (cfd->IsDropped()) {
continue;
}
@ -5243,6 +5245,9 @@ Status VersionSet::WriteCurrentStateToManifest(
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
assert(cfd->current());
assert(cfd->current()->storage_info());
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& f :
cfd->current()->storage_info()->LevelFiles(level)) {
@ -5254,6 +5259,24 @@ Status VersionSet::WriteCurrentStateToManifest(
f->file_checksum, f->file_checksum_func_name);
}
}
const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
for (const auto& pair : blob_files) {
const uint64_t blob_file_number = pair.first;
const auto& meta = pair.second;
assert(meta);
assert(blob_file_number == meta->GetBlobFileNumber());
edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
meta->GetChecksumValue());
if (meta->GetGarbageBlobCount() > 0) {
edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
meta->GetGarbageBlobBytes());
}
}
const auto iter = curr_state.find(cfd->GetID());
assert(iter != curr_state.end());
uint64_t log_number = iter->second.log_number;

View file

@ -827,6 +827,82 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
EXPECT_EQ(kGroupSize - 1, count);
}
TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
// Initialize the database and add a couple of blob files, one with some
// garbage in it, and one without any garbage.
NewDB();
VersionEdit edit;
{
constexpr uint64_t blob_file_number = 123;
constexpr uint64_t total_blob_count = 456;
constexpr uint64_t total_blob_bytes = 77777777;
constexpr char checksum_method[] = "SHA1";
constexpr char checksum_value[] =
"bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
constexpr uint64_t garbage_blob_count = 89;
constexpr uint64_t garbage_blob_bytes = 1000000;
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
edit.AddBlobFileGarbage(blob_file_number, garbage_blob_count,
garbage_blob_bytes);
}
{
constexpr uint64_t blob_file_number = 234;
constexpr uint64_t total_blob_count = 555;
constexpr uint64_t total_blob_bytes = 66666;
constexpr char checksum_method[] = "CRC32";
constexpr char checksum_value[] = "3d87ff57";
edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
checksum_method, checksum_value);
}
assert(versions_);
assert(versions_->GetColumnFamilySet());
mutex_.Lock();
Status s =
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &edit, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
// Force the creation of a new manifest file and make sure metadata for
// the blob files is re-persisted.
size_t addition_encoded = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileAddition::EncodeTo::CustomFields",
[&](void* /* arg */) { ++addition_encoded; });
size_t garbage_encoded = 0;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileGarbage::EncodeTo::CustomFields",
[&](void* /* arg */) { ++garbage_encoded; });
SyncPoint::GetInstance()->EnableProcessing();
VersionEdit dummy;
mutex_.Lock();
constexpr FSDirectory* db_directory = nullptr;
constexpr bool new_descriptor_log = true;
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options_, &dummy, &mutex_, db_directory,
new_descriptor_log);
mutex_.Unlock();
ASSERT_OK(s);
ASSERT_EQ(addition_encoded, 2);
ASSERT_EQ(garbage_encoded, 1);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class VersionSetAtomicGroupTest : public VersionSetTestBase,
public testing::Test {
public: