diff --git a/CMakeLists.txt b/CMakeLists.txt index 4ac7b5628e..c159300a28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -627,6 +627,7 @@ set(SOURCES cache/cache_reservation_manager.cc cache/clock_cache.cc cache/lru_cache.cc + cache/lru_secondary_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc db/blob/blob_fetcher.cc @@ -1154,6 +1155,7 @@ if(WITH_TESTS) cache/cache_reservation_manager_test.cc cache/cache_test.cc cache/lru_cache_test.cc + cache/lru_secondary_cache_test.cc db/blob/blob_counting_iterator_test.cc db/blob/blob_file_addition_test.cc db/blob/blob_file_builder_test.cc diff --git a/Makefile b/Makefile index 7aee1b5b80..0559b7d8a9 100644 --- a/Makefile +++ b/Makefile @@ -1897,6 +1897,9 @@ statistics_test: $(OBJ_DIR)/monitoring/statistics_test.o $(TEST_LIBRARY) $(LIBRA stats_history_test: $(OBJ_DIR)/monitoring/stats_history_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +lru_secondary_cache_test: $(OBJ_DIR)/cache/lru_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + lru_cache_test: $(OBJ_DIR)/cache/lru_cache_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 6de0298717..494eb45c0b 100644 --- a/TARGETS +++ b/TARGETS @@ -15,6 +15,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", + "cache/lru_secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_fetcher.cc", @@ -331,6 +332,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", + "cache/lru_secondary_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_fetcher.cc", @@ -9096,6 +9098,12 @@ cpp_unittest_wrapper(name="lru_cache_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="lru_secondary_cache_test", + srcs=["cache/lru_secondary_cache_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="manual_compaction_test", srcs=["db/manual_compaction_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/cache/lru_secondary_cache.cc b/cache/lru_secondary_cache.cc new file mode 100644 index 0000000000..7ab06f19eb --- /dev/null +++ b/cache/lru_secondary_cache.cc @@ -0,0 +1,168 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/lru_secondary_cache.h" + +#include + +#include "memory/memory_allocator.h" +#include "util/compression.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; +} + +} // namespace + +LRUSecondaryCache::LRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version) + : cache_options_(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, compression_type, + compress_format_version) { + cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, + use_adaptive_mutex, metadata_charge_policy); +} + +LRUSecondaryCache::~LRUSecondaryCache() { cache_.reset(); } + +std::unique_ptr LRUSecondaryCache::Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) { + std::unique_ptr handle; + Cache::Handle* lru_handle = cache_->Lookup(key); + if (lru_handle == nullptr) { + return handle; + } + + CacheAllocationPtr* ptr = + reinterpret_cast(cache_->Value(lru_handle)); + void* value = nullptr; + size_t charge = 0; + Status s; + + if (cache_options_.compression_type == kNoCompression) { + s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); + } else { + UncompressionContext uncompression_context(cache_options_.compression_type); + UncompressionInfo uncompression_info(uncompression_context, + UncompressionDict::GetEmptyDict(), + cache_options_.compression_type); + + size_t uncompressed_size = 0; + CacheAllocationPtr uncompressed; + uncompressed = UncompressData( + uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), + &uncompressed_size, cache_options_.compress_format_version, + cache_options_.memory_allocator.get()); + + if (!uncompressed) { + cache_->Release(lru_handle, true); + return handle; + } + s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); + } + + if (!s.ok()) { + cache_->Release(lru_handle, true); + return handle; + } + + handle.reset(new LRUSecondaryCacheResultHandle(value, charge)); + cache_->Release(lru_handle); + return handle; +} + +Status LRUSecondaryCache::Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) { + size_t size = (*helper->size_cb)(value); + CacheAllocationPtr ptr = + AllocateBlock(size, cache_options_.memory_allocator.get()); + + Status s = (*helper->saveto_cb)(value, 0, size, ptr.get()); + if (!s.ok()) { + return s; + } + Slice val(ptr.get(), size); + + std::string compressed_val; + if (cache_options_.compression_type != kNoCompression) { + CompressionOptions compression_opts; + CompressionContext compression_context(cache_options_.compression_type); + uint64_t sample_for_compression = 0; + CompressionInfo compression_info( + compression_opts, compression_context, CompressionDict::GetEmptyDict(), + cache_options_.compression_type, sample_for_compression); + + bool success = + CompressData(val, compression_info, + cache_options_.compress_format_version, &compressed_val); + + if (!success) { + return Status::Corruption("Error compressing value."); + } + + val = Slice(compressed_val); + size = compressed_val.size(); + ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); + memcpy(ptr.get(), compressed_val.data(), size); + } + + CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); + + return cache_->Insert(key, buf, size, DeletionCallback); +} + +void LRUSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } + +std::string LRUSecondaryCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + ret.append(cache_->GetPrintableOptions()); + snprintf(buffer, kBufferSize, " compression_type : %s\n", + CompressionTypeToString(cache_options_.compression_type).c_str()); + ret.append(buffer); + snprintf(buffer, kBufferSize, " compression_type : %d\n", + cache_options_.compress_format_version); + ret.append(buffer); + return ret; +} + +std::shared_ptr NewLRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy, + CompressionType compression_type, uint32_t compress_format_version) { + return std::make_shared( + capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, + memory_allocator, use_adaptive_mutex, metadata_charge_policy, + compression_type, compress_format_version); +} + +std::shared_ptr NewLRUSecondaryCache( + const LRUSecondaryCacheOptions& opts) { + // The secondary_cache is disabled for this LRUCache instance. + assert(opts.secondary_cache == nullptr); + return NewLRUSecondaryCache( + opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, + opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, + opts.metadata_charge_policy, opts.compression_type, + opts.compress_format_version); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_secondary_cache.h b/cache/lru_secondary_cache.h new file mode 100644 index 0000000000..8601f56adc --- /dev/null +++ b/cache/lru_secondary_cache.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "cache/lru_cache.h" +#include "memory/memory_allocator.h" +#include "rocksdb/secondary_cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +class LRUSecondaryCacheResultHandle : public SecondaryCacheResultHandle { + public: + LRUSecondaryCacheResultHandle(void* value, size_t size) + : value_(value), size_(size) {} + virtual ~LRUSecondaryCacheResultHandle() override = default; + + LRUSecondaryCacheResultHandle(const LRUSecondaryCacheResultHandle&) = delete; + LRUSecondaryCacheResultHandle& operator=( + const LRUSecondaryCacheResultHandle&) = delete; + + bool IsReady() override { return true; } + + void Wait() override {} + + void* Value() override { return value_; } + + size_t Size() override { return size_; } + + private: + void* value_; + size_t size_; +}; + +// The LRUSecondaryCache is a concrete implementation of +// rocksdb::SecondaryCache. +// +// Users can also cast a pointer to it and call methods on +// it directly, especially custom methods that may be added +// in the future. For example - +// std::unique_ptr cache = +// NewLRUSecondaryCache(opts); +// static_cast(cache.get())->Erase(key); + +class LRUSecondaryCache : public SecondaryCache { + public: + LRUSecondaryCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator = nullptr, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy metadata_charge_policy = + kDontChargeCacheMetadata, + CompressionType compression_type = CompressionType::kLZ4Compression, + uint32_t compress_format_version = 2); + virtual ~LRUSecondaryCache() override; + + const char* Name() const override { return "LRUSecondaryCache"; } + + Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) override; + + std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, + bool /*wait*/) override; + + void Erase(const Slice& key) override; + + void WaitAll(std::vector /*handles*/) override {} + + std::string GetPrintableOptions() const override; + + private: + std::shared_ptr cache_; + LRUSecondaryCacheOptions cache_options_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_secondary_cache_test.cc b/cache/lru_secondary_cache_test.cc new file mode 100644 index 0000000000..9cad4af0b8 --- /dev/null +++ b/cache/lru_secondary_cache_test.cc @@ -0,0 +1,597 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "cache/lru_secondary_cache.h" + +#include +#include + +#include "memory/jemalloc_nodump_allocator.h" +#include "memory/memory_allocator.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/compression.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class LRUSecondaryCacheTest : public testing::Test { + public: + LRUSecondaryCacheTest() : fail_create_(false) {} + ~LRUSecondaryCacheTest() {} + + protected: + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { return buf_.get(); } + size_t Size() { return size_; } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + static size_t SizeCallback(void* obj) { + return reinterpret_cast(obj)->Size(); + } + + static Status SaveToCallback(void* from_obj, size_t from_offset, + size_t length, void* out) { + TestItem* item = reinterpret_cast(from_obj); + const char* buf = item->Buf(); + EXPECT_EQ(length, item->Size()); + EXPECT_EQ(from_offset, 0); + memcpy(out, buf, length); + return Status::OK(); + } + + static void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; + } + + static Cache::CacheItemHelper helper_; + + static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/, + size_t /*size*/, void* /*out*/) { + return Status::NotSupported(); + } + + static Cache::CacheItemHelper helper_fail_; + + Cache::CreateCallback test_item_creator = [&](const void* buf, size_t size, + void** out_obj, + size_t* charge) -> Status { + if (fail_create_) { + return Status::NotSupported(); + } + *out_obj = reinterpret_cast(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + void SetFailCreate(bool fail) { fail_create_ = fail; } + + void BasicTest(bool sec_cache_is_compressed, bool use_jemalloc) { + LRUSecondaryCacheOptions opts; + opts.capacity = 2048; + opts.num_shard_bits = 0; + opts.metadata_charge_policy = kDontChargeCacheMetadata; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + opts.compression_type = CompressionType::kNoCompression; + } + } else { + opts.compression_type = CompressionType::kNoCompression; + } + + if (use_jemalloc) { + JemallocAllocatorOptions jopts; + std::shared_ptr allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (s.ok()) { + opts.memory_allocator = allocator; + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } + std::shared_ptr cache = NewLRUSecondaryCache(opts); + + // Lookup an non-existent key. + std::unique_ptr handle0 = + cache->Lookup("k0", test_item_creator, true); + ASSERT_EQ(handle0, nullptr); + + Random rnd(301); + // Insert and Lookup the first item. + std::string str1; + test::CompressibleString(&rnd, 0.25, 1000, &str1); + TestItem item1(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1, nullptr); + // delete reinterpret_cast(handle1->Value()); + std::unique_ptr val1 = + std::unique_ptr(static_cast(handle1->Value())); + ASSERT_NE(val1, nullptr); + ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); + + // Insert and Lookup the second item. + std::string str2; + test::CompressibleString(&rnd, 0.5, 1000, &str2); + TestItem item2(str2.data(), str2.length()); + ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle2 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_NE(handle2, nullptr); + std::unique_ptr val2 = + std::unique_ptr(static_cast(handle2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + // Lookup the first item again to make sure it is still in the cache. + std::unique_ptr handle1_1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1_1, nullptr); + std::unique_ptr val1_1 = + std::unique_ptr(static_cast(handle1_1->Value())); + ASSERT_NE(val1_1, nullptr); + ASSERT_EQ(memcmp(val1_1->Buf(), item1.Buf(), item1.Size()), 0); + + std::vector handles = {handle1.get(), + handle2.get()}; + cache->WaitAll(handles); + + cache->Erase("k1"); + handle1 = cache->Lookup("k1", test_item_creator, true); + ASSERT_EQ(handle1, nullptr); + + cache.reset(); + } + + void FailsTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 1100; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr cache = + NewLRUSecondaryCache(secondary_cache_opts); + + // Insert and Lookup the first item. + Random rnd(301); + std::string str1(rnd.RandomString(1000)); + TestItem item1(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_NE(handle1, nullptr); + std::unique_ptr val1 = + std::unique_ptr(static_cast(handle1->Value())); + ASSERT_NE(val1, nullptr); + ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); + + // Insert and Lookup the second item. + std::string str2(rnd.RandomString(200)); + TestItem item2(str2.data(), str2.length()); + // k1 is evicted. + ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_)); + std::unique_ptr handle1_1 = + cache->Lookup("k1", test_item_creator, true); + ASSERT_EQ(handle1_1, nullptr); + std::unique_ptr handle2 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_NE(handle2, nullptr); + std::unique_ptr val2 = + std::unique_ptr(static_cast(handle2->Value())); + ASSERT_NE(val2, nullptr); + ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + + // Create Fails. + SetFailCreate(true); + std::unique_ptr handle2_1 = + cache->Lookup("k2", test_item_creator, true); + ASSERT_EQ(handle2_1, nullptr); + + // Save Fails. + std::string str3 = rnd.RandomString(10); + TestItem item3(str3.data(), str3.length()); + ASSERT_NOK( + cache->Insert("k3", &item3, &LRUSecondaryCacheTest::helper_fail_)); + + cache.reset(); + } + + void BasicIntegrationTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2300; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + lru_cache_opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(lru_cache_opts); + std::shared_ptr stats = CreateDBStatistics(); + + Random rnd(301); + + std::string str1 = rnd.RandomString(1010); + std::string str1_clone{str1}; + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // After Insert, lru cache contains k2 and secondary cache contains k1. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + std::string str3 = rnd.RandomString(1020); + TestItem* item3 = new TestItem(str3.data(), str3.length()); + // After Insert, lru cache contains k3 and secondary cache contains k1 and + // k2 + ASSERT_OK(cache->Insert("k3", item3, &LRUSecondaryCacheTest::helper_, + str3.length())); + + Cache::Handle* handle; + handle = + cache->Lookup("k3", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_NE(handle, nullptr); + TestItem* val3 = static_cast(cache->Value(handle)); + ASSERT_NE(val3, nullptr); + ASSERT_EQ(memcmp(val3->Buf(), item3->Buf(), item3->Size()), 0); + cache->Release(handle); + + // Lookup an non-existent key. + handle = + cache->Lookup("k0", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_EQ(handle, nullptr); + + // This Lookup should promote k1 and demote k3, so k2 is evicted from the + // secondary cache. The lru cache contains k1 and secondary cache contains + // k3. item1 was Free(), so it cannot be compared against the item1. + handle = + cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_NE(handle, nullptr); + TestItem* val1_1 = static_cast(cache->Value(handle)); + ASSERT_NE(val1_1, nullptr); + ASSERT_EQ(memcmp(val1_1->Buf(), str1_clone.data(), str1_clone.size()), 0); + cache->Release(handle); + + handle = + cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator, + Cache::Priority::LOW, true, stats.get()); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); + } + + void BasicIntegrationFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + auto item1 = + std::unique_ptr(new TestItem(str1.data(), str1.length())); + ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); + ASSERT_OK(cache->Insert("k1", item1.get(), &LRUSecondaryCacheTest::helper_, + str1.length())); + item1.release(); // Appease clang-analyze "potential memory leak" + + Cache::Handle* handle; + handle = cache->Lookup("k2", nullptr, test_item_creator, + Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, false); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationSaveFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 demotion would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationCreateFailTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + SetFailCreate(true); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 creation would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + void IntegrationFullCapacityTest(bool sec_cache_is_compressed) { + LRUSecondaryCacheOptions secondary_cache_opts; + + if (sec_cache_is_compressed) { + if (!LZ4_Supported()) { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + } else { + secondary_cache_opts.compression_type = CompressionType::kNoCompression; + } + + secondary_cache_opts.capacity = 2048; + secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; + + std::shared_ptr secondary_cache = + NewLRUSecondaryCache(secondary_cache_opts); + + LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to the secondary cache. + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + // k1 promotion should fail due to the block cache being at capacity, + // but the lookup should still succeed + Cache::Handle* handle2; + handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle2, nullptr); + // Since k1 didn't get inserted, k2 should still be in cache + cache->Release(handle); + cache->Release(handle2); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + + cache.reset(); + secondary_cache.reset(); + } + + private: + bool fail_create_; +}; + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_( + LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback, + LRUSecondaryCacheTest::DeletionCallback); + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_( + LRUSecondaryCacheTest::SizeCallback, + LRUSecondaryCacheTest::SaveToCallbackFail, + LRUSecondaryCacheTest::DeletionCallback); + +TEST_F(LRUSecondaryCacheTest, BasicTestWithNoCompression) { + BasicTest(false, false); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndNoCompression) { + BasicTest(false, true); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithCompression) { + BasicTest(true, false); +} + +TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndCompression) { + BasicTest(true, true); +} + +TEST_F(LRUSecondaryCacheTest, FailsTestWithNoCompression) { FailsTest(false); } + +TEST_F(LRUSecondaryCacheTest, FailsTestWithCompression) { FailsTest(true); } + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithNoCompression) { + BasicIntegrationTest(false); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithCompression) { + BasicIntegrationTest(true); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithNoCompression) { + BasicIntegrationFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithCompression) { + BasicIntegrationFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) { + IntegrationSaveFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithCompression) { + IntegrationSaveFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithNoCompression) { + IntegrationCreateFailTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithCompression) { + IntegrationCreateFailTest(true); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithNoCompression) { + IntegrationFullCapacityTest(false); +} + +TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithCompression) { + IntegrationFullCapacityTest(true); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 75fbea0e8c..4fa6e44e8c 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -27,6 +27,7 @@ #include #include +#include "rocksdb/compression_type.h" #include "rocksdb/memory_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" @@ -127,6 +128,53 @@ extern std::shared_ptr NewLRUCache( extern std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts); +// EXPERIMENTAL +// Options structure for configuring a SecondaryCache instance based on +// LRUCache. The LRUCacheOptions.secondary_cache is not used and +// should not be set. +struct LRUSecondaryCacheOptions : LRUCacheOptions { + // The compression method (if any) that is used to compress data. + CompressionType compression_type = CompressionType::kLZ4Compression; + + // compress_format_version can have two values: + // compress_format_version == 1 -- decompressed size is not included in the + // block header. + // compress_format_version == 2 -- decompressed size is included in the block + // header in varint32 format. + uint32_t compress_format_version = 2; + + LRUSecondaryCacheOptions() {} + LRUSecondaryCacheOptions( + size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, + double _high_pri_pool_ratio, + std::shared_ptr _memory_allocator = nullptr, + bool _use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy _metadata_charge_policy = + kDefaultCacheMetadataChargePolicy, + CompressionType _compression_type = CompressionType::kLZ4Compression, + uint32_t _compress_format_version = 2) + : LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit, + _high_pri_pool_ratio, std::move(_memory_allocator), + _use_adaptive_mutex, _metadata_charge_policy), + compression_type(_compression_type), + compress_format_version(_compress_format_version) {} +}; + +// EXPERIMENTAL +// Create a new Secondary Cache that is implemented on top of LRUCache. +extern std::shared_ptr NewLRUSecondaryCache( + size_t capacity, int num_shard_bits = -1, + bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.5, + std::shared_ptr memory_allocator = nullptr, + bool use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy metadata_charge_policy = + kDefaultCacheMetadataChargePolicy, + CompressionType compression_type = CompressionType::kLZ4Compression, + uint32_t compress_format_version = 2); + +extern std::shared_ptr NewLRUSecondaryCache( + const LRUSecondaryCacheOptions& opts); + // Similar to NewLRUCache, but create a cache based on CLOCK algorithm with // better concurrent performance in some cases. See util/clock_cache.cc for // more detail. diff --git a/src.mk b/src.mk index bbdb8ae94f..8e9517aeb4 100644 --- a/src.mk +++ b/src.mk @@ -6,6 +6,7 @@ LIB_SOURCES = \ cache/cache_reservation_manager.cc \ cache/clock_cache.cc \ cache/lru_cache.cc \ + cache/lru_secondary_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ db/blob/blob_fetcher.cc \ @@ -398,8 +399,9 @@ BENCH_MAIN_SOURCES = \ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ - cache/cache_reservation_manager_test.cc \ + cache/cache_reservation_manager_test.cc \ cache/lru_cache_test.cc \ + cache/lru_secondary_cache_test.cc \ db/blob/blob_counting_iterator_test.cc \ db/blob/blob_file_addition_test.cc \ db/blob/blob_file_builder_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 4d5cd5b05e..dec6e75201 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -556,6 +556,38 @@ DEFINE_double(cache_high_pri_pool_ratio, 0.0, DEFINE_bool(use_clock_cache, false, "Replace default LRU block cache with clock cache."); +DEFINE_bool(use_lru_secondary_cache, false, + "Use the LRUSecondaryCache as the secondary cache."); + +DEFINE_int64(lru_secondary_cache_size, 8 << 20, // 8MB + "Number of bytes to use as a cache of data"); + +DEFINE_int32(lru_secondary_cache_numshardbits, 6, + "Number of shards for the block cache" + " is 2 ** lru_secondary_cache_numshardbits." + " Negative means use default settings." + " This is applied only if FLAGS_cache_size is non-negative."); + +DEFINE_double(lru_secondary_cache_high_pri_pool_ratio, 0.0, + "Ratio of block cache reserve for high pri blocks. " + "If > 0.0, we also enable " + "cache_index_and_filter_blocks_with_high_priority."); + +DEFINE_string(lru_secondary_cache_compression_type, "lz4", + "The compression algorithm to use for large " + "values stored in LRUSecondaryCache."); +static enum ROCKSDB_NAMESPACE::CompressionType + FLAGS_lru_secondary_cache_compression_type_e = + ROCKSDB_NAMESPACE::kLZ4Compression; + +DEFINE_uint32( + lru_secondary_cache_compress_format_version, 2, + "compress_format_version can have two values: " + "compress_format_version == 1 -- decompressed size is not included" + " in the block header." + "compress_format_version == 2 -- decompressed size is included" + " in the block header in varint32 format."); + DEFINE_int64(simcache_size, -1, "Number of bytes to use as a simcache of " "uncompressed data. Nagative value disables simcache."); @@ -2791,6 +2823,21 @@ class Benchmark { opts.secondary_cache = secondary_cache; } #endif // ROCKSDB_LITE + + if (FLAGS_use_lru_secondary_cache) { + LRUSecondaryCacheOptions secondary_cache_opts; + secondary_cache_opts.capacity = FLAGS_lru_secondary_cache_size; + secondary_cache_opts.num_shard_bits = + FLAGS_lru_secondary_cache_numshardbits; + secondary_cache_opts.high_pri_pool_ratio = + FLAGS_lru_secondary_cache_high_pri_pool_ratio; + secondary_cache_opts.compression_type = + FLAGS_lru_secondary_cache_compression_type_e; + secondary_cache_opts.compress_format_version = + FLAGS_lru_secondary_cache_compress_format_version; + opts.secondary_cache = NewLRUSecondaryCache(secondary_cache_opts); + } + return NewLRUCache(opts); } } @@ -7961,6 +8008,9 @@ int db_bench_tool(int argc, char** argv) { FLAGS_wal_compression_e = StringToCompressionType(FLAGS_wal_compression.c_str()); + FLAGS_lru_secondary_cache_compression_type_e = StringToCompressionType( + FLAGS_lru_secondary_cache_compression_type.c_str()); + #ifndef ROCKSDB_LITE // Stacked BlobDB FLAGS_blob_db_compression_type_e =