diff --git a/db/column_family.cc b/db/column_family.cc index f8b6a55a59..5dc16ef805 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -466,7 +466,7 @@ void SuperVersion::Cleanup() { // decrement reference to the immutable MemtableList // this SV object was pointing to. imm->Unref(&to_delete); - MemTable* m = mem->Unref(); + ReadOnlyMemTable* m = mem->Unref(); if (m != nullptr) { auto* memory_usage = current->cfd()->imm()->current_memory_usage(); assert(*memory_usage >= m->ApproximateMemoryUsage()); @@ -693,9 +693,9 @@ ColumnFamilyData::~ColumnFamilyData() { if (mem_ != nullptr) { delete mem_->Unref(); } - autovector to_delete; + autovector to_delete; imm_.current()->Unref(&to_delete); - for (MemTable* m : to_delete) { + for (auto* m : to_delete) { delete m; } diff --git a/db/column_family.h b/db/column_family.h index 9e7f52bd01..baaa9b9042 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -207,7 +207,7 @@ struct SuperVersion { // Accessing members of this class is not thread-safe and requires external // synchronization (ie db mutex held or on write thread). ColumnFamilyData* cfd; - MemTable* mem; + ReadOnlyMemTable* mem; MemTableListVersion* imm; Version* current; MutableCFOptions mutable_cf_options; @@ -269,7 +269,7 @@ struct SuperVersion { // We need to_delete because during Cleanup(), imm->Unref() returns // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction - autovector to_delete; + autovector to_delete; }; Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e4781858c0..a3061a3c61 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4730,9 +4730,9 @@ void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family, // Convert user_key into a corresponding internal key. InternalKey k1(start.value(), kMaxSequenceNumber, kValueTypeForSeek); InternalKey k2(limit.value(), kMaxSequenceNumber, kValueTypeForSeek); - MemTable::MemTableStats memStats = + ReadOnlyMemTable::MemTableStats memStats = sv->mem->ApproximateStats(k1.Encode(), k2.Encode()); - MemTable::MemTableStats immStats = + ReadOnlyMemTable::MemTableStats immStats = sv->imm->ApproximateStats(k1.Encode(), k2.Encode()); *count = memStats.count + immStats.count; *size = memStats.size + immStats.size; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 614574bd38..f2b57f788e 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1709,7 +1709,7 @@ class DBImpl : public DB { struct WriteContext { SuperVersionContext superversion_context; - autovector memtables_to_free_; + autovector memtables_to_free_; explicit WriteContext(bool create_superversion = false) : superversion_context(create_superversion) {} @@ -2051,6 +2051,8 @@ class DBImpl : public DB { Status TrimMemtableHistory(WriteContext* context); + // Switches the current live memtable to immutable/read-only memtable. + // A new WAL is created if the current WAL is not empty. Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Select and output column families qualified for atomic flush in @@ -3007,7 +3009,8 @@ CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, VersionEdit GetDBRecoveryEditForObsoletingMemTables( VersionSet* vset, const ColumnFamilyData& cfd, const autovector& edit_list, - const autovector& memtables, LogsWithPrepTracker* prep_tracker); + const autovector& memtables, + LogsWithPrepTracker* prep_tracker); // Return the earliest log file to keep after the memtable flush is // finalized. @@ -3018,13 +3021,13 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables( uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, const autovector& edit_list, - const autovector& memtables_to_flush, + const autovector& memtables_to_flush, LogsWithPrepTracker* prep_tracker); // For atomic flush. uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const autovector& cfds_to_flush, const autovector>& edit_lists, - const autovector*>& memtables_to_flush, + const autovector*>& memtables_to_flush, LogsWithPrepTracker* prep_tracker); // In non-2PC mode, WALs with log number < the returned number can be @@ -3041,11 +3044,11 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC( // will not depend on any WAL file. nullptr means no memtable is being flushed. // The function is only applicable to 2pc mode. uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& memtables_to_flush); + VersionSet* vset, const autovector& memtables_to_flush); // For atomic flush. uint64_t FindMinPrepLogReferencedByMemTable( VersionSet* vset, - const autovector*>& memtables_to_flush); + const autovector*>& memtables_to_flush); // Fix user-supplied options to be reasonable template diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c75178525a..12de06d1d6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -753,7 +753,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (s.ok()) { autovector tmp_cfds; - autovector*> mems_list; + autovector*> mems_list; autovector mutable_cf_options_list; autovector tmp_file_meta; autovector>*> diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index c91a577960..df264dcd33 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -258,7 +258,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() { } uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { - autovector empty_list; + autovector empty_list; return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list); } diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index ff3054d100..96178b7065 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -739,7 +739,8 @@ void DBImpl::DeleteObsoleteFiles() { VersionEdit GetDBRecoveryEditForObsoletingMemTables( VersionSet* vset, const ColumnFamilyData& cfd, const autovector& edit_list, - const autovector& memtables, LogsWithPrepTracker* prep_tracker) { + const autovector& memtables, + LogsWithPrepTracker* prep_tracker) { VersionEdit wal_deletion_edit; uint64_t min_wal_number_to_keep = 0; assert(edit_list.size() > 0); @@ -769,12 +770,12 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables( } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& memtables_to_flush) { + VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; // we must look through the memtables for two phase transactions // that have been committed but not yet flushed - std::unordered_set memtables_to_flush_set( + std::unordered_set memtables_to_flush_set( memtables_to_flush.begin(), memtables_to_flush.end()); for (auto loop_cfd : *vset->GetColumnFamilySet()) { if (loop_cfd->IsDropped()) { @@ -799,12 +800,12 @@ uint64_t FindMinPrepLogReferencedByMemTable( } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, - const autovector*>& memtables_to_flush) { + VersionSet* vset, const autovector*>& + memtables_to_flush) { uint64_t min_log = 0; - std::unordered_set memtables_to_flush_set; - for (const autovector* memtables : memtables_to_flush) { + std::unordered_set memtables_to_flush_set; + for (const autovector* memtables : memtables_to_flush) { memtables_to_flush_set.insert(memtables->begin(), memtables->end()); } for (auto loop_cfd : *vset->GetColumnFamilySet()) { @@ -896,7 +897,7 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC( uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, const autovector& edit_list, - const autovector& memtables_to_flush, + const autovector& memtables_to_flush, LogsWithPrepTracker* prep_tracker) { assert(vset != nullptr); assert(prep_tracker != nullptr); @@ -937,7 +938,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const autovector& cfds_to_flush, const autovector>& edit_lists, - const autovector*>& memtables_to_flush, + const autovector*>& memtables_to_flush, LogsWithPrepTracker* prep_tracker) { assert(vset != nullptr); assert(prep_tracker != nullptr); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 3e1a6198ea..8586758f28 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1769,7 +1769,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, s = io_s; } - uint64_t total_num_entries = mem->num_entries(); + uint64_t total_num_entries = mem->NumEntries(); if (s.ok() && total_num_entries != num_input_entries) { std::string msg = "Expected " + std::to_string(total_num_entries) + " entries in memtable, but read " + diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 248ddb88de..91a2f0f4c6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -687,7 +687,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } // Note: if we are to resume after non-OK statuses we need to revisit how - // we reacts to non-OK statuses here. + // we react to non-OK statuses here. versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); @@ -1610,6 +1610,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL( Status DBImpl::WriteRecoverableState() { mutex_.AssertHeld(); if (!cached_recoverable_state_empty_) { + // Only for write-prepared and write-unprepared. + assert(seq_per_batch_); bool dont_care_bool; SequenceNumber next_seq; if (two_write_queues_) { @@ -2251,8 +2253,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { memtable_info.cf_name = cfd->GetName(); memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); - memtable_info.num_entries = cfd->mem()->num_entries(); - memtable_info.num_deletes = cfd->mem()->num_deletes(); + memtable_info.num_entries = cfd->mem()->NumEntries(); + memtable_info.num_deletes = cfd->mem()->NumDeletion(); if (!cfd->ioptions()->persist_user_defined_timestamps && cfd->user_comparator()->timestamp_size() > 0) { const Slice& newest_udt = cfd->mem()->GetNewestUDT(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 88499cd112..56f5170c69 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -157,7 +157,7 @@ void FlushJob::ReportStartedFlush() { IOSTATS_RESET(bytes_written); } -void FlushJob::ReportFlushInputSize(const autovector& mems) { +void FlushJob::ReportFlushInputSize(const autovector& mems) { uint64_t input_size = 0; for (auto* mem : mems) { input_size += mem->ApproximateMemoryUsage(); @@ -204,7 +204,7 @@ void FlushJob::PickMemTable() { // entries mems are (implicitly) sorted in ascending order by their created // time. We will use the first memtable's `edit` to keep the meta info for // this flush. - MemTable* m = mems_[0]; + ReadOnlyMemTable* m = mems_[0]; edit_ = m->GetEdits(); edit_->SetPrevLogNumber(0); // SetLogNumber(log_num) indicates logs with number smaller than log_num @@ -420,7 +420,7 @@ Status FlushJob::MemPurge() { std::vector memtables; std::vector> range_del_iters; - for (MemTable* m : mems_) { + for (ReadOnlyMemTable* m : mems_) { memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena, /*prefix_extractor=*/nullptr)); auto* range_del_iter = m->NewRangeTombstoneIterator( @@ -713,11 +713,11 @@ bool FlushJob::MemPurgeDecider(double threshold) { // Iterate over each memtable of the set. for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_); - mem_iter++) { - MemTable* mt = *mem_iter; + ++mem_iter) { + ReadOnlyMemTable* mt = *mem_iter; // Else sample from the table. - uint64_t nentries = mt->num_entries(); + uint64_t nentries = mt->NumEntries(); // Corrected Cochran formula for small populations // (converges to n0 for large populations). uint64_t target_sample_size = @@ -894,11 +894,12 @@ Status FlushJob::WriteLevel0Table() { TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables", &mems_size); assert(job_context_); - for (MemTable* m : mems_) { - ROCKS_LOG_INFO( - db_options_.info_log, - "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", - cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); + for (ReadOnlyMemTable* m : mems_) { + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Flushing memtable id %" PRIu64 + " with next log file: %" PRIu64 "\n", + cfd_->GetName().c_str(), job_context_->job_id, m->GetID(), + m->GetNextLogNumber()); if (logical_strip_timestamp) { memtables.push_back(m->NewTimestampStrippingIterator( ro, /*seqno_to_time_mapping=*/nullptr, &arena, @@ -917,11 +918,11 @@ Status FlushJob::WriteLevel0Table() { if (range_del_iter != nullptr) { range_del_iters.emplace_back(range_del_iter); } - total_num_entries += m->num_entries(); - total_num_deletes += m->num_deletes(); - total_data_size += m->get_data_size(); + total_num_entries += m->NumEntries(); + total_num_deletes += m->NumDeletion(); + total_data_size += m->GetDataSize(); total_memory_usage += m->ApproximateMemoryUsage(); - total_num_range_deletes += m->num_range_deletes(); + total_num_range_deletes += m->NumRangeDeletion(); } // TODO(cbi): when memtable is flushed due to number of range deletions @@ -1172,7 +1173,7 @@ void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() { return; } // Find the newest user-defined timestamps from all the flushed memtables. - for (MemTable* m : mems_) { + for (const ReadOnlyMemTable* m : mems_) { Slice table_newest_udt = m->GetNewestUDT(); // Empty memtables can be legitimately created and flushed, for example // by error recovery flush attempts. diff --git a/db/flush_job.h b/db/flush_job.h index 596d5c2045..8cc8821bf5 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -91,7 +91,7 @@ class FlushJob { bool* skipped_since_bg_error = nullptr, ErrorHandler* error_handler = nullptr); void Cancel(); - const autovector& GetMemTables() const { return mems_; } + const autovector& GetMemTables() const { return mems_; } std::list>* GetCommittedFlushJobsInfo() { return &committed_flush_jobs_info_; @@ -101,7 +101,7 @@ class FlushJob { friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test; void ReportStartedFlush(); - void ReportFlushInputSize(const autovector& mems); + static void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); @@ -205,7 +205,8 @@ class FlushJob { // Variables below are set by PickMemTable(): FileMetaData meta_; - autovector mems_; + // Memtables to be flushed by this job. + autovector mems_; VersionEdit* edit_; Version* base_; bool pick_memtable_called; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index ee8e7d14a9..45b7f73d9b 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -264,7 +264,7 @@ TEST_F(FlushJobTest, NonEmpty) { } mock::SortKVVector(&inserted_keys); - autovector to_delete; + autovector to_delete; new_mem->ConstructFragmentedRangeTombstones(); cfd->imm()->Add(new_mem, &to_delete); for (auto& m : to_delete) { @@ -325,7 +325,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { } } - autovector to_delete; + autovector to_delete; for (auto mem : new_mems) { mem->ConstructFragmentedRangeTombstones(); cfd->imm()->Add(mem, &to_delete); @@ -380,7 +380,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { std::vector memtable_ids; std::vector smallest_seqs; std::vector largest_seqs; - autovector to_delete; + autovector to_delete; SequenceNumber curr_seqno = 0; size_t k = 0; for (auto cfd : all_cfds) { @@ -439,7 +439,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { for (auto& meta : file_metas) { file_meta_ptrs.push_back(&meta); } - autovector*> mems_list; + autovector*> mems_list; for (size_t i = 0; i != all_cfds.size(); ++i) { const auto& mems = flush_jobs[i]->GetMemTables(); mems_list.push_back(&mems); @@ -528,7 +528,7 @@ TEST_F(FlushJobTest, Snapshots) { } mock::SortKVVector(&inserted_keys); - autovector to_delete; + autovector to_delete; new_mem->ConstructFragmentedRangeTombstones(); cfd->imm()->Add(new_mem, &to_delete); for (auto& m : to_delete) { @@ -582,7 +582,7 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { } } - autovector to_delete; + autovector to_delete; for (auto mem : new_mems) { mem->ConstructFragmentedRangeTombstones(); cfd->imm()->Add(mem, &to_delete); @@ -654,7 +654,7 @@ TEST_F(FlushJobTest, ReplaceTimedPutWriteTimeWithPreferredSeqno) { InternalKey largest_internal_key("foo", SequenceNumber(18), kTypeValue); inserted_entries.push_back( {largest_internal_key.Encode().ToString(), "fval"}); - autovector to_delete; + autovector to_delete; new_mem->ConstructFragmentedRangeTombstones(); cfd->imm()->Add(new_mem, &to_delete); for (auto& m : to_delete) { @@ -744,7 +744,7 @@ class FlushJobTimestampTest TEST_P(FlushJobTimestampTest, AllKeysExpired) { ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); - autovector to_delete; + autovector to_delete; { MemTable* new_mem = cfd->ConstructNewMemtable( @@ -810,7 +810,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) { TEST_P(FlushJobTimestampTest, NoKeyExpired) { ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); - autovector to_delete; + autovector to_delete; { MemTable* new_mem = cfd->ConstructNewMemtable( diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 8baa5b18dc..47c7e1c9a9 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -1301,7 +1301,7 @@ bool InternalStats::HandleNumEntriesActiveMemTable(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // Current number of entires in the active memtable - *value = cfd_->mem()->num_entries(); + *value = cfd_->mem()->NumEntries(); return true; } @@ -1317,7 +1317,7 @@ bool InternalStats::HandleNumDeletesActiveMemTable(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // Current number of entires in the active memtable - *value = cfd_->mem()->num_deletes(); + *value = cfd_->mem()->NumDeletion(); return true; } @@ -1334,11 +1334,11 @@ bool InternalStats::HandleEstimateNumKeys(uint64_t* value, DBImpl* /*db*/, // Estimate number of entries in the column family: // Use estimated entries in tables + total entries in memtables. const auto* vstorage = cfd_->current()->storage_info(); - uint64_t estimate_keys = cfd_->mem()->num_entries() + + uint64_t estimate_keys = cfd_->mem()->NumEntries() + cfd_->imm()->current()->GetTotalNumEntries() + vstorage->GetEstimatedActiveKeys(); uint64_t estimate_deletes = - cfd_->mem()->num_deletes() + cfd_->imm()->current()->GetTotalNumDeletes(); + cfd_->mem()->NumDeletion() + cfd_->imm()->current()->GetTotalNumDeletes(); *value = estimate_keys > estimate_deletes * 2 ? estimate_keys - (estimate_deletes * 2) : 0; diff --git a/db/job_context.h b/db/job_context.h index a0cb3c8159..83e9f5faca 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -191,7 +191,7 @@ struct JobContext { std::vector manifest_delete_files; // a list of memtables to be free - autovector memtables_to_free; + autovector memtables_to_free; // contexts for installing superversions for multiple column families std::vector superversion_contexts; diff --git a/db/memtable.cc b/db/memtable.cc index 42117c639c..ef8ac07c78 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -79,7 +79,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp, SequenceNumber latest_seq, uint32_t column_family_id) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), - refs_(0), kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)), mem_tracker_(write_buffer_manager), arena_(moptions_.arena_block_size, @@ -101,13 +100,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp, num_deletes_(0), num_range_deletes_(0), write_buffer_size_(mutable_cf_options.write_buffer_size), - flush_in_progress_(false), - flush_completed_(false), - file_number_(0), first_seqno_(0), earliest_seqno_(latest_seq), creation_seq_(latest_seq), - mem_next_logfile_number_(0), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support ? moptions_.inplace_update_num_locks @@ -118,7 +113,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp, insert_with_hint_prefix_extractor_( ioptions.memtable_insert_with_hint_prefix_extractor.get()), oldest_key_time_(std::numeric_limits::max()), - atomic_flush_seqno_(kMaxSequenceNumber), approximate_memory_usage_(0), memtable_max_range_deletions_( mutable_cf_options.memtable_max_range_deletions) { @@ -832,8 +826,8 @@ port::RWMutex* MemTable::GetLock(const Slice& key) { return &locks_[GetSliceRangedNPHash(key, locks_.size())]; } -MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, - const Slice& end_ikey) { +ReadOnlyMemTable::MemTableStats MemTable::ApproximateStats( + const Slice& start_ikey, const Slice& end_ikey) { uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey); if (entry_count == 0) { diff --git a/db/memtable.h b/db/memtable.h index 1ed82f456e..ae86831176 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -76,88 +76,48 @@ struct MemTablePostProcessInfo { }; using MultiGetRange = MultiGetContext::Range; -// Note: Many of the methods in this class have comments indicating that + +// For each CF, rocksdb maintains an active memtable that accept writes, +// and zero or more sealed memtables that we call immutable memtables. +// This interface contains all methods required for immutable memtables. +// MemTable class inherit from `ReadOnlyMemTable` and implements additional +// methods required for active memtables. +// Immutable memtable list (MemTableList) maintains a list of ReadOnlyMemTable +// objects. This interface enables feature like direct ingestion of an +// immutable memtable with custom implementation, bypassing memtable writes. +// +// Note: Many of the methods in this class have comments indicating that // external synchronization is required as these methods are not thread-safe. // It is up to higher layers of code to decide how to prevent concurrent -// invocation of these methods. This is usually done by acquiring either +// invocation of these methods. This is usually done by acquiring either // the db mutex or the single writer thread. // // Some of these methods are documented to only require external -// synchronization if this memtable is immutable. Calling MarkImmutable() is +// synchronization if this memtable is immutable. Calling MarkImmutable() is // not sufficient to guarantee immutability. It is up to higher layers of // code to determine if this MemTable can still be modified by other threads. // Eg: The Superversion stores a pointer to the current MemTable (that can // be modified) and a separate list of the MemTables that can no longer be // written to (aka the 'immutable memtables'). -class MemTable { +// +// MemTables are reference counted. The initial reference count +// is zero and the caller must call Ref() at least once. +class ReadOnlyMemTable { public: - struct KeyComparator : public MemTableRep::KeyComparator { - const InternalKeyComparator comparator; - explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {} - int operator()(const char* prefix_len_key1, - const char* prefix_len_key2) const override; - int operator()(const char* prefix_len_key, - const DecodedType& key) const override; - }; - - // MemTables are reference counted. The initial reference count - // is zero and the caller must call Ref() at least once. - // - // earliest_seq should be the current SequenceNumber in the db such that any - // key inserted into this memtable will have an equal or larger seq number. - // (When a db is first created, the earliest sequence number will be 0). - // If the earliest sequence number is not known, kMaxSequenceNumber may be - // used, but this may prevent some transactions from succeeding until the - // first key is inserted into the memtable. - explicit MemTable(const InternalKeyComparator& comparator, - const ImmutableOptions& ioptions, - const MutableCFOptions& mutable_cf_options, - WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq, uint32_t column_family_id); - // No copying allowed - MemTable(const MemTable&) = delete; - MemTable& operator=(const MemTable&) = delete; - // Do not delete this MemTable unless Unref() indicates it not in use. - ~MemTable(); + virtual ~ReadOnlyMemTable() = default; - // Increase reference count. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - void Ref() { ++refs_; } - - // Drop reference count. - // If the refcount goes to zero return this memtable, otherwise return null. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - MemTable* Unref() { - --refs_; - assert(refs_ >= 0); - if (refs_ <= 0) { - return this; - } - return nullptr; - } + virtual const char* Name() const = 0; // Returns an estimate of the number of bytes of data in use by this // data structure. // // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). - size_t ApproximateMemoryUsage(); - - // As a cheap version of `ApproximateMemoryUsage()`, this function doesn't - // require external synchronization. The value may be less accurate though - size_t ApproximateMemoryUsageFast() const { - return approximate_memory_usage_.load(std::memory_order_relaxed); - } + virtual size_t ApproximateMemoryUsage() = 0; // used by MemTableListVersion::MemoryAllocatedBytesExcludingLast - size_t MemoryAllocatedBytes() const { - return table_->ApproximateMemoryUsage() + - range_del_table_->ApproximateMemoryUsage() + - arena_.MemoryAllocatedBytes(); - } + virtual size_t MemoryAllocatedBytes() const = 0; // Returns a vector of unique random memtable entries of size 'sample_size'. // @@ -172,27 +132,8 @@ class MemTable { // REQUIRES: SkipList memtable representation. This function is not // implemented for any other type of memtable representation (vectorrep, // hashskiplist,...). - void UniqueRandomSample(const uint64_t& target_sample_size, - std::unordered_set* entries) { - // TODO(bjlemaire): at the moment, only supported by skiplistrep. - // Extend it to all other memtable representations. - table_->UniqueRandomSample(num_entries(), target_sample_size, entries); - } - - // This method heuristically determines if the memtable should continue to - // host more data. - bool ShouldScheduleFlush() const { - return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; - } - - // Returns true if a flush should be scheduled and the caller should - // be the one to schedule it - bool MarkFlushScheduled() { - auto before = FLUSH_REQUESTED; - return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, - std::memory_order_relaxed, - std::memory_order_relaxed); - } + virtual void UniqueRandomSample(const uint64_t& target_sample_size, + std::unordered_set* entries) = 0; // Return an iterator that yields the contents of the memtable. // @@ -208,18 +149,18 @@ class MemTable { // those allocated in arena. // seqno_to_time_mapping: it's used to support return write unix time for the // data, currently only needed for iterators serving user reads. - InternalIterator* NewIterator( + virtual InternalIterator* NewIterator( const ReadOptions& read_options, UnownedPtr seqno_to_time_mapping, Arena* arena, - const SliceTransform* prefix_extractor); + const SliceTransform* prefix_extractor) = 0; // Returns an iterator that wraps a MemTableIterator and logically strips the // user-defined timestamp of each key. This API is only used by flush when // user-defined timestamps in MemTable only feature is enabled. - InternalIterator* NewTimestampStrippingIterator( + virtual InternalIterator* NewTimestampStrippingIterator( const ReadOptions& read_options, UnownedPtr seqno_to_time_mapping, Arena* arena, - const SliceTransform* prefix_extractor, size_t ts_sz); + const SliceTransform* prefix_extractor, size_t ts_sz) = 0; // Returns an iterator that yields the range tombstones of the memtable. // The caller must ensure that the underlying MemTable remains live @@ -231,38 +172,23 @@ class MemTable { // is constructed when a memtable becomes immutable. Setting the flag to false // will always yield correct result, but may incur performance penalty as it // always creates a new fragmented range tombstone list. - FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& read_options, SequenceNumber read_seq, - bool immutable_memtable); + bool immutable_memtable) = 0; // Returns an iterator that yields the range tombstones of the memtable and // logically strips the user-defined timestamp of each key (including start // key, and end key). This API is only used by flush when user-defined // timestamps in MemTable only feature is enabled. - FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator( - const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz); - - Status VerifyEncodedEntry(Slice encoded, - const ProtectionInfoKVOS64& kv_prot_info); - - // Add an entry into memtable that maps key to value at the - // specified sequence number and with the specified type. - // Typically value will be empty if type==kTypeDeletion. - // - // REQUIRES: if allow_concurrent = false, external synchronization to prevent - // simultaneous operations on the same MemTable. - // - // Returns `Status::TryAgain` if the `seq`, `key` combination already exists - // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true. - // The next attempt should try a larger value for `seq`. - Status Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value, const ProtectionInfoKVOS64* kv_prot_info, - bool allow_concurrent = false, - MemTablePostProcessInfo* post_process_info = nullptr, - void** hint = nullptr); + virtual FragmentedRangeTombstoneIterator* + NewTimestampStrippingRangeTombstoneIterator(const ReadOptions& read_options, + SequenceNumber read_seq, + size_t ts_sz) = 0; // Used to Get value associated with key or Get Merge Operands associated // with key. + // Keys are considered if they are no larger than the parameter `key` in + // the order defined by comparator and share the save user key with `key`. // If do_merge = true the default behavior which is Get value for key is // executed. Expected behavior is described right below. // If memtable contains a value for key, store it in *value and return true. @@ -291,14 +217,13 @@ class MemTable { // @param immutable_memtable Whether this memtable is immutable. Used // internally by NewRangeTombstoneIterator(). See comment above // NewRangeTombstoneIterator() for more detail. - bool Get(const LookupKey& key, std::string* value, - PinnableWideColumns* columns, std::string* timestamp, Status* s, - MergeContext* merge_context, - SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, - const ReadOptions& read_opts, bool immutable_memtable, - ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, - bool do_merge = true); - + virtual bool Get(const LookupKey& key, std::string* value, + PinnableWideColumns* columns, std::string* timestamp, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + SequenceNumber* seq, const ReadOptions& read_opts, + bool immutable_memtable, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr, bool do_merge = true) = 0; bool Get(const LookupKey& key, std::string* value, PinnableWideColumns* columns, std::string* timestamp, Status* s, MergeContext* merge_context, @@ -315,8 +240,282 @@ class MemTable { // @param immutable_memtable Whether this memtable is immutable. Used // internally by NewRangeTombstoneIterator(). See comment above // NewRangeTombstoneIterator() for more detail. + virtual void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool immutable_memtable) = 0; + + // Get total number of entries in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + virtual uint64_t NumEntries() const = 0; + + // Get total number of point deletes in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + virtual uint64_t NumDeletion() const = 0; + + // Get total number of range deletions in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + virtual uint64_t NumRangeDeletion() const = 0; + + virtual uint64_t GetDataSize() const = 0; + + // Returns the sequence number of the first element that was inserted + // into the memtable. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + virtual SequenceNumber GetFirstSequenceNumber() = 0; + + // Returns the sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + // + // If the earliest sequence number could not be determined, + // kMaxSequenceNumber will be returned. + virtual SequenceNumber GetEarliestSequenceNumber() = 0; + + virtual uint64_t GetMinLogContainingPrepSection() = 0; + + // Notify the underlying storage that no more items will be added. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + // After MarkImmutable() is called, you should not attempt to + // write anything to this MemTable(). (Ie. do not call Add() or Update()). + virtual void MarkImmutable() = 0; + + // Notify the underlying storage that all data it contained has been + // persisted. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + virtual void MarkFlushed() = 0; + + struct MemTableStats { + uint64_t size; + uint64_t count; + }; + virtual MemTableStats ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey) = 0; + + virtual const InternalKeyComparator& GetInternalKeyComparator() const = 0; + + virtual uint64_t ApproximateOldestKeyTime() const = 0; + + // Returns whether a fragmented range tombstone list is already constructed + // for this memtable. It should be constructed right before a memtable is + // added to an immutable memtable list. Note that if a memtable does not have + // any range tombstone, then no range tombstone list will ever be constructed + // and true is returned in that case. + virtual bool IsFragmentedRangeTombstonesConstructed() const = 0; + + // Get the newest user-defined timestamp contained in this MemTable. Check + // `newest_udt_` for what newer means. This method should only be invoked for + // an MemTable that has enabled user-defined timestamp feature and set + // `persist_user_defined_timestamps` to false. The tracked newest UDT will be + // used by flush job in the background to help check the MemTable's + // eligibility for Flush. + virtual const Slice& GetNewestUDT() const = 0; + + // Increase reference count. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void Ref() { ++refs_; } + + // Drop reference count. + // If the refcount goes to zero return this memtable, otherwise return null. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + ReadOnlyMemTable* Unref() { + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + return this; + } + return nullptr; + } + + // Returns the edits area that is needed for flushing the memtable + VersionEdit* GetEdits() { return &edit_; } + + // Returns the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + uint64_t GetNextLogNumber() const { return mem_next_logfile_number_; } + + // Sets the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + + // REQUIRES: db_mutex held. + void SetID(uint64_t id) { id_ = id; } + + uint64_t GetID() const { return id_; } + + void SetFlushCompleted(bool completed) { flush_completed_ = completed; } + + uint64_t GetFileNumber() const { return file_number_; } + + void SetFileNumber(uint64_t file_num) { file_number_ = file_num; } + + void SetFlushInProgress(bool in_progress) { + flush_in_progress_ = in_progress; + } + + void SetFlushJobInfo(std::unique_ptr&& info) { + flush_job_info_ = std::move(info); + }; + + std::unique_ptr ReleaseFlushJobInfo() { + return std::move(flush_job_info_); + } + + protected: + friend class MemTableList; + + int refs_{0}; + + // These are used to manage memtable flushes to storage + bool flush_in_progress_{false}; // started the flush + bool flush_completed_{false}; // finished the flush + uint64_t file_number_{0}; + + // The updates to be applied to the transaction log when this + // memtable is flushed to storage. + VersionEdit edit_; + + // The log files earlier than this number can be deleted. + uint64_t mem_next_logfile_number_{0}; + + // Memtable id to track flush. + uint64_t id_ = 0; + + // Sequence number of the atomic flush that is responsible for this memtable. + // The sequence number of atomic flush is a seq, such that no writes with + // sequence numbers greater than or equal to seq are flushed, while all + // writes with sequence number smaller than seq are flushed. + SequenceNumber atomic_flush_seqno_{kMaxSequenceNumber}; + + // Flush job info of the current memtable. + std::unique_ptr flush_job_info_; +}; + +class MemTable final : public ReadOnlyMemTable { + public: + struct KeyComparator final : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {} + int operator()(const char* prefix_len_key1, + const char* prefix_len_key2) const override; + int operator()(const char* prefix_len_key, + const DecodedType& key) const override; + }; + + // earliest_seq should be the current SequenceNumber in the db such that any + // key inserted into this memtable will have an equal or larger seq number. + // (When a db is first created, the earliest sequence number will be 0). + // If the earliest sequence number is not known, kMaxSequenceNumber may be + // used, but this may prevent some transactions from succeeding until the + // first key is inserted into the memtable. + explicit MemTable(const InternalKeyComparator& comparator, + const ImmutableOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + WriteBufferManager* write_buffer_manager, + SequenceNumber earliest_seq, uint32_t column_family_id); + // No copying allowed + MemTable(const MemTable&) = delete; + MemTable& operator=(const MemTable&) = delete; + + ~MemTable() override; + + const char* Name() const override { return "MemTable"; } + + size_t ApproximateMemoryUsage() override; + + // As a cheap version of `ApproximateMemoryUsage()`, this function doesn't + // require external synchronization. The value may be less accurate though + size_t ApproximateMemoryUsageFast() const { + return approximate_memory_usage_.load(std::memory_order_relaxed); + } + + size_t MemoryAllocatedBytes() const override { + return table_->ApproximateMemoryUsage() + + range_del_table_->ApproximateMemoryUsage() + + arena_.MemoryAllocatedBytes(); + } + + void UniqueRandomSample(const uint64_t& target_sample_size, + std::unordered_set* entries) override { + // TODO(bjlemaire): at the moment, only supported by skiplistrep. + // Extend it to all other memtable representations. + table_->UniqueRandomSample(NumEntries(), target_sample_size, entries); + } + + // This method heuristically determines if the memtable should continue to + // host more data. + bool ShouldScheduleFlush() const { + return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; + } + + // Returns true if a flush should be scheduled and the caller should + // be the one to schedule it + bool MarkFlushScheduled() { + auto before = FLUSH_REQUESTED; + return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } + + InternalIterator* NewIterator( + const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, Arena* arena, + const SliceTransform* prefix_extractor) override; + + InternalIterator* NewTimestampStrippingIterator( + const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, Arena* arena, + const SliceTransform* prefix_extractor, size_t ts_sz) override; + + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + const ReadOptions& read_options, SequenceNumber read_seq, + bool immutable_memtable) override; + + FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator( + const ReadOptions& read_options, SequenceNumber read_seq, + size_t ts_sz) override; + + Status VerifyEncodedEntry(Slice encoded, + const ProtectionInfoKVOS64& kv_prot_info); + + // Add an entry into memtable that maps key to value at the + // specified sequence number and with the specified type. + // Typically, value will be empty if type==kTypeDeletion. + // + // REQUIRES: if allow_concurrent = false, external synchronization to prevent + // simultaneous operations on the same MemTable. + // + // Returns `Status::TryAgain` if the `seq`, `key` combination already exists + // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true. + // The next attempt should try a larger value for `seq`. + Status Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, const ProtectionInfoKVOS64* kv_prot_info, + bool allow_concurrent = false, + MemTablePostProcessInfo* post_process_info = nullptr, + void** hint = nullptr); + + using ReadOnlyMemTable::Get; + bool Get(const LookupKey& key, std::string* value, + PinnableWideColumns* columns, std::string* timestamp, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, bool immutable_memtable, + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, + bool do_merge = true) override; + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, - ReadCallback* callback, bool immutable_memtable); + ReadCallback* callback, bool immutable_memtable) override; // If `key` exists in current memtable with type value_type and the existing // value is at least as large as the new value, updates it in-place. Otherwise @@ -372,28 +571,19 @@ class MemTable { UpdateFlushState(); } - // Get total number of entries in the mem table. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable (unless this Memtable is immutable). - uint64_t num_entries() const { + uint64_t NumEntries() const override { return num_entries_.load(std::memory_order_relaxed); } - // Get total number of deletes in the mem table. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable (unless this Memtable is immutable). - uint64_t num_deletes() const { + uint64_t NumDeletion() const override { return num_deletes_.load(std::memory_order_relaxed); } - // Get total number of range deletions in the mem table. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable (unless this Memtable is immutable). - uint64_t num_range_deletes() const { + uint64_t NumRangeDeletion() const override { return num_range_deletes_.load(std::memory_order_relaxed); } - uint64_t get_data_size() const { + uint64_t GetDataSize() const override { return data_size_.load(std::memory_order_relaxed); } @@ -413,19 +603,12 @@ class MemTable { } } - // Returns the edits area that is needed for flushing the memtable - VersionEdit* GetEdits() { return &edit_; } - // Returns if there is no entry inserted to the mem table. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). bool IsEmpty() const { return first_seqno_ == 0; } - // Returns the sequence number of the first element that was inserted - // into the memtable. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable (unless this Memtable is immutable). - SequenceNumber GetFirstSequenceNumber() { + SequenceNumber GetFirstSequenceNumber() override { return first_seqno_.load(std::memory_order_relaxed); } @@ -437,14 +620,8 @@ class MemTable { return first_seqno_.store(first_seqno, std::memory_order_relaxed); } - // Returns the sequence number that is guaranteed to be smaller than or equal - // to the sequence number of any key that could be inserted into this - // memtable. It can then be assumed that any write with a larger(or equal) - // sequence number will be present in this memtable or a later memtable. - // - // If the earliest sequence number could not be determined, - // kMaxSequenceNumber will be returned. - SequenceNumber GetEarliestSequenceNumber() { + SequenceNumber GetEarliestSequenceNumber() override { + // With file ingestion and empty memtable, this seqno needs to be fixed. return earliest_seqno_.load(std::memory_order_relaxed); } @@ -463,40 +640,18 @@ class MemTable { void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } - // Returns the next active logfile number when this memtable is about to - // be flushed to storage - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } - - // Sets the next active logfile number when this memtable is about to - // be flushed to storage - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } - - // if this memtable contains data from a committed - // two phase transaction we must take note of the - // log which contains that data so we can know - // when to relese that log + // If this memtable contains data from a committed two phase transaction we + // must take note of the log which contains that data so we can know when + // to release that log. void RefLogContainingPrepSection(uint64_t log); - uint64_t GetMinLogContainingPrepSection(); + uint64_t GetMinLogContainingPrepSection() override; - // Notify the underlying storage that no more items will be added. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - // After MarkImmutable() is called, you should not attempt to - // write anything to this MemTable(). (Ie. do not call Add() or Update()). - void MarkImmutable() { + void MarkImmutable() override { table_->MarkReadOnly(); mem_tracker_.DoneAllocating(); } - // Notify the underlying storage that all data it contained has been - // persisted. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - void MarkFlushed() { table_->MarkFlushed(); } + void MarkFlushed() override { table_->MarkFlushed(); } // return true if the current MemTableRep supports merge operator. bool IsMergeOperatorSupported() const { @@ -509,18 +664,13 @@ class MemTable { return table_->IsSnapshotSupported() && !moptions_.inplace_update_support; } - struct MemTableStats { - uint64_t size; - uint64_t count; - }; - MemTableStats ApproximateStats(const Slice& start_ikey, - const Slice& end_ikey); + const Slice& end_ikey) override; // Get the lock associated for the key port::RWMutex* GetLock(const Slice& key); - const InternalKeyComparator& GetInternalKeyComparator() const { + const InternalKeyComparator& GetInternalKeyComparator() const override { return comparator_.comparator; } @@ -528,33 +678,10 @@ class MemTable { return &moptions_; } - uint64_t ApproximateOldestKeyTime() const { + uint64_t ApproximateOldestKeyTime() const override { return oldest_key_time_.load(std::memory_order_relaxed); } - // REQUIRES: db_mutex held. - void SetID(uint64_t id) { id_ = id; } - - uint64_t GetID() const { return id_; } - - void SetFlushCompleted(bool completed) { flush_completed_ = completed; } - - uint64_t GetFileNumber() const { return file_number_; } - - void SetFileNumber(uint64_t file_num) { file_number_ = file_num; } - - void SetFlushInProgress(bool in_progress) { - flush_in_progress_ = in_progress; - } - - void SetFlushJobInfo(std::unique_ptr&& info) { - flush_job_info_ = std::move(info); - } - - std::unique_ptr ReleaseFlushJobInfo() { - return std::move(flush_job_info_); - } - // Returns a heuristic flush decision bool ShouldFlushNow(); @@ -565,23 +692,12 @@ class MemTable { // SwitchMemtable() may fail. void ConstructFragmentedRangeTombstones(); - // Returns whether a fragmented range tombstone list is already constructed - // for this memtable. It should be constructed right before a memtable is - // added to an immutable memtable list. Note that if a memtable does not have - // any range tombstone, then no range tombstone list will ever be constructed - // and true is returned in that case. - bool IsFragmentedRangeTombstonesConstructed() const { + bool IsFragmentedRangeTombstonesConstructed() const override { return fragmented_range_tombstone_list_.get() != nullptr || is_range_del_table_empty_; } - // Get the newest user-defined timestamp contained in this MemTable. Check - // `newest_udt_` for what newer means. This method should only be invoked for - // an MemTable that has enabled user-defined timestamp feature and set - // `persist_user_defined_timestamps` to false. The tracked newest UDT will be - // used by flush job in the background to help check the MemTable's - // eligibility for Flush. - const Slice& GetNewestUDT() const; + const Slice& GetNewestUDT() const override; // Returns Corruption status if verification fails. static Status VerifyEntryChecksum(const char* entry, @@ -597,7 +713,6 @@ class MemTable { KeyComparator comparator_; const ImmutableMemTableOptions moptions_; - int refs_; const size_t kArenaBlockSize; AllocTracker mem_tracker_; ConcurrentArena arena_; @@ -614,15 +729,6 @@ class MemTable { // Dynamically changeable memtable option std::atomic write_buffer_size_; - // These are used to manage memtable flushes to storage - bool flush_in_progress_; // started the flush - bool flush_completed_; // finished the flush - uint64_t file_number_; // filled up after flush is complete - - // The updates to be applied to the transaction log when this - // memtable is flushed to storage. - VersionEdit edit_; - // The sequence number of the kv that was inserted first std::atomic first_seqno_; @@ -632,9 +738,6 @@ class MemTable { SequenceNumber creation_seq_; - // The log files earlier than this number can be deleted. - uint64_t mem_next_logfile_number_; - // the earliest log containing a prepared section // which has been inserted into this memtable. std::atomic min_prep_log_referenced_; @@ -658,15 +761,6 @@ class MemTable { // Timestamp of oldest key std::atomic oldest_key_time_; - // Memtable id to track flush. - uint64_t id_ = 0; - - // Sequence number of the atomic flush that is responsible for this memtable. - // The sequence number of atomic flush is a seq, such that no writes with - // sequence numbers greater than or equal to seq are flushed, while all - // writes with sequence number smaller than seq are flushed. - SequenceNumber atomic_flush_seqno_; - // keep track of memory usage in table_, arena_, and range_del_table_. // Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` std::atomic approximate_memory_usage_; @@ -675,9 +769,6 @@ class MemTable { // unlimited. uint32_t memtable_max_range_deletions_ = 0; - // Flush job info of the current memtable. - std::unique_ptr flush_job_info_; - // Size in bytes for the user-defined timestamps. size_t ts_sz_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 8ad4efcc2b..5f4029af2e 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -31,13 +31,13 @@ class InternalKeyComparator; class Mutex; class VersionSet; -void MemTableListVersion::AddMemTable(MemTable* m) { +void MemTableListVersion::AddMemTable(ReadOnlyMemTable* m) { memlist_.push_front(m); *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); } -void MemTableListVersion::UnrefMemTable(autovector* to_delete, - MemTable* m) { +void MemTableListVersion::UnrefMemTable( + autovector* to_delete, ReadOnlyMemTable* m) { if (m->Unref()) { to_delete->push_back(m); assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); @@ -74,7 +74,7 @@ MemTableListVersion::MemTableListVersion( void MemTableListVersion::Ref() { ++refs_; } // called by superversion::clean() -void MemTableListVersion::Unref(autovector* to_delete) { +void MemTableListVersion::Unref(autovector* to_delete) { assert(refs_ >= 1); --refs_; if (refs_ == 0) { @@ -131,7 +131,7 @@ void MemTableListVersion::MultiGet(const ReadOptions& read_options, bool MemTableListVersion::GetMergeOperands( const LookupKey& key, Status* s, MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) { - for (MemTable* memtable : memlist_) { + for (ReadOnlyMemTable* memtable : memlist_) { bool done = memtable->Get( key, /*value=*/nullptr, /*columns=*/nullptr, /*timestamp=*/nullptr, s, merge_context, max_covering_tombstone_seq, read_opts, @@ -154,11 +154,11 @@ bool MemTableListVersion::GetFromHistory( } bool MemTableListVersion::GetFromList( - std::list* list, const LookupKey& key, std::string* value, - PinnableWideColumns* columns, std::string* timestamp, Status* s, - MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, - SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback, - bool* is_blob_index) { + std::list* list, const LookupKey& key, + std::string* value, PinnableWideColumns* columns, std::string* timestamp, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { @@ -259,14 +259,14 @@ void MemTableListVersion::AddIterators( uint64_t MemTableListVersion::GetTotalNumEntries() const { uint64_t total_num = 0; for (auto& m : memlist_) { - total_num += m->num_entries(); + total_num += m->NumEntries(); } return total_num; } -MemTable::MemTableStats MemTableListVersion::ApproximateStats( - const Slice& start_ikey, const Slice& end_ikey) { - MemTable::MemTableStats total_stats = {0, 0}; +ReadOnlyMemTable::MemTableStats MemTableListVersion::ApproximateStats( + const Slice& start_ikey, const Slice& end_ikey) const { + ReadOnlyMemTable::MemTableStats total_stats = {0, 0}; for (auto& m : memlist_) { auto mStats = m->ApproximateStats(start_ikey, end_ikey); total_stats.size += mStats.size; @@ -278,7 +278,7 @@ MemTable::MemTableStats MemTableListVersion::ApproximateStats( uint64_t MemTableListVersion::GetTotalNumDeletes() const { uint64_t total_num = 0; for (auto& m : memlist_) { - total_num += m->num_deletes(); + total_num += m->NumDeletion(); } return total_num; } @@ -304,7 +304,8 @@ SequenceNumber MemTableListVersion::GetFirstSequenceNumber() const { } // caller is responsible for referencing m -void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { +void MemTableListVersion::Add(ReadOnlyMemTable* m, + autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable AddMemTable(m); // m->MemoryAllocatedBytes() is added in MemoryAllocatedBytesExcludingLast @@ -312,8 +313,8 @@ void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { } // Removes m from list of memtables not flushed. Caller should NOT Unref m. -void MemTableListVersion::Remove(MemTable* m, - autovector* to_delete) { +void MemTableListVersion::Remove(ReadOnlyMemTable* m, + autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable memlist_.remove(m); @@ -364,11 +365,11 @@ bool MemTableListVersion::HistoryShouldBeTrimmed(size_t usage) { } // Make sure we don't use up too much space in history -bool MemTableListVersion::TrimHistory(autovector* to_delete, +bool MemTableListVersion::TrimHistory(autovector* to_delete, size_t usage) { bool ret = false; while (HistoryShouldBeTrimmed(usage)) { - MemTable* x = memlist_history_.back(); + ReadOnlyMemTable* x = memlist_history_.back(); memlist_history_.pop_back(); UnrefMemTable(to_delete, x); @@ -398,7 +399,7 @@ bool MemTableList::IsFlushPendingOrRunning() const { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, - autovector* ret, + autovector* ret, uint64_t* max_next_log_number) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); @@ -412,7 +413,7 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, // However, when the mempurge feature is activated, new memtables with older // IDs will be added to the memlist. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { atomic_flush = true; } @@ -445,20 +446,21 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, } } -void MemTableList::RollbackMemtableFlush(const autovector& mems, - bool rollback_succeeding_memtables) { +void MemTableList::RollbackMemtableFlush( + const autovector& mems, + bool rollback_succeeding_memtables) { TEST_SYNC_POINT("RollbackMemtableFlush"); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_ROLLBACK); #ifndef NDEBUG - for (MemTable* m : mems) { + for (ReadOnlyMemTable* m : mems) { assert(m->flush_in_progress_); assert(m->file_number_ == 0); } #endif if (rollback_succeeding_memtables && !mems.empty()) { - std::list& memlist = current_->memlist_; + std::list& memlist = current_->memlist_; auto it = memlist.rbegin(); for (; *it != mems[0] && it != memlist.rend(); ++it) { } @@ -468,7 +470,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, ++it; } while (it != memlist.rend()) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; // Only rollback complete, not in-progress, // in_progress can be flushes that are still writing SSTs if (m->flush_completed_) { @@ -484,7 +486,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, } } - for (MemTable* m : mems) { + for (ReadOnlyMemTable* m : mems) { if (m->flush_in_progress_) { assert(m->file_number_ == 0); m->file_number_ = 0; @@ -503,10 +505,10 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Status::OK letting a concurrent flush to do actual the recording.. Status MemTableList::TryInstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& mems, LogsWithPrepTracker* prep_tracker, - VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, - autovector* to_delete, FSDirectory* db_directory, - LogBuffer* log_buffer, + const autovector& mems, + LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, + uint64_t file_number, autovector* to_delete, + FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, bool write_edits) { AutoThreadOperationStageUpdater stage_updater( @@ -555,10 +557,10 @@ Status MemTableList::TryInstallMemtableFlushResults( uint64_t batch_file_number = 0; size_t batch_count = 0; autovector edit_list; - autovector memtables_to_flush; + autovector memtables_to_flush; // enumerate from the last (earliest) element to see how many batch finished for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; if (!m->flush_completed_) { break; } @@ -646,7 +648,8 @@ Status MemTableList::TryInstallMemtableFlushResults( } // New memtables are inserted at the front of the list. -void MemTableList::Add(MemTable* m, autovector* to_delete) { +void MemTableList::Add(ReadOnlyMemTable* m, + autovector* to_delete) { assert(static_cast(current_->memlist_.size()) >= num_flush_not_started_); InstallNewVersion(); // this method is used to move mutable memtable into an immutable list. @@ -664,7 +667,8 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { ResetTrimHistoryNeeded(); } -bool MemTableList::TrimHistory(autovector* to_delete, size_t usage) { +bool MemTableList::TrimHistory(autovector* to_delete, + size_t usage) { // Check if history trim is needed first, so that we can avoid installing a // new MemTableListVersion without installing a SuperVersion (installed based // on return value of this function). @@ -734,7 +738,7 @@ void MemTableList::InstallNewVersion() { void MemTableList::RemoveMemTablesOrRestoreFlags( const Status& s, ColumnFamilyData* cfd, size_t batch_count, - LogBuffer* log_buffer, autovector* to_delete, + LogBuffer* log_buffer, autovector* to_delete, InstrumentedMutex* mu) { assert(mu); mu->AssertHeld(); @@ -763,7 +767,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( // the column family is dropped. if (s.ok() && !cfd->IsDropped()) { // commit new state while (batch_count-- > 0) { - MemTable* m = current_->memlist_.back(); + ReadOnlyMemTable* m = current_->memlist_.back(); if (m->edit_.GetBlobFileAdditions().empty()) { ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit flush result of table #%" PRIu64 @@ -786,7 +790,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } } else { for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; // commit failed. setup state so that we can flush again. if (m->edit_.GetBlobFileAdditions().empty()) { ROCKS_LOG_BUFFER(log_buffer, @@ -814,7 +818,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( - const std::unordered_set* memtables_to_flush) { + const std::unordered_set* memtables_to_flush) { uint64_t min_log = 0; for (auto& m : current_->memlist_) { @@ -837,12 +841,12 @@ Status InstallMemtableAtomicFlushResults( const autovector* imm_lists, const autovector& cfds, const autovector& mutable_cf_options_list, - const autovector*>& mems_list, VersionSet* vset, - LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, + const autovector*>& mems_list, + VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_metas, const autovector>*>& committed_flush_jobs_info, - autovector* to_delete, FSDirectory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); @@ -1006,14 +1010,14 @@ Status InstallMemtableAtomicFlushResults( return s; } -void MemTableList::RemoveOldMemTables(uint64_t log_number, - autovector* to_delete) { +void MemTableList::RemoveOldMemTables( + uint64_t log_number, autovector* to_delete) { assert(to_delete != nullptr); InstallNewVersion(); auto& memlist = current_->memlist_; - autovector old_memtables; + autovector old_memtables; for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* mem = *it; + ReadOnlyMemTable* mem = *it; if (mem->GetNextLogNumber() > log_number) { break; } @@ -1021,7 +1025,7 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, } for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) { - MemTable* mem = *it; + ReadOnlyMemTable* mem = *it; current_->Remove(mem, to_delete); --num_flush_not_started_; if (0 == num_flush_not_started_) { @@ -1044,9 +1048,9 @@ VersionEdit MemTableList::GetEditForDroppingCurrentVersion( uint64_t max_next_log_number = 0; autovector edit_list; - autovector memtables_to_drop; + autovector memtables_to_drop; for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; memtables_to_drop.push_back(m); max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number); } diff --git a/db/memtable_list.h b/db/memtable_list.h index 75afb5018d..a1e9311aff 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -34,11 +34,11 @@ class MemTableList; struct FlushJobInfo; -// keeps a list of immutable memtables in a vector. the list is immutable -// if refcount is bigger than one. It is used as a state for Get() and -// Iterator code paths +// keeps a list of immutable memtables (ReadOnlyMemtable*) in a vector. +// The list is immutable if refcount is bigger than one. It is used as +// a state for Get() and iterator code paths. // -// This class is not thread-safe. External synchronization is required +// This class is not thread-safe. External synchronization is required // (such as holding the db mutex or being on the write thread). class MemTableListVersion { public: @@ -49,7 +49,7 @@ class MemTableListVersion { int64_t max_write_buffer_size_to_maintain); void Ref(); - void Unref(autovector* to_delete = nullptr); + void Unref(autovector* to_delete = nullptr); // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. @@ -127,8 +127,8 @@ class MemTableListVersion { uint64_t GetTotalNumDeletes() const; - MemTable::MemTableStats ApproximateStats(const Slice& start_ikey, - const Slice& end_ikey); + ReadOnlyMemTable::MemTableStats ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey) const; // Returns the value of MemTable::GetEarliestSequenceNumber() on the most // recent MemTable in this list or kMaxSequenceNumber if the list is empty. @@ -153,27 +153,27 @@ class MemTableListVersion { const autovector* imm_lists, const autovector& cfds, const autovector& mutable_cf_options_list, - const autovector*>& mems_list, + const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, const autovector>*>& committed_flush_jobs_info, - autovector* to_delete, FSDirectory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); // REQUIRE: m is an immutable memtable - void Add(MemTable* m, autovector* to_delete); + void Add(ReadOnlyMemTable* m, autovector* to_delete); // REQUIRE: m is an immutable memtable - void Remove(MemTable* m, autovector* to_delete); + void Remove(ReadOnlyMemTable* m, autovector* to_delete); // Return true if the memtable list should be trimmed to get memory usage // under budget. bool HistoryShouldBeTrimmed(size_t usage); // Trim history, Return true if memtable is trimmed - bool TrimHistory(autovector* to_delete, size_t usage); + bool TrimHistory(autovector* to_delete, size_t usage); - bool GetFromList(std::list* list, const LookupKey& key, + bool GetFromList(std::list* list, const LookupKey& key, std::string* value, PinnableWideColumns* columns, std::string* timestamp, Status* s, MergeContext* merge_context, @@ -182,9 +182,10 @@ class MemTableListVersion { ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); - void AddMemTable(MemTable* m); + void AddMemTable(ReadOnlyMemTable* m); - void UnrefMemTable(autovector* to_delete, MemTable* m); + void UnrefMemTable(autovector* to_delete, + ReadOnlyMemTable* m); // Calculate the total amount of memory used by memlist_ and memlist_history_ // excluding the last MemTable in memlist_history_. The reason for excluding @@ -199,11 +200,11 @@ class MemTableListVersion { bool MemtableLimitExceeded(size_t usage); // Immutable MemTables that have not yet been flushed. - std::list memlist_; + std::list memlist_; // MemTables that have already been flushed // (used during Transaction validation) - std::list memlist_history_; + std::list memlist_history_; // Maximum number of MemTables to keep in memory (including both flushed const int max_write_buffer_number_to_maintain_; @@ -283,7 +284,7 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(uint64_t max_memtable_id, - autovector* mems, + autovector* mems, uint64_t* max_next_log_number = nullptr); // Reset status of the given memtable list back to pending state so that @@ -300,16 +301,16 @@ class MemTableList { // Note that we also do rollback in `write_manifest_cb` by calling // `RemoveMemTablesOrRestoreFlags()`. There we rollback the entire batch so // it is similar to what we do here with rollback_succeeding_memtables=true. - void RollbackMemtableFlush(const autovector& mems, + void RollbackMemtableFlush(const autovector& mems, bool rollback_succeeding_memtables); // Try commit a successful flush in the manifest file. It might just return // Status::OK letting a concurrent flush to do the actual the recording. Status TryInstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& m, LogsWithPrepTracker* prep_tracker, + const autovector& m, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, - autovector* to_delete, FSDirectory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info, bool write_edits = true); @@ -319,7 +320,7 @@ class MemTableList { // By default, adding memtables will flag that the memtable list needs to be // flushed, but in certain situations, like after a mempurge, we may want to // avoid flushing the memtable list upon addition of a memtable. - void Add(MemTable* m, autovector* to_delete); + void Add(ReadOnlyMemTable* m, autovector* to_delete); // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); @@ -341,7 +342,7 @@ class MemTableList { // memtable list. // // Return true if memtable is trimmed - bool TrimHistory(autovector* to_delete, size_t usage); + bool TrimHistory(autovector* to_delete, size_t usage); // Returns an estimate of the number of bytes of data used by // the unflushed mem-tables. @@ -393,7 +394,8 @@ class MemTableList { // Returns the min log containing the prep section after memtables listsed in // `memtables_to_flush` are flushed and their status is persisted in manifest. uint64_t PrecomputeMinLogContainingPrepSection( - const std::unordered_set* memtables_to_flush = nullptr); + const std::unordered_set* memtables_to_flush = + nullptr); uint64_t GetEarliestMemTableID() const { auto& memlist = current_->memlist_; @@ -411,7 +413,7 @@ class MemTableList { if (for_atomic_flush) { // Scan the memtable list from new to old for (auto it = memlist.begin(); it != memlist.end(); ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; if (m->atomic_flush_seqno_ != kMaxSequenceNumber) { return m->GetID(); } @@ -431,7 +433,7 @@ class MemTableList { // Iterating through the memlist starting at the end, the vector // ret is filled with memtables already sorted in increasing MemTable ID. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { - MemTable* m = *it; + ReadOnlyMemTable* m = *it; if (m->GetID() > max_memtable_id) { break; } @@ -444,7 +446,7 @@ class MemTableList { const auto& memlist = current_->memlist_; // Scan the memtable list from new to old for (auto it = memlist.begin(); it != memlist.end(); ++it) { - MemTable* mem = *it; + ReadOnlyMemTable* mem = *it; if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) { mem->atomic_flush_seqno_ = seq; } else { @@ -460,7 +462,7 @@ class MemTableList { // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are // not freed, but put into a vector for future deref and reclamation. void RemoveOldMemTables(uint64_t log_number, - autovector* to_delete); + autovector* to_delete); // This API is only used by atomic date replacement. To get an edit for // dropping the current `MemTableListVersion`. @@ -473,12 +475,12 @@ class MemTableList { const autovector* imm_lists, const autovector& cfds, const autovector& mutable_cf_options_list, - const autovector*>& mems_list, + const autovector*>& mems_list, VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, const autovector>*>& committed_flush_jobs_info, - autovector* to_delete, FSDirectory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); // DB mutex held @@ -488,7 +490,7 @@ class MemTableList { // Called after writing to MANIFEST void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd, size_t batch_count, LogBuffer* log_buffer, - autovector* to_delete, + autovector* to_delete, InstrumentedMutex* mu); const int min_write_buffer_number_to_merge_; @@ -529,11 +531,11 @@ Status InstallMemtableAtomicFlushResults( const autovector* imm_lists, const autovector& cfds, const autovector& mutable_cf_options_list, - const autovector*>& mems_list, VersionSet* vset, - LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, + const autovector*>& mems_list, + VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, const autovector& file_meta, const autovector>*>& committed_flush_jobs_info, - autovector* to_delete, FSDirectory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 22a96d67ff..812c176d27 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -98,7 +98,8 @@ class MemTableListTest : public testing::Test { // structures needed to call this function. Status Mock_InstallMemtableFlushResults( MemTableList* list, const MutableCFOptions& mutable_cf_options, - const autovector& m, autovector* to_delete) { + const autovector& m, + autovector* to_delete) { // Create a mock Logger test::NullLogger logger; LogBuffer log_buffer(DEBUG_LEVEL, &logger); @@ -148,8 +149,8 @@ class MemTableListTest : public testing::Test { Status Mock_InstallMemtableAtomicFlushResults( autovector& lists, const autovector& cf_ids, const autovector& mutable_cf_options_list, - const autovector*>& mems_list, - autovector* to_delete) { + const autovector*>& mems_list, + autovector* to_delete) { // Create a mock Logger test::NullLogger logger; LogBuffer log_buffer(DEBUG_LEVEL, &logger); @@ -227,12 +228,12 @@ TEST_F(MemTableListTest, Empty) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); ASSERT_FALSE(list.IsFlushPending()); - autovector mems; + autovector mems; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &mems); ASSERT_EQ(0, mems.size()); - autovector to_delete; + autovector to_delete; list.current()->Unref(&to_delete); ASSERT_EQ(0, to_delete.size()); } @@ -252,7 +253,7 @@ TEST_F(MemTableListTest, GetTest) { MergeContext merge_context; InternalKeyComparator ikey_cmp(options.comparator); SequenceNumber max_covering_tombstone_seq = 0; - autovector to_delete; + autovector to_delete; LookupKey lkey("key1", seq); bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr, @@ -322,8 +323,8 @@ TEST_F(MemTableListTest, GetTest) { ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value3.1"); - ASSERT_EQ(5, mem->num_entries()); - ASSERT_EQ(1, mem->num_deletes()); + ASSERT_EQ(5, mem->NumEntries()); + ASSERT_EQ(1, mem->NumDeletion()); // Add memtable to list // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed()) @@ -398,7 +399,7 @@ TEST_F(MemTableListTest, GetTest) { ASSERT_EQ(2, list.NumNotFlushed()); list.current()->Unref(&to_delete); - for (MemTable* m : to_delete) { + for (ReadOnlyMemTable* m : to_delete) { delete m; } } @@ -418,7 +419,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { MergeContext merge_context; InternalKeyComparator ikey_cmp(options.comparator); SequenceNumber max_covering_tombstone_seq = 0; - autovector to_delete; + autovector to_delete; LookupKey lkey("key1", seq); bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr, @@ -491,7 +492,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Flush this memtable from the list. // (It will then be a part of the memtable history). - autovector to_flush; + autovector to_flush; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); @@ -636,7 +637,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Cleanup list.current()->Unref(&to_delete); ASSERT_EQ(3, to_delete.size()); - for (MemTable* m : to_delete) { + for (ReadOnlyMemTable* m : to_delete) { delete m; } } @@ -651,7 +652,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ImmutableOptions ioptions(options); InternalKeyComparator cmp(BytewiseComparator()); WriteBufferManager wb(options.db_write_buffer_size); - autovector to_delete; + autovector to_delete; // Create MemTableList int min_write_buffer_number_to_merge = 3; @@ -692,7 +693,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Nothing to flush ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); - autovector to_flush; + autovector to_flush; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); @@ -758,7 +759,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush again - autovector to_flush2; + autovector to_flush2; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &to_flush2); ASSERT_EQ(0, to_flush2.size()); @@ -811,7 +812,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush again - autovector to_flush3; + autovector to_flush3; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &to_flush3); // Picks newest (fifth oldest) @@ -821,7 +822,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Nothing left to flush - autovector to_flush4; + autovector to_flush4; list.PickMemtablesToFlush( std::numeric_limits::max() /* memtable_id */, &to_flush4); ASSERT_EQ(0, to_flush4.size()); @@ -891,7 +892,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { memtable_id = 4; // Pick tables to flush. The tables to pick must have ID smaller than or // equal to 4. Therefore, no table will be selected in this case. - autovector to_flush5; + autovector to_flush5; list.FlushRequested(); ASSERT_TRUE(list.HasFlushRequested()); list.PickMemtablesToFlush(memtable_id, &to_flush5); @@ -932,8 +933,8 @@ TEST_F(MemTableListTest, EmptyAtomicFlushTest) { autovector lists; autovector cf_ids; autovector options_list; - autovector*> to_flush; - autovector to_delete; + autovector*> to_flush; + autovector to_delete; Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list, to_flush, &to_delete); ASSERT_OK(s); @@ -995,7 +996,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) { cf_ids.push_back(cf_id++); } - std::vector> flush_candidates(num_cfs); + std::vector> flush_candidates(num_cfs); // Nothing to flush for (auto i = 0; i != num_cfs; ++i) { @@ -1014,7 +1015,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) { ASSERT_FALSE(list->IsFlushPending()); ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); } - autovector to_delete; + autovector to_delete; // Add tables to the immutable memtalbe lists associated with column families for (auto i = 0; i != num_cfs; ++i) { for (auto j = 0; j != num_tables_per_cf; ++j) { @@ -1041,7 +1042,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) { autovector tmp_lists; autovector tmp_cf_ids; autovector tmp_options_list; - autovector*> to_flush; + autovector*> to_flush; for (auto i = 0; i != num_cfs; ++i) { if (!flush_candidates[i].empty()) { to_flush.push_back(&flush_candidates[i]); @@ -1122,7 +1123,7 @@ TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) { std::vector tables; MutableCFOptions mutable_cf_options(options); uint64_t current_ts = 0; - autovector to_delete; + autovector to_delete; std::vector newest_udts; std::string key; @@ -1162,7 +1163,7 @@ TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) { } list.current()->Unref(&to_delete); - for (MemTable* m : to_delete) { + for (ReadOnlyMemTable* m : to_delete) { delete m; } to_delete.clear();