mirror of https://github.com/facebook/rocksdb.git
Fix a corruption bug in `CreateColumnFamilyWithImport()` (#12602)
Summary: when importing files from multiple CFs into a new CF, we were reusing the epoch numbers assigned by the original CFs. This means L0 files in the new CF can have the same epoch number (assigned originally by different CFs). While CreateColumnFamilyWithImport() requires each original CF to have disjoint key range, after an intra-l0 compaction, we still can end up with L0 files with the same epoch number but overlapping key range. This PR attempt to fix this by reassigning epoch numbers when importing multiple CFs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12602 Test Plan: a new repro unit test. Before this PR, it fails with ``` [ RUN ] ImportColumnFamilyTest.AssignEpochNumberToMultipleCF db/import_column_family_test.cc:1048: Failure db_->WaitForCompact(o) Corruption: force_consistency_checks(DEBUG): VersionBuilder: L0 files of same epoch number but overlapping range https://github.com/facebook/rocksdb/issues/44 , smallest key: '6B6579303030303030' seq:511, type:1 , largest key: '6B6579303031303239' seq:510, type:1 , epoch number: 3 vs. file https://github.com/facebook/rocksdb/issues/36 , smallest key: '6B6579303030313030' seq:401, type:1 , largest key: '6B6579303030313939' seq:500, type:1 , epoch number: 3 ``` Reviewed By: hx235 Differential Revision: D56851808 Pulled By: cbi42 fbshipit-source-id: 01b8c790c9f1f2a168047ead670e73633f705b84
This commit is contained in:
parent
3fdc7243f3
commit
6fdc4c5282
|
@ -175,22 +175,29 @@ Status ImportColumnFamilyJob::Run() {
|
|||
static_cast<uint64_t>(temp_current_time);
|
||||
}
|
||||
|
||||
// Recover files' epoch number using dummy VersionStorageInfo
|
||||
VersionBuilder dummy_version_builder(
|
||||
cfd_->current()->version_set()->file_options(), cfd_->ioptions(),
|
||||
cfd_->table_cache(), cfd_->current()->storage_info(),
|
||||
cfd_->current()->version_set(),
|
||||
cfd_->GetFileMetadataCacheReservationManager());
|
||||
VersionStorageInfo dummy_vstorage(
|
||||
&cfd_->internal_comparator(), cfd_->user_comparator(),
|
||||
cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
|
||||
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
|
||||
EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock,
|
||||
cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay,
|
||||
cfd_->current()->version_set()->offpeak_time_option());
|
||||
Status s;
|
||||
|
||||
// When importing multiple CFs, we should not reuse epoch number from ingested
|
||||
// files. Since these epoch numbers were assigned by different CFs, there may
|
||||
// be different files from different CFs with the same epoch number. With a
|
||||
// subsequent intra-L0 compaction we may end up with files with overlapping
|
||||
// key range but the same epoch number. Here we will create a dummy
|
||||
// VersionStorageInfo per CF being imported. Each CF's files will be assigned
|
||||
// increasing epoch numbers to avoid duplicated epoch number. This is done by
|
||||
// only resetting epoch number of the new CF in the first call to
|
||||
// RecoverEpochNumbers() below.
|
||||
for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
|
||||
VersionBuilder dummy_version_builder(
|
||||
cfd_->current()->version_set()->file_options(), cfd_->ioptions(),
|
||||
cfd_->table_cache(), cfd_->current()->storage_info(),
|
||||
cfd_->current()->version_set(),
|
||||
cfd_->GetFileMetadataCacheReservationManager());
|
||||
VersionStorageInfo dummy_vstorage(
|
||||
&cfd_->internal_comparator(), cfd_->user_comparator(),
|
||||
cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
|
||||
nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
|
||||
EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock,
|
||||
cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay,
|
||||
cfd_->current()->version_set()->offpeak_time_option());
|
||||
for (size_t j = 0; s.ok() && j < files_to_import_[i].size(); ++j) {
|
||||
const auto& f = files_to_import_[i][j];
|
||||
const auto& file_metadata = *metadatas_[i][j];
|
||||
|
@ -218,42 +225,39 @@ Status ImportColumnFamilyJob::Run() {
|
|||
f.table_properties.user_defined_timestamps_persisted));
|
||||
s = dummy_version_builder.Apply(&dummy_version_edit);
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
s = dummy_version_builder.SaveTo(&dummy_vstorage);
|
||||
}
|
||||
if (s.ok()) {
|
||||
dummy_vstorage.RecoverEpochNumbers(cfd_);
|
||||
}
|
||||
|
||||
// Record changes from this CF import in VersionEdit, including files with
|
||||
// recovered epoch numbers
|
||||
if (s.ok()) {
|
||||
edit_.SetColumnFamily(cfd_->GetID());
|
||||
if (s.ok()) {
|
||||
s = dummy_version_builder.SaveTo(&dummy_vstorage);
|
||||
}
|
||||
if (s.ok()) {
|
||||
// force resetting epoch number for each file
|
||||
dummy_vstorage.RecoverEpochNumbers(cfd_, /*restart_epoch=*/i == 0,
|
||||
/*force=*/true);
|
||||
edit_.SetColumnFamily(cfd_->GetID());
|
||||
|
||||
for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
|
||||
for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
|
||||
edit_.AddFile(level, *file_meta);
|
||||
// If incoming sequence number is higher, update local sequence
|
||||
// number.
|
||||
if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
|
||||
versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
|
||||
versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
|
||||
versions_->SetLastSequence(file_meta->fd.largest_seqno);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Release resources occupied by the dummy VersionStorageInfo
|
||||
for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
|
||||
for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
|
||||
edit_.AddFile(level, *file_meta);
|
||||
// If incoming sequence number is higher, update local sequence number.
|
||||
if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
|
||||
versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
|
||||
versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
|
||||
versions_->SetLastSequence(file_meta->fd.largest_seqno);
|
||||
file_meta->refs--;
|
||||
if (file_meta->refs <= 0) {
|
||||
delete file_meta;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Release resources occupied by the dummy VersionStorageInfo
|
||||
for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
|
||||
for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
|
||||
file_meta->refs--;
|
||||
if (file_meta->refs <= 0) {
|
||||
delete file_meta;
|
||||
}
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
|
@ -946,6 +946,108 @@ TEST_F(ImportColumnFamilyTest, ImportMultiColumnFamilySeveralFilesWithOverlap) {
|
|||
ASSERT_OK(db_->DestroyColumnFamilyHandle(second_cfh));
|
||||
}
|
||||
|
||||
TEST_F(ImportColumnFamilyTest, AssignEpochNumberToMultipleCF) {
|
||||
// Test ingesting CFs where L0 files could have the same epoch number.
|
||||
Options options = CurrentOptions();
|
||||
options.level_compaction_dynamic_level_bytes = true;
|
||||
options.max_background_jobs = 8;
|
||||
env_->SetBackgroundThreads(2, Env::LOW);
|
||||
env_->SetBackgroundThreads(0, Env::BOTTOM);
|
||||
CreateAndReopenWithCF({"CF1", "CF2"}, options);
|
||||
|
||||
// CF1:
|
||||
// L6: [0, 99], [100, 199]
|
||||
// CF2:
|
||||
// L6: [1000, 1099], [1100, 1199]
|
||||
for (int i = 100; i < 200; ++i) {
|
||||
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||
ASSERT_OK(Put(2, Key(1000 + i), Key(1000 + i) + "_val"));
|
||||
}
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_OK(Flush(2));
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||
ASSERT_OK(Put(2, Key(1000 + i), Key(1000 + i) + "_val"));
|
||||
}
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_OK(Flush(2));
|
||||
MoveFilesToLevel(6, 1);
|
||||
MoveFilesToLevel(6, 2);
|
||||
|
||||
// CF1:
|
||||
// level 0 epoch: 5 file num 30 smallest key000010 - key000019
|
||||
// level 0 epoch: 4 file num 27 smallest key000000 - key000009
|
||||
// level 0 epoch: 3 file num 23 smallest key000100 - key000199
|
||||
// level 6 epoch: 2 file num 20 smallest key000000 - key000099
|
||||
// level 6 epoch: 1 file num 17 smallest key000100 - key000199
|
||||
// CF2:
|
||||
// level 0 epoch: 5 file num 31 smallest key001010 - key001019
|
||||
// level 0 epoch: 4 file num 28 smallest key001000 - key001009
|
||||
// level 0 epoch: 3 file num 25 smallest key001020 - key001029
|
||||
// level 6 epoch: 2 file num 21 smallest key001000 - key001099
|
||||
// level 6 epoch: 1 file num 18 smallest key001100 - key001199
|
||||
for (int i = 100; i < 200; ++i) {
|
||||
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||
}
|
||||
ASSERT_OK(Flush(1));
|
||||
for (int i = 20; i < 30; ++i) {
|
||||
ASSERT_OK(Put(2, Key(i + 1000), Key(i + 1000) + "_val"));
|
||||
}
|
||||
ASSERT_OK(Flush(2));
|
||||
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
|
||||
ASSERT_OK(Put(2, Key(i + 1000), Key(i + 1000) + "_val"));
|
||||
if (i % 10 == 9) {
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_OK(Flush(2));
|
||||
}
|
||||
}
|
||||
ASSERT_OK(Flush(1));
|
||||
ASSERT_OK(Flush(2));
|
||||
|
||||
// Create a CF by importing these two CF1 and CF2.
|
||||
// Then two compactions will be triggerred, one to compact from L0
|
||||
// to L6 (files #23 and #17), and another to do intra-L0 compaction
|
||||
// for the rest of the L0 files. Before a bug fix, we used to
|
||||
// directly use the epoch numbers from the ingested files in the new CF.
|
||||
// This means different files from different CFs can have the same epoch
|
||||
// number. If the intra-L0 compaction finishes first, it can cause a
|
||||
// corruption where two L0 files can have the same epoch number but
|
||||
// with overlapping key range.
|
||||
Checkpoint* checkpoint1;
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint1));
|
||||
ASSERT_OK(checkpoint1->ExportColumnFamily(handles_[1], export_files_dir_,
|
||||
&metadata_ptr_));
|
||||
ASSERT_OK(checkpoint1->ExportColumnFamily(handles_[2], export_files_dir2_,
|
||||
&metadata_ptr2_));
|
||||
ASSERT_NE(metadata_ptr_, nullptr);
|
||||
ASSERT_NE(metadata_ptr2_, nullptr);
|
||||
|
||||
std::atomic_int compaction_counter = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
|
||||
[&compaction_counter](void*) {
|
||||
compaction_counter++;
|
||||
if (compaction_counter == 1) {
|
||||
// Wait for the next compaction to finish
|
||||
TEST_SYNC_POINT("WaitForSecondCompaction");
|
||||
}
|
||||
});
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::BackgroundCompaction:AfterCompaction",
|
||||
"WaitForSecondCompaction"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
ImportColumnFamilyOptions import_options;
|
||||
import_options.move_files = false;
|
||||
std::vector<const ExportImportFilesMetaData*> metadatas = {metadata_ptr_,
|
||||
metadata_ptr2_};
|
||||
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "CF3", import_options,
|
||||
metadatas, &import_cfh_));
|
||||
WaitForCompactOptions o;
|
||||
ASSERT_OK(db_->WaitForCompact(o));
|
||||
delete checkpoint1;
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -4617,25 +4617,27 @@ uint64_t VersionStorageInfo::GetMaxEpochNumberOfFiles() const {
|
|||
return max_epoch_number;
|
||||
}
|
||||
|
||||
void VersionStorageInfo::RecoverEpochNumbers(ColumnFamilyData* cfd) {
|
||||
cfd->ResetNextEpochNumber();
|
||||
void VersionStorageInfo::RecoverEpochNumbers(ColumnFamilyData* cfd,
|
||||
bool restart_epoch, bool force) {
|
||||
if (restart_epoch) {
|
||||
cfd->ResetNextEpochNumber();
|
||||
|
||||
bool reserve_epoch_num_for_file_ingested_behind =
|
||||
cfd->ioptions()->allow_ingest_behind;
|
||||
if (reserve_epoch_num_for_file_ingested_behind) {
|
||||
uint64_t reserved_epoch_number = cfd->NewEpochNumber();
|
||||
assert(reserved_epoch_number == kReservedEpochNumberForFileIngestedBehind);
|
||||
ROCKS_LOG_INFO(cfd->ioptions()->info_log.get(),
|
||||
"[%s]CF has reserved epoch number %" PRIu64
|
||||
" for files ingested "
|
||||
"behind since `Options::allow_ingest_behind` is true",
|
||||
cfd->GetName().c_str(), reserved_epoch_number);
|
||||
bool reserve_epoch_num_for_file_ingested_behind =
|
||||
cfd->ioptions()->allow_ingest_behind;
|
||||
if (reserve_epoch_num_for_file_ingested_behind) {
|
||||
uint64_t reserved_epoch_number = cfd->NewEpochNumber();
|
||||
assert(reserved_epoch_number ==
|
||||
kReservedEpochNumberForFileIngestedBehind);
|
||||
ROCKS_LOG_INFO(cfd->ioptions()->info_log.get(),
|
||||
"[%s]CF has reserved epoch number %" PRIu64
|
||||
" for files ingested "
|
||||
"behind since `Options::allow_ingest_behind` is true",
|
||||
cfd->GetName().c_str(), reserved_epoch_number);
|
||||
}
|
||||
}
|
||||
|
||||
if (HasMissingEpochNumber()) {
|
||||
assert(epoch_number_requirement_ == EpochNumberRequirement::kMightMissing);
|
||||
assert(num_levels_ >= 1);
|
||||
|
||||
bool missing_epoch_number = HasMissingEpochNumber();
|
||||
if (missing_epoch_number || force) {
|
||||
for (int level = num_levels_ - 1; level >= 1; --level) {
|
||||
auto& files_at_level = files_[level];
|
||||
if (files_at_level.empty()) {
|
||||
|
@ -4646,17 +4648,19 @@ void VersionStorageInfo::RecoverEpochNumbers(ColumnFamilyData* cfd) {
|
|||
f->epoch_number = next_epoch_number;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto file_meta_iter = files_[0].rbegin();
|
||||
file_meta_iter != files_[0].rend(); file_meta_iter++) {
|
||||
FileMetaData* f = *file_meta_iter;
|
||||
f->epoch_number = cfd->NewEpochNumber();
|
||||
}
|
||||
|
||||
ROCKS_LOG_WARN(cfd->ioptions()->info_log.get(),
|
||||
"[%s]CF's epoch numbers are inferred based on seqno",
|
||||
cfd->GetName().c_str());
|
||||
epoch_number_requirement_ = EpochNumberRequirement::kMustPresent;
|
||||
if (missing_epoch_number) {
|
||||
assert(epoch_number_requirement_ ==
|
||||
EpochNumberRequirement::kMightMissing);
|
||||
ROCKS_LOG_WARN(cfd->ioptions()->info_log.get(),
|
||||
"[%s]CF's epoch numbers are inferred based on seqno",
|
||||
cfd->GetName().c_str());
|
||||
epoch_number_requirement_ = EpochNumberRequirement::kMustPresent;
|
||||
}
|
||||
} else {
|
||||
assert(epoch_number_requirement_ == EpochNumberRequirement::kMustPresent);
|
||||
cfd->SetNextEpochNumber(
|
||||
|
|
|
@ -341,7 +341,15 @@ class VersionStorageInfo {
|
|||
EpochNumberRequirement epoch_number_requirement) {
|
||||
epoch_number_requirement_ = epoch_number_requirement;
|
||||
}
|
||||
void RecoverEpochNumbers(ColumnFamilyData* cfd);
|
||||
// Ensure all files have epoch number set.
|
||||
// If there is a file missing epoch number, all files' epoch number will be
|
||||
// reset according to CF's epoch number. Otherwise, the CF will be updated
|
||||
// with the max epoch number of the files.
|
||||
//
|
||||
// @param restart_epoch This CF's epoch number will be reset to start from 0.
|
||||
// @param force Force resetting all files' epoch number.
|
||||
void RecoverEpochNumbers(ColumnFamilyData* cfd, bool restart_epoch = true,
|
||||
bool force = false);
|
||||
|
||||
class FileLocation {
|
||||
public:
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
* Fix a bug in CreateColumnFamilyWithImport() where if multiple CFs are imported, we were not resetting files' epoch number and L0 files can have overlapping key range but the same epoch number.
|
Loading…
Reference in New Issue