From f706a9c199118fdf57031d57ebb975764bcebcd3 Mon Sep 17 00:00:00 2001 From: Bo Wang Date: Wed, 23 Feb 2022 16:06:27 -0800 Subject: [PATCH] Add a secondary cache implementation based on LRUCache 1 (#9518) Summary: **Summary:** RocksDB uses a block cache to reduce IO and make queries more efficient. The block cache is based on the LRU algorithm (LRUCache) and keeps objects containing uncompressed data, such as Block, ParsedFullFilterBlock etc. It allows the user to configure a second level cache (rocksdb::SecondaryCache) to extend the primary block cache by holding items evicted from it. Some of the major RocksDB users, like MyRocks, use direct IO and would like to use a primary block cache for uncompressed data and a secondary cache for compressed data. The latter allows us to mitigate the loss of the Linux page cache due to direct IO. This PR includes a concrete implementation of rocksdb::SecondaryCache that integrates with compression libraries such as LZ4 and implements an LRU cache to hold compressed blocks. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9518 Test Plan: In this PR, the lru_secondary_cache_test.cc includes the following tests: 1. The unit tests for the secondary cache with either compression or no compression, such as basic tests, fails tests. 2. The integration tests with both primary cache and this secondary cache . **Follow Up:** 1. Statistics (e.g. compression ratio) will be added in another PR. 2. Once this implementation is ready, I will do some shadow testing and benchmarking with UDB to measure the impact. Reviewed By: anand1976 Differential Revision: D34430930 Pulled By: gitbw95 fbshipit-source-id: 218d78b672a2f914856d8a90ff32f2f5b5043ded --- CMakeLists.txt | 2 + Makefile | 3 + TARGETS | 8 + cache/lru_secondary_cache.cc | 168 +++++++++ cache/lru_secondary_cache.h | 85 +++++ cache/lru_secondary_cache_test.cc | 597 ++++++++++++++++++++++++++++++ include/rocksdb/cache.h | 48 +++ src.mk | 4 +- tools/db_bench_tool.cc | 50 +++ 9 files changed, 964 insertions(+), 1 deletion(-) create mode 100644 cache/lru_secondary_cache.cc create mode 100644 cache/lru_secondary_cache.h create mode 100644 cache/lru_secondary_cache_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 4ac7b5628e..c159300a28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -627,6 +627,7 @@ set(SOURCES cache/cache_reservation_manager.cc cache/clock_cache.cc cache/lru_cache.cc + cache/lru_secondary_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc db/blob/blob_fetcher.cc @@ -1154,6 +1155,7 @@ if(WITH_TESTS) cache/cache_reservation_manager_test.cc cache/cache_test.cc cache/lru_cache_test.cc + cache/lru_secondary_cache_test.cc db/blob/blob_counting_iterator_test.cc db/blob/blob_file_addition_test.cc db/blob/blob_file_builder_test.cc diff --git a/Makefile b/Makefile index 7aee1b5b80..0559b7d8a9 100644 --- a/Makefile +++ b/Makefile @@ -1897,6 +1897,9 @@ statistics_test: $(OBJ_DIR)/monitoring/statistics_test.o $(TEST_LIBRARY) $(LIBRA stats_history_test: $(OBJ_DIR)/monitoring/stats_history_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +lru_secondary_cache_test: $(OBJ_DIR)/cache/lru_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + lru_cache_test: $(OBJ_DIR)/cache/lru_cache_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 6de0298717..494eb45c0b 100644 --- a/TARGETS +++ b/TARGETS @@ -15,6 +15,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", + "cache/lru_secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_fetcher.cc", @@ -331,6 +332,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", + "cache/lru_secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_fetcher.cc", @@ -9096,6 +9098,12 @@ cpp_unittest_wrapper(name="lru_cache_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="lru_secondary_cache_test", + srcs=["cache/lru_secondary_cache_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="manual_compaction_test", srcs=["db/manual_compaction_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/cache/lru_secondary_cache.cc b/cache/lru_secondary_cache.cc new file mode 100644 index 0000000000..7ab06f19eb --- /dev/null +++ b/cache/lru_secondary_cache.cc @@ -0,0 +1,168 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/lru_secondary_cache.h" + +#include + +#include "memory/memory_allocator.h" +#include "util/compression.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; +} + +} // namespace + +LRUSecondaryCache::LRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version) + : cache_options_(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, compression_type, + compress_format_version) { + cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, + use_adaptive_mutex, metadata_charge_policy); +} + +LRUSecondaryCache::~LRUSecondaryCache() { cache_.reset(); } + +std::unique_ptr LRUSecondaryCache::Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) { + std::unique_ptr handle; + Cache::Handle* lru_handle = cache_->Lookup(key); + if (lru_handle == nullptr) { + return handle; + } + + CacheAllocationPtr* ptr = + reinterpret_cast(cache_->Value(lru_handle)); + void* value = nullptr; + size_t charge = 0; + Status s; + + if (cache_options_.compression_type == kNoCompression) { + s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); + } else { + UncompressionContext uncompression_context(cache_options_.compression_type); + UncompressionInfo uncompression_info(uncompression_context, + UncompressionDict::GetEmptyDict(), + cache_options_.compression_type); + + size_t uncompressed_size = 0; + CacheAllocationPtr uncompressed; + uncompressed = UncompressData( + uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), + &uncompressed_size, cache_options_.compress_format_version, + cache_options_.memory_allocator.get()); + + if (!uncompressed) { + cache_->Release(lru_handle, true); + return handle; + } + s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); + } + + if (!s.ok()) { + cache_->Release(lru_handle, true); + return handle; + } + + handle.reset(new LRUSecondaryCacheResultHandle(value, charge)); + cache_->Release(lru_handle); + return handle; +} + +Status LRUSecondaryCache::Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) { + size_t size = (*helper->size_cb)(value); + CacheAllocationPtr ptr = + AllocateBlock(size, cache_options_.memory_allocator.get()); + + Status s = (*helper->saveto_cb)(value, 0, size, ptr.get()); + if (!s.ok()) { + return s; + } + Slice val(ptr.get(), size); + + std::string compressed_val; + if (cache_options_.compression_type != kNoCompression) { + CompressionOptions compression_opts; + CompressionContext compression_context(cache_options_.compression_type); + uint64_t sample_for_compression = 0; + CompressionInfo compression_info( + compression_opts, compression_context, CompressionDict::GetEmptyDict(), + cache_options_.compression_type, sample_for_compression); + + bool success = + CompressData(val, compression_info, + cache_options_.compress_format_version, &compressed_val); + + if (!success) { + return Status::Corruption("Error compressing value."); + } + + val = Slice(compressed_val); + size = compressed_val.size(); + ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); + memcpy(ptr.get(), compressed_val.data(), size); + } + + CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); + + return cache_->Insert(key, buf, size, DeletionCallback); +} + +void LRUSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } + +std::string LRUSecondaryCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + ret.append(cache_->GetPrintableOptions()); + snprintf(buffer, kBufferSize, " compression_type : %s\n", + CompressionTypeToString(cache_options_.compression_type).c_str()); + ret.append(buffer); + snprintf(buffer, kBufferSize, " compression_type : %d\n", + cache_options_.compress_format_version); + ret.append(buffer); + return ret; +} + +std::shared_ptr NewLRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version) { + return std::make_shared( + capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, + memory_allocator, use_adaptive_mutex, metadata_charge_policy, + compression_type, compress_format_version); +} + +std::shared_ptr NewLRUSecondaryCache( + const LRUSecondaryCacheOptions& opts) { + // The secondary_cache is disabled for this LRUCache instance. + assert(opts.secondary_cache == nullptr); + return NewLRUSecondaryCache( + opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, + opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, + opts.metadata_charge_policy, opts.compression_type, + opts.compress_format_version); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_secondary_cache.h b/cache/lru_secondary_cache.h new file mode 100644 index 0000000000..8601f56adc --- /dev/null +++ b/cache/lru_secondary_cache.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "cache/lru_cache.h" +#include "memory/memory_allocator.h" +#include "rocksdb/secondary_cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +class LRUSecondaryCacheResultHandle : public SecondaryCacheResultHandle { + public: + LRUSecondaryCacheResultHandle(void* value, size_t size) + : value_(value), size_(size) {} + virtual ~LRUSecondaryCacheResultHandle() override = default; + + LRUSecondaryCacheResultHandle(const LRUSecondaryCacheResultHandle&) = delete; + LRUSecondaryCacheResultHandle& operator=( + const LRUSecondaryCacheResultHandle&) = delete; + + bool IsReady() override { return true; } + + void Wait() override {} + + void* Value() override { return value_; } + + size_t Size() override { return size_; } + + private: + void* value_; + size_t size_; +}; + +// The LRUSecondaryCache is a concrete implementation of +// rocksdb::SecondaryCache. +// +// Users can also cast a pointer to it and call methods on +// it directly, especially custom methods that may be added +// in the future. For example - +// std::unique_ptr cache = +// NewLRUSecondaryCache(opts); +// static_cast(cache.get())->Erase(key); + +class LRUSecondaryCache : public SecondaryCache { + public: + LRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator = nullptr, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy metadata_charge_policy = + kDontChargeCacheMetadata, + CompressionType compression_type = CompressionType::kLZ4Compression, + uint32_t compress_format_version = 2); + virtual ~LRUSecondaryCache() override; + + const char* Name() const override { return "LRUSecondaryCache"; } + + Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) override; + + std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, + bool /*wait*/) override; + + void Erase(const Slice& key) override; + + void WaitAll(std::vector /*handles*/) override {} + + std::string GetPrintableOptions() const override; + + private: + std::shared_ptr cache_; + LRUSecondaryCacheOptions cache_options_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_secondary_cache_test.cc b/cache/lru_secondary_cache_test.cc new file mode 100644 index 0000000000..9cad4af0b8 --- /dev/null +++ b/cache/lru_secondary_cache_test.cc @@ -0,0 +1,597 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/lru_secondary_cache.h" + +#include +#include + +#include "memory/jemalloc_nodump_allocator.h" +#include "memory/memory_allocator.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/compression.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class LRUSecondaryCacheTest : public testing::Test { + public: + LRUSecondaryCacheTest() : fail_create_(false) {} + ~LRUSecondaryCacheTest() {} + + protected: + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { return buf_.get(); } + size_t Size() { return size_; } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + static size_t SizeCallback(void* obj) { + return reinterpret_cast(obj)->Size(); + } + + static Status SaveToCallback(void* from_obj, size_t from_offset, + size_t length, void* out) { + TestItem* item = reinterpret_cast(from_obj); + const char* buf = item->Buf(); + EXPECT_EQ(length, item->Size()); + EXPECT_EQ(from_offset, 0); + memcpy(out, buf, length); + return Status::OK(); + } + + static void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; + } + + static Cache::CacheItemHelper helper_; + + static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/, + size_t /*size*/, void* /*out*/) { + return Status::NotSupported(); + } + + static Cache::CacheItemHelper helper_fail_; + + Cache::CreateCallback test_item_creator = [&](const void* buf, size_t size, + void** out_obj, + size_t* charge) -> Status { + if (fail_create_) { + return Status::NotSupported(); + } + *out_obj = reinterpret_cast(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + void SetFailCreate(bool fail) { fail_create_ = fail; } + + void BasicTest(bool sec_cache_is_compressed, bool use_jemalloc) { + LRUSecondaryCacheOptions opts; + opts.capacity = 2048; + opts.num_shard_bits = 0; + opts.metadata_charge_policy = kDontChargeCacheMetadata; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + opts.compression_type = CompressionType::kNoCompression; + } + } else { + opts.compression_type = CompressionType::kNoCompression; + } + + if (use_jemalloc) { + JemallocAllocatorOptions jopts; + std::shared_ptr allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (s.ok()) { + opts.memory_allocator = allocator; + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } + std::shared_ptr cache = NewLRUSecondaryCache(opts); + + // Lookup an non-existent key. + std::unique_ptr handle0 = + cache->Lookup("k0", test_item_creator, true); + ASSERT_EQ(handle0, nullptr); + + Random rnd(301); + // Insert and Lookup the first item. + std::string str1; + test::CompressibleString(&rnd, 0.25, 1000, &str1); + TestItem item1(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1, nullptr); + // delete reinterpret_cast(handle1->Value()); + std::unique_ptr val1 = + std::unique_ptr(static_cast(handle1->Value())); + ASSERT_NE(val1, nullptr); + ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); + + // Insert and Lookup the second item. + std::string str2; + test::CompressibleString(&rnd, 0.5, 1000, &str2); + TestItem item2(str2.data(), str2.length()); + ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle2 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_NE(handle2, nullptr); + std::unique_ptr val2 = + std::unique_ptr(static_cast(handle2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + // Lookup the first item again to make sure it is still in the cache. + std::unique_ptr handle1_1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1_1, nullptr); + std::unique_ptr val1_1 = + std::unique_ptr(static_cast(handle1_1->Value())); + ASSERT_NE(val1_1, nullptr); + ASSERT_EQ(memcmp(val1_1->Buf(), item1.Buf(), item1.Size()), 0); + + std::vector handles = {handle1.get(), + handle2.get()}; + cache->WaitAll(handles); + + cache->Erase("k1"); + handle1 = cache->Lookup("k1", test_item_creator, true); + ASSERT_EQ(handle1, nullptr); + + cache.reset(); + } + + void FailsTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 1100; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = + NewLRUSecondaryCache(secondary_cache_opts); + + // Insert and Lookup the first item. + Random rnd(301); + std::string str1(rnd.RandomString(1000)); + TestItem item1(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1, nullptr); + std::unique_ptr val1 = + std::unique_ptr(static_cast(handle1->Value())); + ASSERT_NE(val1, nullptr); + ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); + + // Insert and Lookup the second item. + std::string str2(rnd.RandomString(200)); + TestItem item2(str2.data(), str2.length()); + // k1 is evicted. + ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1_1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_EQ(handle1_1, nullptr); + std::unique_ptr handle2 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_NE(handle2, nullptr); + std::unique_ptr val2 = + std::unique_ptr(static_cast(handle2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + // Create Fails. + SetFailCreate(true); + std::unique_ptr handle2_1 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_EQ(handle2_1, nullptr); + + // Save Fails. + std::string str3 = rnd.RandomString(10); + TestItem item3(str3.data(), str3.length()); + ASSERT_NOK( + cache->Insert("k3", &item3, &LRUSecondaryCacheTest::helper_fail_)); + + cache.reset(); + } + + void BasicIntegrationTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2300; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + lru_cache_opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(lru_cache_opts); + std::shared_ptr stats = CreateDBStatistics(); + + Random rnd(301); + + std::string str1 = rnd.RandomString(1010); + std::string str1_clone{str1}; + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // After Insert, lru cache contains k2 and secondary cache contains k1. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + std::string str3 = rnd.RandomString(1020); + TestItem* item3 = new TestItem(str3.data(), str3.length()); + // After Insert, lru cache contains k3 and secondary cache contains k1 and + // k2 + ASSERT_OK(cache->Insert("k3", item3, &LRUSecondaryCacheTest::helper_, + str3.length())); + + Cache::Handle* handle; + handle = + cache->Lookup("k3", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_NE(handle, nullptr); + TestItem* val3 = static_cast(cache->Value(handle)); + ASSERT_NE(val3, nullptr); + ASSERT_EQ(memcmp(val3->Buf(), item3->Buf(), item3->Size()), 0); + cache->Release(handle); + + // Lookup an non-existent key. + handle = + cache->Lookup("k0", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_EQ(handle, nullptr); + + // This Lookup should promote k1 and demote k3, so k2 is evicted from the + // secondary cache. The lru cache contains k1 and secondary cache contains + // k3. item1 was Free(), so it cannot be compared against the item1. + handle = + cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_NE(handle, nullptr); + TestItem* val1_1 = static_cast(cache->Value(handle)); + ASSERT_NE(val1_1, nullptr); + ASSERT_EQ(memcmp(val1_1->Buf(), str1_clone.data(), str1_clone.size()), 0); + cache->Release(handle); + + handle = + cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); + } + + void BasicIntegrationFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + auto item1 = + std::unique_ptr(new TestItem(str1.data(), str1.length())); + ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); + ASSERT_OK(cache->Insert("k1", item1.get(), &LRUSecondaryCacheTest::helper_, + str1.length())); + item1.release(); // Appease clang-analyze "potential memory leak" + + Cache::Handle* handle; + handle = cache->Lookup("k2", nullptr, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, false); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationSaveFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 demotion would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationCreateFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + SetFailCreate(true); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 creation would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationFullCapacityTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + // k1 promotion should fail due to the block cache being at capacity, + // but the lookup should still succeed + Cache::Handle* handle2; + handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle2, nullptr); + // Since k1 didn't get inserted, k2 should still be in cache + cache->Release(handle); + cache->Release(handle2); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + private: + bool fail_create_; +}; + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_( + LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback, + LRUSecondaryCacheTest::DeletionCallback); + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_( + LRUSecondaryCacheTest::SizeCallback, + LRUSecondaryCacheTest::SaveToCallbackFail, + LRUSecondaryCacheTest::DeletionCallback); + +TEST_F(LRUSecondaryCacheTest, BasicTestWithNoCompression) { + BasicTest(false, false); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndNoCompression) { + BasicTest(false, true); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithCompression) { + BasicTest(true, false); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndCompression) { + BasicTest(true, true); +} + +TEST_F(LRUSecondaryCacheTest, FailsTestWithNoCompression) { FailsTest(false); } + +TEST_F(LRUSecondaryCacheTest, FailsTestWithCompression) { FailsTest(true); } + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithNoCompression) { + BasicIntegrationTest(false); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithCompression) { + BasicIntegrationTest(true); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithNoCompression) { + BasicIntegrationFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithCompression) { + BasicIntegrationFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) { + IntegrationSaveFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithCompression) { + IntegrationSaveFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithNoCompression) { + IntegrationCreateFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithCompression) { + IntegrationCreateFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithNoCompression) { + IntegrationFullCapacityTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithCompression) { + IntegrationFullCapacityTest(true); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 75fbea0e8c..4fa6e44e8c 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -27,6 +27,7 @@ #include #include +#include "rocksdb/compression_type.h" #include "rocksdb/memory_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" @@ -127,6 +128,53 @@ extern std::shared_ptr NewLRUCache( extern std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts); +// EXPERIMENTAL +// Options structure for configuring a SecondaryCache instance based on +// LRUCache. The LRUCacheOptions.secondary_cache is not used and +// should not be set. +struct LRUSecondaryCacheOptions : LRUCacheOptions { + // The compression method (if any) that is used to compress data. + CompressionType compression_type = CompressionType::kLZ4Compression; + + // compress_format_version can have two values: + // compress_format_version == 1 -- decompressed size is not included in the + // block header. + // compress_format_version == 2 -- decompressed size is included in the block + // header in varint32 format. + uint32_t compress_format_version = 2; + + LRUSecondaryCacheOptions() {} + LRUSecondaryCacheOptions( + size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, + double _high_pri_pool_ratio, + std::shared_ptr _memory_allocator = nullptr, + bool _use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy _metadata_charge_policy = + kDefaultCacheMetadataChargePolicy, + CompressionType _compression_type = CompressionType::kLZ4Compression, + uint32_t _compress_format_version = 2) + : LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit, + _high_pri_pool_ratio, std::move(_memory_allocator), + _use_adaptive_mutex, _metadata_charge_policy), + compression_type(_compression_type), + compress_format_version(_compress_format_version) {} +}; + +// EXPERIMENTAL +// Create a new Secondary Cache that is implemented on top of LRUCache. +extern std::shared_ptr NewLRUSecondaryCache( + size_t capacity, int num_shard_bits = -1, + bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.5, + std::shared_ptr memory_allocator = nullptr, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy metadata_charge_policy = + kDefaultCacheMetadataChargePolicy, + CompressionType compression_type = CompressionType::kLZ4Compression, + uint32_t compress_format_version = 2); + +extern std::shared_ptr NewLRUSecondaryCache( + const LRUSecondaryCacheOptions& opts); + // Similar to NewLRUCache, but create a cache based on CLOCK algorithm with // better concurrent performance in some cases. See util/clock_cache.cc for // more detail. diff --git a/src.mk b/src.mk index bbdb8ae94f..8e9517aeb4 100644 --- a/src.mk +++ b/src.mk @@ -6,6 +6,7 @@ LIB_SOURCES = \ cache/cache_reservation_manager.cc \ cache/clock_cache.cc \ cache/lru_cache.cc \ + cache/lru_secondary_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ db/blob/blob_fetcher.cc \ @@ -398,8 +399,9 @@ BENCH_MAIN_SOURCES = \ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ - cache/cache_reservation_manager_test.cc \ + cache/cache_reservation_manager_test.cc \ cache/lru_cache_test.cc \ + cache/lru_secondary_cache_test.cc \ db/blob/blob_counting_iterator_test.cc \ db/blob/blob_file_addition_test.cc \ db/blob/blob_file_builder_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 4d5cd5b05e..dec6e75201 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -556,6 +556,38 @@ DEFINE_double(cache_high_pri_pool_ratio, 0.0, DEFINE_bool(use_clock_cache, false, "Replace default LRU block cache with clock cache."); +DEFINE_bool(use_lru_secondary_cache, false, + "Use the LRUSecondaryCache as the secondary cache."); + +DEFINE_int64(lru_secondary_cache_size, 8 << 20, // 8MB + "Number of bytes to use as a cache of data"); + +DEFINE_int32(lru_secondary_cache_numshardbits, 6, + "Number of shards for the block cache" + " is 2 ** lru_secondary_cache_numshardbits." + " Negative means use default settings." + " This is applied only if FLAGS_cache_size is non-negative."); + +DEFINE_double(lru_secondary_cache_high_pri_pool_ratio, 0.0, + "Ratio of block cache reserve for high pri blocks. " + "If > 0.0, we also enable " + "cache_index_and_filter_blocks_with_high_priority."); + +DEFINE_string(lru_secondary_cache_compression_type, "lz4", + "The compression algorithm to use for large " + "values stored in LRUSecondaryCache."); +static enum ROCKSDB_NAMESPACE::CompressionType + FLAGS_lru_secondary_cache_compression_type_e = + ROCKSDB_NAMESPACE::kLZ4Compression; + +DEFINE_uint32( + lru_secondary_cache_compress_format_version, 2, + "compress_format_version can have two values: " + "compress_format_version == 1 -- decompressed size is not included" + " in the block header." + "compress_format_version == 2 -- decompressed size is included" + " in the block header in varint32 format."); + DEFINE_int64(simcache_size, -1, "Number of bytes to use as a simcache of " "uncompressed data. Nagative value disables simcache."); @@ -2791,6 +2823,21 @@ class Benchmark { opts.secondary_cache = secondary_cache; } #endif // ROCKSDB_LITE + + if (FLAGS_use_lru_secondary_cache) { + LRUSecondaryCacheOptions secondary_cache_opts; + secondary_cache_opts.capacity = FLAGS_lru_secondary_cache_size; + secondary_cache_opts.num_shard_bits = + FLAGS_lru_secondary_cache_numshardbits; + secondary_cache_opts.high_pri_pool_ratio = + FLAGS_lru_secondary_cache_high_pri_pool_ratio; + secondary_cache_opts.compression_type = + FLAGS_lru_secondary_cache_compression_type_e; + secondary_cache_opts.compress_format_version = + FLAGS_lru_secondary_cache_compress_format_version; + opts.secondary_cache = NewLRUSecondaryCache(secondary_cache_opts); + } + return NewLRUCache(opts); } } @@ -7961,6 +8008,9 @@ int db_bench_tool(int argc, char** argv) { FLAGS_wal_compression_e = StringToCompressionType(FLAGS_wal_compression.c_str()); + FLAGS_lru_secondary_cache_compression_type_e = StringToCompressionType( + FLAGS_lru_secondary_cache_compression_type.c_str()); + #ifndef ROCKSDB_LITE // Stacked BlobDB FLAGS_blob_db_compression_type_e =