Introduce an interface `ReadOnlyMemTable` for immutable memtables (#13107)

Summary:
This PR sets up follow-up changes for large transaction support. It introduces an interface that allows custom implementations of immutable memtables. Since transactions use a WriteBatchWithIndex to index their operations, I plan to add a ReadOnlyMemTable implementation backed by WriteBatchWithIndex. This will enable direct ingestion of WriteBatchWithIndex into the DB as an immutable memtable, bypassing memtable writes for transactions.

The changes mostly involve moving required methods for immutable memtables into the ReadOnlyMemTable class.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13107

Test Plan:
* Existing unit test and stress test.
* Performance: I do not expect this change to cause noticeable performance regressions with LTO and devirtualization. The memtable-only readrandom benchmark shows no consistent performance difference:
```
USE_LTO=1 OPTIMIZE_LEVEL="-O3"  DEBUG_LEVEL=0 make -j160 db_bench

(for I in $(seq 1 50);do ./db_bench --benchmarks=fillseq,readrandom --write_buffer_size=268435456 --writes=250000 --num=250000 --reads=500000  --seed=1723056275 2>&1 | grep "readrandom"; done;) | awk '{ t += $5; c++; print } END { print 1.0 * t / c }';

3 runs:
main: 760728, 752727, 739600
PR:   763036, 750696, 739022
```

Reviewed By: jowlyzhang

Differential Revision: D65365062

Pulled By: cbi42

fbshipit-source-id: 40c673ab856b91c65001ef6d6ac04b65286f2882
This commit is contained in:
changyubi 2024-11-04 16:09:34 -08:00 committed by Facebook GitHub Bot
parent 24045549a6
commit 2ce6902cf5
19 changed files with 522 additions and 422 deletions

View File

@ -466,7 +466,7 @@ void SuperVersion::Cleanup() {
// decrement reference to the immutable MemtableList
// this SV object was pointing to.
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
ReadOnlyMemTable* m = mem->Unref();
if (m != nullptr) {
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
@ -693,9 +693,9 @@ ColumnFamilyData::~ColumnFamilyData() {
if (mem_ != nullptr) {
delete mem_->Unref();
}
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
imm_.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
for (auto* m : to_delete) {
delete m;
}

View File

@ -207,7 +207,7 @@ struct SuperVersion {
// Accessing members of this class is not thread-safe and requires external
// synchronization (ie db mutex held or on write thread).
ColumnFamilyData* cfd;
MemTable* mem;
ReadOnlyMemTable* mem;
MemTableListVersion* imm;
Version* current;
MutableCFOptions mutable_cf_options;
@ -269,7 +269,7 @@ struct SuperVersion {
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
};
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);

View File

@ -4730,9 +4730,9 @@ void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
// Convert user_key into a corresponding internal key.
InternalKey k1(start.value(), kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(limit.value(), kMaxSequenceNumber, kValueTypeForSeek);
MemTable::MemTableStats memStats =
ReadOnlyMemTable::MemTableStats memStats =
sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
MemTable::MemTableStats immStats =
ReadOnlyMemTable::MemTableStats immStats =
sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
*count = memStats.count + immStats.count;
*size = memStats.size + immStats.size;

View File

@ -1709,7 +1709,7 @@ class DBImpl : public DB {
struct WriteContext {
SuperVersionContext superversion_context;
autovector<MemTable*> memtables_to_free_;
autovector<ReadOnlyMemTable*> memtables_to_free_;
explicit WriteContext(bool create_superversion = false)
: superversion_context(create_superversion) {}
@ -2051,6 +2051,8 @@ class DBImpl : public DB {
Status TrimMemtableHistory(WriteContext* context);
// Switches the current live memtable to immutable/read-only memtable.
// A new WAL is created if the current WAL is not empty.
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Select and output column families qualified for atomic flush in
@ -3007,7 +3009,8 @@ CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker);
const autovector<ReadOnlyMemTable*>& memtables,
LogsWithPrepTracker* prep_tracker);
// Return the earliest log file to keep after the memtable flush is
// finalized.
@ -3018,13 +3021,13 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
const autovector<ReadOnlyMemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// For atomic flush.
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// In non-2PC mode, WALs with log number < the returned number can be
@ -3041,11 +3044,11 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC(
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush);
VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush);
// For atomic flush.
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush);
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush);
// Fix user-supplied options to be reasonable
template <class T, class V>

View File

@ -753,7 +753,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (s.ok()) {
autovector<ColumnFamilyData*> tmp_cfds;
autovector<const autovector<MemTable*>*> mems_list;
autovector<const autovector<ReadOnlyMemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>

View File

@ -258,7 +258,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() {
}
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
autovector<MemTable*> empty_list;
autovector<ReadOnlyMemTable*> empty_list;
return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list);
}

View File

@ -739,7 +739,8 @@ void DBImpl::DeleteObsoleteFiles() {
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
VersionSet* vset, const ColumnFamilyData& cfd,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables, LogsWithPrepTracker* prep_tracker) {
const autovector<ReadOnlyMemTable*>& memtables,
LogsWithPrepTracker* prep_tracker) {
VersionEdit wal_deletion_edit;
uint64_t min_wal_number_to_keep = 0;
assert(edit_list.size() > 0);
@ -769,12 +770,12 @@ VersionEdit GetDBRecoveryEditForObsoletingMemTables(
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
std::unordered_set<MemTable*> memtables_to_flush_set(
std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set(
memtables_to_flush.begin(), memtables_to_flush.end());
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped()) {
@ -799,12 +800,12 @@ uint64_t FindMinPrepLogReferencedByMemTable(
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush) {
VersionSet* vset, const autovector<const autovector<ReadOnlyMemTable*>*>&
memtables_to_flush) {
uint64_t min_log = 0;
std::unordered_set<MemTable*> memtables_to_flush_set;
for (const autovector<MemTable*>* memtables : memtables_to_flush) {
std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set;
for (const autovector<ReadOnlyMemTable*>* memtables : memtables_to_flush) {
memtables_to_flush_set.insert(memtables->begin(), memtables->end());
}
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
@ -896,7 +897,7 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
const autovector<ReadOnlyMemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
@ -937,7 +938,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);

View File

@ -1769,7 +1769,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
s = io_s;
}
uint64_t total_num_entries = mem->num_entries();
uint64_t total_num_entries = mem->NumEntries();
if (s.ok() && total_num_entries != num_input_entries) {
std::string msg = "Expected " + std::to_string(total_num_entries) +
" entries in memtable, but read " +

View File

@ -687,7 +687,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
// Note: if we are to resume after non-OK statuses we need to revisit how
// we reacts to non-OK statuses here.
// we react to non-OK statuses here.
versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
@ -1610,6 +1610,8 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
Status DBImpl::WriteRecoverableState() {
mutex_.AssertHeld();
if (!cached_recoverable_state_empty_) {
// Only for write-prepared and write-unprepared.
assert(seq_per_batch_);
bool dont_care_bool;
SequenceNumber next_seq;
if (two_write_queues_) {
@ -2251,8 +2253,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
memtable_info.cf_name = cfd->GetName();
memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
memtable_info.num_entries = cfd->mem()->num_entries();
memtable_info.num_deletes = cfd->mem()->num_deletes();
memtable_info.num_entries = cfd->mem()->NumEntries();
memtable_info.num_deletes = cfd->mem()->NumDeletion();
if (!cfd->ioptions()->persist_user_defined_timestamps &&
cfd->user_comparator()->timestamp_size() > 0) {
const Slice& newest_udt = cfd->mem()->GetNewestUDT();

View File

@ -157,7 +157,7 @@ void FlushJob::ReportStartedFlush() {
IOSTATS_RESET(bytes_written);
}
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
void FlushJob::ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems) {
uint64_t input_size = 0;
for (auto* mem : mems) {
input_size += mem->ApproximateMemoryUsage();
@ -204,7 +204,7 @@ void FlushJob::PickMemTable() {
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems_[0];
ReadOnlyMemTable* m = mems_[0];
edit_ = m->GetEdits();
edit_->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
@ -420,7 +420,7 @@ Status FlushJob::MemPurge() {
std::vector<InternalIterator*> memtables;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
for (MemTable* m : mems_) {
for (ReadOnlyMemTable* m : mems_) {
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr));
auto* range_del_iter = m->NewRangeTombstoneIterator(
@ -713,11 +713,11 @@ bool FlushJob::MemPurgeDecider(double threshold) {
// Iterate over each memtable of the set.
for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
mem_iter++) {
MemTable* mt = *mem_iter;
++mem_iter) {
ReadOnlyMemTable* mt = *mem_iter;
// Else sample from the table.
uint64_t nentries = mt->num_entries();
uint64_t nentries = mt->NumEntries();
// Corrected Cochran formula for small populations
// (converges to n0 for large populations).
uint64_t target_sample_size =
@ -894,11 +894,12 @@ Status FlushJob::WriteLevel0Table() {
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
&mems_size);
assert(job_context_);
for (MemTable* m : mems_) {
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
for (ReadOnlyMemTable* m : mems_) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Flushing memtable id %" PRIu64
" with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetID(),
m->GetNextLogNumber());
if (logical_strip_timestamp) {
memtables.push_back(m->NewTimestampStrippingIterator(
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
@ -917,11 +918,11 @@ Status FlushJob::WriteLevel0Table() {
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_data_size += m->get_data_size();
total_num_entries += m->NumEntries();
total_num_deletes += m->NumDeletion();
total_data_size += m->GetDataSize();
total_memory_usage += m->ApproximateMemoryUsage();
total_num_range_deletes += m->num_range_deletes();
total_num_range_deletes += m->NumRangeDeletion();
}
// TODO(cbi): when memtable is flushed due to number of range deletions
@ -1172,7 +1173,7 @@ void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
return;
}
// Find the newest user-defined timestamps from all the flushed memtables.
for (MemTable* m : mems_) {
for (const ReadOnlyMemTable* m : mems_) {
Slice table_newest_udt = m->GetNewestUDT();
// Empty memtables can be legitimately created and flushed, for example
// by error recovery flush attempts.

View File

@ -91,7 +91,7 @@ class FlushJob {
bool* skipped_since_bg_error = nullptr,
ErrorHandler* error_handler = nullptr);
void Cancel();
const autovector<MemTable*>& GetMemTables() const { return mems_; }
const autovector<ReadOnlyMemTable*>& GetMemTables() const { return mems_; }
std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
return &committed_flush_jobs_info_;
@ -101,7 +101,7 @@ class FlushJob {
friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
static void ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table();
@ -205,7 +205,8 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
// Memtables to be flushed by this job.
autovector<ReadOnlyMemTable*> mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;

View File

@ -264,7 +264,7 @@ TEST_F(FlushJobTest, NonEmpty) {
}
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
for (auto& m : to_delete) {
@ -325,7 +325,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
}
}
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
for (auto mem : new_mems) {
mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(mem, &to_delete);
@ -380,7 +380,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
std::vector<uint64_t> memtable_ids;
std::vector<SequenceNumber> smallest_seqs;
std::vector<SequenceNumber> largest_seqs;
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
SequenceNumber curr_seqno = 0;
size_t k = 0;
for (auto cfd : all_cfds) {
@ -439,7 +439,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
autovector<const autovector<MemTable*>*> mems_list;
autovector<const autovector<ReadOnlyMemTable*>*> mems_list;
for (size_t i = 0; i != all_cfds.size(); ++i) {
const auto& mems = flush_jobs[i]->GetMemTables();
mems_list.push_back(&mems);
@ -528,7 +528,7 @@ TEST_F(FlushJobTest, Snapshots) {
}
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
for (auto& m : to_delete) {
@ -582,7 +582,7 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
}
}
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
for (auto mem : new_mems) {
mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(mem, &to_delete);
@ -654,7 +654,7 @@ TEST_F(FlushJobTest, ReplaceTimedPutWriteTimeWithPreferredSeqno) {
InternalKey largest_internal_key("foo", SequenceNumber(18), kTypeValue);
inserted_entries.push_back(
{largest_internal_key.Encode().ToString(), "fval"});
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
new_mem->ConstructFragmentedRangeTombstones();
cfd->imm()->Add(new_mem, &to_delete);
for (auto& m : to_delete) {
@ -744,7 +744,7 @@ class FlushJobTimestampTest
TEST_P(FlushJobTimestampTest, AllKeysExpired) {
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
{
MemTable* new_mem = cfd->ConstructNewMemtable(
@ -810,7 +810,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) {
TEST_P(FlushJobTimestampTest, NoKeyExpired) {
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
{
MemTable* new_mem = cfd->ConstructNewMemtable(

View File

@ -1301,7 +1301,7 @@ bool InternalStats::HandleNumEntriesActiveMemTable(uint64_t* value,
DBImpl* /*db*/,
Version* /*version*/) {
// Current number of entires in the active memtable
*value = cfd_->mem()->num_entries();
*value = cfd_->mem()->NumEntries();
return true;
}
@ -1317,7 +1317,7 @@ bool InternalStats::HandleNumDeletesActiveMemTable(uint64_t* value,
DBImpl* /*db*/,
Version* /*version*/) {
// Current number of entires in the active memtable
*value = cfd_->mem()->num_deletes();
*value = cfd_->mem()->NumDeletion();
return true;
}
@ -1334,11 +1334,11 @@ bool InternalStats::HandleEstimateNumKeys(uint64_t* value, DBImpl* /*db*/,
// Estimate number of entries in the column family:
// Use estimated entries in tables + total entries in memtables.
const auto* vstorage = cfd_->current()->storage_info();
uint64_t estimate_keys = cfd_->mem()->num_entries() +
uint64_t estimate_keys = cfd_->mem()->NumEntries() +
cfd_->imm()->current()->GetTotalNumEntries() +
vstorage->GetEstimatedActiveKeys();
uint64_t estimate_deletes =
cfd_->mem()->num_deletes() + cfd_->imm()->current()->GetTotalNumDeletes();
cfd_->mem()->NumDeletion() + cfd_->imm()->current()->GetTotalNumDeletes();
*value = estimate_keys > estimate_deletes * 2
? estimate_keys - (estimate_deletes * 2)
: 0;

View File

@ -191,7 +191,7 @@ struct JobContext {
std::vector<std::string> manifest_delete_files;
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
autovector<ReadOnlyMemTable*> memtables_to_free;
// contexts for installing superversions for multiple column families
std::vector<SuperVersionContext> superversion_contexts;

View File

@ -79,7 +79,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
SequenceNumber latest_seq, uint32_t column_family_id)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)),
mem_tracker_(write_buffer_manager),
arena_(moptions_.arena_block_size,
@ -101,13 +100,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
num_deletes_(0),
num_range_deletes_(0),
write_buffer_size_(mutable_cf_options.write_buffer_size),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks
@ -118,7 +113,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0),
memtable_max_range_deletions_(
mutable_cf_options.memtable_max_range_deletions) {
@ -832,8 +826,8 @@ port::RWMutex* MemTable::GetLock(const Slice& key) {
return &locks_[GetSliceRangedNPHash(key, locks_.size())];
}
MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey) {
ReadOnlyMemTable::MemTableStats MemTable::ApproximateStats(
const Slice& start_ikey, const Slice& end_ikey) {
uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
if (entry_count == 0) {

View File

@ -76,88 +76,48 @@ struct MemTablePostProcessInfo {
};
using MultiGetRange = MultiGetContext::Range;
// Note: Many of the methods in this class have comments indicating that
// For each CF, rocksdb maintains an active memtable that accept writes,
// and zero or more sealed memtables that we call immutable memtables.
// This interface contains all methods required for immutable memtables.
// MemTable class inherit from `ReadOnlyMemTable` and implements additional
// methods required for active memtables.
// Immutable memtable list (MemTableList) maintains a list of ReadOnlyMemTable
// objects. This interface enables feature like direct ingestion of an
// immutable memtable with custom implementation, bypassing memtable writes.
//
// Note: Many of the methods in this class have comments indicating that
// external synchronization is required as these methods are not thread-safe.
// It is up to higher layers of code to decide how to prevent concurrent
// invocation of these methods. This is usually done by acquiring either
// invocation of these methods. This is usually done by acquiring either
// the db mutex or the single writer thread.
//
// Some of these methods are documented to only require external
// synchronization if this memtable is immutable. Calling MarkImmutable() is
// synchronization if this memtable is immutable. Calling MarkImmutable() is
// not sufficient to guarantee immutability. It is up to higher layers of
// code to determine if this MemTable can still be modified by other threads.
// Eg: The Superversion stores a pointer to the current MemTable (that can
// be modified) and a separate list of the MemTables that can no longer be
// written to (aka the 'immutable memtables').
class MemTable {
//
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
class ReadOnlyMemTable {
public:
struct KeyComparator : public MemTableRep::KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const override;
int operator()(const char* prefix_len_key,
const DecodedType& key) const override;
};
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
//
// earliest_seq should be the current SequenceNumber in the db such that any
// key inserted into this memtable will have an equal or larger seq number.
// (When a db is first created, the earliest sequence number will be 0).
// If the earliest sequence number is not known, kMaxSequenceNumber may be
// used, but this may prevent some transactions from succeeding until the
// first key is inserted into the memtable.
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
// Do not delete this MemTable unless Unref() indicates it not in use.
~MemTable();
virtual ~ReadOnlyMemTable() = default;
// Increase reference count.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Ref() { ++refs_; }
// Drop reference count.
// If the refcount goes to zero return this memtable, otherwise return null.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
MemTable* Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
return this;
}
return nullptr;
}
virtual const char* Name() const = 0;
// Returns an estimate of the number of bytes of data in use by this
// data structure.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
size_t ApproximateMemoryUsage();
// As a cheap version of `ApproximateMemoryUsage()`, this function doesn't
// require external synchronization. The value may be less accurate though
size_t ApproximateMemoryUsageFast() const {
return approximate_memory_usage_.load(std::memory_order_relaxed);
}
virtual size_t ApproximateMemoryUsage() = 0;
// used by MemTableListVersion::MemoryAllocatedBytesExcludingLast
size_t MemoryAllocatedBytes() const {
return table_->ApproximateMemoryUsage() +
range_del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();
}
virtual size_t MemoryAllocatedBytes() const = 0;
// Returns a vector of unique random memtable entries of size 'sample_size'.
//
@ -172,27 +132,8 @@ class MemTable {
// REQUIRES: SkipList memtable representation. This function is not
// implemented for any other type of memtable representation (vectorrep,
// hashskiplist,...).
void UniqueRandomSample(const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) {
// TODO(bjlemaire): at the moment, only supported by skiplistrep.
// Extend it to all other memtable representations.
table_->UniqueRandomSample(num_entries(), target_sample_size, entries);
}
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
}
// Returns true if a flush should be scheduled and the caller should
// be the one to schedule it
bool MarkFlushScheduled() {
auto before = FLUSH_REQUESTED;
return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
virtual void UniqueRandomSample(const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) = 0;
// Return an iterator that yields the contents of the memtable.
//
@ -208,18 +149,18 @@ class MemTable {
// those allocated in arena.
// seqno_to_time_mapping: it's used to support return write unix time for the
// data, currently only needed for iterators serving user reads.
InternalIterator* NewIterator(
virtual InternalIterator* NewIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor);
const SliceTransform* prefix_extractor) = 0;
// Returns an iterator that wraps a MemTableIterator and logically strips the
// user-defined timestamp of each key. This API is only used by flush when
// user-defined timestamps in MemTable only feature is enabled.
InternalIterator* NewTimestampStrippingIterator(
virtual InternalIterator* NewTimestampStrippingIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor, size_t ts_sz);
const SliceTransform* prefix_extractor, size_t ts_sz) = 0;
// Returns an iterator that yields the range tombstones of the memtable.
// The caller must ensure that the underlying MemTable remains live
@ -231,38 +172,23 @@ class MemTable {
// is constructed when a memtable becomes immutable. Setting the flag to false
// will always yield correct result, but may incur performance penalty as it
// always creates a new fragmented range tombstone list.
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable);
bool immutable_memtable) = 0;
// Returns an iterator that yields the range tombstones of the memtable and
// logically strips the user-defined timestamp of each key (including start
// key, and end key). This API is only used by flush when user-defined
// timestamps in MemTable only feature is enabled.
FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz);
Status VerifyEncodedEntry(Slice encoded,
const ProtectionInfoKVOS64& kv_prot_info);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
//
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
//
// Returns `Status::TryAgain` if the `seq`, `key` combination already exists
// in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
// The next attempt should try a larger value for `seq`.
Status Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, const ProtectionInfoKVOS64* kv_prot_info,
bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);
virtual FragmentedRangeTombstoneIterator*
NewTimestampStrippingRangeTombstoneIterator(const ReadOptions& read_options,
SequenceNumber read_seq,
size_t ts_sz) = 0;
// Used to Get value associated with key or Get Merge Operands associated
// with key.
// Keys are considered if they are no larger than the parameter `key` in
// the order defined by comparator and share the save user key with `key`.
// If do_merge = true the default behavior which is Get value for key is
// executed. Expected behavior is described right below.
// If memtable contains a value for key, store it in *value and return true.
@ -291,14 +217,13 @@ class MemTable {
// @param immutable_memtable Whether this memtable is immutable. Used
// internally by NewRangeTombstoneIterator(). See comment above
// NewRangeTombstoneIterator() for more detail.
bool Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true);
virtual bool Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
bool immutable_memtable, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr, bool do_merge = true) = 0;
bool Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp, Status* s,
MergeContext* merge_context,
@ -315,8 +240,282 @@ class MemTable {
// @param immutable_memtable Whether this memtable is immutable. Used
// internally by NewRangeTombstoneIterator(). See comment above
// NewRangeTombstoneIterator() for more detail.
virtual void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool immutable_memtable) = 0;
// Get total number of entries in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
virtual uint64_t NumEntries() const = 0;
// Get total number of point deletes in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
virtual uint64_t NumDeletion() const = 0;
// Get total number of range deletions in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
virtual uint64_t NumRangeDeletion() const = 0;
virtual uint64_t GetDataSize() const = 0;
// Returns the sequence number of the first element that was inserted
// into the memtable.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
virtual SequenceNumber GetFirstSequenceNumber() = 0;
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
virtual SequenceNumber GetEarliestSequenceNumber() = 0;
virtual uint64_t GetMinLogContainingPrepSection() = 0;
// Notify the underlying storage that no more items will be added.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
// After MarkImmutable() is called, you should not attempt to
// write anything to this MemTable(). (Ie. do not call Add() or Update()).
virtual void MarkImmutable() = 0;
// Notify the underlying storage that all data it contained has been
// persisted.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
virtual void MarkFlushed() = 0;
struct MemTableStats {
uint64_t size;
uint64_t count;
};
virtual MemTableStats ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey) = 0;
virtual const InternalKeyComparator& GetInternalKeyComparator() const = 0;
virtual uint64_t ApproximateOldestKeyTime() const = 0;
// Returns whether a fragmented range tombstone list is already constructed
// for this memtable. It should be constructed right before a memtable is
// added to an immutable memtable list. Note that if a memtable does not have
// any range tombstone, then no range tombstone list will ever be constructed
// and true is returned in that case.
virtual bool IsFragmentedRangeTombstonesConstructed() const = 0;
// Get the newest user-defined timestamp contained in this MemTable. Check
// `newest_udt_` for what newer means. This method should only be invoked for
// an MemTable that has enabled user-defined timestamp feature and set
// `persist_user_defined_timestamps` to false. The tracked newest UDT will be
// used by flush job in the background to help check the MemTable's
// eligibility for Flush.
virtual const Slice& GetNewestUDT() const = 0;
// Increase reference count.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Ref() { ++refs_; }
// Drop reference count.
// If the refcount goes to zero return this memtable, otherwise return null.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
ReadOnlyMemTable* Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
return this;
}
return nullptr;
}
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
uint64_t GetNextLogNumber() const { return mem_next_logfile_number_; }
// Sets the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// REQUIRES: db_mutex held.
void SetID(uint64_t id) { id_ = id; }
uint64_t GetID() const { return id_; }
void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
uint64_t GetFileNumber() const { return file_number_; }
void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
void SetFlushInProgress(bool in_progress) {
flush_in_progress_ = in_progress;
}
void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
flush_job_info_ = std::move(info);
};
std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
return std::move(flush_job_info_);
}
protected:
friend class MemTableList;
int refs_{0};
// These are used to manage memtable flushes to storage
bool flush_in_progress_{false}; // started the flush
bool flush_completed_{false}; // finished the flush
uint64_t file_number_{0};
// The updates to be applied to the transaction log when this
// memtable is flushed to storage.
VersionEdit edit_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_{0};
// Memtable id to track flush.
uint64_t id_ = 0;
// Sequence number of the atomic flush that is responsible for this memtable.
// The sequence number of atomic flush is a seq, such that no writes with
// sequence numbers greater than or equal to seq are flushed, while all
// writes with sequence number smaller than seq are flushed.
SequenceNumber atomic_flush_seqno_{kMaxSequenceNumber};
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
};
class MemTable final : public ReadOnlyMemTable {
public:
struct KeyComparator final : public MemTableRep::KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const override;
int operator()(const char* prefix_len_key,
const DecodedType& key) const override;
};
// earliest_seq should be the current SequenceNumber in the db such that any
// key inserted into this memtable will have an equal or larger seq number.
// (When a db is first created, the earliest sequence number will be 0).
// If the earliest sequence number is not known, kMaxSequenceNumber may be
// used, but this may prevent some transactions from succeeding until the
// first key is inserted into the memtable.
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
~MemTable() override;
const char* Name() const override { return "MemTable"; }
size_t ApproximateMemoryUsage() override;
// As a cheap version of `ApproximateMemoryUsage()`, this function doesn't
// require external synchronization. The value may be less accurate though
size_t ApproximateMemoryUsageFast() const {
return approximate_memory_usage_.load(std::memory_order_relaxed);
}
size_t MemoryAllocatedBytes() const override {
return table_->ApproximateMemoryUsage() +
range_del_table_->ApproximateMemoryUsage() +
arena_.MemoryAllocatedBytes();
}
void UniqueRandomSample(const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) override {
// TODO(bjlemaire): at the moment, only supported by skiplistrep.
// Extend it to all other memtable representations.
table_->UniqueRandomSample(NumEntries(), target_sample_size, entries);
}
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
}
// Returns true if a flush should be scheduled and the caller should
// be the one to schedule it
bool MarkFlushScheduled() {
auto before = FLUSH_REQUESTED;
return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
InternalIterator* NewIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor) override;
InternalIterator* NewTimestampStrippingIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor, size_t ts_sz) override;
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable) override;
FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq,
size_t ts_sz) override;
Status VerifyEncodedEntry(Slice encoded,
const ProtectionInfoKVOS64& kv_prot_info);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically, value will be empty if type==kTypeDeletion.
//
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
//
// Returns `Status::TryAgain` if the `seq`, `key` combination already exists
// in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
// The next attempt should try a larger value for `seq`.
Status Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, const ProtectionInfoKVOS64* kv_prot_info,
bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);
using ReadOnlyMemTable::Get;
bool Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true) override;
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool immutable_memtable);
ReadCallback* callback, bool immutable_memtable) override;
// If `key` exists in current memtable with type value_type and the existing
// value is at least as large as the new value, updates it in-place. Otherwise
@ -372,28 +571,19 @@ class MemTable {
UpdateFlushState();
}
// Get total number of entries in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_entries() const {
uint64_t NumEntries() const override {
return num_entries_.load(std::memory_order_relaxed);
}
// Get total number of deletes in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_deletes() const {
uint64_t NumDeletion() const override {
return num_deletes_.load(std::memory_order_relaxed);
}
// Get total number of range deletions in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_range_deletes() const {
uint64_t NumRangeDeletion() const override {
return num_range_deletes_.load(std::memory_order_relaxed);
}
uint64_t get_data_size() const {
uint64_t GetDataSize() const override {
return data_size_.load(std::memory_order_relaxed);
}
@ -413,19 +603,12 @@ class MemTable {
}
}
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
// Returns if there is no entry inserted to the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
bool IsEmpty() const { return first_seqno_ == 0; }
// Returns the sequence number of the first element that was inserted
// into the memtable.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
SequenceNumber GetFirstSequenceNumber() {
SequenceNumber GetFirstSequenceNumber() override {
return first_seqno_.load(std::memory_order_relaxed);
}
@ -437,14 +620,8 @@ class MemTable {
return first_seqno_.store(first_seqno, std::memory_order_relaxed);
}
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
SequenceNumber GetEarliestSequenceNumber() {
SequenceNumber GetEarliestSequenceNumber() override {
// With file ingestion and empty memtable, this seqno needs to be fixed.
return earliest_seqno_.load(std::memory_order_relaxed);
}
@ -463,40 +640,18 @@ class MemTable {
void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
// Sets the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// if this memtable contains data from a committed
// two phase transaction we must take note of the
// log which contains that data so we can know
// when to relese that log
// If this memtable contains data from a committed two phase transaction we
// must take note of the log which contains that data so we can know when
// to release that log.
void RefLogContainingPrepSection(uint64_t log);
uint64_t GetMinLogContainingPrepSection();
uint64_t GetMinLogContainingPrepSection() override;
// Notify the underlying storage that no more items will be added.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
// After MarkImmutable() is called, you should not attempt to
// write anything to this MemTable(). (Ie. do not call Add() or Update()).
void MarkImmutable() {
void MarkImmutable() override {
table_->MarkReadOnly();
mem_tracker_.DoneAllocating();
}
// Notify the underlying storage that all data it contained has been
// persisted.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void MarkFlushed() { table_->MarkFlushed(); }
void MarkFlushed() override { table_->MarkFlushed(); }
// return true if the current MemTableRep supports merge operator.
bool IsMergeOperatorSupported() const {
@ -509,18 +664,13 @@ class MemTable {
return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
}
struct MemTableStats {
uint64_t size;
uint64_t count;
};
MemTableStats ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey);
const Slice& end_ikey) override;
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
const InternalKeyComparator& GetInternalKeyComparator() const {
const InternalKeyComparator& GetInternalKeyComparator() const override {
return comparator_.comparator;
}
@ -528,33 +678,10 @@ class MemTable {
return &moptions_;
}
uint64_t ApproximateOldestKeyTime() const {
uint64_t ApproximateOldestKeyTime() const override {
return oldest_key_time_.load(std::memory_order_relaxed);
}
// REQUIRES: db_mutex held.
void SetID(uint64_t id) { id_ = id; }
uint64_t GetID() const { return id_; }
void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
uint64_t GetFileNumber() const { return file_number_; }
void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
void SetFlushInProgress(bool in_progress) {
flush_in_progress_ = in_progress;
}
void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
flush_job_info_ = std::move(info);
}
std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
return std::move(flush_job_info_);
}
// Returns a heuristic flush decision
bool ShouldFlushNow();
@ -565,23 +692,12 @@ class MemTable {
// SwitchMemtable() may fail.
void ConstructFragmentedRangeTombstones();
// Returns whether a fragmented range tombstone list is already constructed
// for this memtable. It should be constructed right before a memtable is
// added to an immutable memtable list. Note that if a memtable does not have
// any range tombstone, then no range tombstone list will ever be constructed
// and true is returned in that case.
bool IsFragmentedRangeTombstonesConstructed() const {
bool IsFragmentedRangeTombstonesConstructed() const override {
return fragmented_range_tombstone_list_.get() != nullptr ||
is_range_del_table_empty_;
}
// Get the newest user-defined timestamp contained in this MemTable. Check
// `newest_udt_` for what newer means. This method should only be invoked for
// an MemTable that has enabled user-defined timestamp feature and set
// `persist_user_defined_timestamps` to false. The tracked newest UDT will be
// used by flush job in the background to help check the MemTable's
// eligibility for Flush.
const Slice& GetNewestUDT() const;
const Slice& GetNewestUDT() const override;
// Returns Corruption status if verification fails.
static Status VerifyEntryChecksum(const char* entry,
@ -597,7 +713,6 @@ class MemTable {
KeyComparator comparator_;
const ImmutableMemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
AllocTracker mem_tracker_;
ConcurrentArena arena_;
@ -614,15 +729,6 @@ class MemTable {
// Dynamically changeable memtable option
std::atomic<size_t> write_buffer_size_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush
bool flush_completed_; // finished the flush
uint64_t file_number_; // filled up after flush is complete
// The updates to be applied to the transaction log when this
// memtable is flushed to storage.
VersionEdit edit_;
// The sequence number of the kv that was inserted first
std::atomic<SequenceNumber> first_seqno_;
@ -632,9 +738,6 @@ class MemTable {
SequenceNumber creation_seq_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
// the earliest log containing a prepared section
// which has been inserted into this memtable.
std::atomic<uint64_t> min_prep_log_referenced_;
@ -658,15 +761,6 @@ class MemTable {
// Timestamp of oldest key
std::atomic<uint64_t> oldest_key_time_;
// Memtable id to track flush.
uint64_t id_ = 0;
// Sequence number of the atomic flush that is responsible for this memtable.
// The sequence number of atomic flush is a seq, such that no writes with
// sequence numbers greater than or equal to seq are flushed, while all
// writes with sequence number smaller than seq are flushed.
SequenceNumber atomic_flush_seqno_;
// keep track of memory usage in table_, arena_, and range_del_table_.
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;
@ -675,9 +769,6 @@ class MemTable {
// unlimited.
uint32_t memtable_max_range_deletions_ = 0;
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
// Size in bytes for the user-defined timestamps.
size_t ts_sz_;

View File

@ -31,13 +31,13 @@ class InternalKeyComparator;
class Mutex;
class VersionSet;
void MemTableListVersion::AddMemTable(MemTable* m) {
void MemTableListVersion::AddMemTable(ReadOnlyMemTable* m) {
memlist_.push_front(m);
*parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
}
void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
MemTable* m) {
void MemTableListVersion::UnrefMemTable(
autovector<ReadOnlyMemTable*>* to_delete, ReadOnlyMemTable* m) {
if (m->Unref()) {
to_delete->push_back(m);
assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
@ -74,7 +74,7 @@ MemTableListVersion::MemTableListVersion(
void MemTableListVersion::Ref() { ++refs_; }
// called by superversion::clean()
void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
void MemTableListVersion::Unref(autovector<ReadOnlyMemTable*>* to_delete) {
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
@ -131,7 +131,7 @@ void MemTableListVersion::MultiGet(const ReadOptions& read_options,
bool MemTableListVersion::GetMergeOperands(
const LookupKey& key, Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
for (MemTable* memtable : memlist_) {
for (ReadOnlyMemTable* memtable : memlist_) {
bool done = memtable->Get(
key, /*value=*/nullptr, /*columns=*/nullptr, /*timestamp=*/nullptr, s,
merge_context, max_covering_tombstone_seq, read_opts,
@ -154,11 +154,11 @@ bool MemTableListVersion::GetFromHistory(
}
bool MemTableListVersion::GetFromList(
std::list<MemTable*>* list, const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp, Status* s,
MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
std::list<ReadOnlyMemTable*>* list, const LookupKey& key,
std::string* value, PinnableWideColumns* columns, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
@ -259,14 +259,14 @@ void MemTableListVersion::AddIterators(
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_entries();
total_num += m->NumEntries();
}
return total_num;
}
MemTable::MemTableStats MemTableListVersion::ApproximateStats(
const Slice& start_ikey, const Slice& end_ikey) {
MemTable::MemTableStats total_stats = {0, 0};
ReadOnlyMemTable::MemTableStats MemTableListVersion::ApproximateStats(
const Slice& start_ikey, const Slice& end_ikey) const {
ReadOnlyMemTable::MemTableStats total_stats = {0, 0};
for (auto& m : memlist_) {
auto mStats = m->ApproximateStats(start_ikey, end_ikey);
total_stats.size += mStats.size;
@ -278,7 +278,7 @@ MemTable::MemTableStats MemTableListVersion::ApproximateStats(
uint64_t MemTableListVersion::GetTotalNumDeletes() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {
total_num += m->num_deletes();
total_num += m->NumDeletion();
}
return total_num;
}
@ -304,7 +304,8 @@ SequenceNumber MemTableListVersion::GetFirstSequenceNumber() const {
}
// caller is responsible for referencing m
void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
void MemTableListVersion::Add(ReadOnlyMemTable* m,
autovector<ReadOnlyMemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
AddMemTable(m);
// m->MemoryAllocatedBytes() is added in MemoryAllocatedBytesExcludingLast
@ -312,8 +313,8 @@ void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
}
// Removes m from list of memtables not flushed. Caller should NOT Unref m.
void MemTableListVersion::Remove(MemTable* m,
autovector<MemTable*>* to_delete) {
void MemTableListVersion::Remove(ReadOnlyMemTable* m,
autovector<ReadOnlyMemTable*>* to_delete) {
assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
memlist_.remove(m);
@ -364,11 +365,11 @@ bool MemTableListVersion::HistoryShouldBeTrimmed(size_t usage) {
}
// Make sure we don't use up too much space in history
bool MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
bool MemTableListVersion::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
size_t usage) {
bool ret = false;
while (HistoryShouldBeTrimmed(usage)) {
MemTable* x = memlist_history_.back();
ReadOnlyMemTable* x = memlist_history_.back();
memlist_history_.pop_back();
UnrefMemTable(to_delete, x);
@ -398,7 +399,7 @@ bool MemTableList::IsFlushPendingOrRunning() const {
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
autovector<MemTable*>* ret,
autovector<ReadOnlyMemTable*>* ret,
uint64_t* max_next_log_number) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
@ -412,7 +413,7 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
// However, when the mempurge feature is activated, new memtables with older
// IDs will be added to the memlist.
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
atomic_flush = true;
}
@ -445,20 +446,21 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
}
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
bool rollback_succeeding_memtables) {
void MemTableList::RollbackMemtableFlush(
const autovector<ReadOnlyMemTable*>& mems,
bool rollback_succeeding_memtables) {
TEST_SYNC_POINT("RollbackMemtableFlush");
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
#ifndef NDEBUG
for (MemTable* m : mems) {
for (ReadOnlyMemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
}
#endif
if (rollback_succeeding_memtables && !mems.empty()) {
std::list<MemTable*>& memlist = current_->memlist_;
std::list<ReadOnlyMemTable*>& memlist = current_->memlist_;
auto it = memlist.rbegin();
for (; *it != mems[0] && it != memlist.rend(); ++it) {
}
@ -468,7 +470,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
++it;
}
while (it != memlist.rend()) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
// Only rollback complete, not in-progress,
// in_progress can be flushes that are still writing SSTs
if (m->flush_completed_) {
@ -484,7 +486,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
}
}
for (MemTable* m : mems) {
for (ReadOnlyMemTable* m : mems) {
if (m->flush_in_progress_) {
assert(m->file_number_ == 0);
m->file_number_ = 0;
@ -503,10 +505,10 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Status::OK letting a concurrent flush to do actual the recording..
Status MemTableList::TryInstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
const autovector<ReadOnlyMemTable*>& mems,
LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu,
uint64_t file_number, autovector<ReadOnlyMemTable*>* to_delete,
FSDirectory* db_directory, LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
bool write_edits) {
AutoThreadOperationStageUpdater stage_updater(
@ -555,10 +557,10 @@ Status MemTableList::TryInstallMemtableFlushResults(
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_flush;
autovector<ReadOnlyMemTable*> memtables_to_flush;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
if (!m->flush_completed_) {
break;
}
@ -646,7 +648,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
void MemTableList::Add(ReadOnlyMemTable* m,
autovector<ReadOnlyMemTable*>* to_delete) {
assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
@ -664,7 +667,8 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
ResetTrimHistoryNeeded();
}
bool MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
bool MemTableList::TrimHistory(autovector<ReadOnlyMemTable*>* to_delete,
size_t usage) {
// Check if history trim is needed first, so that we can avoid installing a
// new MemTableListVersion without installing a SuperVersion (installed based
// on return value of this function).
@ -734,7 +738,7 @@ void MemTableList::InstallNewVersion() {
void MemTableList::RemoveMemTablesOrRestoreFlags(
const Status& s, ColumnFamilyData* cfd, size_t batch_count,
LogBuffer* log_buffer, autovector<MemTable*>* to_delete,
LogBuffer* log_buffer, autovector<ReadOnlyMemTable*>* to_delete,
InstrumentedMutex* mu) {
assert(mu);
mu->AssertHeld();
@ -763,7 +767,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
ReadOnlyMemTable* m = current_->memlist_.back();
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Level-0 commit flush result of table #%" PRIu64
@ -786,7 +790,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
// commit failed. setup state so that we can flush again.
if (m->edit_.GetBlobFileAdditions().empty()) {
ROCKS_LOG_BUFFER(log_buffer,
@ -814,7 +818,7 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
}
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const std::unordered_set<MemTable*>* memtables_to_flush) {
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush) {
uint64_t min_log = 0;
for (auto& m : current_->memlist_) {
@ -837,12 +841,12 @@ Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_metas,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
@ -1006,14 +1010,14 @@ Status InstallMemtableAtomicFlushResults(
return s;
}
void MemTableList::RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete) {
void MemTableList::RemoveOldMemTables(
uint64_t log_number, autovector<ReadOnlyMemTable*>* to_delete) {
assert(to_delete != nullptr);
InstallNewVersion();
auto& memlist = current_->memlist_;
autovector<MemTable*> old_memtables;
autovector<ReadOnlyMemTable*> old_memtables;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* mem = *it;
ReadOnlyMemTable* mem = *it;
if (mem->GetNextLogNumber() > log_number) {
break;
}
@ -1021,7 +1025,7 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number,
}
for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
MemTable* mem = *it;
ReadOnlyMemTable* mem = *it;
current_->Remove(mem, to_delete);
--num_flush_not_started_;
if (0 == num_flush_not_started_) {
@ -1044,9 +1048,9 @@ VersionEdit MemTableList::GetEditForDroppingCurrentVersion(
uint64_t max_next_log_number = 0;
autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_drop;
autovector<ReadOnlyMemTable*> memtables_to_drop;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
memtables_to_drop.push_back(m);
max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number);
}

View File

@ -34,11 +34,11 @@ class MemTableList;
struct FlushJobInfo;
// keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
// Iterator code paths
// keeps a list of immutable memtables (ReadOnlyMemtable*) in a vector.
// The list is immutable if refcount is bigger than one. It is used as
// a state for Get() and iterator code paths.
//
// This class is not thread-safe. External synchronization is required
// This class is not thread-safe. External synchronization is required
// (such as holding the db mutex or being on the write thread).
class MemTableListVersion {
public:
@ -49,7 +49,7 @@ class MemTableListVersion {
int64_t max_write_buffer_size_to_maintain);
void Ref();
void Unref(autovector<MemTable*>* to_delete = nullptr);
void Unref(autovector<ReadOnlyMemTable*>* to_delete = nullptr);
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
@ -127,8 +127,8 @@ class MemTableListVersion {
uint64_t GetTotalNumDeletes() const;
MemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey);
ReadOnlyMemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
const Slice& end_ikey) const;
// Returns the value of MemTable::GetEarliestSequenceNumber() on the most
// recent MemTable in this list or kMaxSequenceNumber if the list is empty.
@ -153,27 +153,27 @@ class MemTableListVersion {
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
// REQUIRE: m is an immutable memtable
void Add(MemTable* m, autovector<MemTable*>* to_delete);
void Add(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
// REQUIRE: m is an immutable memtable
void Remove(MemTable* m, autovector<MemTable*>* to_delete);
void Remove(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
// Return true if the memtable list should be trimmed to get memory usage
// under budget.
bool HistoryShouldBeTrimmed(size_t usage);
// Trim history, Return true if memtable is trimmed
bool TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
bool TrimHistory(autovector<ReadOnlyMemTable*>* to_delete, size_t usage);
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
bool GetFromList(std::list<ReadOnlyMemTable*>* list, const LookupKey& key,
std::string* value, PinnableWideColumns* columns,
std::string* timestamp, Status* s,
MergeContext* merge_context,
@ -182,9 +182,10 @@ class MemTableListVersion {
ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
void AddMemTable(MemTable* m);
void AddMemTable(ReadOnlyMemTable* m);
void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
void UnrefMemTable(autovector<ReadOnlyMemTable*>* to_delete,
ReadOnlyMemTable* m);
// Calculate the total amount of memory used by memlist_ and memlist_history_
// excluding the last MemTable in memlist_history_. The reason for excluding
@ -199,11 +200,11 @@ class MemTableListVersion {
bool MemtableLimitExceeded(size_t usage);
// Immutable MemTables that have not yet been flushed.
std::list<MemTable*> memlist_;
std::list<ReadOnlyMemTable*> memlist_;
// MemTables that have already been flushed
// (used during Transaction validation)
std::list<MemTable*> memlist_history_;
std::list<ReadOnlyMemTable*> memlist_history_;
// Maximum number of MemTables to keep in memory (including both flushed
const int max_write_buffer_number_to_maintain_;
@ -283,7 +284,7 @@ class MemTableList {
// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(uint64_t max_memtable_id,
autovector<MemTable*>* mems,
autovector<ReadOnlyMemTable*>* mems,
uint64_t* max_next_log_number = nullptr);
// Reset status of the given memtable list back to pending state so that
@ -300,16 +301,16 @@ class MemTableList {
// Note that we also do rollback in `write_manifest_cb` by calling
// `RemoveMemTablesOrRestoreFlags()`. There we rollback the entire batch so
// it is similar to what we do here with rollback_succeeding_memtables=true.
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
void RollbackMemtableFlush(const autovector<ReadOnlyMemTable*>& mems,
bool rollback_succeeding_memtables);
// Try commit a successful flush in the manifest file. It might just return
// Status::OK letting a concurrent flush to do the actual the recording.
Status TryInstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
const autovector<ReadOnlyMemTable*>& m, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
bool write_edits = true);
@ -319,7 +320,7 @@ class MemTableList {
// By default, adding memtables will flag that the memtable list needs to be
// flushed, but in certain situations, like after a mempurge, we may want to
// avoid flushing the memtable list upon addition of a memtable.
void Add(MemTable* m, autovector<MemTable*>* to_delete);
void Add(ReadOnlyMemTable* m, autovector<ReadOnlyMemTable*>* to_delete);
// Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage();
@ -341,7 +342,7 @@ class MemTableList {
// memtable list.
//
// Return true if memtable is trimmed
bool TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
bool TrimHistory(autovector<ReadOnlyMemTable*>* to_delete, size_t usage);
// Returns an estimate of the number of bytes of data used by
// the unflushed mem-tables.
@ -393,7 +394,8 @@ class MemTableList {
// Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection(
const std::unordered_set<MemTable*>* memtables_to_flush = nullptr);
const std::unordered_set<ReadOnlyMemTable*>* memtables_to_flush =
nullptr);
uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_;
@ -411,7 +413,7 @@ class MemTableList {
if (for_atomic_flush) {
// Scan the memtable list from new to old
for (auto it = memlist.begin(); it != memlist.end(); ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
if (m->atomic_flush_seqno_ != kMaxSequenceNumber) {
return m->GetID();
}
@ -431,7 +433,7 @@ class MemTableList {
// Iterating through the memlist starting at the end, the vector<MemTable*>
// ret is filled with memtables already sorted in increasing MemTable ID.
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
ReadOnlyMemTable* m = *it;
if (m->GetID() > max_memtable_id) {
break;
}
@ -444,7 +446,7 @@ class MemTableList {
const auto& memlist = current_->memlist_;
// Scan the memtable list from new to old
for (auto it = memlist.begin(); it != memlist.end(); ++it) {
MemTable* mem = *it;
ReadOnlyMemTable* mem = *it;
if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
mem->atomic_flush_seqno_ = seq;
} else {
@ -460,7 +462,7 @@ class MemTableList {
// was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
autovector<ReadOnlyMemTable*>* to_delete);
// This API is only used by atomic date replacement. To get an edit for
// dropping the current `MemTableListVersion`.
@ -473,12 +475,12 @@ class MemTableList {
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
// DB mutex held
@ -488,7 +490,7 @@ class MemTableList {
// Called after writing to MANIFEST
void RemoveMemTablesOrRestoreFlags(const Status& s, ColumnFamilyData* cfd,
size_t batch_count, LogBuffer* log_buffer,
autovector<MemTable*>* to_delete,
autovector<ReadOnlyMemTable*>* to_delete,
InstrumentedMutex* mu);
const int min_write_buffer_number_to_merge_;
@ -529,11 +531,11 @@ Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
VersionSet* vset, LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
autovector<ReadOnlyMemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
} // namespace ROCKSDB_NAMESPACE

View File

@ -98,7 +98,8 @@ class MemTableListTest : public testing::Test {
// structures needed to call this function.
Status Mock_InstallMemtableFlushResults(
MemTableList* list, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, autovector<MemTable*>* to_delete) {
const autovector<ReadOnlyMemTable*>& m,
autovector<ReadOnlyMemTable*>* to_delete) {
// Create a mock Logger
test::NullLogger logger;
LogBuffer log_buffer(DEBUG_LEVEL, &logger);
@ -148,8 +149,8 @@ class MemTableListTest : public testing::Test {
Status Mock_InstallMemtableAtomicFlushResults(
autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
autovector<MemTable*>* to_delete) {
const autovector<const autovector<ReadOnlyMemTable*>*>& mems_list,
autovector<ReadOnlyMemTable*>* to_delete) {
// Create a mock Logger
test::NullLogger logger;
LogBuffer log_buffer(DEBUG_LEVEL, &logger);
@ -227,12 +228,12 @@ TEST_F(MemTableListTest, Empty) {
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
autovector<MemTable*> mems;
autovector<ReadOnlyMemTable*> mems;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &mems);
ASSERT_EQ(0, mems.size());
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
list.current()->Unref(&to_delete);
ASSERT_EQ(0, to_delete.size());
}
@ -252,7 +253,7 @@ TEST_F(MemTableListTest, GetTest) {
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
SequenceNumber max_covering_tombstone_seq = 0;
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
@ -322,8 +323,8 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value3.1");
ASSERT_EQ(5, mem->num_entries());
ASSERT_EQ(1, mem->num_deletes());
ASSERT_EQ(5, mem->NumEntries());
ASSERT_EQ(1, mem->NumDeletion());
// Add memtable to list
// This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
@ -398,7 +399,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ(2, list.NumNotFlushed());
list.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
}
@ -418,7 +419,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
SequenceNumber max_covering_tombstone_seq = 0;
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
@ -491,7 +492,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Flush this memtable from the list.
// (It will then be a part of the memtable history).
autovector<MemTable*> to_flush;
autovector<ReadOnlyMemTable*> to_flush;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(1, to_flush.size());
@ -636,7 +637,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Cleanup
list.current()->Unref(&to_delete);
ASSERT_EQ(3, to_delete.size());
for (MemTable* m : to_delete) {
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
}
@ -651,7 +652,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
// Create MemTableList
int min_write_buffer_number_to_merge = 3;
@ -692,7 +693,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
// Nothing to flush
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
autovector<MemTable*> to_flush;
autovector<ReadOnlyMemTable*> to_flush;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
ASSERT_EQ(0, to_flush.size());
@ -758,7 +759,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
// Pick tables to flush again
autovector<MemTable*> to_flush2;
autovector<ReadOnlyMemTable*> to_flush2;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
ASSERT_EQ(0, to_flush2.size());
@ -811,7 +812,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
// Pick tables to flush again
autovector<MemTable*> to_flush3;
autovector<ReadOnlyMemTable*> to_flush3;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush3);
// Picks newest (fifth oldest)
@ -821,7 +822,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
// Nothing left to flush
autovector<MemTable*> to_flush4;
autovector<ReadOnlyMemTable*> to_flush4;
list.PickMemtablesToFlush(
std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush4);
ASSERT_EQ(0, to_flush4.size());
@ -891,7 +892,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
memtable_id = 4;
// Pick tables to flush. The tables to pick must have ID smaller than or
// equal to 4. Therefore, no table will be selected in this case.
autovector<MemTable*> to_flush5;
autovector<ReadOnlyMemTable*> to_flush5;
list.FlushRequested();
ASSERT_TRUE(list.HasFlushRequested());
list.PickMemtablesToFlush(memtable_id, &to_flush5);
@ -932,8 +933,8 @@ TEST_F(MemTableListTest, EmptyAtomicFlushTest) {
autovector<MemTableList*> lists;
autovector<uint32_t> cf_ids;
autovector<const MutableCFOptions*> options_list;
autovector<const autovector<MemTable*>*> to_flush;
autovector<MemTable*> to_delete;
autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
autovector<ReadOnlyMemTable*> to_delete;
Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
to_flush, &to_delete);
ASSERT_OK(s);
@ -995,7 +996,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) {
cf_ids.push_back(cf_id++);
}
std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
std::vector<autovector<ReadOnlyMemTable*>> flush_candidates(num_cfs);
// Nothing to flush
for (auto i = 0; i != num_cfs; ++i) {
@ -1014,7 +1015,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) {
ASSERT_FALSE(list->IsFlushPending());
ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
}
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
// Add tables to the immutable memtalbe lists associated with column families
for (auto i = 0; i != num_cfs; ++i) {
for (auto j = 0; j != num_tables_per_cf; ++j) {
@ -1041,7 +1042,7 @@ TEST_F(MemTableListTest, AtomicFlushTest) {
autovector<MemTableList*> tmp_lists;
autovector<uint32_t> tmp_cf_ids;
autovector<const MutableCFOptions*> tmp_options_list;
autovector<const autovector<MemTable*>*> to_flush;
autovector<const autovector<ReadOnlyMemTable*>*> to_flush;
for (auto i = 0; i != num_cfs; ++i) {
if (!flush_candidates[i].empty()) {
to_flush.push_back(&flush_candidates[i]);
@ -1122,7 +1123,7 @@ TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) {
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options);
uint64_t current_ts = 0;
autovector<MemTable*> to_delete;
autovector<ReadOnlyMemTable*> to_delete;
std::vector<std::string> newest_udts;
std::string key;
@ -1162,7 +1163,7 @@ TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) {
}
list.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
to_delete.clear();