From 227b5d52df103ef8722e537bd3ecd3445082b288 Mon Sep 17 00:00:00 2001 From: haoyuhuang Date: Tue, 4 Jun 2019 10:51:22 -0700 Subject: [PATCH] Make RocksDB secondary instance respect atomic groups in version edits. (#5411) Summary: With this commit, RocksDB secondary instance respects atomic groups in version edits. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5411 Differential Revision: D15617512 Pulled By: HaoyuHuang fbshipit-source-id: 913f4ede391d772dcaf5649e3cd2099fa292d120 --- db/db_impl/db_secondary_test.cc | 2 +- db/version_edit.h | 1 + db/version_set.cc | 403 ++++++++++++++--------- db/version_set.h | 47 ++- db/version_set_test.cc | 566 +++++++++++++++++++++----------- 5 files changed, 659 insertions(+), 360 deletions(-) diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index e8eafd673e..5b375422f0 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -373,7 +373,7 @@ TEST_F(DBSecondaryTest, MissingTableFile) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->SetCallBack( - "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", + "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers", [&](void* arg) { Status s = *reinterpret_cast(arg); if (s.IsPathNotFound()) { diff --git a/db/version_edit.h b/db/version_edit.h index 471b4e095a..e1857b37fc 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -316,6 +316,7 @@ class VersionEdit { friend class ReactiveVersionSet; friend class VersionSet; friend class Version; + friend class AtomicGroupReadBuffer; bool GetLevel(Slice* input, int* level, const char** msg); diff --git a/db/version_set.cc b/db/version_set.cc index 26465a01a4..a60a4e87ca 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3313,6 +3313,51 @@ struct VersionSet::ManifestWriter { edit_list(e) {} }; +Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) { + assert(edit); + if (edit->is_in_atomic_group_) { + TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup"); + if (replay_buffer_.empty()) { + replay_buffer_.resize(edit->remaining_entries_ + 1); + TEST_SYNC_POINT_CALLBACK( + "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit); + } + read_edits_in_atomic_group_++; + if (read_edits_in_atomic_group_ + edit->remaining_entries_ != + static_cast(replay_buffer_.size())) { + TEST_SYNC_POINT_CALLBACK( + "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit); + return Status::Corruption("corrupted atomic group"); + } + replay_buffer_[read_edits_in_atomic_group_ - 1] = std::move(*edit); + if (read_edits_in_atomic_group_ == replay_buffer_.size()) { + TEST_SYNC_POINT_CALLBACK( + "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit); + return Status::OK(); + } + return Status::OK(); + } + + // A normal edit. + if (!replay_buffer().empty()) { + TEST_SYNC_POINT_CALLBACK( + "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit); + return Status::Corruption("corrupted atomic group"); + } + return Status::OK(); +} + +bool AtomicGroupReadBuffer::IsFull() const { + return read_edits_in_atomic_group_ == replay_buffer_.size(); +} + +bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); } + +void AtomicGroupReadBuffer::Clear() { + read_edits_in_atomic_group_ = 0; + replay_buffer_.clear(); +} + VersionSet::VersionSet(const std::string& dbname, const ImmutableDBOptions* _db_options, const EnvOptions& storage_options, Cache* table_cache, @@ -4071,6 +4116,74 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, Env* env, return Status::OK(); } +Status VersionSet::ReadAndRecover( + log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + const std::unordered_map& name_to_options, + std::unordered_map& column_families_not_found, + std::unordered_map>& + builders, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family) { + assert(reader != nullptr); + assert(read_buffer != nullptr); + Status s; + Slice record; + std::string scratch; + size_t recovered_edits = 0; + while (reader->ReadRecord(&record, &scratch) && s.ok()) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + s = read_buffer->AddEdit(&edit); + if (!s.ok()) { + break; + } + if (edit.is_in_atomic_group_) { + if (read_buffer->IsFull()) { + // Apply edits in an atomic group when we have read all edits in the + // group. + for (auto& e : read_buffer->replay_buffer()) { + s = ApplyOneVersionEditToBuilder( + e, name_to_options, column_families_not_found, builders, + have_log_number, log_number, have_prev_log_number, + previous_log_number, have_next_file, next_file, + have_last_sequence, last_sequence, min_log_number_to_keep, + max_column_family); + if (!s.ok()) { + break; + } + recovered_edits++; + } + if (!s.ok()) { + break; + } + read_buffer->Clear(); + } + } else { + // Apply a normal edit immediately. + s = ApplyOneVersionEditToBuilder( + edit, name_to_options, column_families_not_found, builders, + have_log_number, log_number, have_prev_log_number, + previous_log_number, have_next_file, next_file, have_last_sequence, + last_sequence, min_log_number_to_keep, max_column_family); + if (s.ok()) { + recovered_edits++; + } + } + } + if (!s.ok()) { + // Clear the buffer if we fail to decode/apply an edit. + read_buffer->Clear(); + } + TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits", + &recovered_edits); + return s; +} + Status VersionSet::Recover( const std::vector& column_families, bool read_only) { @@ -4148,66 +4261,12 @@ Status VersionSet::Recover( true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; - std::vector replay_buffer; - size_t num_entries_decoded = 0; - while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - if (edit.is_in_atomic_group_) { - if (replay_buffer.empty()) { - replay_buffer.resize(edit.remaining_entries_ + 1); - TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup", - &edit); - } - ++num_entries_decoded; - if (num_entries_decoded + edit.remaining_entries_ != - static_cast(replay_buffer.size())) { - TEST_SYNC_POINT_CALLBACK( - "VersionSet::Recover:IncorrectAtomicGroupSize", &edit); - s = Status::Corruption("corrupted atomic group"); - break; - } - replay_buffer[num_entries_decoded - 1] = std::move(edit); - if (num_entries_decoded == replay_buffer.size()) { - TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup", - &edit); - for (auto& e : replay_buffer) { - s = ApplyOneVersionEditToBuilder( - e, cf_name_to_options, column_families_not_found, builders, - &have_log_number, &log_number, &have_prev_log_number, - &previous_log_number, &have_next_file, &next_file, - &have_last_sequence, &last_sequence, &min_log_number_to_keep, - &max_column_family); - if (!s.ok()) { - break; - } - } - replay_buffer.clear(); - num_entries_decoded = 0; - } - TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup"); - } else { - if (!replay_buffer.empty()) { - TEST_SYNC_POINT_CALLBACK( - "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit); - s = Status::Corruption("corrupted atomic group"); - break; - } - s = ApplyOneVersionEditToBuilder( - edit, cf_name_to_options, column_families_not_found, builders, - &have_log_number, &log_number, &have_prev_log_number, - &previous_log_number, &have_next_file, &next_file, - &have_last_sequence, &last_sequence, &min_log_number_to_keep, - &max_column_family); - } - if (!s.ok()) { - break; - } - } + AtomicGroupReadBuffer read_buffer; + s = ReadAndRecover( + &reader, &read_buffer, cf_name_to_options, column_families_not_found, + builders, &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, &have_last_sequence, + &last_sequence, &min_log_number_to_keep, &max_column_family); } if (s.ok()) { @@ -5218,19 +5277,11 @@ Status ReactiveVersionSet::Recover( assert(reader != nullptr); Slice record; std::string scratch; - while (s.ok() && reader->ReadRecord(&record, &scratch)) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - s = ApplyOneVersionEditToBuilder( - edit, cf_name_to_options, column_families_not_found, builders, - &have_log_number, &log_number, &have_prev_log_number, - &previous_log_number, &have_next_file, &next_file, - &have_last_sequence, &last_sequence, &min_log_number_to_keep, - &max_column_family); - } + s = ReadAndRecover( + reader, &read_buffer_, cf_name_to_options, column_families_not_found, + builders, &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, &have_last_sequence, + &last_sequence, &min_log_number_to_keep, &max_column_family); if (s.ok()) { bool enough = have_next_file && have_log_number && have_last_sequence; if (enough) { @@ -5350,7 +5401,7 @@ Status ReactiveVersionSet::ReadAndApply( uint64_t previous_log_number = 0; uint32_t max_column_family = 0; uint64_t min_log_number_to_keep = 0; - + uint64_t applied_edits = 0; while (s.ok()) { Slice record; std::string scratch; @@ -5362,73 +5413,46 @@ Status ReactiveVersionSet::ReadAndApply( if (!s.ok()) { break; } - ColumnFamilyData* cfd = - column_family_set_->GetColumnFamily(edit.column_family_); - // If we cannot find this column family in our column family set, then it - // may be a new column family created by the primary after the secondary - // starts. Ignore it for now. - if (nullptr == cfd) { - continue; - } - if (active_version_builders_.find(edit.column_family_) == - active_version_builders_.end()) { - std::unique_ptr builder_guard( - new BaseReferencedVersionBuilder(cfd)); - active_version_builders_.insert( - std::make_pair(edit.column_family_, std::move(builder_guard))); - } - s = ApplyOneVersionEditToBuilder( - edit, &have_log_number, &log_number, &have_prev_log_number, - &previous_log_number, &have_next_file, &next_file, - &have_last_sequence, &last_sequence, &min_log_number_to_keep, - &max_column_family); + + s = read_buffer_.AddEdit(&edit); if (!s.ok()) { break; } - auto builder_iter = active_version_builders_.find(edit.column_family_); - assert(builder_iter != active_version_builders_.end()); - auto builder = builder_iter->second->version_builder(); - assert(builder != nullptr); - s = builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - false /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); - TEST_SYNC_POINT_CALLBACK( - "ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s); - if (!s.ok() && !s.IsPathNotFound()) { - break; - } else if (s.IsPathNotFound()) { - s = Status::OK(); - } else { // s.ok() == true - auto version = new Version(cfd, this, env_options_, - *cfd->GetLatestMutableCFOptions(), - current_version_number_++); - builder->SaveTo(version->storage_info()); - version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); - AppendVersion(cfd, version); - active_version_builders_.erase(builder_iter); - if (cfds_changed->count(cfd) == 0) { - cfds_changed->insert(cfd); + if (edit.is_in_atomic_group_) { + if (read_buffer_.IsFull()) { + // Apply edits in an atomic group when we have read all edits in the + // group. + for (auto& e : read_buffer_.replay_buffer()) { + s = ApplyOneVersionEditToBuilder( + e, cfds_changed, &have_log_number, &log_number, + &have_prev_log_number, &previous_log_number, &have_next_file, + &next_file, &have_last_sequence, &last_sequence, + &min_log_number_to_keep, &max_column_family); + if (!s.ok()) { + break; + } + applied_edits++; + } + if (!s.ok()) { + break; + } + read_buffer_.Clear(); + } + } else { + // Apply a normal edit immediately. + s = ApplyOneVersionEditToBuilder( + edit, cfds_changed, &have_log_number, &log_number, + &have_prev_log_number, &previous_log_number, &have_next_file, + &next_file, &have_last_sequence, &last_sequence, + &min_log_number_to_keep, &max_column_family); + if (s.ok()) { + applied_edits++; } } - if (have_next_file) { - next_file_number_.store(next_file + 1); - } - if (have_last_sequence) { - last_allocated_sequence_ = last_sequence; - last_published_sequence_ = last_sequence; - last_sequence_ = last_sequence; - } - if (have_prev_log_number) { - prev_log_number_ = previous_log_number; - MarkFileNumberUsed(previous_log_number); - } - if (have_log_number) { - MarkFileNumberUsed(log_number); - } - column_family_set_->UpdateMaxColumnFamily(max_column_family); - MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + if (!s.ok()) { + // Clear the buffer if we fail to decode/apply an edit. + read_buffer_.Clear(); } // It's possible that: // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. @@ -5457,52 +5481,113 @@ Status ReactiveVersionSet::ReadAndApply( } } } + TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits", + &applied_edits); return s; } Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( - VersionEdit& edit, bool* have_log_number, uint64_t* log_number, - bool* have_prev_log_number, uint64_t* previous_log_number, - bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, - SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, - uint32_t* max_column_family) { - ColumnFamilyData* cfd = nullptr; - Status status; + VersionEdit& edit, std::unordered_set* cfds_changed, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family) { + ColumnFamilyData* cfd = + column_family_set_->GetColumnFamily(edit.column_family_); + + // If we cannot find this column family in our column family set, then it + // may be a new column family created by the primary after the secondary + // starts. It is also possible that the secondary instance opens only a subset + // of column families. Ignore it for now. + if (nullptr == cfd) { + return Status::OK(); + } + if (active_version_builders_.find(edit.column_family_) == + active_version_builders_.end()) { + std::unique_ptr builder_guard( + new BaseReferencedVersionBuilder(cfd)); + active_version_builders_.insert( + std::make_pair(edit.column_family_, std::move(builder_guard))); + } + + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + if (edit.is_column_family_add_) { // TODO (yanqin) for now the secondary ignores column families created // after Open. This also simplifies handling of switching to a new MANIFEST // and processing the snapshot of the system at the beginning of the // MANIFEST. - return Status::OK(); } else if (edit.is_column_family_drop_) { - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // Drop a CF created by primary after secondary starts? Then ignore - if (cfd == nullptr) { - return Status::OK(); - } // Drop the column family by setting it to be 'dropped' without destroying // the column family handle. + // TODO (haoyu) figure out how to handle column faimly drop for + // secondary instance. (Is it possible that the ref count for cfd is 0 but + // the ref count for its versions is higher than 0?) cfd->SetDropped(); if (cfd->Unref()) { delete cfd; cfd = nullptr; } } else { - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // Operation on a CF created after Open? Then ignore - if (cfd == nullptr) { - return Status::OK(); - } - auto builder_iter = active_version_builders_.find(edit.column_family_); - assert(builder_iter != active_version_builders_.end()); - auto builder = builder_iter->second->version_builder(); - assert(builder != nullptr); builder->Apply(&edit); } - return ExtractInfoFromVersionEdit( + Status s = ExtractInfoFromVersionEdit( cfd, edit, have_log_number, log_number, have_prev_log_number, previous_log_number, have_next_file, next_file, have_last_sequence, last_sequence, min_log_number_to_keep, max_column_family); + if (!s.ok()) { + return s; + } + + if (cfd != nullptr) { + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + TEST_SYNC_POINT_CALLBACK( + "ReactiveVersionSet::ApplyOneVersionEditToBuilder:" + "AfterLoadTableHandlers", + &s); + + if (s.ok()) { + auto version = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } else if (s.IsPathNotFound()) { + s = Status::OK(); + } + // Some other error has occurred during LoadTableHandlers. + } + + if (have_next_file) { + next_file_number_.store(*next_file + 1); + } + if (have_last_sequence) { + last_allocated_sequence_ = *last_sequence; + last_published_sequence_ = *last_sequence; + last_sequence_ = *last_sequence; + } + if (have_prev_log_number) { + prev_log_number_ = *previous_log_number; + MarkFileNumberUsed(*previous_log_number); + } + if (have_log_number) { + MarkFileNumberUsed(*log_number); + } + column_family_set_->UpdateMaxColumnFamily(*max_column_family); + MarkMinLogNumberToKeep2PC(*min_log_number_to_keep); + return s; } Status ReactiveVersionSet::MaybeSwitchManifest( diff --git a/db/version_set.h b/db/version_set.h index c43e409144..dc9e759655 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -752,6 +752,23 @@ struct ObsoleteFileInfo { class BaseReferencedVersionBuilder; +class AtomicGroupReadBuffer { + public: + Status AddEdit(VersionEdit* edit); + void Clear(); + bool IsFull() const; + bool IsEmpty() const; + + uint64_t TEST_read_edits_in_atomic_group() const { + return read_edits_in_atomic_group_; + } + std::vector& replay_buffer() { return replay_buffer_; } + + private: + uint64_t read_edits_in_atomic_group_ = 0; + std::vector replay_buffer_; +}; + // VersionSet is the collection of versions of all the column families of the // database. Each database owns one VersionSet. A VersionSet has access to all // column families via ColumnFamilySet, i.e. set of the column families. @@ -1028,6 +1045,18 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); + Status ReadAndRecover( + log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + const std::unordered_map& + name_to_options, + std::unordered_map& column_families_not_found, + std::unordered_map< + uint32_t, std::unique_ptr>& builders, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family); + // REQUIRES db mutex Status ApplyOneVersionEditToBuilder( VersionEdit& edit, @@ -1135,16 +1164,23 @@ class ReactiveVersionSet : public VersionSet { std::unique_ptr* manifest_reporter, std::unique_ptr* manifest_reader_status); + uint64_t TEST_read_edits_in_atomic_group() const { + return read_buffer_.TEST_read_edits_in_atomic_group(); + } + std::vector& replay_buffer() { + return read_buffer_.replay_buffer(); + } + protected: using VersionSet::ApplyOneVersionEditToBuilder; // REQUIRES db mutex Status ApplyOneVersionEditToBuilder( - VersionEdit& edit, bool* have_log_number, uint64_t* log_number, - bool* have_prev_log_number, uint64_t* previous_log_number, - bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, - SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, - uint32_t* max_column_family); + VersionEdit& edit, std::unordered_set* cfds_changed, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family); Status MaybeSwitchManifest( log::Reader::Reporter* reporter, @@ -1153,6 +1189,7 @@ class ReactiveVersionSet : public VersionSet { private: std::unordered_map> active_version_builders_; + AtomicGroupReadBuffer read_buffer_; using VersionSet::LogAndApply; using VersionSet::Recover; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 77890d8263..bf9ef8e39f 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -607,6 +607,7 @@ class VersionSetTestBase { const static std::string kColumnFamilyName1; const static std::string kColumnFamilyName2; const static std::string kColumnFamilyName3; + int num_initial_edits_; VersionSetTestBase() : env_(Env::Default()), @@ -618,6 +619,9 @@ class VersionSetTestBase { versions_(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_)), + reactive_versions_(std::make_shared( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_)), shutting_down_(false), mock_table_factory_(std::make_shared()) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); @@ -653,7 +657,7 @@ class VersionSetTestBase { new_cfs.emplace_back(new_cf); } *last_seqno = last_seq; - + num_initial_edits_ = static_cast(new_cfs.size() + 1); const std::string manifest = DescriptorFileName(dbname_, 1); std::unique_ptr file; Status s = env_->NewWritableFile( @@ -708,6 +712,7 @@ class VersionSetTestBase { WriteController write_controller_; WriteBufferManager write_buffer_manager_; std::shared_ptr versions_; + std::shared_ptr reactive_versions_; InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; @@ -758,216 +763,388 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { EXPECT_EQ(kGroupSize - 1, count); } -TEST_F(VersionSetTest, HandleValidAtomicGroup) { - std::vector column_families; - SequenceNumber last_seqno; - std::unique_ptr log_writer; - PrepareManifest(&column_families, &last_seqno, &log_writer); +class VersionSetAtomicGroupTest : public VersionSetTestBase, + public testing::Test { + public: + VersionSetAtomicGroupTest() : VersionSetTestBase() {} - // Append multiple version edits that form an atomic group + void SetUp() override { + PrepareManifest(&column_families_, &last_seqno_, &log_writer_); + SetupTestSyncPoints(); + } + + void SetupValidAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + edits_[i].MarkAtomicGroup(--remaining); + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupCorruptedAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != ((size_t)atomic_group_size / 2)) { + edits_[i].MarkAtomicGroup(--remaining); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupIncorrectAtomicGroup(int atomic_group_size) { + edits_.resize(atomic_group_size); + int remaining = atomic_group_size; + for (size_t i = 0; i != edits_.size(); ++i) { + edits_[i].SetLogNumber(0); + edits_[i].SetNextFile(2); + if (i != 1) { + edits_[i].MarkAtomicGroup(--remaining); + } else { + edits_[i].MarkAtomicGroup(remaining--); + } + edits_[i].SetLastSequence(last_seqno_++); + } + ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + } + + void SetupTestSyncPoints() { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits_.front().DebugString(), + e->DebugString()); // compare based on value + first_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) { + VersionEdit* e = reinterpret_cast(arg); + EXPECT_EQ(edits_.back().DebugString(), + e->DebugString()); // compare based on value + EXPECT_TRUE(first_in_atomic_group_); + last_in_atomic_group_ = true; + }); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) { + num_recovered_edits_ = *reinterpret_cast(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "ReactiveVersionSet::ReadAndApply:AppliedEdits", + [&](void* arg) { num_applied_edits_ = *reinterpret_cast(arg); }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroup", + [&](void* /* arg */) { ++num_edits_in_atomic_group_; }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", + [&](void* arg) { + corrupted_edit_ = *reinterpret_cast(arg); + }); + SyncPoint::GetInstance()->SetCallBack( + "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", + [&](void* arg) { + edit_with_incorrect_group_size_ = + *reinterpret_cast(arg); + }); + SyncPoint::GetInstance()->EnableProcessing(); + } + + void AddNewEditsToLog(int num_edits) { + for (int i = 0; i < num_edits; i++) { + std::string record; + edits_[i].EncodeTo(&record); + ASSERT_OK(log_writer_->AddRecord(record)); + } + } + + void TearDown() override { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + log_writer_.reset(); + } + + protected: + std::vector column_families_; + SequenceNumber last_seqno_; + std::vector edits_; + bool first_in_atomic_group_ = false; + bool last_in_atomic_group_ = false; + int num_edits_in_atomic_group_ = 0; + int num_recovered_edits_ = 0; + int num_applied_edits_ = 0; + VersionEdit corrupted_edit_; + VersionEdit edit_with_incorrect_group_size_; + std::unique_ptr log_writer_; +}; + +TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) { const int kAtomicGroupSize = 3; - std::vector edits(kAtomicGroupSize); - int remaining = kAtomicGroupSize; - for (size_t i = 0; i != edits.size(); ++i) { - edits[i].SetLogNumber(0); - edits[i].SetNextFile(2); - edits[i].MarkAtomicGroup(--remaining); - edits[i].SetLastSequence(last_seqno++); - } - Status s; - for (const auto& edit : edits) { - std::string record; - edit.EncodeTo(&record); - s = log_writer->AddRecord(record); - ASSERT_OK(s); - } - log_writer.reset(); - - s = SetCurrentFile(env_, dbname_, 1, nullptr); - ASSERT_OK(s); - - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - - bool first_in_atomic_group = false; - bool last_in_atomic_group = false; - - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { - VersionEdit* e = reinterpret_cast(arg); - EXPECT_EQ(edits.front().DebugString(), - e->DebugString()); // compare based on value - first_in_atomic_group = true; - }); - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:LastInAtomicGroup", [&](void* arg) { - VersionEdit* e = reinterpret_cast(arg); - EXPECT_EQ(edits.back().DebugString(), - e->DebugString()); // compare based on value - EXPECT_TRUE(first_in_atomic_group); - last_in_atomic_group = true; - }); - SyncPoint::GetInstance()->EnableProcessing(); - - EXPECT_OK(versions_->Recover(column_families, false)); - EXPECT_EQ(column_families.size(), + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); - EXPECT_TRUE(first_in_atomic_group); - EXPECT_TRUE(last_in_atomic_group); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); } -TEST_F(VersionSetTest, HandleIncompleteTrailingAtomicGroup) { - std::vector column_families; - SequenceNumber last_seqno; - std::unique_ptr log_writer; - PrepareManifest(&column_families, &last_seqno, &log_writer); +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} - // Append multiple version edits that form an atomic group +TEST_F(VersionSetAtomicGroupTest, + HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 3; + SetupValidAtomicGroup(kAtomicGroupSize); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + AddNewEditsToLog(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_TRUE(last_in_atomic_group_); + // The recover should clean up the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) { const int kAtomicGroupSize = 4; const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; - std::vector edits(kNumberOfPersistedVersionEdits); - int remaining = kAtomicGroupSize; - for (size_t i = 0; i != edits.size(); ++i) { - edits[i].SetLogNumber(0); - edits[i].SetNextFile(2); - edits[i].MarkAtomicGroup(--remaining); - edits[i].SetLastSequence(last_seqno++); - } - Status s; - for (const auto& edit : edits) { - std::string record; - edit.EncodeTo(&record); - s = log_writer->AddRecord(record); - ASSERT_OK(s); - } - log_writer.reset(); - - s = SetCurrentFile(env_, dbname_, 1, nullptr); - ASSERT_OK(s); - - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - - bool first_in_atomic_group = false; - bool last_in_atomic_group = false; - size_t num = 0; - - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { - VersionEdit* e = reinterpret_cast(arg); - EXPECT_EQ(edits.front().DebugString(), - e->DebugString()); // compare based on value - first_in_atomic_group = true; - }); - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:LastInAtomicGroup", - [&](void* /* arg */) { last_in_atomic_group = true; }); - SyncPoint::GetInstance()->SetCallBack("VersionSet::Recover:AtomicGroup", - [&](void* /* arg */) { ++num; }); - SyncPoint::GetInstance()->EnableProcessing(); - - EXPECT_OK(versions_->Recover(column_families, false)); - EXPECT_EQ(column_families.size(), + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); - EXPECT_TRUE(first_in_atomic_group); - EXPECT_FALSE(last_in_atomic_group); - EXPECT_EQ(kNumberOfPersistedVersionEdits, num); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); } -TEST_F(VersionSetTest, HandleCorruptedAtomicGroup) { - std::vector column_families; - SequenceNumber last_seqno; - std::unique_ptr log_writer; - PrepareManifest(&column_families, &last_seqno, &log_writer); - - // Append multiple version edits that form an atomic group +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) { const int kAtomicGroupSize = 4; - std::vector edits(kAtomicGroupSize); - int remaining = kAtomicGroupSize; - for (size_t i = 0; i != edits.size(); ++i) { - edits[i].SetLogNumber(0); - edits[i].SetNextFile(2); - if (i != (kAtomicGroupSize / 2)) { - edits[i].MarkAtomicGroup(--remaining); - } - edits[i].SetLastSequence(last_seqno++); - } - Status s; - for (const auto& edit : edits) { - std::string record; - edit.EncodeTo(&record); - s = log_writer->AddRecord(record); - ASSERT_OK(s); - } - log_writer.reset(); - - s = SetCurrentFile(env_, dbname_, 1, nullptr); - ASSERT_OK(s); - - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - - bool mixed = false; - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", [&](void* arg) { - VersionEdit* e = reinterpret_cast(arg); - EXPECT_EQ(edits[kAtomicGroupSize / 2].DebugString(), e->DebugString()); - mixed = true; - }); - SyncPoint::GetInstance()->EnableProcessing(); - EXPECT_NOK(versions_->Recover(column_families, false)); - EXPECT_EQ(column_families.size(), - versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); - EXPECT_TRUE(mixed); + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); + // Write the last record. The reactive version set should now apply all + // edits. + std::string last_record; + edits_[kAtomicGroupSize - 1].EncodeTo(&last_record); + EXPECT_OK(log_writer_->AddRecord(last_record)); + InstrumentedMutex mu; + std::unordered_set cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + // Reactive version set should be empty now. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); } -TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) { - std::vector column_families; - SequenceNumber last_seqno; - std::unique_ptr log_writer; - PrepareManifest(&column_families, &last_seqno, &log_writer); - - // Append multiple version edits that form an atomic group +TEST_F(VersionSetAtomicGroupTest, + HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) { const int kAtomicGroupSize = 4; - std::vector edits(kAtomicGroupSize); - int remaining = kAtomicGroupSize; - for (size_t i = 0; i != edits.size(); ++i) { - edits[i].SetLogNumber(0); - edits[i].SetNextFile(2); - if (i != 1) { - edits[i].MarkAtomicGroup(--remaining); - } else { - edits[i].MarkAtomicGroup(remaining--); - } - edits[i].SetLastSequence(last_seqno++); - } - Status s; - for (const auto& edit : edits) { - std::string record; - edit.EncodeTo(&record); - s = log_writer->AddRecord(record); - ASSERT_OK(s); - } - log_writer.reset(); + const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; + SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + // No edits in an atomic group. + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + // Write a few edits in an atomic group. + AddNewEditsToLog(kNumberOfPersistedVersionEdits); + InstrumentedMutex mu; + std::unordered_set cfds_changed; + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_TRUE(first_in_atomic_group_); + EXPECT_FALSE(last_in_atomic_group_); + EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); + // Reactive version set should store the edits in the replay buffer. + EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == + kNumberOfPersistedVersionEdits); + EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); + EXPECT_EQ(0, num_applied_edits_); +} - s = SetCurrentFile(env_, dbname_, 1, nullptr); - ASSERT_OK(s); - - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - - bool incorrect_group_size = false; - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::Recover:IncorrectAtomicGroupSize", [&](void* arg) { - VersionEdit* e = reinterpret_cast(arg); - EXPECT_EQ(edits[1].DebugString(), e->DebugString()); - incorrect_group_size = true; - }); - SyncPoint::GetInstance()->EnableProcessing(); - EXPECT_NOK(versions_->Recover(column_families, false)); - EXPECT_EQ(column_families.size(), +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); - EXPECT_TRUE(incorrect_group_size); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupCorruptedAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set cfds_changed; + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + // Write the corrupted edits. + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), + corrupted_edit_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + EXPECT_NOK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + AddNewEditsToLog(kAtomicGroupSize); + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + EXPECT_EQ(column_families_.size(), + reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); +} + +TEST_F(VersionSetAtomicGroupTest, + HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) { + const int kAtomicGroupSize = 4; + SetupIncorrectAtomicGroup(kAtomicGroupSize); + InstrumentedMutex mu; + std::unordered_set cfds_changed; + std::unique_ptr manifest_reader; + std::unique_ptr manifest_reporter; + std::unique_ptr manifest_reader_status; + EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, + &manifest_reporter, + &manifest_reader_status)); + AddNewEditsToLog(kAtomicGroupSize); + mu.Lock(); + EXPECT_OK( + reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + mu.Unlock(); + EXPECT_EQ(edits_[1].DebugString(), + edit_with_incorrect_group_size_.DebugString()); } class VersionSetTestDropOneCF : public VersionSetTestBase, @@ -1088,7 +1265,6 @@ INSTANTIATE_TEST_CASE_P( testing::Values(VersionSetTestBase::kColumnFamilyName1, VersionSetTestBase::kColumnFamilyName2, VersionSetTestBase::kColumnFamilyName3)); - } // namespace rocksdb int main(int argc, char** argv) {