From 74cfe7db601083b541f7a790acea599be10d716d Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Tue, 24 Aug 2021 12:42:31 -0700 Subject: [PATCH] Refactor WriteBufferManager::CacheRep into CacheReservationManager (#8506) Summary: Context: To help cap various memory usage by a single limit of the block cache capacity, we charge the memory usage through inserting/releasing dummy entries in the block cache. CacheReservationManager is such a class (non thread-safe) responsible for inserting/removing dummy entries to reserve cache space for memory used by the class user. - Refactored the inner private class CacheRep of WriteBufferManager into public CacheReservationManager class for reusability such as for https://github.com/facebook/rocksdb/pull/8428 - Encapsulated implementation details of cache key generation and dummy entries insertion/release in cache reservation as discussed in https://github.com/facebook/rocksdb/pull/8506#discussion_r666550838 - Consolidated increase/decrease cache reservation into one API - UpdateCacheReservation. - Adjusted the previous dummy entry release algorithm in decreasing cache reservation to be loop-releasing dummy entries to stay symmetric to dummy entry insertion algorithm - Made the previous dummy entry release algorithm in delayed decrease mode more aggressive for better decreasing cache reservation when memory used is less likely to increase back. Previously, the algorithms only release 1 dummy entries when new_mem_used < 3/4 * cache_allocated_size_ and cache_allocated_size_ - kSizeDummyEntry > new_mem_used. Now, the algorithms loop-releases as many dummy entries as possible when new_mem_used < 3/4 * cache_allocated_size_. - Updated WriteBufferManager's test cases to adapt to changes on the release algorithm mentioned above and left comment for some test cases for clarity - Replaced the previous cache key prefix generation (utilizing object address related to the cache client) with one that utilizes Cache->NewID() to prevent cache-key collision among dummy entry clients sharing the same cache. The specific collision we are preventing happens when the object address is reused for a new cache-key prefix while the old cache-key using that same object address in its prefix still exists in the cache. This could happen due to that, under LRU cache policy, there is a possible delay in releasing a cache entry after the cache client object owning that cache entry get deallocated. In this case, the object address related to the cache client object can get reused for other client object to generate a new cache-key prefix. This prefix generation can be made obsolete after Peter's unification of all the code generating cache key, mentioned in https://github.com/facebook/rocksdb/pull/8506#discussion_r667265255 Pull Request resolved: https://github.com/facebook/rocksdb/pull/8506 Test Plan: - Passing the added unit tests cache_reservation_manager_test.cc - Passing existing and adjusted write_buffer_manager_test.cc Reviewed By: ajkr Differential Revision: D29644135 Pulled By: hx235 fbshipit-source-id: 0fc93fbfe4a40bb41be85c314f8f2bafa8b741f7 --- CMakeLists.txt | 2 + Makefile | 4 +- TARGETS | 9 + cache/cache_reservation_manager.cc | 128 ++++++++ cache/cache_reservation_manager.h | 97 ++++++ cache/cache_reservation_manager_test.cc | 411 ++++++++++++++++++++++++ include/rocksdb/write_buffer_manager.h | 14 +- memtable/write_buffer_manager.cc | 135 +++----- memtable/write_buffer_manager_test.cc | 189 +++++++---- src.mk | 2 + 10 files changed, 829 insertions(+), 162 deletions(-) create mode 100644 cache/cache_reservation_manager.cc create mode 100644 cache/cache_reservation_manager.h create mode 100644 cache/cache_reservation_manager_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d78743d95..d7ab5b488b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -624,6 +624,7 @@ find_package(Threads REQUIRED) set(SOURCES cache/cache.cc cache/cache_entry_roles.cc + cache/cache_reservation_manager.cc cache/clock_cache.cc cache/lru_cache.cc cache/sharded_cache.cc @@ -1123,6 +1124,7 @@ if(WITH_TESTS) ) if(WITH_ALL_TESTS) list(APPEND TESTS + cache/cache_reservation_manager_test.cc cache/cache_test.cc cache/lru_cache_test.cc db/blob/blob_counting_iterator_test.cc diff --git a/Makefile b/Makefile index 77ce2dd83a..e2790a2cc3 100644 --- a/Makefile +++ b/Makefile @@ -1897,7 +1897,9 @@ clipping_iterator_test: $(OBJ_DIR)/db/compaction/clipping_iterator_test.o $(TEST ribbon_bench: $(OBJ_DIR)/microbench/ribbon_bench.o $(LIBRARY) $(AM_LINK) - + +cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index b5b992b6b6..ac10bcacb4 100644 --- a/TARGETS +++ b/TARGETS @@ -133,6 +133,7 @@ cpp_library( srcs = [ "cache/cache.cc", "cache/cache_entry_roles.cc", + "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", "cache/sharded_cache.cc", @@ -452,6 +453,7 @@ cpp_library( srcs = [ "cache/cache.cc", "cache/cache_entry_roles.cc", + "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", "cache/sharded_cache.cc", @@ -1025,6 +1027,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "cache_reservation_manager_test", + "cache/cache_reservation_manager_test.cc", + "parallel", + [], + [], + ], [ "cache_simulator_test", "utilities/simulator_cache/cache_simulator_test.cc", diff --git a/cache/cache_reservation_manager.cc b/cache/cache_reservation_manager.cc new file mode 100644 index 0000000000..d6f62d647e --- /dev/null +++ b/cache/cache_reservation_manager.cc @@ -0,0 +1,128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "cache/cache_reservation_manager.h" + +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "table/block_based/block_based_table_reader.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { +CacheReservationManager::CacheReservationManager(std::shared_ptr cache, + bool delayed_decrease) + : delayed_decrease_(delayed_decrease), cache_allocated_size_(0) { + assert(cache != nullptr); + cache_ = cache; + std::memset(cache_key_, 0, kCacheKeyPrefixSize + kMaxVarint64Length); + EncodeVarint64(cache_key_, cache_->NewId()); +} + +CacheReservationManager::~CacheReservationManager() { + for (auto* handle : dummy_handles_) { + cache_->Release(handle, true); + } +} + +template +Status CacheReservationManager::UpdateCacheReservation( + std::size_t new_mem_used) { + std::size_t cur_cache_allocated_size = + cache_allocated_size_.load(std::memory_order_relaxed); + if (new_mem_used == cur_cache_allocated_size) { + return Status::OK(); + } else if (new_mem_used > cur_cache_allocated_size) { + Status s = IncreaseCacheReservation(new_mem_used); + return s; + } else { + // In delayed decrease mode, we don't decrease cache reservation + // untill the memory usage is less than 3/4 of what we reserve + // in the cache. + // We do this because + // (1) Dummy entry insertion is expensive in block cache + // (2) Delayed releasing previously inserted dummy entries can save such + // expensive dummy entry insertion on memory increase in the near future, + // which is likely to happen when the memory usage is greater than or equal + // to 3/4 of what we reserve + if (delayed_decrease_ && new_mem_used >= cur_cache_allocated_size / 4 * 3) { + return Status::OK(); + } else { + Status s = DecreaseCacheReservation(new_mem_used); + return s; + } + } +} + +// Explicitly instantiate templates for "CacheEntryRole" values we use. +// This makes it possible to keep the template definitions in the .cc file. +template Status CacheReservationManager::UpdateCacheReservation< + CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used); +// For cache reservation manager unit tests +template Status CacheReservationManager::UpdateCacheReservation< + CacheEntryRole::kMisc>(std::size_t new_mem_used); + +template +Status CacheReservationManager::IncreaseCacheReservation( + std::size_t new_mem_used) { + Status return_status = Status::OK(); + while (new_mem_used > cache_allocated_size_.load(std::memory_order_relaxed)) { + Cache::Handle* handle = nullptr; + return_status = cache_->Insert(GetNextCacheKey(), nullptr, kSizeDummyEntry, + GetNoopDeleterForRole(), &handle); + + if (return_status != Status::OK()) { + return return_status; + } + + dummy_handles_.push_back(handle); + cache_allocated_size_ += kSizeDummyEntry; + } + return return_status; +} + +Status CacheReservationManager::DecreaseCacheReservation( + std::size_t new_mem_used) { + Status return_status = Status::OK(); + + // Decrease to the smallest multiple of kSizeDummyEntry that is greater than + // or equal to new_mem_used We do addition instead of new_mem_used <= + // cache_allocated_size_.load(std::memory_order_relaxed) - kSizeDummyEntry to + // avoid underflow of size_t when cache_allocated_size_ = 0 + while (new_mem_used + kSizeDummyEntry <= + cache_allocated_size_.load(std::memory_order_relaxed)) { + assert(!dummy_handles_.empty()); + auto* handle = dummy_handles_.back(); + cache_->Release(handle, true); + dummy_handles_.pop_back(); + cache_allocated_size_ -= kSizeDummyEntry; + } + return return_status; +} + +std::size_t CacheReservationManager::GetTotalReservedCacheSize() { + return cache_allocated_size_.load(std::memory_order_relaxed); +} + +Slice CacheReservationManager::GetNextCacheKey() { + // Calling this function will have the side-effect of changing the + // underlying cache_key_ that is shared among other keys generated from this + // fucntion. Therefore please make sure the previous keys are saved/copied + // before calling this function. + std::memset(cache_key_ + kCacheKeyPrefixSize, 0, kMaxVarint64Length); + char* end = + EncodeVarint64(cache_key_ + kCacheKeyPrefixSize, next_cache_key_id_++); + return Slice(cache_key_, static_cast(end - cache_key_)); +} +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/cache/cache_reservation_manager.h b/cache/cache_reservation_manager.h new file mode 100644 index 0000000000..7c5ccf14ba --- /dev/null +++ b/cache/cache_reservation_manager.h @@ -0,0 +1,97 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "table/block_based/block_based_table_reader.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +// CacheReservationManager is for reserving cache space for the memory used +// through inserting/releasing dummy entries in the cache. +// This class is not thread-safe. +class CacheReservationManager { + public: + // Construct a CacheReservationManager + // @param cache The cache where dummy entries are inserted and released for + // reserving cache space + // @param delayed_decrease If set true, then dummy entries won't be released + // immediately when memory usage decreases. + // Instead, it will be released when the memory usage + // decreases to 3/4 of what we have reserved so far. + // This is for saving some future dummy entry + // insertion when memory usage increases are likely to + // happen in the near future. + explicit CacheReservationManager(std::shared_ptr cache, + bool delayed_decrease = false); + + // no copy constructor, copy assignment, move constructor, move assignment + CacheReservationManager(const CacheReservationManager &) = delete; + CacheReservationManager &operator=(const CacheReservationManager &) = delete; + CacheReservationManager(CacheReservationManager &&) = delete; + CacheReservationManager &operator=(CacheReservationManager &&) = delete; + + ~CacheReservationManager(); + + template + + // Insert and release dummy entries in the cache to + // match the size of total dummy entries with the smallest multiple of + // kSizeDummyEntry that is greater than or equal to new_mem_used + // + // Insert dummy entries if new_memory_used > cache_allocated_size_; + // + // Release dummy entries if new_memory_used < cache_allocated_size_ + // (and new_memory_used < cache_allocated_size_ * 3/4 + // when delayed_decrease is set true); + // + // Keey dummy entries the same if (1) new_memory_used == cache_allocated_size_ + // or (2) new_memory_used is in the interval of + // [cache_allocated_size_ * 3/4, cache_allocated_size) when delayed_decrease + // is set true. + // + // On inserting dummy entries, it returns Status::OK() if all dummy entry + // insertions succeed. Otherwise, it returns the first non-ok status; + // On releasing dummy entries, it always returns Status::OK(). + // On keeping dummy entries the same, it always returns Status::OK(). + Status UpdateCacheReservation(std::size_t new_memory_used); + std::size_t GetTotalReservedCacheSize(); + + private: + static constexpr std::size_t kSizeDummyEntry = 256 * 1024; + // The key will be longer than keys for blocks in SST files so they won't + // conflict. + static const std::size_t kCacheKeyPrefixSize = + BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length; + + Slice GetNextCacheKey(); + template + Status IncreaseCacheReservation(std::size_t new_mem_used); + Status DecreaseCacheReservation(std::size_t new_mem_used); + + std::shared_ptr cache_; + bool delayed_decrease_; + std::atomic cache_allocated_size_; + std::vector dummy_handles_; + std::uint64_t next_cache_key_id_ = 0; + // The non-prefix part will be updated according to the ID to use. + char cache_key_[kCacheKeyPrefixSize + kMaxVarint64Length]; +}; +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/cache/cache_reservation_manager_test.cc b/cache/cache_reservation_manager_test.cc new file mode 100644 index 0000000000..be548d3af3 --- /dev/null +++ b/cache/cache_reservation_manager_test.cc @@ -0,0 +1,411 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "cache/cache_reservation_manager.h" + +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "table/block_based/block_based_table_reader.h" +#include "test_util/testharness.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { +class CacheReservationManagerTest : public ::testing::Test { + protected: + static constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + static constexpr int kNumShardBits = 0; // 2^0 shard + + static constexpr std::size_t kSizeDummyEntry = 256 * 1024; + static const std::size_t kCacheKeyPrefixSize = + BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length; + static constexpr std::size_t kMetaDataChargeOverhead = 10000; + + std::shared_ptr cache = NewLRUCache(kOneGigabyte, kNumShardBits); + std::unique_ptr test_cache_rev_mng; + + CacheReservationManagerTest() { + test_cache_rev_mng.reset(new CacheReservationManager(cache)); + } +}; + +TEST_F(CacheReservationManagerTest, GenerateCacheKey) { + // The first cache reservation manager owning the cache will have + // cache->NewId() = 1 + constexpr std::size_t kCacheNewId = 1; + // The first key generated inside of cache reservation manager will have + // next_cache_key_id = 0 + constexpr std::size_t kCacheKeyId = 0; + + char expected_cache_key[kCacheKeyPrefixSize + kMaxVarint64Length]; + std::memset(expected_cache_key, 0, kCacheKeyPrefixSize + kMaxVarint64Length); + + EncodeVarint64(expected_cache_key, kCacheNewId); + char* end = + EncodeVarint64(expected_cache_key + kCacheKeyPrefixSize, kCacheKeyId); + Slice expected_cache_key_slice( + expected_cache_key, static_cast(end - expected_cache_key)); + + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + Cache::Handle* handle = cache->Lookup(expected_cache_key_slice); + EXPECT_NE(handle, nullptr) + << "Failed to generate the cache key for the dummy entry correctly"; + // Clean up the returned handle from Lookup() to prevent memory leak + cache->Release(handle); +} + +TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) { + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry); + std::size_t initial_pinned_usage = cache->GetPinnedUsage(); + ASSERT_GE(initial_pinned_usage, 1 * kSizeDummyEntry); + ASSERT_LT(initial_pinned_usage, + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to keep cache reservation the same when new_mem_used equals " + "to current cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly when new_mem_used equals to current " + "cache reservation"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to keep underlying dummy entries the same when new_mem_used " + "equals to current cache reservation"; +} + +TEST_F(CacheReservationManagerTest, + IncreaseCacheReservationByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry) + << "Failed to increase underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to increase underlying dummy entries in cache correctly"; +} + +TEST_F(CacheReservationManagerTest, + IncreaseCacheReservationNotByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry + kSizeDummyEntry / 2; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 3 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 3 * kSizeDummyEntry) + << "Failed to increase underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 3 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to increase underlying dummy entries in cache correctly"; +} + +TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, + IncreaseCacheReservationOnFullCache) { + constexpr std::size_t kOneMegabyte = 1024 * 1024; + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneMegabyte; + lo.num_shard_bits = 0; // 2^0 shard + lo.strict_capacity_limit = true; + std::shared_ptr cache = NewLRUCache(lo); + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache)); + + std::size_t new_mem_used = kOneMegabyte + 1; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::Incomplete()) + << "Failed to return status to indicate failure of dummy entry insertion " + "during cache reservation on full cache"; + EXPECT_GE(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly before cache resevation failure happens " + "due to full cache"; + EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte) + << "Failed to bookkeep correctly (i.e, bookkeep only successful dummy " + "entry insertions) when encountering cache resevation failure due to " + "full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + + new_mem_used = kOneMegabyte / 2; // 2 dummy entries + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation after encountering cache " + "reservation failure due to full cache"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly after " + "encountering cache reservation due to full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry) + << "Failed to release underlying dummy entries correctly on cache " + "reservation decrease after encountering cache resevation failure due " + "to full cache"; + EXPECT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to release underlying dummy entries correctly on cache " + "reservation decrease after encountering cache resevation failure due " + "to full cache"; + + // Create cache full again for subsequent tests + new_mem_used = kOneMegabyte + 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::Incomplete()) + << "Failed to return status to indicate failure of dummy entry insertion " + "during cache reservation on full cache"; + EXPECT_GE(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly before cache resevation failure happens " + "due to full cache"; + EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte) + << "Failed to bookkeep correctly (i.e, bookkeep only successful dummy " + "entry insertions) when encountering cache resevation failure due to " + "full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + + // Increase cache capacity so the previously failed insertion can fully + // succeed + cache->SetCapacity(kOneGigabyte); + new_mem_used = kOneMegabyte + 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation after increasing cache capacity " + "and mitigating cache full error"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 5 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly after " + "increasing cache capacity and mitigating cache full error"; + EXPECT_GE(cache->GetPinnedUsage(), 5 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly after increasing " + "cache capacity and mitigating cache full error"; + EXPECT_LT(cache->GetPinnedUsage(), + 5 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to insert underlying dummy entries correctly after increasing " + "cache capacity and mitigating cache full error"; +} + +TEST_F(CacheReservationManagerTest, + DecreaseCacheReservationByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = 1 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache correctly"; +} + +TEST_F(CacheReservationManagerTest, + DecreaseCacheReservationNotByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = kSizeDummyEntry / 2; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache correctly"; +} + +TEST(CacheReservationManagerWithDelayedDecreaseTest, + DecreaseCacheReservationWithDelayedDecrease) { + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneGigabyte; + lo.num_shard_bits = 0; + std::shared_ptr cache = NewLRUCache(lo); + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache, true /* delayed_decrease */)); + + std::size_t new_mem_used = 8 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry); + std::size_t initial_pinned_usage = cache->GetPinnedUsage(); + ASSERT_GE(initial_pinned_usage, 8 * kSizeDummyEntry); + ASSERT_LT(initial_pinned_usage, + 8 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = 6 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry) + << "Failed to bookkeep correctly when delaying cache reservation " + "decrease"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to delay decreasing underlying dummy entries in cache"; + + new_mem_used = 7 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry) + << "Failed to bookkeep correctly when delaying cache reservation " + "decrease"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to delay decreasing underlying dummy entries in cache"; + + new_mem_used = 6 * kSizeDummyEntry - 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly when new_mem_used < " + "GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 6 * kSizeDummyEntry) + << "Failed to bookkeep correctly when new_mem_used < " + "GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode"; + EXPECT_GE(cache->GetPinnedUsage(), 6 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache when " + "new_mem_used < GetTotalReservedCacheSize() * 3 / 4 on delayed " + "decrease mode"; + EXPECT_LT(cache->GetPinnedUsage(), + 6 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache when " + "new_mem_used < GetTotalReservedCacheSize() * 3 / 4 on delayed " + "decrease mode"; +} + +TEST(CacheReservationManagerDestructorTest, + ReleaseRemainingDummyEntriesOnDestruction) { + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneGigabyte; + lo.num_shard_bits = 0; + std::shared_ptr cache = NewLRUCache(lo); + { + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache)); + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + } + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry) + << "Failed to release remaining underlying dummy entries in cache in " + "CacheReservationManager's destructor"; +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 67aef7f8fe..add957d847 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -21,6 +21,7 @@ #include "rocksdb/cache.h" namespace ROCKSDB_NAMESPACE { +class CacheReservationManager; // Interface to block and signal DB instances. // Each DB instance contains ptr to StallInterface. @@ -60,7 +61,7 @@ class WriteBufferManager { bool enabled() const { return buffer_size() > 0; } // Returns true if pointer to cache is passed. - bool cost_to_cache() const { return cache_rep_ != nullptr; } + bool cost_to_cache() const { return cache_rev_mng_ != nullptr; } // Returns the total memory used by memtables. // Only valid if enabled() @@ -73,9 +74,7 @@ class WriteBufferManager { return memory_active_.load(std::memory_order_relaxed); } - size_t dummy_entries_in_cache_usage() const { - return dummy_size_.load(std::memory_order_relaxed); - } + size_t dummy_entries_in_cache_usage() const; // Returns the buffer_size. size_t buffer_size() const { @@ -163,9 +162,10 @@ class WriteBufferManager { std::atomic memory_used_; // Memory that hasn't been scheduled to free. std::atomic memory_active_; - std::atomic dummy_size_; - struct CacheRep; - std::unique_ptr cache_rep_; + std::unique_ptr cache_rev_mng_; + // Protects cache_rev_mng_ + std::mutex cache_rev_mng_mu_; + std::list queue_; // Protects the queue_ std::mutex mu_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index ecbccb82b3..c599b658c1 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -10,46 +10,12 @@ #include "rocksdb/write_buffer_manager.h" #include "cache/cache_entry_roles.h" +#include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" +#include "rocksdb/status.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { -#ifndef ROCKSDB_LITE -namespace { -const size_t kSizeDummyEntry = 256 * 1024; -// The key will be longer than keys for blocks in SST files so they won't -// conflict. -const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1; -} // namespace - -struct WriteBufferManager::CacheRep { - std::shared_ptr cache_; - std::mutex cache_mutex_; - std::atomic cache_allocated_size_; - // The non-prefix part will be updated according to the ID to use. - char cache_key_[kCacheKeyPrefix + kMaxVarint64Length]; - uint64_t next_cache_key_id_ = 0; - std::vector dummy_handles_; - - explicit CacheRep(std::shared_ptr cache) - : cache_(cache), cache_allocated_size_(0) { - memset(cache_key_, 0, kCacheKeyPrefix); - size_t pointer_size = sizeof(const void*); - assert(pointer_size <= kCacheKeyPrefix); - memcpy(cache_key_, static_cast(this), pointer_size); - } - - Slice GetNextCacheKey() { - memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length); - char* end = - EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++); - return Slice(cache_key_, static_cast(end - cache_key_)); - } -}; -#else -struct WriteBufferManager::CacheRep {}; -#endif // ROCKSDB_LITE - WriteBufferManager::WriteBufferManager(size_t _buffer_size, std::shared_ptr cache, bool allow_stall) @@ -57,34 +23,34 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), memory_active_(0), - dummy_size_(0), - cache_rep_(nullptr), + cache_rev_mng_(nullptr), allow_stall_(allow_stall), stall_active_(false) { #ifndef ROCKSDB_LITE if (cache) { - // Construct the cache key using the pointer to this. - cache_rep_.reset(new CacheRep(cache)); + // Memtable's memory usage tends to fluctuate frequently + // therefore we set delayed_decrease = true to save some dummy entry + // insertion on memory increase right after memory decrease + cache_rev_mng_.reset( + new CacheReservationManager(cache, true /* delayed_decrease */)); } #else (void)cache; #endif // ROCKSDB_LITE } -WriteBufferManager::~WriteBufferManager() { -#ifndef ROCKSDB_LITE - if (cache_rep_) { - for (auto* handle : cache_rep_->dummy_handles_) { - if (handle != nullptr) { - cache_rep_->cache_->Release(handle, true); - } - } +WriteBufferManager::~WriteBufferManager() = default; + +std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { + if (cache_rev_mng_ != nullptr) { + return cache_rev_mng_->GetTotalReservedCacheSize(); + } else { + return 0; } -#endif // ROCKSDB_LITE } void WriteBufferManager::ReserveMem(size_t mem) { - if (cache_rep_ != nullptr) { + if (cache_rev_mng_ != nullptr) { ReserveMemWithCache(mem); } else if (enabled()) { memory_used_.fetch_add(mem, std::memory_order_relaxed); @@ -97,32 +63,23 @@ void WriteBufferManager::ReserveMem(size_t mem) { // Should only be called from write thread void WriteBufferManager::ReserveMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE - assert(cache_rep_ != nullptr); + assert(cache_rev_mng_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. - std::lock_guard lock(cache_rep_->cache_mutex_); + std::lock_guard lock(cache_rev_mng_mu_); size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - while (new_mem_used > cache_rep_->cache_allocated_size_) { - // Expand size by at least 256KB. - // Add a dummy record to the cache - Cache::Handle* handle = nullptr; - Status s = cache_rep_->cache_->Insert( - cache_rep_->GetNextCacheKey(), nullptr, kSizeDummyEntry, - GetNoopDeleterForRole(), &handle); - s.PermitUncheckedError(); // TODO: What to do on error? - // We keep the handle even if insertion fails and a null handle is - // returned, so that when memory shrinks, we don't release extra - // entries from cache. - // Ideallly we should prevent this allocation from happening if - // this insertion fails. However, the callers to this code path - // are not able to handle failures properly. We'll need to improve - // it in the future. - cache_rep_->dummy_handles_.push_back(handle); - cache_rep_->cache_allocated_size_ += kSizeDummyEntry; - dummy_size_.fetch_add(kSizeDummyEntry, std::memory_order_relaxed); - } + Status s = + cache_rev_mng_->UpdateCacheReservation( + new_mem_used); + + // We absorb the error since WriteBufferManager is not able to handle + // this failure properly. Ideallly we should prevent this allocation + // from happening if this cache reservation fails. + // [TODO] We'll need to improve it in the future and figure out what to do on + // error + s.PermitUncheckedError(); #else (void)mem; #endif // ROCKSDB_LITE @@ -135,7 +92,7 @@ void WriteBufferManager::ScheduleFreeMem(size_t mem) { } void WriteBufferManager::FreeMem(size_t mem) { - if (cache_rep_ != nullptr) { + if (cache_rev_mng_ != nullptr) { FreeMemWithCache(mem); } else if (enabled()) { memory_used_.fetch_sub(mem, std::memory_order_relaxed); @@ -148,33 +105,21 @@ void WriteBufferManager::FreeMem(size_t mem) { void WriteBufferManager::FreeMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE - assert(cache_rep_ != nullptr); + assert(cache_rev_mng_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. - std::lock_guard lock(cache_rep_->cache_mutex_); + std::lock_guard lock(cache_rev_mng_mu_); size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - // Gradually shrink memory costed in the block cache if the actual - // usage is less than 3/4 of what we reserve from the block cache. - // We do this because: - // 1. we don't pay the cost of the block cache immediately a memtable is - // freed, as block cache insert is expensive; - // 2. eventually, if we walk away from a temporary memtable size increase, - // we make sure shrink the memory costed in block cache over time. - // In this way, we only shrink costed memory showly even there is enough - // margin. - if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 && - cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) { - assert(!cache_rep_->dummy_handles_.empty()); - auto* handle = cache_rep_->dummy_handles_.back(); - // If insert failed, handle is null so we should not release. - if (handle != nullptr) { - cache_rep_->cache_->Release(handle, true); - } - cache_rep_->dummy_handles_.pop_back(); - cache_rep_->cache_allocated_size_ -= kSizeDummyEntry; - dummy_size_.fetch_sub(kSizeDummyEntry, std::memory_order_relaxed); - } + Status s = + cache_rev_mng_->UpdateCacheReservation( + new_mem_used); + + // We absorb the error since WriteBufferManager is not able to handle + // this failure properly. + // [TODO] We'll need to improve it in the future and figure out what to do on + // error + s.PermitUncheckedError(); #else (void)mem; #endif // ROCKSDB_LITE diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index 7e3de41d18..709a723e3c 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -78,6 +78,8 @@ TEST_F(WriteBufferManagerTest, ShouldFlush) { } TEST_F(WriteBufferManagerTest, CacheCost) { + constexpr std::size_t kMetaDataChargeOverhead = 10000; + LRUCacheOptions co; // 1GB cache co.capacity = 1024 * 1024 * 1024; @@ -88,137 +90,206 @@ TEST_F(WriteBufferManagerTest, CacheCost) { std::unique_ptr wbf( new WriteBufferManager(50 * 1024 * 1024, cache)); - // Allocate 333KB will allocate 512KB + // Allocate 333KB will allocate 512KB, memory_used_ = 333KB wbf->ReserveMem(333 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 2 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 2 * 256 * 1024 + 10000); - // 2 dummy entries are added for size 333 kb. + // 2 dummy entries are added for size 333 KB ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 2 * 256 * 1024 + kMetaDataChargeOverhead); - // Allocate another 512KB + // Allocate another 512KB, memory_used_ = 845KB wbf->ReserveMem(512 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + 10000); - // 2 more dummy entries are added for size 512. + // 2 more dummy entries are added for size 512 KB + // since ceil((memory_used_ - dummy_entries_in_cache_usage) % kSizeDummyEntry) + // = 2 ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); - // Allocate another 10MB + // Allocate another 10MB, memory_used_ = 11085KB wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 11 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 11 * 1024 * 1024 + 10000); - // 40 more entries are added for size 10 * 1024 * 1024. + // 40 more entries are added for size 10 * 1024 * 1024 KB ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); - // Free 1MB will not cause any change in cache cost - wbf->FreeMem(1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 11 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 11 * 1024 * 1024 + 10000); + // Free 1MB, memory_used_ = 10061KB + // It will not cause any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + wbf->FreeMem(1 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_FALSE(wbf->ShouldFlush()); - // Allocate another 41MB + // Allocate another 41MB, memory_used_ = 52045KB wbf->ReserveMem(41 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 204 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 204 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 204 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_TRUE(wbf->ShouldFlush()); ASSERT_TRUE(wbf->ShouldFlush()); + // Schedule free 20MB, memory_used_ = 52045KB + // It will not cause any change in memory_used and cache cost wbf->ScheduleFreeMem(20 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 204 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 204 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 204 * 256 * 1024 + kMetaDataChargeOverhead); // Still need flush as the hard limit hits ASSERT_TRUE(wbf->ShouldFlush()); - // Free 20MB will releae 256KB from cache + // Free 20MB, memory_used_ = 31565KB + // It will releae 80 dummy entries from cache since + // since memory_used_ < dummy_entries_in_cache_usage * (3/4) + // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) + // = 80 wbf->FreeMem(20 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 203 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 124 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 124 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 124 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_FALSE(wbf->ShouldFlush()); - // Every free will release 256KB if still not hit 3/4 + // Free 16KB, memory_used_ = 31549KB + // It will not release any dummy entry since memory_used_ >= + // dummy_entries_in_cache_usage * (3/4) wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 2 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 2 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 202 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 124 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 124 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 124 * 256 * 1024 + kMetaDataChargeOverhead); - wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 201 * kSizeDummyEntry); + // Free 20MB, memory_used_ = 11069KB + // It will releae 80 dummy entries from cache + // since memory_used_ < dummy_entries_in_cache_usage * (3/4) + // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) + // = 80 + wbf->FreeMem(20 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); - // Reserve 512KB will not cause any change in cache cost + // Free 1MB, memory_used_ = 10045KB + // It will not cause any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + wbf->FreeMem(1 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); + + // Reserve 512KB, memory_used_ = 10557KB + // It will not casue any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + // which reflects the benefit of saving dummy entry insertion on memory + // reservation after delay decrease wbf->ReserveMem(512 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 201 * kSizeDummyEntry); - - wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 4 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 4 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 200 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); // Destory write buffer manger should free everything wbf.reset(); - ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024); + ASSERT_EQ(cache->GetPinnedUsage(), 0); } TEST_F(WriteBufferManagerTest, NoCapCacheCost) { + constexpr std::size_t kMetaDataChargeOverhead = 10000; // 1GB cache std::shared_ptr cache = NewLRUCache(1024 * 1024 * 1024, 4); // A write buffer manager of size 256MB std::unique_ptr wbf(new WriteBufferManager(0, cache)); - // Allocate 1.5MB will allocate 2MB + + // Allocate 10MB, memory_used_ = 10240KB + // It will allocate 40 dummy entries wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 10 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 40 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 40 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 40 * 256 * 1024 + kMetaDataChargeOverhead); + ASSERT_FALSE(wbf->ShouldFlush()); + // Free 9MB, memory_used_ = 1024KB + // It will free 36 dummy entries wbf->FreeMem(9 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); + + // Free 160KB gradually, memory_used_ = 864KB + // It will not cause any change + // since memory_used_ > dummy_entries_in_cache_usage * 3/4 for (int i = 0; i < 40; i++) { wbf->FreeMem(4 * 1024); } - ASSERT_GE(cache->GetPinnedUsage(), 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); } TEST_F(WriteBufferManagerTest, CacheFull) { - // 15MB cache size with strict capacity + constexpr std::size_t kMetaDataChargeOverhead = 20000; + + // 12MB cache size with strict capacity LRUCacheOptions lo; lo.capacity = 12 * 1024 * 1024; lo.num_shard_bits = 0; lo.strict_capacity_limit = true; std::shared_ptr cache = NewLRUCache(lo); std::unique_ptr wbf(new WriteBufferManager(0, cache)); + + // Allocate 10MB, memory_used_ = 10240KB wbf->ReserveMem(10 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 40 * kSizeDummyEntry); - size_t prev_pinned = cache->GetPinnedUsage(); - ASSERT_GE(prev_pinned, 10 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 40 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 40 * kSizeDummyEntry + kMetaDataChargeOverhead); - // Some insert will fail + // Allocate 10MB, memory_used_ = 20480KB + // Some dummy entry insertion will fail due to full cache wbf->ReserveMem(10 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 40 * kSizeDummyEntry); ASSERT_LE(cache->GetPinnedUsage(), 12 * 1024 * 1024); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); + ASSERT_LT(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); - // Increase capacity so next insert will succeed - cache->SetCapacity(30 * 1024 * 1024); + // Free 15MB after encoutering cache full, memory_used_ = 5120KB + wbf->FreeMem(15 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 20 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 20 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 20 * kSizeDummyEntry + kMetaDataChargeOverhead); + + // Reserve 15MB, creating cache full again, memory_used_ = 20480KB + wbf->ReserveMem(15 * 1024 * 1024); + ASSERT_LE(cache->GetPinnedUsage(), 12 * 1024 * 1024); + ASSERT_LT(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); + + // Increase capacity so next insert will fully succeed + cache->SetCapacity(40 * 1024 * 1024); + + // Allocate 10MB, memory_used_ = 30720KB wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GT(cache->GetPinnedUsage(), 20 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 120 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 120 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 120 * kSizeDummyEntry + kMetaDataChargeOverhead); // Gradually release 20 MB + // It ended up sequentially releasing 32, 24, 18 dummy entries when + // memory_used_ decreases to 22528KB, 16384KB, 11776KB. + // In total, it releases 74 dummy entries for (int i = 0; i < 40; i++) { wbf->FreeMem(512 * 1024); } - ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 20 * 1024 * 1024); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 95 * kSizeDummyEntry); + + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 46 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 46 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 46 * kSizeDummyEntry + kMetaDataChargeOverhead); } #endif // ROCKSDB_LITE diff --git a/src.mk b/src.mk index 28d8380ad5..2da0584951 100644 --- a/src.mk +++ b/src.mk @@ -2,6 +2,7 @@ LIB_SOURCES = \ cache/cache.cc \ cache/cache_entry_roles.cc \ + cache/cache_reservation_manager.cc \ cache/clock_cache.cc \ cache/lru_cache.cc \ cache/sharded_cache.cc \ @@ -384,6 +385,7 @@ BENCH_MAIN_SOURCES = \ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ + cache/cache_reservation_manager_test.cc \ cache/lru_cache_test.cc \ db/blob/blob_counting_iterator_test.cc \ db/blob/blob_file_addition_test.cc \