diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d4dc5004d..f43d668bdc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -598,6 +598,7 @@ set(SOURCES cache/cache_entry_roles.cc cache/cache_key.cc cache/cache_reservation_manager.cc + cache/charged_cache.cc cache/clock_cache.cc cache/compressed_secondary_cache.cc cache/fast_lru_cache.cc diff --git a/HISTORY.md b/HISTORY.md index 330a01bf6b..e6d01636ea 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,9 @@ # Rocksdb Change Log ## Unreleased + * Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies. +* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions. +* Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`. + ### Public API changes * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. @@ -14,9 +18,7 @@ * Added support for blob caching in order to cache frequently used blobs for BlobDB. * User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching. * Either sharing the backend cache with the block cache or using a completely separate cache is supported. - * A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`). - * Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies. - * Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions. + * A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`). * Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete). ### Public API changes diff --git a/TARGETS b/TARGETS index b53c1860a2..a3150c20b3 100644 --- a/TARGETS +++ b/TARGETS @@ -13,6 +13,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "cache/cache_entry_roles.cc", "cache/cache_key.cc", "cache/cache_reservation_manager.cc", + "cache/charged_cache.cc", "cache/clock_cache.cc", "cache/compressed_secondary_cache.cc", "cache/fast_lru_cache.cc", @@ -348,6 +349,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/cache_entry_roles.cc", "cache/cache_key.cc", "cache/cache_reservation_manager.cc", + "cache/charged_cache.cc", "cache/clock_cache.cc", "cache/compressed_secondary_cache.cc", "cache/fast_lru_cache.cc", diff --git a/cache/cache_entry_roles.cc b/cache/cache_entry_roles.cc index 8b373c4164..1aebedd24d 100644 --- a/cache/cache_entry_roles.cc +++ b/cache/cache_entry_roles.cc @@ -23,6 +23,7 @@ std::array kCacheEntryRoleToCamelString{{ "FilterConstruction", "BlockBasedTableReader", "FileMetadata", + "BlobCache", "Misc", }}; @@ -38,6 +39,7 @@ std::array kCacheEntryRoleToHyphenString{{ "filter-construction", "block-based-table-reader", "file-metadata", + "blob-cache", "misc", }}; diff --git a/cache/cache_reservation_manager.cc b/cache/cache_reservation_manager.cc index fb4f2ad50a..53dee5d790 100644 --- a/cache/cache_reservation_manager.cc +++ b/cache/cache_reservation_manager.cc @@ -181,4 +181,5 @@ template class CacheReservationManagerImpl; template class CacheReservationManagerImpl; template class CacheReservationManagerImpl; template class CacheReservationManagerImpl; +template class CacheReservationManagerImpl; } // namespace ROCKSDB_NAMESPACE diff --git a/cache/charged_cache.cc b/cache/charged_cache.cc new file mode 100644 index 0000000000..a9ff969b81 --- /dev/null +++ b/cache/charged_cache.cc @@ -0,0 +1,117 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/charged_cache.h" + +#include "cache/cache_reservation_manager.h" + +namespace ROCKSDB_NAMESPACE { + +ChargedCache::ChargedCache(std::shared_ptr cache, + std::shared_ptr block_cache) + : cache_(cache), + cache_res_mgr_(std::make_shared( + std::make_shared< + CacheReservationManagerImpl>( + block_cache))) {} + +Status ChargedCache::Insert(const Slice& key, void* value, size_t charge, + DeleterFn deleter, Handle** handle, + Priority priority) { + Status s = cache_->Insert(key, value, charge, deleter, handle, priority); + if (s.ok()) { + // Insert may cause the cache entry eviction if the cache is full. So we + // directly call the reservation manager to update the total memory used + // in the cache. + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); + } + return s; +} + +Status ChargedCache::Insert(const Slice& key, void* value, + const CacheItemHelper* helper, size_t charge, + Handle** handle, Priority priority) { + Status s = cache_->Insert(key, value, helper, charge, handle, priority); + if (s.ok()) { + // Insert may cause the cache entry eviction if the cache is full. So we + // directly call the reservation manager to update the total memory used + // in the cache. + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); + } + return s; +} + +Cache::Handle* ChargedCache::Lookup(const Slice& key, Statistics* stats) { + return cache_->Lookup(key, stats); +} + +Cache::Handle* ChargedCache::Lookup(const Slice& key, + const CacheItemHelper* helper, + const CreateCallback& create_cb, + Priority priority, bool wait, + Statistics* stats) { + auto handle = cache_->Lookup(key, helper, create_cb, priority, wait, stats); + // Lookup may promote the KV pair from the secondary cache to the primary + // cache. So we directly call the reservation manager to update the total + // memory used in the cache. + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); + return handle; +} + +bool ChargedCache::Release(Cache::Handle* handle, bool useful, + bool erase_if_last_ref) { + size_t memory_used_delta = cache_->GetUsage(handle); + bool erased = cache_->Release(handle, useful, erase_if_last_ref); + if (erased) { + assert(cache_res_mgr_); + cache_res_mgr_ + ->UpdateCacheReservation(memory_used_delta, /* increase */ false) + .PermitUncheckedError(); + } + return erased; +} + +bool ChargedCache::Release(Cache::Handle* handle, bool erase_if_last_ref) { + size_t memory_used_delta = cache_->GetUsage(handle); + bool erased = cache_->Release(handle, erase_if_last_ref); + if (erased) { + assert(cache_res_mgr_); + cache_res_mgr_ + ->UpdateCacheReservation(memory_used_delta, /* increase */ false) + .PermitUncheckedError(); + } + return erased; +} + +void ChargedCache::Erase(const Slice& key) { + cache_->Erase(key); + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); +} + +void ChargedCache::EraseUnRefEntries() { + cache_->EraseUnRefEntries(); + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); +} + +void ChargedCache::SetCapacity(size_t capacity) { + cache_->SetCapacity(capacity); + // SetCapacity can result in evictions when the cache capacity is decreased, + // so we would want to update the cache reservation here as well. + assert(cache_res_mgr_); + cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage()) + .PermitUncheckedError(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/charged_cache.h b/cache/charged_cache.h new file mode 100644 index 0000000000..1739e40889 --- /dev/null +++ b/cache/charged_cache.h @@ -0,0 +1,121 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "port/port.h" +#include "rocksdb/cache.h" + +namespace ROCKSDB_NAMESPACE { + +class ConcurrentCacheReservationManager; + +// A cache interface which wraps around another cache and takes care of +// reserving space in block cache towards a single global memory limit, and +// forwards all the calls to the underlying cache. +class ChargedCache : public Cache { + public: + ChargedCache(std::shared_ptr cache, + std::shared_ptr block_cache); + ~ChargedCache() override = default; + + Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter, + Handle** handle, Priority priority) override; + Status Insert(const Slice& key, void* value, const CacheItemHelper* helper, + size_t charge, Handle** handle = nullptr, + Priority priority = Priority::LOW) override; + + Cache::Handle* Lookup(const Slice& key, Statistics* stats) override; + Cache::Handle* Lookup(const Slice& key, const CacheItemHelper* helper, + const CreateCallback& create_cb, Priority priority, + bool wait, Statistics* stats = nullptr) override; + + bool Release(Cache::Handle* handle, bool useful, + bool erase_if_last_ref = false) override; + bool Release(Cache::Handle* handle, bool erase_if_last_ref = false) override; + + void Erase(const Slice& key) override; + void EraseUnRefEntries() override; + + static const char* kClassName() { return "ChargedCache"; } + const char* Name() const override { return kClassName(); } + + uint64_t NewId() override { return cache_->NewId(); } + + void SetCapacity(size_t capacity) override; + + void SetStrictCapacityLimit(bool strict_capacity_limit) override { + cache_->SetStrictCapacityLimit(strict_capacity_limit); + } + + bool HasStrictCapacityLimit() const override { + return cache_->HasStrictCapacityLimit(); + } + + void* Value(Cache::Handle* handle) override { return cache_->Value(handle); } + + bool IsReady(Cache::Handle* handle) override { + return cache_->IsReady(handle); + } + + void Wait(Cache::Handle* handle) override { cache_->Wait(handle); } + + void WaitAll(std::vector& handles) override { + cache_->WaitAll(handles); + } + + bool Ref(Cache::Handle* handle) override { return cache_->Ref(handle); } + + size_t GetCapacity() const override { return cache_->GetCapacity(); } + + size_t GetUsage() const override { return cache_->GetUsage(); } + + size_t GetUsage(Cache::Handle* handle) const override { + return cache_->GetUsage(handle); + } + + size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); } + + size_t GetCharge(Cache::Handle* handle) const override { + return cache_->GetCharge(handle); + } + + Cache::DeleterFn GetDeleter(Cache::Handle* handle) const override { + return cache_->GetDeleter(handle); + } + + void ApplyToAllEntries( + const std::function& callback, + const Cache::ApplyToAllEntriesOptions& opts) override { + cache_->ApplyToAllEntries(callback, opts); + } + + void ApplyToAllCacheEntries(void (*callback)(void* value, size_t charge), + bool thread_safe) override { + cache_->ApplyToAllCacheEntries(callback, thread_safe); + } + + std::string GetPrintableOptions() const override { + return cache_->GetPrintableOptions(); + } + + void DisownData() override { return cache_->DisownData(); } + + inline Cache* GetCache() const { return cache_.get(); } + + inline ConcurrentCacheReservationManager* TEST_GetCacheReservationManager() + const { + return cache_res_mgr_.get(); + } + + private: + std::shared_ptr cache_; + std::shared_ptr cache_res_mgr_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_cache.h b/cache/lru_cache.h index f52d7bfc66..67cb97e870 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -484,9 +484,9 @@ class LRUCache virtual void WaitAll(std::vector& handles) override; std::string GetPrintableOptions() const override; - // Retrieves number of elements in LRU, for unit test purpose only. + // Retrieves number of elements in LRU, for unit test purpose only. size_t TEST_GetLRUSize(); - // Retrieves high pri pool ratio. + // Retrieves high pri pool ratio. double GetHighPriPoolRatio(); private: diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 1792038f15..e76ba64f69 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -1480,10 +1480,10 @@ class LRUCacheWithStat : public LRUCache { return LRUCache::Insert(key, value, charge, deleter, handle, priority); } Status Insert(const Slice& key, void* value, const CacheItemHelper* helper, - size_t chargge, Handle** handle = nullptr, + size_t charge, Handle** handle = nullptr, Priority priority = Priority::LOW) override { insert_count_++; - return LRUCache::Insert(key, value, helper, chargge, handle, priority); + return LRUCache::Insert(key, value, helper, charge, handle, priority); } Handle* Lookup(const Slice& key, Statistics* stats) override { lookup_count_++; diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index 6263b24df8..c0bb60a216 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -86,7 +86,7 @@ class ShardedCache : public Cache { DeleterFn deleter, Handle** handle, Priority priority) override; virtual Status Insert(const Slice& key, void* value, - const CacheItemHelper* helper, size_t chargge, + const CacheItemHelper* helper, size_t charge, Handle** handle = nullptr, Priority priority = Priority::LOW) override; virtual Handle* Lookup(const Slice& key, Statistics* stats) override; diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index c02fae9df9..a57d089e52 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -8,6 +8,8 @@ #include #include +#include "cache/cache_reservation_manager.h" +#include "cache/charged_cache.h" #include "db/blob/blob_file_reader.h" #include "db/blob/blob_log_format.h" #include "monitoring/statistics.h" @@ -26,7 +28,18 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options, statistics_(immutable_options->statistics.get()), blob_file_cache_(blob_file_cache), blob_cache_(immutable_options->blob_cache), - lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {} + lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) { +#ifndef ROCKSDB_LITE + auto bbto = + immutable_options->table_factory->GetOptions(); + if (bbto && + bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache) + .charged == CacheEntryRoleOptions::Decision::kEnabled) { + blob_cache_ = std::make_shared(immutable_options->blob_cache, + bbto->block_cache); + } +#endif // ROCKSDB_LITE +} BlobSource::~BlobSource() = default; diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index c45bc17975..4999a31205 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -99,6 +99,8 @@ class BlobSource { blob_file_reader); } + inline Cache* GetBlobCache() const { return blob_cache_.get(); } + bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size, uint64_t offset) const; diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index a96a3a3d62..9f84a190f7 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -11,6 +11,7 @@ #include #include +#include "cache/charged_cache.h" #include "cache/compressed_secondary_cache.h" #include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" @@ -183,9 +184,10 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { FileOptions file_options; constexpr HistogramImpl* blob_file_read_hist = nullptr; - std::unique_ptr blob_file_cache(new BlobFileCache( - backing_cache.get(), &immutable_options, &file_options, column_family_id, - blob_file_read_hist, nullptr /*IOTracer*/)); + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); @@ -481,9 +483,10 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) { auto backing_cache = NewLRUCache(capacity); // Blob file cache FileOptions file_options; - std::unique_ptr blob_file_cache(new BlobFileCache( - backing_cache.get(), &immutable_options, &file_options, column_family_id, - nullptr /*HistogramImpl*/, nullptr /*IOTracer*/)); + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/); BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); @@ -625,9 +628,10 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) { FileOptions file_options; constexpr HistogramImpl* blob_file_read_hist = nullptr; - std::unique_ptr blob_file_cache(new BlobFileCache( - backing_cache.get(), &immutable_options, &file_options, column_family_id, - blob_file_read_hist, nullptr /*IOTracer*/)); + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); @@ -807,9 +811,10 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { FileOptions file_options; constexpr HistogramImpl* blob_file_read_hist = nullptr; - std::unique_ptr blob_file_cache(new BlobFileCache( - backing_cache.get(), &immutable_options, &file_options, column_family_id, - blob_file_read_hist, nullptr /*IOTracer*/)); + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); @@ -1258,6 +1263,270 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { } } +class BlobSourceCacheReservationTest : public DBTestBase { + public: + explicit BlobSourceCacheReservationTest() + : DBTestBase("blob_source_cache_reservation_test", + /*env_do_fsync=*/true) { + options_.env = env_; + options_.enable_blob_files = true; + options_.create_if_missing = true; + + LRUCacheOptions co; + co.capacity = kCacheCapacity; + co.num_shard_bits = kNumShardBits; + co.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr blob_cache = NewLRUCache(co); + std::shared_ptr block_cache = NewLRUCache(co); + + options_.blob_cache = blob_cache; + options_.lowest_used_cache_tier = CacheTier::kVolatileTier; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = block_cache; + block_based_options.cache_usage_options.options_overrides.insert( + {CacheEntryRole::kBlobCache, + {/* charged = */ CacheEntryRoleOptions::Decision::kEnabled}}); + options_.table_factory.reset( + NewBlockBasedTableFactory(block_based_options)); + + assert(db_->GetDbIdentity(db_id_).ok()); + assert(db_->GetDbSessionId(db_session_id_).ok()); + } + + void GenerateKeysAndBlobs() { + for (size_t i = 0; i < kNumBlobs; ++i) { + key_strs_.push_back("key" + std::to_string(i)); + blob_strs_.push_back("blob" + std::to_string(i)); + } + + blob_file_size_ = BlobLogHeader::kSize; + for (size_t i = 0; i < kNumBlobs; ++i) { + keys_.push_back({key_strs_[i]}); + blobs_.push_back({blob_strs_[i]}); + blob_file_size_ += + BlobLogRecord::kHeaderSize + keys_[i].size() + blobs_[i].size(); + } + blob_file_size_ += BlobLogFooter::kSize; + } + + static constexpr std::size_t kSizeDummyEntry = CacheReservationManagerImpl< + CacheEntryRole::kBlobCache>::GetDummyEntrySize(); + static constexpr std::size_t kCacheCapacity = 1 * kSizeDummyEntry; + static constexpr int kNumShardBits = 0; // 2^0 shard + + static constexpr uint32_t kColumnFamilyId = 1; + static constexpr bool kHasTTL = false; + static constexpr uint64_t kBlobFileNumber = 1; + static constexpr size_t kNumBlobs = 16; + + std::vector keys_; + std::vector blobs_; + std::vector key_strs_; + std::vector blob_strs_; + uint64_t blob_file_size_; + + Options options_; + std::string db_id_; + std::string db_session_id_; +}; + +#ifndef ROCKSDB_LITE +TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) { + options_.cf_paths.emplace_back( + test::PerThreadDBPath( + env_, "BlobSourceCacheReservationTest_SimpleCacheReservation"), + 0); + + GenerateKeysAndBlobs(); + + DestroyAndReopen(options_); + + ImmutableOptions immutable_options(options_); + + constexpr ExpirationRange expiration_range; + + std::vector blob_offsets(keys_.size()); + std::vector blob_sizes(keys_.size()); + + WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range, + expiration_range, kBlobFileNumber, keys_, blobs_, + kNoCompression, blob_offsets, blob_sizes); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = NewLRUCache(capacity); + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/); + + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, + blob_file_cache.get()); + + ConcurrentCacheReservationManager* cache_res_mgr = + static_cast(blob_source.GetBlobCache()) + ->TEST_GetCacheReservationManager(); + ASSERT_NE(cache_res_mgr, nullptr); + + ReadOptions read_options; + read_options.verify_checksums = true; + + std::vector values(keys_.size()); + + { + read_options.fill_cache = false; + + for (size_t i = 0; i < kNumBlobs; ++i) { + ASSERT_OK(blob_source.GetBlob( + read_options, keys_[i], kBlobFileNumber, blob_offsets[i], + blob_file_size_, blob_sizes[i], kNoCompression, + nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0); + } + } + + { + read_options.fill_cache = true; + + // num_blobs is 16, so the total blob cache usage is less than a single + // dummy entry. Therefore, cache reservation manager only reserves one dummy + // entry here. + uint64_t blob_bytes = 0; + for (size_t i = 0; i < kNumBlobs; ++i) { + ASSERT_OK(blob_source.GetBlob( + read_options, keys_[i], kBlobFileNumber, blob_offsets[i], + blob_file_size_, blob_sizes[i], kNoCompression, + nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); + blob_bytes += blob_sizes[i]; + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), + options_.blob_cache->GetUsage()); + } + } + + { + OffsetableCacheKey base_cache_key(db_id_, db_session_id_, kBlobFileNumber, + blob_file_size_); + size_t blob_bytes = options_.blob_cache->GetUsage(); + + for (size_t i = 0; i < kNumBlobs; ++i) { + CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[i]); + // We didn't call options_.blob_cache->Erase() here, this is because + // the cache wrapper's Erase() method must be called to update the + // cache usage after erasing the cache entry. + blob_source.GetBlobCache()->Erase(cache_key.AsSlice()); + if (i == kNumBlobs - 1) { + // The last blob is not in the cache. cache_res_mgr should not reserve + // any space for it. + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); + } else { + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); + } + blob_bytes -= blob_sizes[i]; + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), + options_.blob_cache->GetUsage()); + } + } +} + +TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) { + options_.cf_paths.emplace_back( + test::PerThreadDBPath( + env_, + "BlobSourceCacheReservationTest_IncreaseCacheReservationOnFullCache"), + 0); + + GenerateKeysAndBlobs(); + + DestroyAndReopen(options_); + + ImmutableOptions immutable_options(options_); + constexpr size_t blob_size = kSizeDummyEntry / (kNumBlobs / 2); + for (size_t i = 0; i < kNumBlobs; ++i) { + blob_file_size_ -= blobs_[i].size(); // old blob size + blob_strs_[i].resize(blob_size, '@'); + blobs_[i] = Slice(blob_strs_[i]); + blob_file_size_ += blobs_[i].size(); // new blob size + } + + std::vector blob_offsets(keys_.size()); + std::vector blob_sizes(keys_.size()); + + constexpr ExpirationRange expiration_range; + WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range, + expiration_range, kBlobFileNumber, keys_, blobs_, + kNoCompression, blob_offsets, blob_sizes); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = NewLRUCache(capacity); + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache = + std::make_unique( + backing_cache.get(), &immutable_options, &file_options, + kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/); + + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, + blob_file_cache.get()); + + ConcurrentCacheReservationManager* cache_res_mgr = + static_cast(blob_source.GetBlobCache()) + ->TEST_GetCacheReservationManager(); + ASSERT_NE(cache_res_mgr, nullptr); + + ReadOptions read_options; + read_options.verify_checksums = true; + + std::vector values(keys_.size()); + + { + read_options.fill_cache = false; + + for (size_t i = 0; i < kNumBlobs; ++i) { + ASSERT_OK(blob_source.GetBlob( + read_options, keys_[i], kBlobFileNumber, blob_offsets[i], + blob_file_size_, blob_sizes[i], kNoCompression, + nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0); + } + } + + { + read_options.fill_cache = true; + + // Since we resized each blob to be kSizeDummyEntry / (num_blobs/ 2), we + // should observe cache eviction for the second half blobs. + uint64_t blob_bytes = 0; + for (size_t i = 0; i < kNumBlobs; ++i) { + ASSERT_OK(blob_source.GetBlob( + read_options, keys_[i], kBlobFileNumber, blob_offsets[i], + blob_file_size_, blob_sizes[i], kNoCompression, + nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); + blob_bytes += blob_sizes[i]; + ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); + if (i >= kNumBlobs / 2) { + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), kSizeDummyEntry); + } else { + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); + } + ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), + options_.blob_cache->GetUsage()); + } + } +} +#endif // ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index c445f1e9d1..494dc5a3fd 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -141,6 +141,7 @@ DECLARE_bool(charge_compression_dictionary_building_buffer); DECLARE_bool(charge_filter_construction); DECLARE_bool(charge_table_reader); DECLARE_bool(charge_file_metadata); +DECLARE_bool(charge_blob_cache); DECLARE_int32(top_level_index_pinning); DECLARE_int32(partition_pinning); DECLARE_int32(unpartitioned_pinning); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 5bfccd2cd0..0bb5fb4996 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -316,24 +316,29 @@ DEFINE_bool(cache_index_and_filter_blocks, false, DEFINE_bool(charge_compression_dictionary_building_buffer, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kCompressionDictionaryBuildingBuffer"); DEFINE_bool(charge_filter_construction, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kFilterConstruction"); DEFINE_bool(charge_table_reader, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kBlockBasedTableReader"); DEFINE_bool(charge_file_metadata, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "kFileMetadata"); +DEFINE_bool(charge_blob_cache, false, + "Setting for " + "CacheEntryRoleOptions::charged of " + "kBlobCache"); + DEFINE_int32( top_level_index_pinning, static_cast(ROCKSDB_NAMESPACE::PinningTier::kFallback), diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3847d6c8a0..833ae9fc88 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2907,6 +2907,11 @@ void InitializeOptionsFromFlags( {/*.charged = */ FLAGS_charge_file_metadata ? CacheEntryRoleOptions::Decision::kEnabled : CacheEntryRoleOptions::Decision::kDisabled}}); + block_based_options.cache_usage_options.options_overrides.insert( + {CacheEntryRole::kBlobCache, + {/*.charged = */ FLAGS_charge_blob_cache + ? CacheEntryRoleOptions::Decision::kEnabled + : CacheEntryRoleOptions::Decision::kDisabled}}); block_based_options.format_version = static_cast(FLAGS_format_version); block_based_options.index_block_restart_interval = diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index ddc52613d3..3010882596 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -571,12 +571,12 @@ enum class CacheEntryRole { // Filter's charge to account for // (new) bloom and ribbon filter construction's memory usage kFilterConstruction, - // BlockBasedTableReader's charge to account for - // its memory usage + // BlockBasedTableReader's charge to account for its memory usage kBlockBasedTableReader, - // FileMetadata's charge to account for - // its memory usage + // FileMetadata's charge to account for its memory usage kFileMetadata, + // Blob cache's charge to account for its memory usage + kBlobCache, // Default bucket, for miscellaneous cache entries. Do not use for // entries that could potentially add up to large usage. kMisc, diff --git a/src.mk b/src.mk index dc7b426edb..2dcf525250 100644 --- a/src.mk +++ b/src.mk @@ -4,6 +4,7 @@ LIB_SOURCES = \ cache/cache_entry_roles.cc \ cache/cache_key.cc \ cache/cache_reservation_manager.cc \ + cache/charged_cache.cc \ cache/clock_cache.cc \ cache/fast_lru_cache.cc \ cache/lru_cache.cc \ diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index cc7caae115..aa936ea831 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -695,12 +695,13 @@ Status BlockBasedTableFactory::ValidateOptions( static const std::set kMemoryChargingSupported = { CacheEntryRole::kCompressionDictionaryBuildingBuffer, CacheEntryRole::kFilterConstruction, - CacheEntryRole::kBlockBasedTableReader, CacheEntryRole::kFileMetadata}; + CacheEntryRole::kBlockBasedTableReader, CacheEntryRole::kFileMetadata, + CacheEntryRole::kBlobCache}; if (options.charged != CacheEntryRoleOptions::Decision::kFallback && kMemoryChargingSupported.count(role) == 0) { return Status::NotSupported( "Enable/Disable CacheEntryRoleOptions::charged" - "for CacheEntryRole " + + " for CacheEntryRole " + kCacheEntryRoleToCamelString[static_cast(role)] + " is not supported"); } @@ -708,10 +709,42 @@ Status BlockBasedTableFactory::ValidateOptions( options.charged == CacheEntryRoleOptions::Decision::kEnabled) { return Status::InvalidArgument( "Enable CacheEntryRoleOptions::charged" - "for CacheEntryRole " + + " for CacheEntryRole " + kCacheEntryRoleToCamelString[static_cast(role)] + " but block cache is disabled"); } + if (role == CacheEntryRole::kBlobCache && + options.charged == CacheEntryRoleOptions::Decision::kEnabled) { + if (cf_opts.blob_cache == nullptr) { + return Status::InvalidArgument( + "Enable CacheEntryRoleOptions::charged" + " for CacheEntryRole " + + kCacheEntryRoleToCamelString[static_cast(role)] + + " but blob cache is not configured"); + } + if (table_options_.no_block_cache) { + return Status::InvalidArgument( + "Enable CacheEntryRoleOptions::charged" + " for CacheEntryRole " + + kCacheEntryRoleToCamelString[static_cast(role)] + + " but block cache is disabled"); + } + if (table_options_.block_cache == cf_opts.blob_cache) { + return Status::InvalidArgument( + "Enable CacheEntryRoleOptions::charged" + " for CacheEntryRole " + + kCacheEntryRoleToCamelString[static_cast(role)] + + " but blob cache is the same as block cache"); + } + if (cf_opts.blob_cache->GetCapacity() > + table_options_.block_cache->GetCapacity()) { + return Status::InvalidArgument( + "Enable CacheEntryRoleOptions::charged" + " for CacheEntryRole " + + kCacheEntryRoleToCamelString[static_cast(role)] + + " but blob cache capacity is larger than block cache capacity"); + } + } } { Status s = CheckCacheOptionCompatibility(table_options_); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 3df9fcb78b..f0b5eb493f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1170,24 +1170,29 @@ DEFINE_bool(async_io, false, DEFINE_bool(charge_compression_dictionary_building_buffer, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kCompressionDictionaryBuildingBuffer"); DEFINE_bool(charge_filter_construction, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kFilterConstruction"); DEFINE_bool(charge_table_reader, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kBlockBasedTableReader"); DEFINE_bool(charge_file_metadata, false, "Setting for " - "CacheEntryRoleOptions::charged of" + "CacheEntryRoleOptions::charged of " "CacheEntryRole::kFileMetadata"); +DEFINE_bool(charge_blob_cache, false, + "Setting for " + "CacheEntryRoleOptions::charged of " + "CacheEntryRole::kBlobCache"); + DEFINE_uint64(backup_rate_limit, 0ull, "If non-zero, db_bench will rate limit reads and writes for DB " "backup. This " @@ -4294,6 +4299,11 @@ class Benchmark { {/*.charged = */ FLAGS_charge_file_metadata ? CacheEntryRoleOptions::Decision::kEnabled : CacheEntryRoleOptions::Decision::kDisabled}}); + block_based_options.cache_usage_options.options_overrides.insert( + {CacheEntryRole::kBlobCache, + {/*.charged = */ FLAGS_charge_blob_cache + ? CacheEntryRoleOptions::Decision::kEnabled + : CacheEntryRoleOptions::Decision::kDisabled}}); block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; block_based_options.block_restart_interval = FLAGS_block_restart_interval; @@ -4369,6 +4379,46 @@ class Benchmark { #endif } + + if (FLAGS_use_blob_cache) { + if (FLAGS_use_shared_block_and_blob_cache) { + options.blob_cache = cache_; + } else { + if (FLAGS_blob_cache_size > 0) { + LRUCacheOptions co; + co.capacity = FLAGS_blob_cache_size; + co.num_shard_bits = FLAGS_blob_cache_numshardbits; + options.blob_cache = NewLRUCache(co); + } else { + fprintf( + stderr, + "Unable to create a standalone blob cache if blob_cache_size " + "<= 0.\n"); + exit(1); + } + } + switch (FLAGS_prepopulate_blob_cache) { + case 0: + options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable; + break; + case 1: + options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; + break; + default: + fprintf(stderr, "Unknown prepopulate blob cache mode\n"); + exit(1); + } + fprintf(stdout, + "Integrated BlobDB: blob cache enabled, block and blob caches " + "shared: %d, blob cache size %" PRIu64 + ", blob cache num shard bits: %d, hot/warm blobs prepopulated: " + "%d\n", + FLAGS_use_shared_block_and_blob_cache, FLAGS_blob_cache_size, + FLAGS_blob_cache_numshardbits, FLAGS_prepopulate_blob_cache); + } else { + fprintf(stdout, "Integrated BlobDB: blob cache disabled\n"); + } + options.table_factory.reset( NewBlockBasedTableFactory(block_based_options)); } @@ -4512,44 +4562,6 @@ class Benchmark { FLAGS_blob_compaction_readahead_size; options.blob_file_starting_level = FLAGS_blob_file_starting_level; - if (FLAGS_use_blob_cache) { - if (FLAGS_use_shared_block_and_blob_cache) { - options.blob_cache = cache_; - } else { - if (FLAGS_blob_cache_size > 0) { - LRUCacheOptions co; - co.capacity = FLAGS_blob_cache_size; - co.num_shard_bits = FLAGS_blob_cache_numshardbits; - options.blob_cache = NewLRUCache(co); - } else { - fprintf(stderr, - "Unable to create a standalone blob cache if blob_cache_size " - "<= 0.\n"); - exit(1); - } - } - switch (FLAGS_prepopulate_blob_cache) { - case 0: - options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable; - break; - case 1: - options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly; - break; - default: - fprintf(stderr, "Unknown prepopulate blob cache mode\n"); - exit(1); - } - fprintf( - stdout, - "Integrated BlobDB: blob cache enabled, block and blob caches " - "shared: %d, blob cache size %" PRIu64 - ", blob cache num shard bits: %d, hot/warm blobs prepopulated: %d\n", - FLAGS_use_shared_block_and_blob_cache, FLAGS_blob_cache_size, - FLAGS_blob_cache_numshardbits, FLAGS_prepopulate_blob_cache); - } else { - fprintf(stdout, "Integrated BlobDB: blob cache disabled\n"); - } - #ifndef ROCKSDB_LITE if (FLAGS_readonly && FLAGS_transaction_db) { fprintf(stderr, "Cannot use readonly flag with transaction_db\n");