Split cache to minimize internal fragmentation (#10287)

Summary:
### **Summary:**
To minimize the internal fragmentation caused by the variable size of the compressed blocks, the original block is split according to the jemalloc bin size in `Insert()` and then merged back in `Lookup()`.  Based on the analysis of the results of the following tests, from the overall internal fragmentation perspective, this PR does mitigate the internal fragmentation issue.

_Do more myshadow tests with the latest commit. I finished several myshadow AB Testing and the results are promising. For the config of 4GB primary cache and 3GB secondary cache, Jemalloc resident stats shows consistently ~0.15GB memory saving; the allocated and active stats show similar memory savings. The CPU usage is almost the same before and after this PR._

To evaluate the issue of memory fragmentations and the benefits of this PR, I conducted two sets of local tests as follows.

**T1**
Keys:       16 bytes each (+ 0 bytes user-defined timestamp)
Values:     100 bytes each (50 bytes after compression)
Entries:    90000000
RawSize:    9956.4 MB (estimated)
FileSize:   5664.8 MB (estimated)

| Test Name | Primary Cache Size (MB) | Compressed Secondary Cache Size (MB) |
| - | - | - |
| T1_3 | 4000 | 4000 |
| T1_4 | 2000 | 3000 |

Populate the DB:
./db_bench --benchmarks=fillrandom --num=90000000 -db=/mem_fragmentation/db_bench_1
Overwrite it to a stable state:
./db_bench --benchmarks=overwrite --num=90000000 -use_existing_db -db=/mem_fragmentation/db_bench_1

Run read tests with differnt cache setting:
T1_3:
MALLOC_CONF="prof:true,prof_stats:true" ../rocksdb/db_bench --benchmarks=seekrandom  --threads=16 --num=90000000 -use_existing_db --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=4000000000 -compressed_secondary_cache_size=4000000000 -use_compressed_secondary_cache -db=/mem_fragmentation/db_bench_1 --print_malloc_stats=true > ~/temp/mem_frag/20220710/jemalloc_stats_json_T1_3_20220710 -duration=1800 &

T1_4:
MALLOC_CONF="prof:true,prof_stats:true" ../rocksdb/db_bench --benchmarks=seekrandom  --threads=16 --num=90000000 -use_existing_db --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=2000000000 -compressed_secondary_cache_size=3000000000 -use_compressed_secondary_cache -db=/mem_fragmentation/db_bench_1 --print_malloc_stats=true > ~/temp/mem_frag/20220710/jemalloc_stats_json_T1_4_20220710 -duration=1800 &

For T1_3 and T1_4, I also conducted the tests before and after this PR. The following table show the important jemalloc stats.

| Test Name | T1_3 | T1_3 after mem defrag | T1_4 | T1_4 after mem defrag |
| - | - | - | - | - |
| allocated (MB)  | 8728 | 8076 | 5518 | 5043 |
| available (MB)  | 8753 | 8092 | 5536 | 5051 |
| external fragmentation rate  | 0.003 | 0.002 | 0.003 | 0.0016 |
| resident (MB)  | 8956 | 8365 | 5655 | 5235 |

**T2**
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     256 bytes each (128 bytes after compression)
Entries:    40000000
RawSize:    10986.3 MB (estimated)
FileSize:   6103.5 MB (estimated)

| Test Name | Primary Cache Size (MB) | Compressed Secondary Cache Size (MB) |
| - | - | - |
| T2_3 | 4000 | 4000 |
| T2_4 | 2000 | 3000 |

Create DB (10GB):
./db_bench -benchmarks=fillrandom -use_direct_reads=true -num=40000000 -key_size=32 -value_size=256 -db=/mem_fragmentation/db_bench_2
Overwrite it to a stable state:
./db_bench --benchmarks=overwrite --num=40000000 -use_existing_db -key_size=32 -value_size=256 -db=/mem_fragmentation/db_bench_2

Run read tests with differnt cache setting:
T2_3:
MALLOC_CONF="prof:true,prof_stats:true" ./db_bench  --benchmarks="mixgraph" -use_direct_io_for_flush_and_compaction=true -use_direct_reads=true -cache_size=4000000000 -compressed_secondary_cache_size=4000000000 -use_compressed_secondary_cache -keyrange_dist_a=14.18 -keyrange_dist_b=-2.917 -keyrange_dist_c=0.0164 -keyrange_dist_d=-0.08082 -keyrange_num=30 -value_k=0.2615 -value_sigma=25.45 -iter_k=2.517 -iter_sigma=14.236 -mix_get_ratio=0.85 -mix_put_ratio=0.14 -mix_seek_ratio=0.01 -sine_mix_rate_interval_milliseconds=5000 -sine_a=1000 -sine_b=0.000073 -sine_d=400000 -reads=80000000 -num=40000000 -key_size=32 -value_size=256 -use_existing_db=true -db=/mem_fragmentation/db_bench_2 --print_malloc_stats=true > ~/temp/mem_frag/jemalloc_stats_T2_3 -duration=1800  &

T2_4:
MALLOC_CONF="prof:true,prof_stats:true" ./db_bench  --benchmarks="mixgraph" -use_direct_io_for_flush_and_compaction=true -use_direct_reads=true -cache_size=2000000000 -compressed_secondary_cache_size=3000000000 -use_compressed_secondary_cache -keyrange_dist_a=14.18 -keyrange_dist_b=-2.917 -keyrange_dist_c=0.0164 -keyrange_dist_d=-0.08082 -keyrange_num=30 -value_k=0.2615 -value_sigma=25.45 -iter_k=2.517 -iter_sigma=14.236 -mix_get_ratio=0.85 -mix_put_ratio=0.14 -mix_seek_ratio=0.01 -sine_mix_rate_interval_milliseconds=5000 -sine_a=1000 -sine_b=0.000073 -sine_d=400000 -reads=80000000 -num=40000000 -key_size=32 -value_size=256 -use_existing_db=true -db=/mem_fragmentation/db_bench_2 --print_malloc_stats=true > ~/temp/mem_frag/jemalloc_stats_T2_4 -duration=1800  &

For T2_3 and T2_4, I also conducted the tests before and after this PR. The following table show the important jemalloc stats.

| Test Name |  T2_3 | T2_3 after mem defrag | T2_4 | T2_4 after mem defrag |
| -  | - | - | - | - |
| allocated (MB)  | 8425 | 8093 | 5426 | 5149 |
| available (MB)  | 8489 | 8138 | 5435 | 5158 |
| external fragmentation rate  | 0.008 | 0.0055 | 0.0017 | 0.0017 |
| resident (MB)  | 8676 | 8392 | 5541 | 5321 |

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10287

Test Plan: Unit tests.

Reviewed By: anand1976

Differential Revision: D37743362

Pulled By: gitbw95

fbshipit-source-id: 0010c5af08addeacc5ebbc4ffe5be882fb1d38ad
This commit is contained in:
Bo Wang 2022-08-02 15:28:11 -07:00 committed by Facebook GitHub Bot
parent bef3127b00
commit 87b82f28a1
4 changed files with 329 additions and 67 deletions

View File

@ -23,6 +23,7 @@
### Behavior Change ### Behavior Change
* Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery. * Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery.
* To minimize the internal fragmentation caused by the variable size of the compressed blocks in `CompressedSecondaryCache`, the original block is split according to the jemalloc bin size in `Insert()` and then merged back in `Lookup()`.
* PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env.
* Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.

View File

@ -5,6 +5,8 @@
#include "cache/compressed_secondary_cache.h" #include "cache/compressed_secondary_cache.h"
#include <algorithm>
#include <cstdint>
#include <memory> #include <memory>
#include "memory/memory_allocator.h" #include "memory/memory_allocator.h"
@ -13,15 +15,6 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace {
void DeletionCallback(const Slice& /*key*/, void* obj) {
delete reinterpret_cast<CacheAllocationPtr*>(obj);
obj = nullptr;
}
} // namespace
CompressedSecondaryCache::CompressedSecondaryCache( CompressedSecondaryCache::CompressedSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit, size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio,
@ -49,25 +42,28 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
return handle; return handle;
} }
CacheAllocationPtr* ptr = CacheValueChunk* handle_value =
reinterpret_cast<CacheAllocationPtr*>(cache_->Value(lru_handle)); reinterpret_cast<CacheValueChunk*>(cache_->Value(lru_handle));
void* value = nullptr; size_t handle_value_charge{0};
size_t charge = 0; CacheAllocationPtr merged_value =
Status s; MergeChunksIntoValue(handle_value, handle_value_charge);
Status s;
void* value{nullptr};
size_t charge{0};
if (cache_options_.compression_type == kNoCompression) { if (cache_options_.compression_type == kNoCompression) {
s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); s = create_cb(merged_value.get(), handle_value_charge, &value, &charge);
} else { } else {
UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context, UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(), UncompressionDict::GetEmptyDict(),
cache_options_.compression_type); cache_options_.compression_type);
size_t uncompressed_size = 0; size_t uncompressed_size{0};
CacheAllocationPtr uncompressed; CacheAllocationPtr uncompressed;
uncompressed = UncompressData( uncompressed = UncompressData(uncompression_info, (char*)merged_value.get(),
uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), handle_value_charge, &uncompressed_size,
&uncompressed_size, cache_options_.compress_format_version, cache_options_.compress_format_version,
cache_options_.memory_allocator.get()); cache_options_.memory_allocator.get());
if (!uncompressed) { if (!uncompressed) {
@ -104,7 +100,7 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
if (cache_options_.compression_type != kNoCompression) { if (cache_options_.compression_type != kNoCompression) {
CompressionOptions compression_opts; CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type); CompressionContext compression_context(cache_options_.compression_type);
uint64_t sample_for_compression = 0; uint64_t sample_for_compression{0};
CompressionInfo compression_info( CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(), compression_opts, compression_context, CompressionDict::GetEmptyDict(),
cache_options_.compression_type, sample_for_compression); cache_options_.compression_type, sample_for_compression);
@ -118,14 +114,12 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
} }
val = Slice(compressed_val); val = Slice(compressed_val);
size = compressed_val.size();
ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
memcpy(ptr.get(), compressed_val.data(), size);
} }
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); size_t charge{0};
CacheValueChunk* value_chunks_head =
return cache_->Insert(key, buf, size, DeletionCallback); SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, charge, DeletionCallback);
} }
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
@ -133,7 +127,7 @@ void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
std::string CompressedSecondaryCache::GetPrintableOptions() const { std::string CompressedSecondaryCache::GetPrintableOptions() const {
std::string ret; std::string ret;
ret.reserve(20000); ret.reserve(20000);
const int kBufferSize = 200; const int kBufferSize{200};
char buffer[kBufferSize]; char buffer[kBufferSize];
ret.append(cache_->GetPrintableOptions()); ret.append(cache_->GetPrintableOptions());
snprintf(buffer, kBufferSize, " compression_type : %s\n", snprintf(buffer, kBufferSize, " compression_type : %s\n",
@ -145,6 +139,87 @@ std::string CompressedSecondaryCache::GetPrintableOptions() const {
return ret; return ret;
} }
CompressedSecondaryCache::CacheValueChunk*
CompressedSecondaryCache::SplitValueIntoChunks(
const Slice& value, const CompressionType compression_type,
size_t& charge) {
assert(!value.empty());
const char* src_ptr = value.data();
size_t src_size{value.size()};
CacheValueChunk dummy_head = CacheValueChunk();
CacheValueChunk* current_chunk = &dummy_head;
CacheAllocationPtr ptr;
// Do not split when value size is large or there is no compression.
size_t predicted_chunk_size{0};
size_t actual_chunk_size{0};
size_t tmp_size{0};
while (src_size > 0) {
predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
auto upper =
std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
predicted_chunk_size);
// Do not split when value size is too small, too large, close to a bin
// size, or there is no compression.
if (upper == malloc_bin_sizes_.begin() ||
upper == malloc_bin_sizes_.end() ||
*upper - predicted_chunk_size < malloc_bin_sizes_.front() ||
compression_type == kNoCompression) {
tmp_size = predicted_chunk_size;
} else {
tmp_size = *(--upper);
}
ptr = AllocateBlock(tmp_size, cache_options_.memory_allocator.get());
current_chunk->next = reinterpret_cast<CacheValueChunk*>(ptr.release());
current_chunk = current_chunk->next;
actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
memcpy(current_chunk->data, src_ptr, actual_chunk_size);
current_chunk->size = actual_chunk_size;
src_ptr += actual_chunk_size;
src_size -= actual_chunk_size;
charge += tmp_size;
}
current_chunk->next = nullptr;
return dummy_head.next;
}
CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue(
const void* chunks_head, size_t& charge) {
const CacheValueChunk* head =
reinterpret_cast<const CacheValueChunk*>(chunks_head);
const CacheValueChunk* current_chunk = head;
charge = 0;
while (current_chunk != nullptr) {
charge += current_chunk->size;
current_chunk = current_chunk->next;
}
CacheAllocationPtr ptr =
AllocateBlock(charge, cache_options_.memory_allocator.get());
current_chunk = head;
size_t pos{0};
while (current_chunk != nullptr) {
memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size);
pos += current_chunk->size;
current_chunk = current_chunk->next;
}
return ptr;
}
void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/,
void* obj) {
CacheValueChunk* chunks_head = reinterpret_cast<CacheValueChunk*>(obj);
while (chunks_head != nullptr) {
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
}
obj = nullptr;
}
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit, size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio,

View File

@ -5,6 +5,8 @@
#pragma once #pragma once
#include <array>
#include <cstddef>
#include <memory> #include <memory>
#include "cache/lru_cache.h" #include "cache/lru_cache.h"
@ -58,7 +60,7 @@ class CompressedSecondaryCache : public SecondaryCache {
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr, std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
bool use_adaptive_mutex = kDefaultToAdaptiveMutex, bool use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy metadata_charge_policy = CacheMetadataChargePolicy metadata_charge_policy =
kDontChargeCacheMetadata, kDefaultCacheMetadataChargePolicy,
CompressionType compression_type = CompressionType::kLZ4Compression, CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2); uint32_t compress_format_version = 2);
virtual ~CompressedSecondaryCache() override; virtual ~CompressedSecondaryCache() override;
@ -79,6 +81,36 @@ class CompressedSecondaryCache : public SecondaryCache {
std::string GetPrintableOptions() const override; std::string GetPrintableOptions() const override;
private: private:
friend class CompressedSecondaryCacheTest;
static constexpr std::array<uint16_t, 33> malloc_bin_sizes_{
32, 64, 96, 128, 160, 192, 224, 256, 320, 384, 448,
512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072,
3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 32768};
struct CacheValueChunk {
// TODO try "CacheAllocationPtr next;".
CacheValueChunk* next;
size_t size;
// Beginning of the chunk data (MUST BE THE LAST FIELD IN THIS STRUCT!)
char data[1];
void Free() { delete[] reinterpret_cast<char*>(this); }
};
// Split value into chunks to better fit into jemalloc bins. The chunks
// are stored in CacheValueChunk and extra charge is needed for each chunk,
// so the cache charge is recalculated here.
CacheValueChunk* SplitValueIntoChunks(const Slice& value,
const CompressionType compression_type,
size_t& charge);
// After merging chunks, the extra charge for each chunk is removed, so
// the charge is recalculated.
CacheAllocationPtr MergeChunksIntoValue(const void* chunks_head,
size_t& charge);
// An implementation of Cache::DeleterFn.
static void DeletionCallback(const Slice& /*key*/, void* obj);
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
CompressedSecondaryCacheOptions cache_options_; CompressedSecondaryCacheOptions cache_options_;
}; };

View File

@ -7,9 +7,12 @@
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <iterator>
#include "cache/lru_cache.h"
#include "memory/jemalloc_nodump_allocator.h" #include "memory/jemalloc_nodump_allocator.h"
#include "memory/memory_allocator.h" #include "memory/memory_allocator.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -136,7 +139,6 @@ class CompressedSecondaryCacheTest : public testing::Test {
CompressedSecondaryCacheOptions opts; CompressedSecondaryCacheOptions opts;
opts.capacity = 2048; opts.capacity = 2048;
opts.num_shard_bits = 0; opts.num_shard_bits = 0;
opts.metadata_charge_policy = kDontChargeCacheMetadata;
if (sec_cache_is_compressed) { if (sec_cache_is_compressed) {
if (!LZ4_Supported()) { if (!LZ4_Supported()) {
@ -162,6 +164,8 @@ class CompressedSecondaryCacheTest : public testing::Test {
} }
std::shared_ptr<SecondaryCache> sec_cache = std::shared_ptr<SecondaryCache> sec_cache =
NewCompressedSecondaryCache(opts); NewCompressedSecondaryCache(opts);
BasicTestHelper(sec_cache);
} }
void FailsTest(bool sec_cache_is_compressed) { void FailsTest(bool sec_cache_is_compressed) {
@ -177,7 +181,6 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.capacity = 1100; secondary_cache_opts.capacity = 1100;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> sec_cache = std::shared_ptr<SecondaryCache> sec_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
@ -235,34 +238,35 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.capacity = 2300; secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr, LRUCacheOptions lru_cache_opts(1300, 0, /*_strict_capacity_limit=*/false,
kDefaultToAdaptiveMutex, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata); kDefaultCacheMetadataChargePolicy);
lru_cache_opts.secondary_cache = secondary_cache; lru_cache_opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(lru_cache_opts); std::shared_ptr<Cache> cache = NewLRUCache(lru_cache_opts);
std::shared_ptr<Statistics> stats = CreateDBStatistics(); std::shared_ptr<Statistics> stats = CreateDBStatistics();
Random rnd(301); Random rnd(301);
std::string str1 = rnd.RandomString(1010); std::string str1;
test::CompressibleString(&rnd, 0.5, 1001, &str1);
std::string str1_clone{str1}; std::string str1_clone{str1};
TestItem* item1 = new TestItem(str1.data(), str1.length()); TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length())); str1.length()));
std::string str2 = rnd.RandomString(1020); std::string str2;
test::CompressibleString(&rnd, 0.5, 1012, &str2);
TestItem* item2 = new TestItem(str2.data(), str2.length()); TestItem* item2 = new TestItem(str2.data(), str2.length());
// After Insert, lru cache contains k2 and secondary cache contains k1. // After Insert, cache contains k2 and secondary cache contains k1.
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
str2.length())); str2.length()));
std::string str3 = rnd.RandomString(1020); std::string str3;
test::CompressibleString(&rnd, 0.5, 1024, &str3);
TestItem* item3 = new TestItem(str3.data(), str3.length()); TestItem* item3 = new TestItem(str3.data(), str3.length());
// After Insert, lru cache contains k3 and secondary cache contains k1 and // After Insert, cache contains k3 and secondary cache contains k1 and k2.
// k2
ASSERT_OK(cache->Insert("k3", item3, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k3", item3, &CompressedSecondaryCacheTest::helper_,
str3.length())); str3.length()));
@ -287,7 +291,6 @@ class CompressedSecondaryCacheTest : public testing::Test {
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true, test_item_creator, Cache::Priority::LOW, true,
stats.get()); stats.get());
ASSERT_NE(handle, nullptr); ASSERT_NE(handle, nullptr);
TestItem* val1_1 = static_cast<TestItem*>(cache->Value(handle)); TestItem* val1_1 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val1_1, nullptr); ASSERT_NE(val1_1, nullptr);
@ -316,19 +319,20 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.compression_type = CompressionType::kNoCompression; secondary_cache_opts.compression_type = CompressionType::kNoCompression;
} }
secondary_cache_opts.capacity = 2048; secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/false, 0.5,
kDontChargeCacheMetadata); nullptr, kDefaultToAdaptiveMutex,
kDefaultCacheMetadataChargePolicy);
opts.secondary_cache = secondary_cache; opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts); std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301); Random rnd(301);
std::string str1 = rnd.RandomString(1020); std::string str1;
test::CompressibleString(&rnd, 0.5, 1001, &str1);
auto item1 = auto item1 =
std::unique_ptr<TestItem>(new TestItem(str1.data(), str1.length())); std::unique_ptr<TestItem>(new TestItem(str1.data(), str1.length()));
ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length()));
@ -361,25 +365,28 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.compression_type = CompressionType::kNoCompression; secondary_cache_opts.compression_type = CompressionType::kNoCompression;
} }
secondary_cache_opts.capacity = 2048; secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/false, 0.5,
kDontChargeCacheMetadata); nullptr, kDefaultToAdaptiveMutex,
kDefaultCacheMetadataChargePolicy);
opts.secondary_cache = secondary_cache; opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts); std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301); Random rnd(301);
std::string str1 = rnd.RandomString(1020); std::string str1;
test::CompressibleString(&rnd, 0.5, 1001, &str1);
TestItem* item1 = new TestItem(str1.data(), str1.length()); TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, ASSERT_OK(cache->Insert("k1", item1,
&CompressedSecondaryCacheTest::helper_fail_, &CompressedSecondaryCacheTest::helper_fail_,
str1.length())); str1.length()));
std::string str2 = rnd.RandomString(1020);
std::string str2;
test::CompressibleString(&rnd, 0.5, 1002, &str2);
TestItem* item2 = new TestItem(str2.data(), str2.length()); TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache. // k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, ASSERT_OK(cache->Insert("k2", item2,
@ -417,25 +424,27 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.compression_type = CompressionType::kNoCompression; secondary_cache_opts.compression_type = CompressionType::kNoCompression;
} }
secondary_cache_opts.capacity = 2048; secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/false, 0.5,
kDontChargeCacheMetadata); nullptr, kDefaultToAdaptiveMutex,
kDefaultCacheMetadataChargePolicy);
opts.secondary_cache = secondary_cache; opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts); std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301); Random rnd(301);
std::string str1 = rnd.RandomString(1020); std::string str1;
test::CompressibleString(&rnd, 0.5, 1001, &str1);
TestItem* item1 = new TestItem(str1.data(), str1.length()); TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length())); str1.length()));
std::string str2 = rnd.RandomString(1020); std::string str2;
test::CompressibleString(&rnd, 0.5, 1002, &str2);
TestItem* item2 = new TestItem(str2.data(), str2.length()); TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache. // k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
@ -473,24 +482,28 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.compression_type = CompressionType::kNoCompression; secondary_cache_opts.compression_type = CompressionType::kNoCompression;
} }
secondary_cache_opts.capacity = 2048; secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); kDefaultToAdaptiveMutex,
kDefaultCacheMetadataChargePolicy);
opts.secondary_cache = secondary_cache; opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts); std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301); Random rnd(301);
std::string str1 = rnd.RandomString(1020); std::string str1;
test::CompressibleString(&rnd, 0.5, 1001, &str1);
TestItem* item1 = new TestItem(str1.data(), str1.length()); TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length())); str1.length()));
std::string str2 = rnd.RandomString(1020);
std::string str2;
test::CompressibleString(&rnd, 0.5, 1002, &str2);
std::string str2_clone{str2};
TestItem* item2 = new TestItem(str2.data(), str2.length()); TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache. // k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
@ -501,8 +514,9 @@ class CompressedSecondaryCacheTest : public testing::Test {
test_item_creator, Cache::Priority::LOW, true); test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle2, nullptr); ASSERT_NE(handle2, nullptr);
cache->Release(handle2); cache->Release(handle2);
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed // k1 promotion should fail because cache is at capacity and
// strict_capacity_limit is true, but the lookup should still succeed.
Cache::Handle* handle1; Cache::Handle* handle1;
handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true); test_item_creator, Cache::Priority::LOW, true);
@ -519,6 +533,134 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache.reset(); secondary_cache.reset();
} }
void SplitValueIntoChunksTest() {
JemallocAllocatorOptions jopts;
std::shared_ptr<MemoryAllocator> allocator;
std::string msg;
if (JemallocNodumpAllocator::IsSupported(&msg)) {
Status s = NewJemallocNodumpAllocator(jopts, &allocator);
if (!s.ok()) {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
} else {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk;
std::unique_ptr<CompressedSecondaryCache> sec_cache =
std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5,
allocator);
Random rnd(301);
// 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split.
size_t str_size{10000};
std::string str = rnd.RandomString(static_cast<int>(str_size));
size_t charge{0};
CacheValueChunk* chunks_head =
sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge);
ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1));
CacheValueChunk* current_chunk = chunks_head;
ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1);
current_chunk = current_chunk->next;
ASSERT_EQ(current_chunk->size, 1792 - sizeof(CacheValueChunk) + 1);
current_chunk = current_chunk->next;
ASSERT_EQ(current_chunk->size, 62);
sec_cache->DeletionCallback("dummy", chunks_head);
}
void MergeChunksIntoValueTest() {
JemallocAllocatorOptions jopts;
std::shared_ptr<MemoryAllocator> allocator;
std::string msg;
if (JemallocNodumpAllocator::IsSupported(&msg)) {
Status s = NewJemallocNodumpAllocator(jopts, &allocator);
if (!s.ok()) {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
} else {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk;
Random rnd(301);
size_t size1{2048};
std::string str1 = rnd.RandomString(static_cast<int>(size1));
CacheAllocationPtr ptr =
AllocateBlock(sizeof(CacheValueChunk) - 1 + size1, allocator.get());
CacheValueChunk* current_chunk =
reinterpret_cast<CacheValueChunk*>(ptr.release());
CacheValueChunk* chunks_head = current_chunk;
memcpy(current_chunk->data, str1.data(), size1);
current_chunk->size = size1;
size_t size2{256};
std::string str2 = rnd.RandomString(static_cast<int>(size2));
ptr = AllocateBlock(sizeof(CacheValueChunk) - 1 + size2, allocator.get());
current_chunk->next = reinterpret_cast<CacheValueChunk*>(ptr.release());
current_chunk = current_chunk->next;
memcpy(current_chunk->data, str2.data(), size2);
current_chunk->size = size2;
size_t size3{31};
std::string str3 = rnd.RandomString(static_cast<int>(size3));
ptr = AllocateBlock(sizeof(CacheValueChunk) - 1 + size3, allocator.get());
current_chunk->next = reinterpret_cast<CacheValueChunk*>(ptr.release());
current_chunk = current_chunk->next;
memcpy(current_chunk->data, str3.data(), size3);
current_chunk->size = size3;
current_chunk->next = nullptr;
std::string str = str1 + str2 + str3;
std::unique_ptr<CompressedSecondaryCache> sec_cache =
std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5,
allocator);
size_t charge{0};
CacheAllocationPtr value =
sec_cache->MergeChunksIntoValue(chunks_head, charge);
ASSERT_EQ(charge, size1 + size2 + size3);
std::string value_str{value.get(), charge};
ASSERT_EQ(strcmp(value_str.data(), str.data()), 0);
sec_cache->DeletionCallback("dummy", chunks_head);
}
void SplictValueAndMergeChunksTest() {
JemallocAllocatorOptions jopts;
std::shared_ptr<MemoryAllocator> allocator;
std::string msg;
if (JemallocNodumpAllocator::IsSupported(&msg)) {
Status s = NewJemallocNodumpAllocator(jopts, &allocator);
if (!s.ok()) {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
} else {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk;
std::unique_ptr<CompressedSecondaryCache> sec_cache =
std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5,
allocator);
Random rnd(301);
// 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split.
size_t str_size{10000};
std::string str = rnd.RandomString(static_cast<int>(str_size));
size_t charge{0};
CacheValueChunk* chunks_head =
sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge);
ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1));
CacheAllocationPtr value =
sec_cache->MergeChunksIntoValue(chunks_head, charge);
ASSERT_EQ(charge, str_size);
std::string value_str{value.get(), charge};
ASSERT_EQ(strcmp(value_str.data(), str.data()), 0);
sec_cache->DeletionCallback("dummy", chunks_head);
}
private: private:
bool fail_create_; bool fail_create_;
}; };
@ -639,6 +781,18 @@ TEST_F(CompressedSecondaryCacheTest,
IntegrationFullCapacityTest(true); IntegrationFullCapacityTest(true);
} }
TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) {
SplitValueIntoChunksTest();
}
TEST_F(CompressedSecondaryCacheTest, MergeChunksIntoValueTest) {
MergeChunksIntoValueTest();
}
TEST_F(CompressedSecondaryCacheTest, SplictValueAndMergeChunksTest) {
SplictValueAndMergeChunksTest();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {