diff --git a/db/column_family.cc b/db/column_family.cc index 2579338d00..f7cb479cc9 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -21,13 +21,12 @@ #include "db/compaction_picker.h" #include "db/db_impl.h" -#include "db/job_context.h" -#include "db/version_set.h" -#include "db/writebuffer.h" #include "db/internal_stats.h" +#include "db/job_context.h" #include "db/table_properties_collector.h" #include "db/version_set.h" #include "db/write_controller.h" +#include "db/writebuffer.h" #include "util/autovector.h" #include "util/compression.h" #include "util/hash_skiplist_rep.h" @@ -242,6 +241,9 @@ void SuperVersion::Cleanup() { imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { + auto* memory_usage = current->cfd()->imm()->current_memory_usage(); + assert(*memory_usage >= m->ApproximateMemoryUsage()); + *memory_usage -= m->ApproximateMemoryUsage(); to_delete.push_back(m); } current->Unref(); diff --git a/db/db_test.cc b/db/db_test.cc index 573b311392..656c75edca 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2184,6 +2184,110 @@ TEST_F(DBTest, GetProperty) { } } +TEST_F(DBTest, ApproximateMemoryUsage) { + const int kNumRounds = 10; + const int kFlushesPerRound = 10; + const int kWritesPerFlush = 10; + const int kKeySize = 100; + const int kValueSize = 1000; + Options options; + options.write_buffer_size = 1000; // small write buffer + options.min_write_buffer_number_to_merge = 4; + options.compression = kNoCompression; + options.create_if_missing = true; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + + std::vector iters; + + uint64_t active_mem; + uint64_t unflushed_mem; + uint64_t all_mem; + uint64_t prev_all_mem; + + // Phase 0. The verify the initial value of all these properties are + // the same as we have no mem-tables. + dbfull()->GetIntProperty("rocksdb.cur-size-active-mem-table", &active_mem); + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + ASSERT_EQ(all_mem, active_mem); + ASSERT_EQ(all_mem, unflushed_mem); + + // Phase 1. Simply issue Put() and expect "cur-size-all-mem-tabless" + // equals to "size-all-mem-tables" + for (int r = 0; r < kNumRounds; ++r) { + for (int f = 0; f < kFlushesPerRound; ++f) { + for (int w = 0; w < kWritesPerFlush; ++w) { + Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValueSize)); + } + } + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", + &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + // in no iterator case, these two number should be the same. + ASSERT_EQ(unflushed_mem, all_mem); + } + prev_all_mem = all_mem; + + // Phase 2. Keep issuing Put() but also create new iterator. This time + // we expect "size-all-mem-tables" > "cur-size-all-mem-tabless". + for (int r = 0; r < kNumRounds; ++r) { + iters.push_back(db_->NewIterator(ReadOptions())); + for (int f = 0; f < kFlushesPerRound; ++f) { + for (int w = 0; w < kWritesPerFlush; ++w) { + Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValueSize)); + } + } + // In the second round, add iterators. + dbfull()->GetIntProperty("rocksdb.cur-size-active-mem-table", &active_mem); + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", + &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + ASSERT_GT(all_mem, active_mem); + ASSERT_GT(all_mem, unflushed_mem); + ASSERT_GT(all_mem, prev_all_mem); + prev_all_mem = all_mem; + } + + // Phase 3. Delete iterators and expect "size-all-mem-tables" + // shrinks whenever we release an iterator. + for (auto* iter : iters) { + delete iter; + if (iters.size() != 0) { + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + // Expect the size shrinking + ASSERT_LT(all_mem, prev_all_mem); + } + prev_all_mem = all_mem; + } + dbfull()->GetIntProperty("rocksdb.cur-size-active-mem-table", &active_mem); + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + // now we expect "cur-size-all-mem-tabless" and + // "size-all-mem-tables" are the same again after we + // released all iterators. + ASSERT_EQ(all_mem, unflushed_mem); + ASSERT_GE(all_mem, active_mem); + + // Phase 4. Perform flush, and expect all these three counters are the same. + Flush(); + dbfull()->GetIntProperty("rocksdb.cur-size-active-mem-table", &active_mem); + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + ASSERT_EQ(active_mem, unflushed_mem); + ASSERT_EQ(unflushed_mem, all_mem); + + // Phase 5. Reopen, and expect all these three counters are the same again. + Reopen(options); + dbfull()->GetIntProperty("rocksdb.cur-size-active-mem-table", &active_mem); + dbfull()->GetIntProperty("rocksdb.cur-size-all-mem-tabless", &unflushed_mem); + dbfull()->GetIntProperty("rocksdb.size-all-mem-tables", &all_mem); + ASSERT_EQ(active_mem, unflushed_mem); + ASSERT_EQ(unflushed_mem, all_mem); +} + TEST_F(DBTest, FLUSH) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index a3004a8102..172aec6ef5 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -101,7 +101,9 @@ static const std::string compaction_pending = "compaction-pending"; static const std::string background_errors = "background-errors"; static const std::string cur_size_active_mem_table = "cur-size-active-mem-table"; -static const std::string cur_size_all_mem_tables = "cur-size-all-mem-tables"; +static const std::string cur_size_unflushed_mem_tables = + "cur-size-all-mem-tabless"; +static const std::string cur_size_all_mem_tables = "size-all-mem-tables"; static const std::string num_entries_active_mem_table = "num-entries-active-mem-table"; static const std::string num_entries_imm_mem_tables = @@ -138,7 +140,9 @@ const std::string DB::Properties::kBackgroundErrors = const std::string DB::Properties::kCurSizeActiveMemTable = rocksdb_prefix + cur_size_active_mem_table; const std::string DB::Properties::kCurSizeAllMemTables = - rocksdb_prefix + cur_size_all_mem_tables; + rocksdb_prefix + cur_size_unflushed_mem_tables; +const std::string DB::Properties::kSizeAllMemTables = + rocksdb_prefix + cur_size_all_mem_tables; const std::string DB::Properties::kNumEntriesActiveMemTable = rocksdb_prefix + num_entries_active_mem_table; const std::string DB::Properties::kNumEntriesImmMemTables = @@ -202,8 +206,10 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, return kBackgroundErrors; } else if (in == cur_size_active_mem_table) { return kCurSizeActiveMemTable; - } else if (in == cur_size_all_mem_tables) { + } else if (in == cur_size_unflushed_mem_tables) { return kCurSizeAllMemTables; + } else if (in == cur_size_all_mem_tables) { + return kSizeAllMemTables; } else if (in == num_entries_active_mem_table) { return kNumEntriesInMutableMemtable; } else if (in == num_entries_imm_mem_tables) { @@ -347,6 +353,10 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, return true; case kCurSizeAllMemTables: // Current size of the active memtable + immutable memtables + *value = cfd_->mem()->ApproximateMemoryUsage() + + cfd_->imm()->ApproximateUnflushedMemTablesMemoryUsage(); + return true; + case kSizeAllMemTables: *value = cfd_->mem()->ApproximateMemoryUsage() + cfd_->imm()->ApproximateMemoryUsage(); return true; diff --git a/db/internal_stats.h b/db/internal_stats.h index 7b2775b049..866d95b51d 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -41,8 +41,10 @@ enum DBPropertyType : uint32_t { kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kBackgroundErrors, // Return accumulated background errors encountered. kCurSizeActiveMemTable, // Return current size of the active memtable - kCurSizeAllMemTables, // Return current size of all (active + immutable) - // memtables + kCurSizeAllMemTables, // Return current size of unflushed + // (active + immutable) memtables + kSizeAllMemTables, // Return current size of all (active + immutable + // + pinned) memtables kNumEntriesInMutableMemtable, // Return number of deletes in the mutable // memtable. kNumEntriesInImmutableMemtable, // Return sum of number of entries in all @@ -58,8 +60,8 @@ enum DBPropertyType : uint32_t { kNumSnapshots, // Number of snapshots in the system kOldestSnapshotTime, // Unix timestamp of the first snapshot kNumLiveVersions, - kEstimateLiveDataSize, // Estimated amount of live data in bytes - kBaseLevel, // The level that L0 data is compacted to + kEstimateLiveDataSize, // Estimated amount of live data in bytes + kBaseLevel, // The level that L0 data is compacted to }; extern DBPropertyType GetPropertyType(const Slice& property, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f74f1b3776..7ac3af60a8 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -27,9 +27,26 @@ class InternalKeyComparator; class Mutex; class VersionSet; -MemTableListVersion::MemTableListVersion(MemTableListVersion* old) +void MemTableListVersion::AddMemTable(MemTable* m) { + memlist_.push_front(m); + *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); +} + +void MemTableListVersion::UnrefMemTable(autovector* to_delete, + MemTable* m) { + if (m->Unref()) { + to_delete->push_back(m); + assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); + *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); + } else { + } +} + +MemTableListVersion::MemTableListVersion( + size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) : max_write_buffer_number_to_maintain_( - old->max_write_buffer_number_to_maintain_) { + old->max_write_buffer_number_to_maintain_), + parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { if (old != nullptr) { memlist_ = old->memlist_; for (auto& m : memlist_) { @@ -44,12 +61,14 @@ MemTableListVersion::MemTableListVersion(MemTableListVersion* old) } MemTableListVersion::MemTableListVersion( + size_t* parent_memtable_list_memory_usage, int max_write_buffer_number_to_maintain) - : max_write_buffer_number_to_maintain_( - max_write_buffer_number_to_maintain) {} + : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), + parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} void MemTableListVersion::Ref() { ++refs_; } +// called by superversion::clean() void MemTableListVersion::Unref(autovector* to_delete) { assert(refs_ >= 1); --refs_; @@ -58,16 +77,10 @@ void MemTableListVersion::Unref(autovector* to_delete) { // that refs_ will not be zero assert(to_delete != nullptr); for (const auto& m : memlist_) { - MemTable* x = m->Unref(); - if (x != nullptr) { - to_delete->push_back(x); - } + UnrefMemTable(to_delete, m); } for (const auto& m : memlist_history_) { - MemTable* x = m->Unref(); - if (x != nullptr) { - to_delete->push_back(x); - } + UnrefMemTable(to_delete, m); } delete this; } @@ -180,7 +193,7 @@ SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( // caller is responsible for referencing m void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable - memlist_.push_front(m); + AddMemTable(m); TrimHistory(to_delete); } @@ -195,9 +208,7 @@ void MemTableListVersion::Remove(MemTable* m, memlist_history_.push_front(m); TrimHistory(to_delete); } else { - if (m->Unref()) { - to_delete->push_back(m); - } + UnrefMemTable(to_delete, m); } } @@ -209,9 +220,7 @@ void MemTableListVersion::TrimHistory(autovector* to_delete) { MemTable* x = memlist_history_.back(); memlist_history_.pop_back(); - if (x->Unref()) { - to_delete->push_back(x); - } + UnrefMemTable(to_delete, x); } } @@ -361,7 +370,7 @@ void MemTableList::Add(MemTable* m, autovector* to_delete) { } // Returns an estimate of the number of bytes of data in use. -size_t MemTableList::ApproximateMemoryUsage() { +size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { size_t total_size = 0; for (auto& memtable : current_->memlist_) { total_size += memtable->ApproximateMemoryUsage(); @@ -369,13 +378,15 @@ size_t MemTableList::ApproximateMemoryUsage() { return total_size; } +size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } + void MemTableList::InstallNewVersion() { if (current_->refs_ == 1) { // we're the only one using the version, just keep using it } else { // somebody else holds the current version, we need to create new one MemTableListVersion* version = current_; - current_ = new MemTableListVersion(current_); + current_ = new MemTableListVersion(¤t_memory_usage_, current_); current_->Ref(); version->Unref(); } diff --git a/db/memtable_list.h b/db/memtable_list.h index 3d19290fd3..63e27732b2 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -10,14 +10,11 @@ #include #include #include -#include "rocksdb/db.h" -#include "rocksdb/options.h" -#include "rocksdb/iterator.h" #include "db/dbformat.h" #include "db/filename.h" -#include "db/skiplist.h" #include "db/memtable.h" +#include "db/skiplist.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" @@ -41,8 +38,10 @@ class MergeIteratorBuilder; // (such as holding the db mutex or being on the write thread). class MemTableListVersion { public: - explicit MemTableListVersion(MemTableListVersion* old = nullptr); - explicit MemTableListVersion(int max_write_buffer_number_to_maintain); + explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, + MemTableListVersion* old = nullptr); + explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, + int max_write_buffer_number_to_maintain); void Ref(); void Unref(autovector* to_delete = nullptr); @@ -104,6 +103,10 @@ class MemTableListVersion { std::string* value, Status* s, MergeContext* merge_context, SequenceNumber* seq); + void AddMemTable(MemTable* m); + + void UnrefMemTable(autovector* to_delete, MemTable* m); + friend class MemTableList; // Immutable MemTables that have not yet been flushed. @@ -118,6 +121,8 @@ class MemTableListVersion { const int max_write_buffer_number_to_maintain_; int refs_ = 0; + + size_t* parent_memtable_list_memory_usage_; }; // This class stores references to all the immutable memtables. @@ -138,11 +143,13 @@ class MemTableList { int max_write_buffer_number_to_maintain) : imm_flush_needed(false), min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), - current_(new MemTableListVersion(max_write_buffer_number_to_maintain)), + current_(new MemTableListVersion(¤t_memory_usage_, + max_write_buffer_number_to_maintain)), num_flush_not_started_(0), commit_in_progress_(false), flush_requested_(false) { current_->Ref(); + current_memory_usage_ = 0; } // Should not delete MemTableList without making sure MemTableList::current() @@ -190,6 +197,10 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); + // Returns an estimate of the number of bytes of data used by + // the unflushed mem-tables. + size_t ApproximateUnflushedMemTablesMemoryUsage(); + // Request a flush of all existing memtables to storage. This will // cause future calls to IsFlushPending() to return true if this list is // non-empty (regardless of the min_write_buffer_number_to_merge @@ -201,6 +212,8 @@ class MemTableList { // MemTableList(const MemTableList&); // void operator=(const MemTableList&); + size_t* current_memory_usage() { return ¤t_memory_usage_; } + private: // DB mutex held void InstallNewVersion(); @@ -218,6 +231,8 @@ class MemTableList { // Requested a flush of all memtables to storage bool flush_requested_; + // The current memory usage. + size_t current_memory_usage_; }; } // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c5aca3af48..624a8da82e 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -312,22 +312,22 @@ class DB { // "rocksdb.compaction-pending" - 1 if at least one compaction is pending // "rocksdb.background-errors" - accumulated number of background errors // "rocksdb.cur-size-active-mem-table" - // "rocksdb.cur-size-all-mem-tables" - // "rocksdb.num-entries-active-mem-table" - // "rocksdb.num-entries-imm-mem-tables" - // "rocksdb.num-deletes-active-mem-table" - // "rocksdb.num-deletes-imm-mem-tables" - // "rocksdb.estimate-num-keys" - estimated keys in the column family - // "rocksdb.estimate-table-readers-mem" - estimated memory used for reding - // SST tables, that is not counted as a part of block cache. - // "rocksdb.is-file-deletions-enabled" - // "rocksdb.num-snapshots" - // "rocksdb.oldest-snapshot-time" - // "rocksdb.num-live-versions" - `version` is an internal data structure. - // See version_set.h for details. More live versions often mean more SST - // files are held from being deleted, by iterators or unfinished - // compactions. - // "rocksdb.estimate-live-data-size" +// "rocksdb.size-all-mem-tables" +// "rocksdb.num-entries-active-mem-table" +// "rocksdb.num-entries-imm-mem-tables" +// "rocksdb.num-deletes-active-mem-table" +// "rocksdb.num-deletes-imm-mem-tables" +// "rocksdb.estimate-num-keys" - estimated keys in the column family +// "rocksdb.estimate-table-readers-mem" - estimated memory used for reding +// SST tables, that is not counted as a part of block cache. +// "rocksdb.is-file-deletions-enabled" +// "rocksdb.num-snapshots" +// "rocksdb.oldest-snapshot-time" +// "rocksdb.num-live-versions" - `version` is an internal data structure. +// See version_set.h for details. More live versions often mean more SST +// files are held from being deleted, by iterators or unfinished +// compactions. +// "rocksdb.estimate-live-data-size" #ifndef ROCKSDB_LITE struct Properties { static const std::string kNumFilesAtLevelPrefix; @@ -341,6 +341,7 @@ class DB { static const std::string kBackgroundErrors; static const std::string kCurSizeActiveMemTable; static const std::string kCurSizeAllMemTables; + static const std::string kSizeAllMemTables; static const std::string kNumEntriesActiveMemTable; static const std::string kNumEntriesImmMemTables; static const std::string kNumDeletesActiveMemTable; @@ -370,6 +371,7 @@ class DB { // "rocksdb.background-errors" // "rocksdb.cur-size-active-mem-table" // "rocksdb.cur-size-all-mem-tables" + // "rocksdb.size-all-mem-tables" // "rocksdb.num-entries-active-mem-table" // "rocksdb.num-entries-imm-mem-tables" // "rocksdb.num-deletes-active-mem-table"