diff --git a/db/c.cc b/db/c.cc index 8048cb7c9e..a3883c9bf3 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3029,9 +3029,9 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } -void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt, - unsigned char v) { - opt->rep.experimental_allow_mempurge = v; +void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, + double v) { + opt->rep.experimental_mempurge_threshold = v; } void rocksdb_options_set_access_hint_on_compaction_start( diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b3e435472a..824dc9e550 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" @@ -694,8 +695,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; uint32_t sst_count = 0; @@ -842,8 +843,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1046,8 +1047,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1122,8 +1123,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); const size_t KVSIZE = 10; @@ -1239,7 +1240,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { const uint32_t EXPECTED_SST_COUNT = 0; EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); - if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) { + if (options.experimental_mempurge_threshold == + std::numeric_limits::max()) { EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e3ca3f52d6..e46092ba4d 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -558,7 +558,7 @@ Status DBImpl::CloseHelper() { // flushing (but need to implement something // else than imm()->IsFlushPending() because the output // memtables added to imm() dont trigger flushes). - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { Status flush_ret; mutex_.Unlock(); for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 6f5e222586..7ec42c1fd6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2410,7 +2410,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, // future changes. Therefore, we add the following if // statement - note that calling it twice (or more) // doesn't break anything. - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { // If imm() contains silent memtables, // requesting a flush will mark the imm_needed as true. cfd->imm()->FlushRequested(); @@ -2556,7 +2556,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { // If imm() contains silent memtables, // requesting a flush will mark the imm_needed as true. cfd->imm()->FlushRequested(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 3a258a57e1..bc4824af6a 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -195,7 +195,7 @@ void FlushJob::PickMemTable() { // If mempurge feature is activated, keep track of any potential // memtables coming from a previous mempurge operation. // Used for mempurge policy. - if (db_options_.experimental_allow_mempurge) { + if (db_options_.experimental_mempurge_threshold > 0.0) { contains_mempurge_outcome_ = false; for (MemTable* mt : mems_) { if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } Status mempurge_s = Status::NotFound("No MemPurge."); - if (db_options_.experimental_allow_mempurge && + if ((db_options_.experimental_mempurge_threshold > 0.0) && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && (!mems_.empty()) && MemPurgeDecider()) { mempurge_s = MemPurge(); @@ -580,8 +580,6 @@ Status FlushJob::MemPurge() { // This addition will not trigger another flush, because // we do not call SchedulePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); - new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 / - mutable_cf_options_.write_buffer_size; new_mem->Ref(); db_mutex_->Unlock(); } else { @@ -622,16 +620,129 @@ Status FlushJob::MemPurge() { } bool FlushJob::MemPurgeDecider() { - MemPurgePolicy policy = db_options_.experimental_mempurge_policy; - if (policy == MemPurgePolicy::kAlways) { - return true; - } else if (policy == MemPurgePolicy::kAlternate) { - // Note: if at least one of the flushed memtables is - // an output of a previous mempurge process, then flush - // to storage. - return !(contains_mempurge_outcome_); + double threshold = db_options_.experimental_mempurge_threshold; + // Never trigger mempurge if threshold is not a strictly positive value. + if (!(threshold > 0.0)) { + return false; } - return false; + if (threshold > (1.0 * mems_.size())) { + return true; + } + // Payload and useful_payload (in bytes). + // The useful payload ratio of a given MemTable + // is estimated to be useful_payload/payload. + uint64_t payload = 0, useful_payload = 0; + // If estimated_useful_payload is > threshold, + // then flush to storage, else MemPurge. + double estimated_useful_payload = 0.0; + // Cochran formula for determining sample size. + // 95% confidence interval, 7% precision. + // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0 + double n0 = 196.0; + ReadOptions ro; + ro.total_order_seek = true; + + // Iterate over each memtable of the set. + for (MemTable* mt : mems_) { + // If the memtable is the output of a previous mempurge, + // its approximate useful payload ratio is already calculated. + if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { + // We make the assumption that this memtable is already + // free of garbage (garbage underestimation). + estimated_useful_payload += mt->ApproximateMemoryUsage(); + } else { + // Else sample from the table. + uint64_t nentries = mt->num_entries(); + // Corrected Cochran formula for small populations + // (converges to n0 for large populations). + uint64_t target_sample_size = + static_cast(ceil(n0 / (1.0 + (n0 / nentries)))); + std::unordered_set sentries = {}; + // Populate sample entries set. + mt->UniqueRandomSample(target_sample_size, &sentries); + + // Estimate the garbage ratio by comparing if + // each sample corresponds to a valid entry. + for (const char* ss : sentries) { + ParsedInternalKey res; + Slice entry_slice = GetLengthPrefixedSlice(ss); + Status parse_s = + ParseInternalKey(entry_slice, &res, true /*log_err_key*/); + if (!parse_s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Memtable Decider: ParseInternalKey did not parse " + "entry_slice %s" + "successfully.", + entry_slice.data()); + } + LookupKey lkey(res.user_key, kMaxSequenceNumber); + std::string vget; + Status s; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0, sqno = 0; + + // Pick the oldest existing snapshot that is more recent + // than the sequence number of the sampled entry. + SequenceNumber min_seqno_snapshot = kMaxSequenceNumber; + SnapshotImpl min_snapshot; + for (SequenceNumber seq_num : existing_snapshots_) { + if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { + min_seqno_snapshot = seq_num; + } + } + min_snapshot.number_ = min_seqno_snapshot; + ro.snapshot = + min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr; + + // Estimate if the sample entry is valid or not. + bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context, + &max_covering_tombstone_seq, &sqno, ro); + if (!gres) { + ROCKS_LOG_WARN( + db_options_.info_log, + "Memtable Get returned false when Get(sampled entry). " + "Yet each sample entry should exist somewhere in the memtable, " + "unrelated to whether it has been deleted or not."); + } + payload += entry_slice.size(); + + // TODO(bjlemaire): evaluate typeMerge. + // This is where the sampled entry is estimated to be + // garbage or not. Note that this is a garbage *estimation* + // because we do not include certain items such as + // CompactionFitlers triggered at flush, or if the same delete + // has been inserted twice or more in the memtable. + if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) { + useful_payload += entry_slice.size(); + } else if (((res.type == kTypeDeletion) || + (res.type == kTypeSingleDeletion)) && + s.IsNotFound() && gres) { + useful_payload += entry_slice.size(); + } + } + if (payload > 0) { + // We used the estimated useful payload ratio + // to evaluate how much of the total memtable is useful bytes. + estimated_useful_payload += + (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); + ROCKS_LOG_INFO( + db_options_.info_log, + "Mempurge sampling - found garbage ratio from sampling: %f.\n", + (payload - useful_payload) * 1.0 / payload); + } else { + ROCKS_LOG_WARN( + db_options_.info_log, + "Mempurge kSampling policy: null payload measured, and collected " + "sample size is %zu\n.", + sentries.size()); + } + } + } + // We convert the total number of useful paylaod bytes + // into the proportion of memtable necessary to store all these bytes. + // We compare this proportion with the threshold value. + return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) < + threshold; } Status FlushJob::WriteLevel0Table() { @@ -843,7 +954,7 @@ Status FlushJob::WriteLevel0Table() { stats.num_output_files_blob = static_cast(blobs.size()); - if (db_options_.experimental_allow_mempurge && s.ok()) { + if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) { // The db_mutex is held at this point. for (MemTable* mt : mems_) { // Note: if m is not a previous mempurge output memtable, diff --git a/db/flush_job.h b/db/flush_job.h index 694dd71d2c..9050657de8 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -117,9 +117,9 @@ class FlushJob { // of development. At the moment it is only compatible with the Get, Put, // Delete operations as well as Iterators and CompactionFilters. // For this early version, "MemPurge" is called by setting the - // options.experimental_allow_mempurge flag as "true". When this is + // options.experimental_mempurge_threshold value as >0.0. When this is // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will - // first go through the MemPurge process. herefore, we strongly + // first go through the MemPurge process. Therefore, we strongly // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); @@ -192,7 +192,7 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; - // Used when experimental_allow_mempurge set to true. + // Used when experimental_mempurge_threshold > 0.0. bool contains_mempurge_outcome_; }; diff --git a/db/memtable.h b/db/memtable.h index 93060941a9..e6580379ff 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "db/dbformat.h" @@ -145,6 +146,26 @@ class MemTable { return approximate_memory_usage_.load(std::memory_order_relaxed); } + // Returns a vector of unique random memtable entries of size 'sample_size'. + // + // Note: the entries are stored in the unordered_set as length-prefixed keys, + // hence their representation in the set as "const char*". + // Note2: the size of the output set 'entries' is not enforced to be strictly + // equal to 'target_sample_size'. Its final size might be slightly + // greater or slightly less than 'target_sample_size' + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + // REQUIRES: SkipList memtable representation. This function is not + // implemented for any other type of memtable representation (vectorrep, + // hashskiplist,...). + void UniqueRandomSample(const uint64_t& target_sample_size, + std::unordered_set* entries) { + // TODO(bjlemaire): at the moment, only supported by skiplistrep. + // Extend it to all other memtable representations. + table_->UniqueRandomSample(num_entries(), target_sample_size, entries); + } + // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { diff --git a/db/memtable_list.h b/db/memtable_list.h index 17a7aa87f1..6dde850163 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -390,11 +390,7 @@ class MemTableList { // not freed, but put into a vector for future deref and reclamation. void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); - void AddMemPurgeOutputID(uint64_t mid) { - if (mempurged_ids_.find(mid) == mempurged_ids_.end()) { - mempurged_ids_.insert(mid); - } - } + void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); } void RemoveMemPurgeOutputID(uint64_t mid) { if (mempurged_ids_.find(mid) != mempurged_ids_.end()) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index ec3aa212e3..5db089b161 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -141,8 +141,7 @@ DECLARE_uint64(subcompactions); DECLARE_uint64(periodic_compaction_seconds); DECLARE_uint64(compaction_ttl); DECLARE_bool(allow_concurrent_memtable_write); -DECLARE_bool(experimental_allow_mempurge); -DECLARE_string(experimental_mempurge_policy); +DECLARE_double(experimental_mempurge_threshold); DECLARE_bool(enable_write_thread_adaptive_yield); DECLARE_int32(reopen); DECLARE_double(bloom_bits); @@ -341,18 +340,6 @@ inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( return ret_compression_type; } -inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy( - const char* mpolicy) { - assert(mpolicy); - if (!strcasecmp(mpolicy, "kAlways")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways; - } else if (!strcasecmp(mpolicy, "kAlternate")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; - } - fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy); - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; -} - inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType( const char* ctype) { assert(ctype); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index f1589d78ca..adb44084af 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -326,11 +326,9 @@ DEFINE_uint64(compaction_ttl, 1000, DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); -DEFINE_bool(experimental_allow_mempurge, false, - "Allow mempurge process to collect memtable garbage bytes."); - -DEFINE_string(experimental_mempurge_policy, "kAlternate", - "Set mempurge (MemTable Garbage Collection) policy."); +DEFINE_double(experimental_mempurge_threshold, 0.0, + "Maximum estimated useful payload that triggers a " + "mempurge process to collect memtable garbage bytes."); DEFINE_bool(enable_write_thread_adaptive_yield, true, "Use a yielding spin loop for brief writer thread waits."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index df42523987..9d9320ddec 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2266,9 +2266,8 @@ void StressTest::Open() { options_.max_subcompactions = static_cast(FLAGS_subcompactions); options_.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; - options_.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge; - options_.experimental_mempurge_policy = - StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str()); + options_.experimental_mempurge_threshold = + FLAGS_experimental_mempurge_threshold; options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds; options_.ttl = FLAGS_compaction_ttl; options_.enable_pipelined_write = FLAGS_enable_pipelined_write; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index b8701135d3..6974831fdc 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -38,8 +38,10 @@ #include #include #include + #include #include +#include namespace ROCKSDB_NAMESPACE { @@ -194,6 +196,17 @@ class MemTableRep { return 0; } + // Returns a vector of unique random memtable entries of approximate + // size 'target_sample_size' (this size is not strictly enforced). + virtual void UniqueRandomSample(const uint64_t& num_entries, + const uint64_t& target_sample_size, + std::unordered_set* entries) { + (void)num_entries; + (void)target_sample_size; + (void)entries; + assert(false); + } + // Report an approximation of how much memory has been used other than memory // that was allocated through the allocator. Safe to call from any thread. virtual size_t ApproximateMemoryUsage() = 0; @@ -230,6 +243,8 @@ class MemTableRep { virtual void SeekForPrev(const Slice& internal_key, const char* memtable_key) = 0; + virtual void RandomSeek() {} + // Position at the first entry in collection. // Final state of iterator is Valid() iff collection is not empty. virtual void SeekToFirst() = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d7ecb5b3dc..95a9ff46fa 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -369,11 +369,6 @@ struct DbPath { extern const char* kHostnameForDbHostId; -enum class MemPurgePolicy : char { - kAlternate = 0x00, - kAlways = 0x01, -}; - enum class CompactionServiceJobStatus : char { kSuccess, kFailure, @@ -787,14 +782,22 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; - // If true, allows for memtable purge instead of flush to storage. - // (experimental). - bool experimental_allow_mempurge = false; - // If experimental_allow_mempurge is true, will dictate MemPurge - // policy. - // Default: kAlternate - // (experimental). - MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold = 0.0; // Amount of data to build up in memtables across all column // families before writing to disk. diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index 1782288f08..028fde3a27 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -177,6 +177,9 @@ class InlineSkipList { // Retreat to the last entry with a key <= target void SeekForPrev(const char* target); + // Advance to a random entry in the list. + void RandomSeek(); + // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst(); @@ -252,6 +255,9 @@ class InlineSkipList { // Return head_ if list is empty. Node* FindLast() const; + // Returns a random entry. + Node* FindRandomEntry() const; + // Traverses a single level of the list, setting *out_prev to the last // node before the key and *out_next to the first node after. Assumes // that the key is not present in the skip list. On entry, before should @@ -412,6 +418,11 @@ inline void InlineSkipList::Iterator::SeekForPrev( } } +template +inline void InlineSkipList::Iterator::RandomSeek() { + node_ = list_->FindRandomEntry(); +} + template inline void InlineSkipList::Iterator::SeekToFirst() { node_ = list_->head_->Next(0); @@ -558,6 +569,48 @@ InlineSkipList::FindLast() const { } } +template +typename InlineSkipList::Node* +InlineSkipList::FindRandomEntry() const { + // TODO(bjlemaire): consider adding PREFETCH calls. + Node *x = head_, *scan_node = nullptr, *limit_node = nullptr; + + // We start at the max level. + // FOr each level, we look at all the nodes at the level, and + // we randomly pick one of them. Then decrement the level + // and reiterate the process. + // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes). + // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15. + // We will consider all the nodes between #15 (inclusive) and #67 + // (exclusive). #67 is called 'limit_node' here. + // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose + // #51. #67 remains 'limit_node'. + // [...] + // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57. + // Return Node #57. + std::vector lvl_nodes; + Random* rnd = Random::GetTLSInstance(); + int level = GetMaxHeight() - 1; + + while (level >= 0) { + lvl_nodes.clear(); + scan_node = x; + while (scan_node != limit_node) { + lvl_nodes.push_back(scan_node); + scan_node = scan_node->Next(level); + } + uint32_t rnd_idx = rnd->Next() % lvl_nodes.size(); + x = lvl_nodes[rnd_idx]; + if (rnd_idx + 1 < lvl_nodes.size()) { + limit_node = lvl_nodes[rnd_idx + 1]; + } + level--; + } + // There is a special case where x could still be the head_ + // (note that the head_ contains no key). + return x == head_ ? head_->Next(0) : x; +} + template uint64_t InlineSkipList::EstimateCount(const char* key) const { uint64_t count = 0; diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index eec15626c0..abe7144ab9 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // +#include + #include "db/memtable.h" #include "memory/arena.h" #include "memtable/inlineskiplist.h" @@ -95,6 +97,66 @@ public: return (end_count >= start_count) ? (end_count - start_count) : 0; } + void UniqueRandomSample(const uint64_t& num_entries, + const uint64_t& target_sample_size, + std::unordered_set* entries) override { + entries->clear(); + // Avoid divide-by-0. + assert(target_sample_size > 0); + assert(num_entries > 0); + // NOTE: the size of entries is not enforced to be exactly + // target_sample_size at the end of this function, it might be slightly + // greater or smaller. + SkipListRep::Iterator iter(&skip_list_); + // There are two methods to create the subset of samples (size m) + // from the table containing N elements: + // 1-Iterate linearly through the N memtable entries. For each entry i, + // add it to the sample set with a probability + // (target_sample_size - entries.size() ) / (N-i). + // + // 2-Pick m random elements without repetition. + // We pick Option 2 when m sqrt(N). + if (target_sample_size > + static_cast(std::sqrt(1.0 * num_entries))) { + Random* rnd = Random::GetTLSInstance(); + iter.SeekToFirst(); + uint64_t counter = 0, num_samples_left = target_sample_size; + for (; iter.Valid() && (num_samples_left > 0); iter.Next(), counter++) { + // Add entry to sample set with probability + // num_samples_left/(num_entries - counter). + if (rnd->Next() % (num_entries - counter) < num_samples_left) { + entries->insert(iter.key()); + num_samples_left--; + } + } + } else { + // Option 2: pick m random elements with no duplicates. + // If Option 2 is picked, then target_sample_size99.9% for N>4. + // At worst, for the final pick , when m=sqrt(N) there is + // a probability of p= 1/sqrt(N) chances to find a duplicate. + for (uint64_t j = 0; j < 5; j++) { + iter.RandomSeek(); + // unordered_set::insert returns pair. + // The second element is true if an insert successfully happened. + // If element is already in the set, this bool will be false, and + // true otherwise. + if ((entries->insert(iter.key())).second) { + break; + } + } + } + } + } + ~SkipListRep() override {} // Iteration over the contents of a skip list @@ -143,6 +205,8 @@ public: } } + void RandomSeek() override { iter_.RandomSeek(); } + // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst() override { iter_.SeekToFirst(); } diff --git a/options/db_options.cc b/options/db_options.cc index 743a0a9e1d..3e2c8bf13e 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -48,11 +48,6 @@ static std::unordered_map info_log_level_string_map = {"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL}, {"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}}; -static std::unordered_map - experimental_mempurge_policy_string_map = { - {"kAlternate", MemPurgePolicy::kAlternate}, - {"kAlways", MemPurgePolicy::kAlways}}; - static std::unordered_map db_mutable_options_type_info = { {"allow_os_buffer", @@ -197,14 +192,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, error_if_exists), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, - {"experimental_allow_mempurge", - {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge), - OptionType::kBoolean, OptionVerificationType::kNormal, + {"experimental_mempurge_threshold", + {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold), + OptionType::kDouble, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, - {"experimental_mempurge_policy", - OptionTypeInfo::Enum( - offsetof(struct ImmutableDBOptions, experimental_mempurge_policy), - &experimental_mempurge_policy_string_map)}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -615,8 +606,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), - experimental_allow_mempurge(options.experimental_allow_mempurge), - experimental_mempurge_policy(options.experimental_mempurge_policy), + experimental_mempurge_threshold(options.experimental_mempurge_threshold), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -750,12 +740,9 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); - ROCKS_LOG_HEADER(log, - " Options.experimental_allow_mempurge: %d", - experimental_allow_mempurge); - ROCKS_LOG_HEADER(log, - " Options.experimental_mempurge_policy: %d", - static_cast(experimental_mempurge_policy)); + ROCKS_LOG_HEADER( + log, " Options.experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index 50ec521f04..d2b0568026 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -57,8 +57,7 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; - bool experimental_allow_mempurge; - MemPurgePolicy experimental_mempurge_policy; + double experimental_mempurge_threshold; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options_test.cc b/options/options_test.cc index 3905a95776..73cbedd6b2 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -143,7 +143,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_allow_mempurge", "false"}, + {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -302,7 +302,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); + ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); @@ -2047,7 +2047,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_allow_mempurge", "false"}, + {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -2200,7 +2200,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); + ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 17be134cb7..c7c254735f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1039,19 +1039,6 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( return ROCKSDB_NAMESPACE::kSnappyCompression; // default value } -static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy( - const char* mpolicy) { - assert(mpolicy); - if (!strcasecmp(mpolicy, "kAlways")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways; - } else if (!strcasecmp(mpolicy, "kAlternate")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; - } - - fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy); - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; -} - static std::string ColumnFamilyName(size_t i) { if (i == 0) { return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName; @@ -1186,11 +1173,9 @@ DEFINE_bool( DEFINE_bool(allow_concurrent_memtable_write, true, "Allow multi-writers to update mem tables in parallel."); -DEFINE_bool(experimental_allow_mempurge, false, - "Allow memtable garbage collection."); - -DEFINE_string(experimental_mempurge_policy, "kAlternate", - "Specify memtable garbage collection policy."); +DEFINE_double(experimental_mempurge_threshold, 0.0, + "Maximum useful payload ratio estimate that triggers a mempurge " + "(memtable garbage collection)."); DEFINE_bool(inplace_update_support, ROCKSDB_NAMESPACE::Options().inplace_update_support, @@ -4275,9 +4260,8 @@ class Benchmark { options.delayed_write_rate = FLAGS_delayed_write_rate; options.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; - options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge; - options.experimental_mempurge_policy = - StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str()); + options.experimental_mempurge_threshold = + FLAGS_experimental_mempurge_threshold; options.inplace_update_support = FLAGS_inplace_update_support; options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield = diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 3215bec9fe..a133f35296 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -220,8 +220,7 @@ whitebox_default_params = { simple_default_params = { "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "column_families": 1, - "experimental_allow_mempurge": lambda: random.randint(0, 1), - "experimental_mempurge_policy": lambda: random.choice(["kAlways", "kAlternate"]), + "experimental_mempurge_threshold": lambda: 10.0*random.random(), "max_background_compactions": 1, "max_bytes_for_level_base": 67108864, "memtablerep": "skip_list",