diff --git a/HISTORY.md b/HISTORY.md index 4c9604cd74..ef55f6f56a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ ### New Features * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. * For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated. +* Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`. ### Behavior changes * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). diff --git a/cache/cache_entry_roles.cc b/cache/cache_entry_roles.cc index 237fcfcd6c..ea6bfe3fbb 100644 --- a/cache/cache_entry_roles.cc +++ b/cache/cache_entry_roles.cc @@ -21,6 +21,7 @@ std::array kCacheEntryRoleToCamelString{{ "WriteBuffer", "CompressionDictionaryBuildingBuffer", "FilterConstruction", + "BlockBasedTableReader", "Misc", }}; @@ -34,6 +35,7 @@ std::array kCacheEntryRoleToHyphenString{{ "write-buffer", "compression-dictionary-building-buffer", "filter-construction", + "block-based-table-reader", "misc", }}; diff --git a/cache/cache_entry_roles.h b/cache/cache_entry_roles.h index d0e3da2e32..acc3e4d4a5 100644 --- a/cache/cache_entry_roles.h +++ b/cache/cache_entry_roles.h @@ -39,6 +39,9 @@ enum class CacheEntryRole { // Filter reservations to account for // (new) bloom and ribbon filter construction's memory usage kFilterConstruction, + // BlockBasedTableReader reservations to account for + // its memory usage + kBlockBasedTableReader, // Default bucket, for miscellaneous cache entries. Do not use for // entries that could potentially add up to large usage. kMisc, diff --git a/cache/cache_reservation_manager.cc b/cache/cache_reservation_manager.cc index dfa2723d9d..3cc149b432 100644 --- a/cache/cache_reservation_manager.cc +++ b/cache/cache_reservation_manager.cc @@ -17,12 +17,30 @@ #include "rocksdb/cache.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "table/block_based/block_based_table_reader.h" +#include "table/block_based/reader_common.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { -CacheReservationManager::CacheReservationManager(std::shared_ptr cache, - bool delayed_decrease) + +template +CacheReservationManagerImpl::CacheReservationHandle::CacheReservationHandle( + std::size_t incremental_memory_used, + std::shared_ptr cache_res_mgr) + : incremental_memory_used_(incremental_memory_used) { + assert(cache_res_mgr); + cache_res_mgr_ = cache_res_mgr; +} + +template +CacheReservationManagerImpl< + R>::CacheReservationHandle::~CacheReservationHandle() { + Status s = cache_res_mgr_->ReleaseCacheReservation(incremental_memory_used_); + s.PermitUncheckedError(); +} + +template +CacheReservationManagerImpl::CacheReservationManagerImpl( + std::shared_ptr cache, bool delayed_decrease) : delayed_decrease_(delayed_decrease), cache_allocated_size_(0), memory_used_(0) { @@ -30,14 +48,15 @@ CacheReservationManager::CacheReservationManager(std::shared_ptr cache, cache_ = cache; } -CacheReservationManager::~CacheReservationManager() { +template +CacheReservationManagerImpl::~CacheReservationManagerImpl() { for (auto* handle : dummy_handles_) { cache_->Release(handle, true); } } template -Status CacheReservationManager::UpdateCacheReservation( +Status CacheReservationManagerImpl::UpdateCacheReservation( std::size_t new_mem_used) { memory_used_ = new_mem_used; std::size_t cur_cache_allocated_size = @@ -45,7 +64,7 @@ Status CacheReservationManager::UpdateCacheReservation( if (new_mem_used == cur_cache_allocated_size) { return Status::OK(); } else if (new_mem_used > cur_cache_allocated_size) { - Status s = IncreaseCacheReservation(new_mem_used); + Status s = IncreaseCacheReservation(new_mem_used); return s; } else { // In delayed decrease mode, we don't decrease cache reservation @@ -66,41 +85,32 @@ Status CacheReservationManager::UpdateCacheReservation( } } -// Explicitly instantiate templates for "CacheEntryRole" values we use. -// This makes it possible to keep the template definitions in the .cc file. -template Status CacheReservationManager::UpdateCacheReservation< - CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used); -template Status CacheReservationManager::UpdateCacheReservation< - CacheEntryRole::kCompressionDictionaryBuildingBuffer>( - std::size_t new_mem_used); -// For cache reservation manager unit tests -template Status CacheReservationManager::UpdateCacheReservation< - CacheEntryRole::kMisc>(std::size_t new_mem_used); - template -Status CacheReservationManager::MakeCacheReservation( +Status CacheReservationManagerImpl::MakeCacheReservation( std::size_t incremental_memory_used, - std::unique_ptr>* handle) { - assert(handle != nullptr); + std::unique_ptr* handle) { + assert(handle); Status s = - UpdateCacheReservation(GetTotalMemoryUsed() + incremental_memory_used); - (*handle).reset(new CacheReservationHandle(incremental_memory_used, - shared_from_this())); + UpdateCacheReservation(GetTotalMemoryUsed() + incremental_memory_used); + (*handle).reset(new CacheReservationManagerImpl::CacheReservationHandle( + incremental_memory_used, + std::enable_shared_from_this< + CacheReservationManagerImpl>::shared_from_this())); return s; } -template Status -CacheReservationManager::MakeCacheReservation( - std::size_t incremental_memory_used, - std::unique_ptr>* handle); -template Status CacheReservationManager::MakeCacheReservation< - CacheEntryRole::kFilterConstruction>( - std::size_t incremental_memory_used, - std::unique_ptr< - CacheReservationHandle>* handle); +template +Status CacheReservationManagerImpl::ReleaseCacheReservation( + std::size_t incremental_memory_used) { + assert(GetTotalMemoryUsed() >= incremental_memory_used); + std::size_t updated_total_mem_used = + GetTotalMemoryUsed() - incremental_memory_used; + Status s = UpdateCacheReservation(updated_total_mem_used); + return s; +} template -Status CacheReservationManager::IncreaseCacheReservation( +Status CacheReservationManagerImpl::IncreaseCacheReservation( std::size_t new_mem_used) { Status return_status = Status::OK(); while (new_mem_used > cache_allocated_size_.load(std::memory_order_relaxed)) { @@ -118,7 +128,8 @@ Status CacheReservationManager::IncreaseCacheReservation( return return_status; } -Status CacheReservationManager::DecreaseCacheReservation( +template +Status CacheReservationManagerImpl::DecreaseCacheReservation( std::size_t new_mem_used) { Status return_status = Status::OK(); @@ -137,15 +148,18 @@ Status CacheReservationManager::DecreaseCacheReservation( return return_status; } -std::size_t CacheReservationManager::GetTotalReservedCacheSize() { +template +std::size_t CacheReservationManagerImpl::GetTotalReservedCacheSize() { return cache_allocated_size_.load(std::memory_order_relaxed); } -std::size_t CacheReservationManager::GetTotalMemoryUsed() { +template +std::size_t CacheReservationManagerImpl::GetTotalMemoryUsed() { return memory_used_; } -Slice CacheReservationManager::GetNextCacheKey() { +template +Slice CacheReservationManagerImpl::GetNextCacheKey() { // Calling this function will have the side-effect of changing the // underlying cache_key_ that is shared among other keys generated from this // fucntion. Therefore please make sure the previous keys are saved/copied @@ -155,34 +169,15 @@ Slice CacheReservationManager::GetNextCacheKey() { } template -Cache::DeleterFn CacheReservationManager::TEST_GetNoopDeleterForRole() { +Cache::DeleterFn CacheReservationManagerImpl::TEST_GetNoopDeleterForRole() { return GetNoopDeleterForRole(); } -template Cache::DeleterFn CacheReservationManager::TEST_GetNoopDeleterForRole< - CacheEntryRole::kFilterConstruction>(); - -template -CacheReservationHandle::CacheReservationHandle( - std::size_t incremental_memory_used, - std::shared_ptr cache_res_mgr) - : incremental_memory_used_(incremental_memory_used) { - assert(cache_res_mgr != nullptr); - cache_res_mgr_ = cache_res_mgr; -} - -template -CacheReservationHandle::~CacheReservationHandle() { - assert(cache_res_mgr_ != nullptr); - assert(cache_res_mgr_->GetTotalMemoryUsed() >= incremental_memory_used_); - - Status s = cache_res_mgr_->UpdateCacheReservation( - cache_res_mgr_->GetTotalMemoryUsed() - incremental_memory_used_); - s.PermitUncheckedError(); -} - -// Explicitly instantiate templates for "CacheEntryRole" values we use. -// This makes it possible to keep the template definitions in the .cc file. -template class CacheReservationHandle; -template class CacheReservationHandle; +template class CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>; +template class CacheReservationManagerImpl< + CacheEntryRole::kCompressionDictionaryBuildingBuffer>; +template class CacheReservationManagerImpl; +template class CacheReservationManagerImpl; +template class CacheReservationManagerImpl; } // namespace ROCKSDB_NAMESPACE diff --git a/cache/cache_reservation_manager.h b/cache/cache_reservation_manager.h index c3d9a3226b..fd003ddc53 100644 --- a/cache/cache_reservation_manager.h +++ b/cache/cache_reservation_manager.h @@ -13,54 +13,90 @@ #include #include #include +#include #include #include "cache/cache_entry_roles.h" +#include "cache/cache_key.h" #include "rocksdb/cache.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "table/block_based/block_based_table_reader.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { +// CacheReservationManager is an interface for reserving cache space for the +// memory used +class CacheReservationManager { + public: + // CacheReservationHandle is for managing the lifetime of a cache reservation + // for an incremental amount of memory used (i.e, incremental_memory_used) + class CacheReservationHandle { + public: + virtual ~CacheReservationHandle() {} + }; + virtual ~CacheReservationManager() {} + virtual Status UpdateCacheReservation(std::size_t new_memory_used) = 0; + virtual Status MakeCacheReservation( + std::size_t incremental_memory_used, + std::unique_ptr + *handle) = 0; + virtual std::size_t GetTotalReservedCacheSize() = 0; + virtual std::size_t GetTotalMemoryUsed() = 0; +}; -template -class CacheReservationHandle; - -// CacheReservationManager is for reserving cache space for the memory used -// through inserting/releasing dummy entries in the cache. +// CacheReservationManagerImpl implements interface CacheReservationManager +// for reserving cache space for the memory used by inserting/releasing dummy +// entries in the cache. // // This class is NOT thread-safe, except that GetTotalReservedCacheSize() // can be called without external synchronization. -class CacheReservationManager - : public std::enable_shared_from_this { +template +class CacheReservationManagerImpl + : public CacheReservationManager, + public std::enable_shared_from_this> { public: - // Construct a CacheReservationManager + class CacheReservationHandle + : public CacheReservationManager::CacheReservationHandle { + public: + CacheReservationHandle( + std::size_t incremental_memory_used, + std::shared_ptr cache_res_mgr); + ~CacheReservationHandle() override; + + private: + std::size_t incremental_memory_used_; + std::shared_ptr cache_res_mgr_; + }; + + // Construct a CacheReservationManagerImpl // @param cache The cache where dummy entries are inserted and released for // reserving cache space // @param delayed_decrease If set true, then dummy entries won't be released - // immediately when memory usage decreases. + // immediately when memory usage decreases. // Instead, it will be released when the memory usage // decreases to 3/4 of what we have reserved so far. // This is for saving some future dummy entry // insertion when memory usage increases are likely to // happen in the near future. - explicit CacheReservationManager(std::shared_ptr cache, - bool delayed_decrease = false); + // + // REQUIRED: cache is not nullptr + explicit CacheReservationManagerImpl(std::shared_ptr cache, + bool delayed_decrease = false); // no copy constructor, copy assignment, move constructor, move assignment - CacheReservationManager(const CacheReservationManager &) = delete; - CacheReservationManager &operator=(const CacheReservationManager &) = delete; - CacheReservationManager(CacheReservationManager &&) = delete; - CacheReservationManager &operator=(CacheReservationManager &&) = delete; + CacheReservationManagerImpl(const CacheReservationManagerImpl &) = delete; + CacheReservationManagerImpl &operator=(const CacheReservationManagerImpl &) = + delete; + CacheReservationManagerImpl(CacheReservationManagerImpl &&) = delete; + CacheReservationManagerImpl &operator=(CacheReservationManagerImpl &&) = + delete; - ~CacheReservationManager(); + ~CacheReservationManagerImpl() override; - template - - // One of the two ways of reserving/releasing cache, - // see CacheReservationManager::MakeCacheReservation() for the other. - // Use ONLY one of them to prevent unexpected behavior. + // One of the two ways of reserving/releasing cache space, + // see MakeCacheReservation() for the other. + // + // Use ONLY one of these two ways to prevent unexpected behavior. // // Insert and release dummy entries in the cache to // match the size of total dummy entries with the least multiple of @@ -90,11 +126,13 @@ class CacheReservationManager // Otherwise, it returns the first non-ok status; // On releasing dummy entries, it always returns Status::OK(). // On keeping dummy entries the same, it always returns Status::OK(). - Status UpdateCacheReservation(std::size_t new_memory_used); + Status UpdateCacheReservation(std::size_t new_memory_used) override; - // One of the two ways of reserving/releasing cache, - // see CacheReservationManager::UpdateCacheReservation() for the other. - // Use ONLY one of them to prevent unexpected behavior. + // One of the two ways of reserving cache space and releasing is done through + // destruction of CacheReservationHandle. + // See UpdateCacheReservation() for the other way. + // + // Use ONLY one of these two ways to prevent unexpected behavior. // // Insert dummy entries in the cache for the incremental memory usage // to match the size of total dummy entries with the least multiple of @@ -118,21 +156,19 @@ class CacheReservationManager // calling MakeCacheReservation() is needed if you want // GetTotalMemoryUsed() indeed returns the latest memory used. // - // @param handle An pointer to std::unique_ptr> that - // manages the lifetime of the handle and its cache reservation. + // @param handle An pointer to std::unique_ptr that + // manages the lifetime of the cache reservation represented by the + // handle. // // @return It returns Status::OK() if all dummy // entry insertions succeed. // Otherwise, it returns the first non-ok status; // // REQUIRES: handle != nullptr - // REQUIRES: The CacheReservationManager object is NOT managed by - // std::unique_ptr as CacheReservationHandle needs to - // shares ownership to the CacheReservationManager object. - template Status MakeCacheReservation( std::size_t incremental_memory_used, - std::unique_ptr> *handle); + std::unique_ptr *handle) + override; // Return the size of the cache (which is a multiple of kSizeDummyEntry) // successfully reserved by calling UpdateCacheReservation(). @@ -142,25 +178,25 @@ class CacheReservationManager // smaller number than the actual reserved cache size due to // the returned number will always be a multiple of kSizeDummyEntry // and cache full might happen in the middle of inserting a dummy entry. - std::size_t GetTotalReservedCacheSize(); + std::size_t GetTotalReservedCacheSize() override; // Return the latest total memory used indicated by the most recent call of // UpdateCacheReservation(std::size_t new_memory_used); - std::size_t GetTotalMemoryUsed(); + std::size_t GetTotalMemoryUsed() override; static constexpr std::size_t GetDummyEntrySize() { return kSizeDummyEntry; } // For testing only - it is to help ensure the NoopDeleterForRole - // accessed from CacheReservationManager and the one accessed from the test - // are from the same translation units - template + // accessed from CacheReservationManagerImpl and the one accessed from the + // test are from the same translation units static Cache::DeleterFn TEST_GetNoopDeleterForRole(); private: static constexpr std::size_t kSizeDummyEntry = 256 * 1024; Slice GetNextCacheKey(); - template + + Status ReleaseCacheReservation(std::size_t incremental_memory_used); Status IncreaseCacheReservation(std::size_t new_mem_used); Status DecreaseCacheReservation(std::size_t new_mem_used); @@ -172,20 +208,81 @@ class CacheReservationManager CacheKey cache_key_; }; -// CacheReservationHandle is for managing the lifetime of a cache reservation -// This class is NOT thread-safe -template -class CacheReservationHandle { +class ConcurrentCacheReservationManager + : public CacheReservationManager, + public std::enable_shared_from_this { public: - // REQUIRES: cache_res_mgr != nullptr - explicit CacheReservationHandle( - std::size_t incremental_memory_used, - std::shared_ptr cache_res_mgr); + class CacheReservationHandle + : public CacheReservationManager::CacheReservationHandle { + public: + CacheReservationHandle( + std::shared_ptr cache_res_mgr, + std::unique_ptr + cache_res_handle) { + assert(cache_res_mgr && cache_res_handle); + cache_res_mgr_ = cache_res_mgr; + cache_res_handle_ = std::move(cache_res_handle); + } - ~CacheReservationHandle(); + ~CacheReservationHandle() override { + std::lock_guard lock(cache_res_mgr_->cache_res_mgr_mu_); + cache_res_handle_.reset(); + } + + private: + std::shared_ptr cache_res_mgr_; + std::unique_ptr + cache_res_handle_; + }; + + explicit ConcurrentCacheReservationManager( + std::shared_ptr cache_res_mgr) { + cache_res_mgr_ = std::move(cache_res_mgr); + } + ConcurrentCacheReservationManager(const ConcurrentCacheReservationManager &) = + delete; + ConcurrentCacheReservationManager &operator=( + const ConcurrentCacheReservationManager &) = delete; + ConcurrentCacheReservationManager(ConcurrentCacheReservationManager &&) = + delete; + ConcurrentCacheReservationManager &operator=( + ConcurrentCacheReservationManager &&) = delete; + + ~ConcurrentCacheReservationManager() override {} + + inline Status UpdateCacheReservation(std::size_t new_memory_used) override { + std::lock_guard lock(cache_res_mgr_mu_); + return cache_res_mgr_->UpdateCacheReservation(new_memory_used); + } + inline Status MakeCacheReservation( + std::size_t incremental_memory_used, + std::unique_ptr *handle) + override { + std::unique_ptr + wrapped_handle; + Status s; + { + std::lock_guard lock(cache_res_mgr_mu_); + s = cache_res_mgr_->MakeCacheReservation(incremental_memory_used, + &wrapped_handle); + } + (*handle).reset( + new ConcurrentCacheReservationManager::CacheReservationHandle( + std::enable_shared_from_this< + ConcurrentCacheReservationManager>::shared_from_this(), + std::move(wrapped_handle))); + return s; + } + inline std::size_t GetTotalReservedCacheSize() override { + return cache_res_mgr_->GetTotalReservedCacheSize(); + } + inline std::size_t GetTotalMemoryUsed() override { + std::lock_guard lock(cache_res_mgr_mu_); + return cache_res_mgr_->GetTotalMemoryUsed(); + } private: - std::size_t incremental_memory_used_; + std::mutex cache_res_mgr_mu_; std::shared_ptr cache_res_mgr_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/cache/cache_reservation_manager_test.cc b/cache/cache_reservation_manager_test.cc index a5b73ad961..0ed00a0081 100644 --- a/cache/cache_reservation_manager_test.cc +++ b/cache/cache_reservation_manager_test.cc @@ -15,7 +15,6 @@ #include "cache/cache_entry_roles.h" #include "rocksdb/cache.h" #include "rocksdb/slice.h" -#include "table/block_based/block_based_table_reader.h" #include "test_util/testharness.h" #include "util/coding.h" @@ -23,25 +22,24 @@ namespace ROCKSDB_NAMESPACE { class CacheReservationManagerTest : public ::testing::Test { protected: static constexpr std::size_t kSizeDummyEntry = - CacheReservationManager::GetDummyEntrySize(); + CacheReservationManagerImpl::GetDummyEntrySize(); static constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry; static constexpr int kNumShardBits = 0; // 2^0 shard static constexpr std::size_t kMetaDataChargeOverhead = 10000; std::shared_ptr cache = NewLRUCache(kCacheCapacity, kNumShardBits); - std::unique_ptr test_cache_rev_mng; + std::shared_ptr test_cache_rev_mng; CacheReservationManagerTest() { - test_cache_rev_mng.reset(new CacheReservationManager(cache)); + test_cache_rev_mng = + std::make_shared>( + cache); } }; TEST_F(CacheReservationManagerTest, GenerateCacheKey) { std::size_t new_mem_used = 1 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); ASSERT_LT(cache->GetPinnedUsage(), @@ -66,10 +64,7 @@ TEST_F(CacheReservationManagerTest, GenerateCacheKey) { TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) { std::size_t new_mem_used = 1 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 1 * kSizeDummyEntry); @@ -79,9 +74,7 @@ TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) { ASSERT_LT(initial_pinned_usage, 1 * kSizeDummyEntry + kMetaDataChargeOverhead); - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to keep cache reservation the same when new_mem_used equals " "to current cache reservation"; @@ -100,10 +93,7 @@ TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) { TEST_F(CacheReservationManagerTest, IncreaseCacheReservationByMultiplesOfDummyEntrySize) { std::size_t new_mem_used = 2 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to increase cache reservation correctly"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), @@ -121,10 +111,7 @@ TEST_F(CacheReservationManagerTest, TEST_F(CacheReservationManagerTest, IncreaseCacheReservationNotByMultiplesOfDummyEntrySize) { std::size_t new_mem_used = 2 * kSizeDummyEntry + kSizeDummyEntry / 2; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to increase cache reservation correctly"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), @@ -143,7 +130,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, IncreaseCacheReservationOnFullCache) { ; constexpr std::size_t kSizeDummyEntry = - CacheReservationManager::GetDummyEntrySize(); + CacheReservationManagerImpl::GetDummyEntrySize(); constexpr std::size_t kSmallCacheCapacity = 4 * kSizeDummyEntry; constexpr std::size_t kBigCacheCapacity = 4096 * kSizeDummyEntry; constexpr std::size_t kMetaDataChargeOverhead = 10000; @@ -153,14 +140,12 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, lo.num_shard_bits = 0; // 2^0 shard lo.strict_capacity_limit = true; std::shared_ptr cache = NewLRUCache(lo); - std::unique_ptr test_cache_rev_mng( - new CacheReservationManager(cache)); + std::shared_ptr test_cache_rev_mng = + std::make_shared>( + cache); std::size_t new_mem_used = kSmallCacheCapacity + 1; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::Incomplete()) << "Failed to return status to indicate failure of dummy entry insertion " "during cache reservation on full cache"; @@ -183,9 +168,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, "encountering cache resevation failure due to full cache"; new_mem_used = kSmallCacheCapacity / 2; // 2 dummy entries - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to decrease cache reservation after encountering cache " "reservation failure due to full cache"; @@ -207,9 +190,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, // Create cache full again for subsequent tests new_mem_used = kSmallCacheCapacity + 1; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::Incomplete()) << "Failed to return status to indicate failure of dummy entry insertion " "during cache reservation on full cache"; @@ -235,9 +216,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, // succeed cache->SetCapacity(kBigCacheCapacity); new_mem_used = kSmallCacheCapacity + 1; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to increase cache reservation after increasing cache capacity " "and mitigating cache full error"; @@ -259,10 +238,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, TEST_F(CacheReservationManagerTest, DecreaseCacheReservationByMultiplesOfDummyEntrySize) { std::size_t new_mem_used = 2 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 2 * kSizeDummyEntry); @@ -272,9 +248,7 @@ TEST_F(CacheReservationManagerTest, 2 * kSizeDummyEntry + kMetaDataChargeOverhead); new_mem_used = 1 * kSizeDummyEntry; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to decrease cache reservation correctly"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), @@ -292,10 +266,7 @@ TEST_F(CacheReservationManagerTest, TEST_F(CacheReservationManagerTest, DecreaseCacheReservationNotByMultiplesOfDummyEntrySize) { std::size_t new_mem_used = 2 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 2 * kSizeDummyEntry); @@ -305,9 +276,7 @@ TEST_F(CacheReservationManagerTest, 2 * kSizeDummyEntry + kMetaDataChargeOverhead); new_mem_used = kSizeDummyEntry / 2; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to decrease cache reservation correctly"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), @@ -325,7 +294,7 @@ TEST_F(CacheReservationManagerTest, TEST(CacheReservationManagerWithDelayedDecreaseTest, DecreaseCacheReservationWithDelayedDecrease) { constexpr std::size_t kSizeDummyEntry = - CacheReservationManager::GetDummyEntrySize(); + CacheReservationManagerImpl::GetDummyEntrySize(); constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry; constexpr std::size_t kMetaDataChargeOverhead = 10000; @@ -333,14 +302,12 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest, lo.capacity = kCacheCapacity; lo.num_shard_bits = 0; std::shared_ptr cache = NewLRUCache(lo); - std::unique_ptr test_cache_rev_mng( - new CacheReservationManager(cache, true /* delayed_decrease */)); + std::shared_ptr test_cache_rev_mng = + std::make_shared>( + cache, true /* delayed_decrease */); std::size_t new_mem_used = 8 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 8 * kSizeDummyEntry); @@ -351,9 +318,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest, 8 * kSizeDummyEntry + kMetaDataChargeOverhead); new_mem_used = 6 * kSizeDummyEntry; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 8 * kSizeDummyEntry) @@ -365,9 +330,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest, << "Failed to delay decreasing underlying dummy entries in cache"; new_mem_used = 7 * kSizeDummyEntry; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), 8 * kSizeDummyEntry) @@ -379,9 +342,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest, << "Failed to delay decreasing underlying dummy entries in cache"; new_mem_used = 6 * kSizeDummyEntry - 1; - s = test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); EXPECT_EQ(s, Status::OK()) << "Failed to decrease cache reservation correctly when new_mem_used < " "GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode"; @@ -405,7 +366,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest, TEST(CacheReservationManagerDestructorTest, ReleaseRemainingDummyEntriesOnDestruction) { constexpr std::size_t kSizeDummyEntry = - CacheReservationManager::GetDummyEntrySize(); + CacheReservationManagerImpl::GetDummyEntrySize(); constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry; constexpr std::size_t kMetaDataChargeOverhead = 10000; @@ -414,13 +375,11 @@ TEST(CacheReservationManagerDestructorTest, lo.num_shard_bits = 0; std::shared_ptr cache = NewLRUCache(lo); { - std::unique_ptr test_cache_rev_mng( - new CacheReservationManager(cache)); + std::shared_ptr test_cache_rev_mng = + std::make_shared>( + cache); std::size_t new_mem_used = 1 * kSizeDummyEntry; - Status s = - test_cache_rev_mng - ->UpdateCacheReservation( - new_mem_used); + Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used); ASSERT_EQ(s, Status::OK()); ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); ASSERT_LT(cache->GetPinnedUsage(), @@ -442,18 +401,19 @@ TEST(CacheReservationHandleTest, HandleTest) { std::shared_ptr cache = NewLRUCache(lo); std::shared_ptr test_cache_rev_mng( - std::make_shared(cache)); + std::make_shared>( + cache)); std::size_t mem_used = 0; const std::size_t incremental_mem_used_handle_1 = 1 * kSizeDummyEntry; const std::size_t incremental_mem_used_handle_2 = 2 * kSizeDummyEntry; - std::unique_ptr> handle_1, + std::unique_ptr handle_1, handle_2; // To test consecutive CacheReservationManager::MakeCacheReservation works // correctly in terms of returning the handle as well as updating cache // reservation and the latest total memory used - Status s = test_cache_rev_mng->MakeCacheReservation( + Status s = test_cache_rev_mng->MakeCacheReservation( incremental_mem_used_handle_1, &handle_1); mem_used = mem_used + incremental_mem_used_handle_1; ASSERT_EQ(s, Status::OK()); @@ -463,8 +423,8 @@ TEST(CacheReservationHandleTest, HandleTest) { EXPECT_GE(cache->GetPinnedUsage(), mem_used); EXPECT_LT(cache->GetPinnedUsage(), mem_used + kMetaDataChargeOverhead); - s = test_cache_rev_mng->MakeCacheReservation( - incremental_mem_used_handle_2, &handle_2); + s = test_cache_rev_mng->MakeCacheReservation(incremental_mem_used_handle_2, + &handle_2); mem_used = mem_used + incremental_mem_used_handle_2; ASSERT_EQ(s, Status::OK()); EXPECT_TRUE(handle_2 != nullptr); @@ -473,8 +433,9 @@ TEST(CacheReservationHandleTest, HandleTest) { EXPECT_GE(cache->GetPinnedUsage(), mem_used); EXPECT_LT(cache->GetPinnedUsage(), mem_used + kMetaDataChargeOverhead); - // To test CacheReservationHandle::~CacheReservationHandle() works correctly - // in releasing the cache reserved for the handle + // To test + // CacheReservationManager::CacheReservationHandle::~CacheReservationHandle() + // works correctly in releasing the cache reserved for the handle handle_1.reset(); EXPECT_TRUE(handle_1 == nullptr); mem_used = mem_used - incremental_mem_used_handle_1; diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index c613505c5f..af2a8bdf43 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -952,8 +952,8 @@ class FilterConstructResPeakTrackingCache : public CacheWrapper { const Cache::DeleterFn FilterConstructResPeakTrackingCache::kNoopDeleterForFilterConstruction = - CacheReservationManager::TEST_GetNoopDeleterForRole< - CacheEntryRole::kFilterConstruction>(); + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::TEST_GetNoopDeleterForRole(); // To align with the type of hash entry being reserved in implementation. using FilterConstructionReserveMemoryHash = uint64_t; @@ -983,7 +983,9 @@ class DBFilterConstructionReserveMemoryTestWithParam // trigger at least 1 dummy entry reservation each for hash entries and // final filter, we need a large number of keys to ensure we have at least // two partitions. - num_key_ = 18 * CacheReservationManager::GetDummyEntrySize() / + num_key_ = 18 * + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() / sizeof(FilterConstructionReserveMemoryHash); } else if (policy_ == kFastLocalBloom) { // For Bloom Filter + FullFilter case, since we design the num_key_ to @@ -993,7 +995,9 @@ class DBFilterConstructionReserveMemoryTestWithParam // behavior and we don't need a large number of keys to verify we // indeed charge the final filter for cache reservation, even though final // filter is a lot smaller than hash entries. - num_key_ = 1 * CacheReservationManager::GetDummyEntrySize() / + num_key_ = 1 * + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() / sizeof(FilterConstructionReserveMemoryHash); } else { // For Ribbon Filter + FullFilter case, we need a large enough number of @@ -1001,7 +1005,9 @@ class DBFilterConstructionReserveMemoryTestWithParam // reservation will trigger at least another dummy entry (or equivalently // to saying, causing another peak in cache reservation) as banding // reservation might not be a multiple of dummy entry. - num_key_ = 12 * CacheReservationManager::GetDummyEntrySize() / + num_key_ = 12 * + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() / sizeof(FilterConstructionReserveMemoryHash); } } @@ -1156,8 +1162,8 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) { return; } - const std::size_t kDummyEntrySize = - CacheReservationManager::GetDummyEntrySize(); + const std::size_t kDummyEntrySize = CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize(); const std::size_t predicted_hash_entries_cache_res = num_key * sizeof(FilterConstructionReserveMemoryHash); @@ -1345,9 +1351,12 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) { * */ if (!partition_filters) { - ASSERT_GE(std::floor(1.0 * predicted_final_filter_cache_res / - CacheReservationManager::GetDummyEntrySize()), - 1) + ASSERT_GE( + std::floor( + 1.0 * predicted_final_filter_cache_res / + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()), + 1) << "Final filter cache reservation too small for this test - please " "increase the number of keys"; if (!detect_filter_construct_corruption) { diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index b036e1ef96..fac924d311 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -11,7 +11,9 @@ #include "file/sst_file_manager_impl.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/cache.h" #include "rocksdb/sst_file_manager.h" +#include "rocksdb/table.h" #include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -1393,6 +1395,69 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) { } } +TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) { + for (bool reserve_table_builder_memory : {true, false}) { + // Open DB with infinite max open files + // - First iteration use 1 thread to open files + // - Second iteration use 5 threads to open files + for (int iter = 0; iter < 2; iter++) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.disable_auto_compactions = true; + options.max_open_files = -1; + + BlockBasedTableOptions table_options; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + if (iter == 0) { + options.max_file_opening_threads = 1; + } else { + options.max_file_opening_threads = 5; + } + + DestroyAndReopen(options); + + // Create 5 Files in L0 (then move then to L2) + for (int i = 0; i < 5; i++) { + std::string k = "L2_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()) << i; + } + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + + // Create 5 Files in L0 + for (int i = 0; i < 5; i++) { + std::string k = "L0_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()); + } + Close(); + + table_options.reserve_table_reader_memory = reserve_table_builder_memory; + table_options.block_cache = + NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */, + true /* strict_capacity_limit */); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + // Reopening the DB will try to load all existing files, conditionally + // subject to memory limit + Status s = TryReopen(options); + if (table_options.reserve_table_reader_memory) { + EXPECT_TRUE(s.IsMemoryLimit()); + EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") != + std::string::npos); + + } else { + EXPECT_TRUE(s.ok()); + ASSERT_EQ("5,0,5", FilesPerLevel(0)); + } + } + } +} + TEST_F(DBSSTTest, GetTotalSstFilesSize) { // We don't propagate oldest-key-time table property on compaction and // just write 0 as default value. This affect the exact table size, since diff --git a/db/db_table_properties_test.cc b/db/db_table_properties_test.cc index ef3ab7c489..b3ee8a41a2 100644 --- a/db/db_table_properties_test.cc +++ b/db/db_table_properties_test.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include @@ -18,6 +19,7 @@ #include "rocksdb/utilities/table_properties_collectors.h" #include "table/format.h" #include "table/meta_blocks.h" +#include "table/table_properties_internal.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/random.h" diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 1dea256283..f98cd1ec99 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -134,6 +134,7 @@ DECLARE_int32(set_in_place_one_in); DECLARE_int64(cache_size); DECLARE_int32(cache_numshardbits); DECLARE_bool(cache_index_and_filter_blocks); +DECLARE_bool(reserve_table_reader_memory); DECLARE_int32(top_level_index_pinning); DECLARE_int32(partition_pinning); DECLARE_int32(unpartitioned_pinning); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index e7eee76ebe..a137ad97d4 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -304,6 +304,11 @@ DEFINE_int32(cache_numshardbits, 6, DEFINE_bool(cache_index_and_filter_blocks, false, "True if indexes/filters should be cached in block cache."); +DEFINE_bool(reserve_table_reader_memory, false, + "A dynamically updating charge to block cache, loosely based on " + "the actual memory usage of table reader, will occur to account " + "the memory, if block cache available."); + DEFINE_int32( top_level_index_pinning, static_cast(ROCKSDB_NAMESPACE::PinningTier::kFallback), diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index e6afb7cfc1..3acaefd673 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2319,6 +2319,8 @@ void StressTest::Open() { block_based_options.block_cache_compressed = compressed_cache_; block_based_options.checksum = checksum_type_e; block_based_options.block_size = FLAGS_block_size; + block_based_options.reserve_table_reader_memory = + FLAGS_reserve_table_reader_memory; block_based_options.format_version = static_cast(FLAGS_format_version); block_based_options.index_block_restart_interval = diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e37e675d3d..9e9305e9cc 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -551,6 +551,10 @@ struct DBOptions { // on target_file_size_base and target_file_size_multiplier for level-based // compaction. For universal-style compaction, you can usually set it to -1. // + // A high value or -1 for this option can cause high memory usage. + // See BlockBasedTableOptions::reserve_table_reader_memory to constrain + // memory usage in case of block based table format. + // // Default: -1 // // Dynamically changeable through SetDBOptions() API. diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index ffb8bd26aa..53eb47d788 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -300,12 +300,35 @@ struct BlockBasedTableOptions { // // If additional temporary memory of Ribbon Filter uses up too much memory // relative to the avaible space left in the block cache - // at some point (i.e, causing a cache full when strict_capacity_limit = - // true), construction will fall back to Bloom Filter. + // at some point (i.e, causing a cache full under + // LRUCacheOptions::strict_capacity_limit = true), construction will fall back + // to Bloom Filter. // // Default: false bool reserve_table_builder_memory = false; + // If true, a dynamically updating charge to block cache, loosely based + // on the actual memory usage of table reader, will occur to account + // the memory, if block cache available. + // + // Charged memory usage includes: + // 1. Table properties + // 2. Index block/Filter block/Uncompression dictionary if stored in table + // reader (i.e, BlockBasedTableOptions::cache_index_and_filter_blocks == + // false) + // 3. Some internal data structures + // 4. More to come... + // + // Note: + // If creation of a table reader uses up too much memory + // relative to the avaible space left in the block cache + // at some point (i.e, causing a cache full under + // LRUCacheOptions::strict_capacity_limit = true), such creation will fail + // with Status::MemoryLimit(). + // + // Default: false + bool reserve_table_reader_memory = false; + // Note: currently this option requires kTwoLevelIndexSearch to be set as // well. // TODO(myabandeh): remove the note above once the limitation is lifted diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 0c91fa8543..b91ab604af 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -301,6 +301,10 @@ struct TableProperties { // between tables. Keys match field names in this class instead // of using full property names. std::map GetAggregatablePropertiesAsMap() const; + + // Return the approximated memory usage of this TableProperties object, + // including memory used by the string properties and UserCollectedProperties + std::size_t ApproximateMemoryUsage() const; }; // Extra properties diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 10cb6e0418..7fb18196d7 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -158,7 +158,7 @@ class WriteBufferManager final { std::atomic memory_used_; // Memory that hasn't been scheduled to free. std::atomic memory_active_; - std::unique_ptr cache_res_mgr_; + std::shared_ptr cache_res_mgr_; // Protects cache_res_mgr_ std::mutex cache_res_mgr_mu_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index a2a2a340c8..d539d2ed24 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -9,6 +9,8 @@ #include "rocksdb/write_buffer_manager.h" +#include + #include "cache/cache_entry_roles.h" #include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" @@ -31,8 +33,9 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, // Memtable's memory usage tends to fluctuate frequently // therefore we set delayed_decrease = true to save some dummy entry // insertion on memory increase right after memory decrease - cache_res_mgr_.reset( - new CacheReservationManager(cache, true /* delayed_decrease */)); + cache_res_mgr_ = std::make_shared< + CacheReservationManagerImpl>( + cache, true /* delayed_decrease */); } #else (void)cache; @@ -75,9 +78,7 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) { size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - Status s = - cache_res_mgr_->UpdateCacheReservation( - new_mem_used); + Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); // We absorb the error since WriteBufferManager is not able to handle // this failure properly. Ideallly we should prevent this allocation @@ -114,9 +115,7 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { std::lock_guard lock(cache_res_mgr_mu_); size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - Status s = - cache_res_mgr_->UpdateCacheReservation( - new_mem_used); + Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); // We absorb the error since WriteBufferManager is not able to handle // this failure properly. diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 9d399f0163..8786e3607c 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -189,6 +189,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { "filter_policy=bloomfilter:4:true;whole_key_filtering=1;detect_filter_" "construct_corruption=false;" "reserve_table_builder_memory=false;" + "reserve_table_reader_memory=false;" "format_version=1;" "verify_compression=true;read_amp_bytes_per_bit=0;" "enable_index_compression=false;" diff --git a/options/options_test.cc b/options/options_test.cc index 00a40ff49f..58070b3ff2 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -855,6 +855,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) { "block_size_deviation=8;block_restart_interval=4;" "format_version=5;whole_key_filtering=1;" "reserve_table_builder_memory=true;" + "reserve_table_reader_memory=true;" "filter_policy=bloomfilter:4.567:false;detect_filter_construct_" "corruption=true;" // A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS @@ -877,6 +878,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) { ASSERT_EQ(new_opt.whole_key_filtering, true); ASSERT_EQ(new_opt.detect_filter_construct_corruption, true); ASSERT_EQ(new_opt.reserve_table_builder_memory, true); + ASSERT_EQ(new_opt.reserve_table_reader_memory, true); ASSERT_TRUE(new_opt.filter_policy != nullptr); auto bfp = new_opt.filter_policy->CheckedCast(); ASSERT_NE(bfp, nullptr); diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 41a49556b4..10d8f30063 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -328,7 +328,7 @@ struct BlockBasedTableBuilder::Rep { // `kBuffered` state is allowed only as long as the buffering of uncompressed // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. uint64_t buffer_limit; - std::unique_ptr + std::shared_ptr compression_dict_buffer_cache_res_mgr; const bool use_delta_encoding_for_index_values; std::unique_ptr filter_builder; @@ -462,10 +462,12 @@ struct BlockBasedTableBuilder::Rep { compression_opts.max_dict_buffer_bytes); } if (table_options.no_block_cache || table_options.block_cache == nullptr) { - compression_dict_buffer_cache_res_mgr.reset(nullptr); + compression_dict_buffer_cache_res_mgr = nullptr; } else { - compression_dict_buffer_cache_res_mgr.reset( - new CacheReservationManager(table_options.block_cache)); + compression_dict_buffer_cache_res_mgr = + std::make_shared>( + table_options.block_cache); } for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { compression_ctxs[i].reset(new CompressionContext(compression_type)); @@ -946,8 +948,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { if (!exceeds_buffer_limit && r->compression_dict_buffer_cache_res_mgr != nullptr) { Status s = - r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation< - CacheEntryRole::kCompressionDictionaryBuildingBuffer>( + r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation( r->data_begin_offset); exceeds_global_block_cache_limit = s.IsIncomplete(); } @@ -1975,8 +1976,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() { r->data_begin_offset = 0; // Release all reserved cache for data block buffers if (r->compression_dict_buffer_cache_res_mgr != nullptr) { - Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation< - CacheEntryRole::kCompressionDictionaryBuildingBuffer>( + Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation( r->data_begin_offset); s.PermitUncheckedError(); } diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index a391fa06a9..dad82cac15 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -16,6 +16,7 @@ #include #include "cache/cache_entry_roles.h" +#include "cache/cache_reservation_manager.h" #include "logging/logging.h" #include "options/options_helper.h" #include "port/port.h" @@ -330,6 +331,10 @@ static std::unordered_map {offsetof(struct BlockBasedTableOptions, reserve_table_builder_memory), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"reserve_table_reader_memory", + {offsetof(struct BlockBasedTableOptions, reserve_table_reader_memory), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"skip_table_builder_flush", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, @@ -419,6 +424,14 @@ BlockBasedTableFactory::BlockBasedTableFactory( : table_options_(_table_options) { InitializeOptions(); RegisterOptions(&table_options_, &block_based_table_type_info); + + if (table_options_.reserve_table_reader_memory && + table_options_.no_block_cache == false) { + table_reader_cache_res_mgr_.reset(new ConcurrentCacheReservationManager( + std::make_shared>( + table_options_.block_cache))); + } } void BlockBasedTableFactory::InitializeOptions() { @@ -582,10 +595,10 @@ Status BlockBasedTableFactory::NewTableReader( return BlockBasedTable::Open( ro, table_reader_options.ioptions, table_reader_options.env_options, table_options_, table_reader_options.internal_comparator, std::move(file), - file_size, table_reader, table_reader_options.prefix_extractor, - prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, - table_reader_options.level, table_reader_options.immortal, - table_reader_options.largest_seqno, + file_size, table_reader, table_reader_cache_res_mgr_, + table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache, + table_reader_options.skip_filters, table_reader_options.level, + table_reader_options.immortal, table_reader_options.largest_seqno, table_reader_options.force_direct_prefetch, &tail_prefetch_stats_, table_reader_options.block_cache_tracer, table_reader_options.max_file_size_for_l0_meta_pin, @@ -620,6 +633,12 @@ Status BlockBasedTableFactory::ValidateOptions( "Enable pin_l0_filter_and_index_blocks_in_cache, " ", but block cache is disabled"); } + if (table_options_.reserve_table_reader_memory && + table_options_.no_block_cache) { + return Status::InvalidArgument( + "Enable reserve_table_reader_memory, " + ", but block cache is disabled"); + } if (!IsSupportedFormatVersion(table_options_.format_version)) { return Status::InvalidArgument( "Unsupported BlockBasedTable format_version. Please check " diff --git a/table/block_based/block_based_table_factory.h b/table/block_based/block_based_table_factory.h index cc93a3f4dd..3166cd3cc9 100644 --- a/table/block_based/block_based_table_factory.h +++ b/table/block_based/block_based_table_factory.h @@ -13,6 +13,7 @@ #include #include +#include "cache/cache_reservation_manager.h" #include "port/port.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/table.h" @@ -89,6 +90,7 @@ class BlockBasedTableFactory : public TableFactory { private: BlockBasedTableOptions table_options_; + std::shared_ptr table_reader_cache_res_mgr_; mutable TailPrefetchStats tail_prefetch_stats_; }; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index a1186343e5..51ae6aa779 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -552,6 +553,7 @@ Status BlockBasedTable::Open( const InternalKeyComparator& internal_comparator, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, + std::shared_ptr table_reader_cache_res_mgr, const std::shared_ptr& prefix_extractor, const bool prefetch_index_and_filter_in_cache, const bool skip_filters, const int level, const bool immortal_table, @@ -715,10 +717,22 @@ Status BlockBasedTable::Open( tail_prefetch_stats->RecordEffectiveSize( static_cast(file_size) - prefetch_buffer->min_offset_read()); } - - *table_reader = std::move(new_table); } + if (s.ok() && table_reader_cache_res_mgr) { + std::size_t mem_usage = new_table->ApproximateMemoryUsage(); + s = table_reader_cache_res_mgr->MakeCacheReservation( + mem_usage, &(rep->table_reader_cache_res_handle)); + if (s.IsIncomplete()) { + s = Status::MemoryLimit( + "Can't allocate BlockBasedTableReader due to memory limit based on " + "cache capacity for memory allocation"); + } + } + + if (s.ok()) { + *table_reader = std::move(new_table); + } return s; } @@ -1108,6 +1122,11 @@ std::shared_ptr BlockBasedTable::GetTableProperties() size_t BlockBasedTable::ApproximateMemoryUsage() const { size_t usage = 0; + if (rep_) { + usage += rep_->ApproximateMemoryUsage(); + } else { + return usage; + } if (rep_->filter) { usage += rep_->filter->ApproximateMemoryUsage(); } @@ -1117,6 +1136,9 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const { if (rep_->uncompression_dict_reader) { usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage(); } + if (rep_->table_properties) { + usage += rep_->table_properties->ApproximateMemoryUsage(); + } return usage; } diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 9d056f73e0..2ca1ff08e6 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -10,8 +10,11 @@ #pragma once #include +#include +#include "cache/cache_entry_roles.h" #include "cache/cache_key.h" +#include "cache/cache_reservation_manager.h" #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" #include "rocksdb/slice_transform.h" @@ -98,6 +101,8 @@ class BlockBasedTable : public TableReader { const InternalKeyComparator& internal_key_comparator, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, + std::shared_ptr table_reader_cache_res_mgr = + nullptr, const std::shared_ptr& prefix_extractor = nullptr, bool prefetch_index_and_filter_in_cache = true, bool skip_filters = false, int level = -1, const bool immortal_table = false, @@ -626,6 +631,9 @@ struct BlockBasedTable::Rep { const bool immortal_table; + std::unique_ptr + table_reader_cache_res_handle = nullptr; + SequenceNumber get_global_seqno(BlockType block_type) const { return (block_type == BlockType::kFilter || block_type == BlockType::kCompressionDictionary) @@ -670,6 +678,16 @@ struct BlockBasedTable::Rep { implicit_auto_readahead, async_io); } } + + std::size_t ApproximateMemoryUsage() const { + std::size_t usage = 0; +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return usage; + } }; // This is an adapter class for `WritableFile` to be used for `std::ostream`. diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 305986eca5..d7a3105a3d 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -5,11 +5,17 @@ #include "table/block_based/block_based_table_reader.h" +#include +#include + +#include "cache/cache_reservation_manager.h" +#include "db/db_test_util.h" #include "db/table_properties_collector.h" #include "file/file_util.h" #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/compression_type.h" #include "rocksdb/db.h" #include "rocksdb/file_system.h" #include "table/block_based/block_based_table_builder.h" @@ -22,33 +28,53 @@ namespace ROCKSDB_NAMESPACE { -class BlockBasedTableReaderTest - : public testing::Test, - public testing::WithParamInterface> { +class BlockBasedTableReaderBaseTest : public testing::Test { protected: - CompressionType compression_type_; - bool use_direct_reads_; + // Prepare key-value pairs to occupy multiple blocks. + // Each value is 256B, every 16 pairs constitute 1 block. + // If mixed_with_human_readable_string_value == true, + // then adjacent blocks contain values with different compression + // complexity: human readable strings are easier to compress than random + // strings. + static std::map GenerateKVMap( + int num_block = 100, + bool mixed_with_human_readable_string_value = false) { + std::map kv; + + Random rnd(101); + uint32_t key = 0; + for (int block = 0; block < num_block; block++) { + for (int i = 0; i < 16; i++) { + char k[9] = {0}; + // Internal key is constructed directly from this key, + // and internal key size is required to be >= 8 bytes, + // so use %08u as the format string. + sprintf(k, "%08u", key); + std::string v; + if (mixed_with_human_readable_string_value) { + v = (block % 2) ? rnd.HumanReadableString(256) + : rnd.RandomString(256); + } else { + v = rnd.RandomString(256); + } + kv[std::string(k)] = v; + key++; + } + } + return kv; + } void SetUp() override { - BlockBasedTableOptions::IndexType index_type; - bool no_block_cache; - std::tie(compression_type_, use_direct_reads_, index_type, no_block_cache) = - GetParam(); - SetupSyncPointsToMockDirectIO(); test_dir_ = test::PerThreadDBPath("block_based_table_reader_test"); env_ = Env::Default(); fs_ = FileSystem::Default(); ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); - - BlockBasedTableOptions opts; - opts.index_type = index_type; - opts.no_block_cache = no_block_cache; - table_factory_.reset( - static_cast(NewBlockBasedTableFactory(opts))); + ConfigureTableFactory(); } + virtual void ConfigureTableFactory() = 0; + void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } // Creates a table with the specificied key value pairs (kv). @@ -59,18 +85,18 @@ class BlockBasedTableReaderTest NewFileWriter(table_name, &writer); // Create table builder. - Options options; - ImmutableOptions ioptions(options); - InternalKeyComparator comparator(options.comparator); + ImmutableOptions ioptions(options_); + InternalKeyComparator comparator(options_.comparator); ColumnFamilyOptions cf_options; MutableCFOptions moptions(cf_options); IntTblPropCollectorFactories factories; - std::unique_ptr table_builder(table_factory_->NewTableBuilder( - TableBuilderOptions(ioptions, moptions, comparator, &factories, - compression_type, CompressionOptions(), - 0 /* column_family_id */, kDefaultColumnFamilyName, - -1 /* level */), - writer.get())); + std::unique_ptr table_builder( + options_.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, moptions, comparator, &factories, + compression_type, CompressionOptions(), + 0 /* column_family_id */, + kDefaultColumnFamilyName, -1 /* level */), + writer.get())); // Build table. for (auto it = kv.begin(); it != kv.end(); it++) { @@ -85,35 +111,41 @@ class BlockBasedTableReaderTest const ImmutableOptions& ioptions, const InternalKeyComparator& comparator, const std::string& table_name, - std::unique_ptr* table) { + std::unique_ptr* table, + bool prefetch_index_and_filter_in_cache = true, + Status* status = nullptr) { + const MutableCFOptions moptions(options_); + TableReaderOptions table_reader_options = TableReaderOptions( + ioptions, moptions.prefix_extractor, EnvOptions(), comparator); + std::unique_ptr file; NewFileReader(table_name, foptions, &file); uint64_t file_size = 0; ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size)); - std::unique_ptr table_reader; - ReadOptions ro; - const auto* table_options = - table_factory_->GetOptions(); - ASSERT_NE(table_options, nullptr); - ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), *table_options, - comparator, std::move(file), file_size, - &table_reader)); + std::unique_ptr general_table; + Status s = options_.table_factory->NewTableReader( + ReadOptions(), table_reader_options, std::move(file), file_size, + &general_table, prefetch_index_and_filter_in_cache); - table->reset(reinterpret_cast(table_reader.release())); + if (s.ok()) { + table->reset(reinterpret_cast(general_table.release())); + } + + if (status) { + *status = s; + } } std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } - const std::shared_ptr& fs() const { return fs_; } - - private: std::string test_dir_; Env* env_; std::shared_ptr fs_; - std::unique_ptr table_factory_; + Options options_; + private: void WriteToFile(const std::string& content, const std::string& filename) { std::unique_ptr f; ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr)); @@ -146,35 +178,36 @@ class BlockBasedTableReaderTest } }; +class BlockBasedTableReaderTest + : public BlockBasedTableReaderBaseTest, + public testing::WithParamInterface> { + protected: + void SetUp() override { + compression_type_ = std::get<0>(GetParam()); + use_direct_reads_ = std::get<1>(GetParam()); + BlockBasedTableReaderBaseTest::SetUp(); + } + + void ConfigureTableFactory() override { + BlockBasedTableOptions opts; + opts.index_type = std::get<2>(GetParam()); + opts.no_block_cache = std::get<3>(GetParam()); + options_.table_factory.reset( + static_cast(NewBlockBasedTableFactory(opts))); + } + + CompressionType compression_type_; + bool use_direct_reads_; +}; + // Tests MultiGet in both direct IO and non-direct IO mode. // The keys should be in cache after MultiGet. TEST_P(BlockBasedTableReaderTest, MultiGet) { - // Prepare key-value pairs to occupy multiple blocks. - // Each value is 256B, every 16 pairs constitute 1 block. - // Adjacent blocks contain values with different compression complexity: - // human readable strings are easier to compress than random strings. - std::map kv; - { - Random rnd(101); - uint32_t key = 0; - for (int block = 0; block < 100; block++) { - for (int i = 0; i < 16; i++) { - char k[9] = {0}; - // Internal key is constructed directly from this key, - // and internal key size is required to be >= 8 bytes, - // so use %08u as the format string. - sprintf(k, "%08u", key); - std::string v; - if (block % 2) { - v = rnd.HumanReadableString(256); - } else { - v = rnd.RandomString(256); - } - kv[std::string(k)] = v; - key++; - } - } - } + std::map kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 100 /* num_block */, + true /* mixed_with_human_readable_string_value */); // Prepare keys, values, and statuses for MultiGet. autovector keys; @@ -249,6 +282,220 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) { } } +class BlockBasedTableReaderResOnlyCache : public CacheWrapper { + public: + explicit BlockBasedTableReaderResOnlyCache(std::shared_ptr target) + : CacheWrapper(std::move(target)) {} + + using Cache::Insert; + Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle = nullptr, + Priority priority = Priority::LOW) override { + if (deleter == kNoopDeleterForBlockBasedTableReader) { + return target_->Insert(key, value, charge, deleter, handle, priority); + } else { + return Status::OK(); + } + } + + using Cache::Release; + bool Release(Handle* handle, bool force_erase = false) override { + auto deleter = GetDeleter(handle); + if (deleter == kNoopDeleterForBlockBasedTableReader) { + return target_->Release(handle, force_erase); + } else { + return true; + } + } + + private: + static const Cache::DeleterFn kNoopDeleterForBlockBasedTableReader; +}; + +const Cache::DeleterFn + BlockBasedTableReaderResOnlyCache::kNoopDeleterForBlockBasedTableReader = + CacheReservationManagerImpl:: + TEST_GetNoopDeleterForRole(); + +class BlockBasedTableReaderCapMemoryTest + : public BlockBasedTableReaderBaseTest, + public testing::WithParamInterface< + bool /* reserve_table_builder_memory */> { + protected: + static std::size_t CalculateMaxTableReaderNumBeforeCacheFull( + std::size_t cache_capacity, std::size_t approx_table_reader_mem) { + // To make calculation easier for testing + assert(cache_capacity % CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>:: + GetDummyEntrySize() == + 0 && + cache_capacity > 2 * CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>:: + GetDummyEntrySize()); + + // We need to subtract 1 for max_num_dummy_entry to account for dummy + // entries' overhead, assumed the overhead is no greater than 1 dummy entry + // size + std::size_t max_num_dummy_entry = + (size_t)std::floor(( + 1.0 * cache_capacity / + CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize())) - + 1; + std::size_t cache_capacity_rounded_to_dummy_entry_multiples = + max_num_dummy_entry * + CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize(); + std::size_t max_table_reader_num = static_cast( + std::floor(1.0 * cache_capacity_rounded_to_dummy_entry_multiples / + approx_table_reader_mem)); + + return max_table_reader_num; + } + + void SetUp() override { + // To cache and re-use the same kv map and compression type in the test + // suite for elimiating variance caused by these two factors + kv_ = BlockBasedTableReaderBaseTest::GenerateKVMap(); + compression_type_ = CompressionType::kNoCompression; + + table_reader_res_only_cache_.reset(new BlockBasedTableReaderResOnlyCache( + NewLRUCache(6 * CacheReservationManagerImpl< + CacheEntryRole::kBlockBasedTableReader>:: + GetDummyEntrySize(), + 0 /* num_shard_bits */, true /* strict_capacity_limit */))); + + // To ApproximateTableReaderMem() without encountering any potential errors + // caused by BlocBasedTableReader::reserve_table_reader_memory == true, we + // first turn off the feature to test + reserve_table_reader_memory_ = false; + BlockBasedTableReaderBaseTest::SetUp(); + approx_table_reader_mem_ = ApproximateTableReaderMem(); + + // Now we condtionally turn on the feature to test + reserve_table_reader_memory_ = GetParam(); + ConfigureTableFactory(); + } + + void ConfigureTableFactory() override { + BlockBasedTableOptions table_options; + table_options.reserve_table_reader_memory = reserve_table_reader_memory_; + table_options.block_cache = table_reader_res_only_cache_; + + table_options.cache_index_and_filter_blocks = false; + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + table_options.partition_filters = true; + table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; + + options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); + } + + bool reserve_table_reader_memory_; + std::shared_ptr + table_reader_res_only_cache_; + std::size_t approx_table_reader_mem_; + std::map kv_; + CompressionType compression_type_; + + private: + std::size_t ApproximateTableReaderMem() { + std::size_t approx_table_reader_mem = 0; + + std::string table_name = "table_for_approx_table_reader_mem"; + CreateTable(table_name, compression_type_, kv_); + + std::unique_ptr table; + Status s; + NewBlockBasedTableReader( + FileOptions(), ImmutableOptions(options_), + InternalKeyComparator(options_.comparator), table_name, &table, + false /* prefetch_index_and_filter_in_cache */, &s); + assert(s.ok()); + + approx_table_reader_mem = table->ApproximateMemoryUsage(); + assert(approx_table_reader_mem > 0); + return approx_table_reader_mem; + } +}; + +INSTANTIATE_TEST_CASE_P(CapMemoryUsageUnderCacheCapacity, + BlockBasedTableReaderCapMemoryTest, ::testing::Bool()); + +TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) { + const std::size_t max_table_reader_num = BlockBasedTableReaderCapMemoryTest:: + CalculateMaxTableReaderNumBeforeCacheFull( + table_reader_res_only_cache_->GetCapacity(), + approx_table_reader_mem_); + + Status s = Status::OK(); + std::size_t opened_table_reader_num = 0; + std::string table_name; + std::vector> tables; + + // Keep creating BlockBasedTableReader till hiting the memory limit based on + // cache capacity and creation fails or reaching a big number of table readers + while (s.ok() && opened_table_reader_num < 2 * max_table_reader_num) { + table_name = "table_" + std::to_string(opened_table_reader_num); + CreateTable(table_name, compression_type_, kv_); + tables.push_back(std::unique_ptr()); + NewBlockBasedTableReader( + FileOptions(), ImmutableOptions(options_), + InternalKeyComparator(options_.comparator), table_name, &tables.back(), + false /* prefetch_index_and_filter_in_cache */, &s); + if (s.ok()) { + ++opened_table_reader_num; + } + } + + if (reserve_table_reader_memory_) { + EXPECT_TRUE(s.IsMemoryLimit() && + opened_table_reader_num < 2 * max_table_reader_num) + << "s: " << s.ToString() << " opened_table_reader_num: " + << std::to_string(opened_table_reader_num); + EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") != + std::string::npos); + + // Acceptable estimtation errors coming from + // 1. overstimate max_table_reader_num due to # dummy entries is high and + // results in metadata charge overhead greater than 1 dummy entry size + // (violating our assumption in calculating max_table_reader_nums) + // 2. overestimate/underestimate max_table_reader_num due to the gap between + // ApproximateTableReaderMem() and actual table reader mem + EXPECT_GE(opened_table_reader_num, max_table_reader_num * 0.99); + EXPECT_LE(opened_table_reader_num, max_table_reader_num * 1.01); + + std::size_t updated_max_table_reader_num = + BlockBasedTableReaderCapMemoryTest:: + CalculateMaxTableReaderNumBeforeCacheFull( + table_reader_res_only_cache_->GetCapacity() / 2, + approx_table_reader_mem_); + + // Keep deleting BlockBasedTableReader to lower down memory usage from the + // memory limit to make the next creation succeeds + while (opened_table_reader_num >= updated_max_table_reader_num) { + tables.pop_back(); + --opened_table_reader_num; + } + table_name = "table_for_successful_table_reader_open"; + CreateTable(table_name, compression_type_, kv_); + tables.push_back(std::unique_ptr()); + NewBlockBasedTableReader( + FileOptions(), ImmutableOptions(options_), + InternalKeyComparator(options_.comparator), table_name, &tables.back(), + false /* prefetch_index_and_filter_in_cache */, &s); + EXPECT_TRUE(s.ok()) << s.ToString(); + + tables.clear(); + EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0); + } else { + EXPECT_TRUE(s.ok() && opened_table_reader_num == 2 * max_table_reader_num) + << "s: " << s.ToString() << " opened_table_reader_num: " + << std::to_string(opened_table_reader_num); + EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0); + } +} + class BlockBasedTableReaderTestVerifyChecksum : public BlockBasedTableReaderTest { public: @@ -256,27 +503,8 @@ class BlockBasedTableReaderTestVerifyChecksum }; TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) { - // Prepare key-value pairs to occupy multiple blocks. - // Each value is 256B, every 16 pairs constitute 1 block. - // Adjacent blocks contain values with different compression complexity: - // human readable strings are easier to compress than random strings. - Random rnd(101); - std::map kv; - { - uint32_t key = 0; - for (int block = 0; block < 800; block++) { - for (int i = 0; i < 16; i++) { - char k[9] = {0}; - // Internal key is constructed directly from this key, - // and internal key size is required to be >= 8 bytes, - // so use %08u as the format string. - sprintf(k, "%08u", key); - std::string v = rnd.RandomString(256); - kv[std::string(k)] = v; - key++; - } - } - } + std::map kv = + BlockBasedTableReaderBaseTest::GenerateKVMap(800 /* num_block */); std::string table_name = "BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_); diff --git a/table/block_based/filter_policy.cc b/table/block_based/filter_policy.cc index cfbc658aa4..48c49242cf 100644 --- a/table/block_based/filter_policy.cc +++ b/table/block_based/filter_policy.cc @@ -91,11 +91,9 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { kUint64tHashEntryCacheResBucketSize) == kUint64tHashEntryCacheResBucketSize / 2)) { hash_entries_info_.cache_res_bucket_handles.emplace_back(nullptr); - Status s = - cache_res_mgr_ - ->MakeCacheReservation( - kUint64tHashEntryCacheResBucketSize * sizeof(hash), - &hash_entries_info_.cache_res_bucket_handles.back()); + Status s = cache_res_mgr_->MakeCacheReservation( + kUint64tHashEntryCacheResBucketSize * sizeof(hash), + &hash_entries_info_.cache_res_bucket_handles.back()); s.PermitUncheckedError(); } } @@ -113,7 +111,9 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { // Number of hash entries to accumulate before charging their memory usage to // the cache when cache reservation is available static const std::size_t kUint64tHashEntryCacheResBucketSize = - CacheReservationManager::GetDummyEntrySize() / sizeof(uint64_t); + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() / + sizeof(uint64_t); // For delegating between XXPH3FilterBitsBuilders void SwapEntriesWith(XXPH3FilterBitsBuilder* other) { @@ -259,8 +259,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { // For managing cache reservation for final filter in (new) Bloom and Ribbon // Filter construction - std::deque>> + std::deque> final_filter_cache_res_handles_; bool detect_filter_construct_corruption_; @@ -274,8 +273,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { // it manages cache reservation for buckets of hash entries in (new) Bloom // or Ribbon Filter construction. // Otherwise, it is empty. - std::deque>> + std::deque> cache_res_bucket_handles; // If detect_filter_construct_corruption_ == true, @@ -336,17 +334,14 @@ class FastLocalBloomBitsBuilder : public XXPH3FilterBitsBuilder { size_t len_with_metadata = CalculateSpace(num_entries); std::unique_ptr mutable_buf; - std::unique_ptr> + std::unique_ptr final_filter_cache_res_handle; len_with_metadata = AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf); // Cache reservation for mutable_buf if (cache_res_mgr_) { - Status s = - cache_res_mgr_ - ->MakeCacheReservation( - len_with_metadata * sizeof(char), - &final_filter_cache_res_handle); + Status s = cache_res_mgr_->MakeCacheReservation( + len_with_metadata * sizeof(char), &final_filter_cache_res_handle); s.PermitUncheckedError(); } @@ -661,13 +656,11 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder { Status status_banding_cache_res = Status::OK(); // Cache reservation for banding - std::unique_ptr> + std::unique_ptr banding_res_handle; if (cache_res_mgr_) { - status_banding_cache_res = - cache_res_mgr_ - ->MakeCacheReservation( - bytes_banding, &banding_res_handle); + status_banding_cache_res = cache_res_mgr_->MakeCacheReservation( + bytes_banding, &banding_res_handle); } if (status_banding_cache_res.IsIncomplete()) { @@ -720,17 +713,14 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder { assert(seed < 256); std::unique_ptr mutable_buf; - std::unique_ptr> + std::unique_ptr final_filter_cache_res_handle; len_with_metadata = AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf); // Cache reservation for mutable_buf if (cache_res_mgr_) { - Status s = - cache_res_mgr_ - ->MakeCacheReservation( - len_with_metadata * sizeof(char), - &final_filter_cache_res_handle); + Status s = cache_res_mgr_->MakeCacheReservation( + len_with_metadata * sizeof(char), &final_filter_cache_res_handle); s.PermitUncheckedError(); } @@ -1498,7 +1488,8 @@ FilterBitsBuilder* BloomLikeFilterPolicy::GetFastLocalBloomBuilderWithContext( context.table_options.block_cache); std::shared_ptr cache_res_mgr; if (reserve_filter_construction_mem) { - cache_res_mgr = std::make_shared( + cache_res_mgr = std::make_shared< + CacheReservationManagerImpl>( context.table_options.block_cache); } return new FastLocalBloomBitsBuilder( @@ -1539,7 +1530,8 @@ BloomLikeFilterPolicy::GetStandard128RibbonBuilderWithContext( context.table_options.block_cache); std::shared_ptr cache_res_mgr; if (reserve_filter_construction_mem) { - cache_res_mgr = std::make_shared( + cache_res_mgr = std::make_shared< + CacheReservationManagerImpl>( context.table_options.block_cache); } return new Standard128RibbonBitsBuilder( diff --git a/table/table_properties.cc b/table/table_properties.cc index 83ff31151d..01285e6989 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -5,6 +5,7 @@ #include "rocksdb/table_properties.h" +#include "port/malloc.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/unique_id.h" @@ -213,6 +214,36 @@ TableProperties::GetAggregatablePropertiesAsMap() const { return rv; } +// WARNING: manual update to this function is needed +// whenever a new string property is added to TableProperties +// to reduce approximation error. +// +// TODO: eliminate the need of manually updating this function +// for new string properties +std::size_t TableProperties::ApproximateMemoryUsage() const { + std::size_t usage = 0; +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size((void*)this); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + + std::size_t string_props_mem_usage = + db_id.size() + db_session_id.size() + db_host_id.size() + + column_family_name.size() + filter_policy_name.size() + + comparator_name.size() + merge_operator_name.size() + + prefix_extractor_name.size() + property_collectors_names.size() + + compression_name.size() + compression_options.size(); + usage += string_props_mem_usage; + + for (auto iter = user_collected_properties.begin(); + iter != user_collected_properties.end(); ++iter) { + usage += (iter->first.size() + iter->second.size()); + } + + return usage; +} + const std::string TablePropertiesNames::kDbId = "rocksdb.creating.db.identity"; const std::string TablePropertiesNames::kDbSessionId = "rocksdb.creating.session.identity"; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 7dc25c3775..4ce9ce2b4c 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1125,6 +1125,11 @@ DEFINE_bool(async_io, false, "When set true, RocksDB does asynchronous reads for internal auto " "readahead prefetching."); +DEFINE_bool(reserve_table_reader_memory, false, + "A dynamically updating charge to block cache, loosely based on " + "the actual memory usage of table reader, will occur to account " + "the memory, if block cache available."); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -4049,6 +4054,8 @@ class Benchmark { true; } block_based_options.block_cache = cache_; + block_based_options.reserve_table_reader_memory = + FLAGS_reserve_table_reader_memory; block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; block_based_options.block_restart_interval = FLAGS_block_restart_interval; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 96ec137b0d..bb8508665f 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -41,6 +41,7 @@ default_params = { random.lognormvariate(2.3, 1.3)]), "cache_index_and_filter_blocks": lambda: random.randint(0, 1), "cache_size": 8388608, + "reserve_table_reader_memory": lambda: random.choice([0, 1]), "checkpoint_one_in": 1000000, "compression_type": lambda: random.choice( ["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]), diff --git a/util/bloom_test.cc b/util/bloom_test.cc index f18e07a6d8..e20d3d4035 100644 --- a/util/bloom_test.cc +++ b/util/bloom_test.cc @@ -621,7 +621,8 @@ TEST_P(FullBloomTest, OptimizeForMemory) { TEST(FullBloomFilterConstructionReserveMemTest, RibbonFilterFallBackOnLargeBanding) { constexpr std::size_t kCacheCapacity = - 8 * CacheReservationManager::GetDummyEntrySize(); + 8 * CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize(); constexpr std::size_t num_entries_for_cache_full = kCacheCapacity / 8; for (bool reserve_builder_mem : {true, false}) { @@ -662,12 +663,19 @@ TEST(FullBloomFilterConstructionReserveMemTest, if (reserve_builder_mem) { const size_t dummy_entry_num = static_cast(std::ceil( - filter.size() * 1.0 / CacheReservationManager::GetDummyEntrySize())); - EXPECT_GE(cache->GetPinnedUsage(), - dummy_entry_num * CacheReservationManager::GetDummyEntrySize()); + filter.size() * 1.0 / + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize())); + EXPECT_GE( + cache->GetPinnedUsage(), + dummy_entry_num * + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()); EXPECT_LT( cache->GetPinnedUsage(), - (dummy_entry_num + 1) * CacheReservationManager::GetDummyEntrySize()); + (dummy_entry_num + 1) * + CacheReservationManagerImpl< + CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()); } else { EXPECT_EQ(cache->GetPinnedUsage(), 0); }