From e079d562af6446c082e78a92fc355c1ccfb07ad5 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Mon, 21 Nov 2022 16:17:36 -0800 Subject: [PATCH] Add a SecondaryCache::InsertSaved() API, use in CacheDumper impl (#10945) Summary: Can simplify some ugly code in cache_dump_load_impl.cc by having an API in SecondaryCache that can directly consume persisted data. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10945 Test Plan: existing tests for CacheDumper, added basic unit test Reviewed By: anand1976 Differential Revision: D41231497 Pulled By: pdillinger fbshipit-source-id: b8ec993ef7d3e7efd68aae8602fd3f858da58068 --- CMakeLists.txt | 1 + HISTORY.md | 1 + TARGETS | 2 + cache/lru_cache_test.cc | 23 ++++++++-- cache/secondary_cache.cc | 32 +++++++++++++ include/rocksdb/secondary_cache.h | 32 ++++++++++--- src.mk | 1 + utilities/cache_dump_load_impl.cc | 74 +++---------------------------- utilities/cache_dump_load_impl.h | 4 +- 9 files changed, 92 insertions(+), 78 deletions(-) create mode 100644 cache/secondary_cache.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index f7173e9fe8..1c716874a3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -654,6 +654,7 @@ set(SOURCES cache/clock_cache.cc cache/compressed_secondary_cache.cc cache/lru_cache.cc + cache/secondary_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc db/blob/blob_contents.cc diff --git a/HISTORY.md b/HISTORY.md index 97fa172d5c..8afd86a20b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -21,6 +21,7 @@ ### Public API Changes * Marked `block_cache_compressed` as a deprecated feature. Use SecondaryCache instead. +* Added a `SecondaryCache::InsertSaved()` API, with default implementation depending on `Insert()`. Some implementations might need to add a custom implementation of `InsertSaved()`. (Details in API comments.) ## 7.8.0 (10/22/2022) ### New Features diff --git a/TARGETS b/TARGETS index 4f63f75d5a..762afa3516 100644 --- a/TARGETS +++ b/TARGETS @@ -17,6 +17,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "cache/clock_cache.cc", "cache/compressed_secondary_cache.cc", "cache/lru_cache.cc", + "cache/secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_contents.cc", @@ -356,6 +357,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/clock_cache.cc", "cache/compressed_secondary_cache.cc", "cache/lru_cache.cc", + "cache/secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_contents.cc", diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 4721980cc5..7904a196d6 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -1132,14 +1132,19 @@ TEST_F(LRUCacheSecondaryCacheTest, BasicTest) { nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); std::shared_ptr secondary_cache = - std::make_shared(2048); + std::make_shared(4096); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); std::shared_ptr stats = CreateDBStatistics(); CacheKey k1 = CacheKey::CreateUniqueForCacheLifetime(cache.get()); CacheKey k2 = CacheKey::CreateUniqueForCacheLifetime(cache.get()); + CacheKey k3 = CacheKey::CreateUniqueForCacheLifetime(cache.get()); Random rnd(301); + // Start with warming k3 + std::string str3 = rnd.RandomString(1021); + ASSERT_OK(secondary_cache->InsertSaved(k3.AsSlice(), str3)); + std::string str1 = rnd.RandomString(1020); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert(k1.AsSlice(), item1, @@ -1156,15 +1161,27 @@ TEST_F(LRUCacheSecondaryCacheTest, BasicTest) { cache->Lookup(k2.AsSlice(), &LRUCacheSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); ASSERT_NE(handle, nullptr); + ASSERT_EQ(static_cast(cache->Value(handle))->Size(), str2.size()); cache->Release(handle); + // This lookup should promote k1 and demote k2 handle = cache->Lookup(k1.AsSlice(), &LRUCacheSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); ASSERT_NE(handle, nullptr); + ASSERT_EQ(static_cast(cache->Value(handle))->Size(), str1.size()); cache->Release(handle); - ASSERT_EQ(secondary_cache->num_inserts(), 2u); - ASSERT_EQ(secondary_cache->num_lookups(), 1u); + + // This lookup should promote k3 and demote k1 + handle = + cache->Lookup(k3.AsSlice(), &LRUCacheSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, stats.get()); + ASSERT_NE(handle, nullptr); + ASSERT_EQ(static_cast(cache->Value(handle))->Size(), str3.size()); + cache->Release(handle); + + ASSERT_EQ(secondary_cache->num_inserts(), 3u); + ASSERT_EQ(secondary_cache->num_lookups(), 2u); ASSERT_EQ(stats->getTickerCount(SECONDARY_CACHE_HITS), secondary_cache->num_lookups()); PerfContext perf_ctx = *get_perf_context(); diff --git a/cache/secondary_cache.cc b/cache/secondary_cache.cc new file mode 100644 index 0000000000..84352db71b --- /dev/null +++ b/cache/secondary_cache.cc @@ -0,0 +1,32 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/secondary_cache.h" + +#include "cache/cache_entry_roles.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +size_t SliceSize(void* obj) { return static_cast(obj)->size(); } + +Status SliceSaveTo(void* from_obj, size_t from_offset, size_t length, + void* out) { + const Slice& slice = *static_cast(from_obj); + std::memcpy(out, slice.data() + from_offset, length); + return Status::OK(); +} + +} // namespace + +Status SecondaryCache::InsertSaved(const Slice& key, const Slice& saved) { + static Cache::CacheItemHelper helper{ + &SliceSize, &SliceSaveTo, GetNoopDeleterForRole()}; + // NOTE: depends on Insert() being synchronous, not keeping pointer `&saved` + return Insert(key, const_cast(&saved), &helper); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h index 3e637efb3f..a6a8c8b1d5 100644 --- a/include/rocksdb/secondary_cache.h +++ b/include/rocksdb/secondary_cache.h @@ -56,15 +56,37 @@ class SecondaryCache : public Customizable { const std::string& id, std::shared_ptr* result); - // Insert the given value into this cache. The value is not written - // directly. Rather, the SaveToCallback provided by helper_cb will be - // used to extract the persistable data in value, which will be written + // Insert the given value into this cache. Ownership of `value` is + // transferred to the callee, who is reponsible for deleting the value + // with helper->del_cb if del_cb is not nullptr. Unlike Cache::Insert(), + // the callee is responsible for such cleanup even in case of non-OK + // Status. + // Typically, the value is not saved directly but the implementation + // uses the SaveToCallback provided by helper to extract value's + // persistable data (typically uncompressed block), which will be written // to this tier. The implementation may or may not write it to cache - // depending on the admission control policy, even if the return status is - // success. + // depending on the admission control policy, even if the return status + // is success (OK). + // + // If the implementation is asynchronous or otherwise uses `value` after + // the call returns, then InsertSaved() must be overridden not to rely on + // Insert(). For example, there could be a "holding area" in memory where + // Lookup() might return the same parsed value back. But more typically, if + // the implementation only uses `value` for getting persistable data during + // the call, then the default implementation of `InsertSaved()` suffices. virtual Status Insert(const Slice& key, void* value, const Cache::CacheItemHelper* helper) = 0; + // Insert a value from its saved/persistable data (typically uncompressed + // block), as if generated by SaveToCallback/SizeCallback. This can be used + // in "warming up" the cache from some auxiliary source, and like Insert() + // may or may not write it to cache depending on the admission control + // policy, even if the return status is success. + // + // The default implementation assumes synchronous, non-escaping Insert(), + // wherein `value` is not used after return of Insert(). See Insert(). + virtual Status InsertSaved(const Slice& key, const Slice& saved); + // Lookup the data for the given key in this cache. The create_cb // will be used to create the object. The handle returned may not be // ready yet, unless wait=true, in which case Lookup() will block until diff --git a/src.mk b/src.mk index f00278ec04..0269fe1d33 100644 --- a/src.mk +++ b/src.mk @@ -8,6 +8,7 @@ LIB_SOURCES = \ cache/clock_cache.cc \ cache/lru_cache.cc \ cache/compressed_secondary_cache.cc \ + cache/secondary_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ db/blob/blob_contents.cc \ diff --git a/utilities/cache_dump_load_impl.cc b/utilities/cache_dump_load_impl.cc index c0ce8dede9..2b9f2a29d2 100644 --- a/utilities/cache_dump_load_impl.cc +++ b/utilities/cache_dump_load_impl.cc @@ -280,7 +280,7 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { // Step 3: read out the rest of the blocks from the reader. The loop will stop // either I/O status is not ok or we reach to the the end. - while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) { + while (io_s.ok()) { dump_unit.reset(); data.clear(); // read the content and store in the dump_unit @@ -288,74 +288,14 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { if (!io_s.ok()) { break; } + if (dump_unit.type == CacheDumpUnitType::kFooter) { + break; + } // Create the uncompressed_block based on the information in the dump_unit // (There is no block trailer here compatible with block-based SST file.) - BlockContents uncompressed_block( - Slice(static_cast(dump_unit.value), dump_unit.value_len)); - Cache::CacheItemHelper* helper = nullptr; - Statistics* statistics = nullptr; - Status s = Status::OK(); - // according to the block type, get the helper callback function and create - // the corresponding block - switch (dump_unit.type) { - case CacheDumpUnitType::kFilter: { - helper = BlocklikeTraits::GetCacheItemHelper( - BlockType::kFilter); - std::unique_ptr block_holder; - block_holder.reset(BlocklikeTraits::Create( - std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, - statistics, false, toptions_.filter_policy.get())); - if (helper != nullptr) { - s = secondary_cache_->Insert(dump_unit.key, - (void*)(block_holder.get()), helper); - } - break; - } - case CacheDumpUnitType::kData: { - helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kData); - std::unique_ptr block_holder; - block_holder.reset(BlocklikeTraits::Create( - std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, - statistics, false, toptions_.filter_policy.get())); - if (helper != nullptr) { - s = secondary_cache_->Insert(dump_unit.key, - (void*)(block_holder.get()), helper); - } - break; - } - case CacheDumpUnitType::kIndex: { - helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kIndex); - std::unique_ptr block_holder; - block_holder.reset(BlocklikeTraits::Create( - std::move(uncompressed_block), 0, statistics, false, - toptions_.filter_policy.get())); - if (helper != nullptr) { - s = secondary_cache_->Insert(dump_unit.key, - (void*)(block_holder.get()), helper); - } - break; - } - case CacheDumpUnitType::kFilterMetaBlock: { - helper = BlocklikeTraits::GetCacheItemHelper( - BlockType::kFilterPartitionIndex); - std::unique_ptr block_holder; - block_holder.reset(BlocklikeTraits::Create( - std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, - statistics, false, toptions_.filter_policy.get())); - if (helper != nullptr) { - s = secondary_cache_->Insert(dump_unit.key, - (void*)(block_holder.get()), helper); - } - break; - } - case CacheDumpUnitType::kFooter: - break; - case CacheDumpUnitType::kDeprecatedFilterBlock: - // Obsolete - break; - default: - continue; - } + Slice content = + Slice(static_cast(dump_unit.value), dump_unit.value_len); + Status s = secondary_cache_->InsertSaved(dump_unit.key, content); if (!s.ok()) { io_s = status_to_io_status(std::move(s)); } diff --git a/utilities/cache_dump_load_impl.h b/utilities/cache_dump_load_impl.h index f45b3360b1..9ca1ff45aa 100644 --- a/utilities/cache_dump_load_impl.h +++ b/utilities/cache_dump_load_impl.h @@ -128,11 +128,10 @@ class CacheDumperImpl : public CacheDumper { class CacheDumpedLoaderImpl : public CacheDumpedLoader { public: CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options, - const BlockBasedTableOptions& toptions, + const BlockBasedTableOptions& /*toptions*/, const std::shared_ptr& secondary_cache, std::unique_ptr&& reader) : options_(dump_options), - toptions_(toptions), secondary_cache_(secondary_cache), reader_(std::move(reader)) {} ~CacheDumpedLoaderImpl() {} @@ -145,7 +144,6 @@ class CacheDumpedLoaderImpl : public CacheDumpedLoader { IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit); CacheDumpOptions options_; - const BlockBasedTableOptions& toptions_; std::shared_ptr secondary_cache_; std::unique_ptr reader_; UnorderedMap role_map_;