rocksdb/cache/compressed_secondary_cache.cc
Changyu Bi c2aad555c3 Add CompressionOptions::checksum for enabling ZSTD checksum (#11666)
Summary:
Optionally enable zstd checksum flag (d857369028/lib/zstd.h (L428)) to detect corruption during decompression. Main changes are in compression.h:
* User can set CompressionOptions::checksum to true to enable this feature.
* We enable this feature in ZSTD by setting the checksum flag in ZSTD compression context: `ZSTD_CCtx`.
* Uses `ZSTD_compress2()` to do compression since it supports frame parameter like the checksum flag. Compression level is also set in compression context as a flag.
* Error handling during decompression to propagate error message from ZSTD.
* Updated microbench to test read performance impact.

About compatibility, the current compression decoders should continue to work with the data created by the new compression API `ZSTD_compress2()`: https://github.com/facebook/zstd/issues/3711.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11666

Test Plan:
* Existing unit tests for zstd compression
* Add unit test `DBTest2.ZSTDChecksum` to test the corruption case
* Manually tested that compression levels, parallel compression, dictionary compression, index compression all work with the new ZSTD_compress2() API.
* Manually tested with `sst_dump --command=recompress` that different compression levels and dictionary compression settings all work.
* Manually tested compiling with older versions of ZSTD: v1.3.8, v1.1.0, v0.6.2.
* Perf impact: from public benchmark data: http://fastcompression.blogspot.com/2019/03/presenting-xxh3.html for checksum and https://github.com/facebook/zstd#benchmarks, if decompression is 1700MB/s and checksum computation is 70000MB/s, checksum computation is an additional ~2.4% time for decompression. Compression is slower and checksumming should be less noticeable.
* Microbench:
```
TEST_TMPDIR=/dev/shm ./branch_db_basic_bench --benchmark_filter=DBGet/comp_style:0/max_data:1048576/per_key_size:256/enable_statistics:0/negative_query:0/enable_filter:0/mmap:0/compression_type:7/compression_checksum:1/no_blockcache:1/iterations:10000/threads:1 --benchmark_repetitions=100

Min out of 100 runs:
Main:
10390 10436 10456 10484 10499 10535 10544 10545 10565 10568

After this PR, checksum=false
10285 10397 10503 10508 10515 10557 10562 10635 10640 10660

After this PR, checksum=true
10827 10876 10925 10949 10971 11052 11061 11063 11100 11109
```
* db_bench:
```
Write perf
TEST_TMPDIR=/dev/shm/ ./db_bench_ichecksum --benchmarks=fillseq[-X10] --compression_type=zstd --num=10000000 --compression_checksum=..

[FillSeq checksum=0]
fillseq [AVG    10 runs] : 281635 (± 31711) ops/sec;   31.2 (± 3.5) MB/sec
fillseq [MEDIAN 10 runs] : 294027 ops/sec;   32.5 MB/sec

[FillSeq checksum=1]
fillseq [AVG    10 runs] : 286961 (± 34700) ops/sec;   31.7 (± 3.8) MB/sec
fillseq [MEDIAN 10 runs] : 283278 ops/sec;   31.3 MB/sec

Read perf
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=readrandom[-X20] --num=100000000 --reads=1000000 --use_existing_db=true --readonly=1

[Readrandom checksum=1]
readrandom [AVG    20 runs] : 360928 (± 3579) ops/sec;    4.0 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 362468 ops/sec;    4.0 MB/sec

[Readrandom checksum=0]
readrandom [AVG    20 runs] : 380365 (± 2384) ops/sec;    4.2 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 379800 ops/sec;    4.2 MB/sec

Compression
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=compress[-X20] --compression_type=zstd --num=100000000 --compression_checksum=1

checksum=1
compress [AVG    20 runs] : 54074 (± 634) ops/sec;  211.2 (± 2.5) MB/sec
compress [MEDIAN 20 runs] : 54396 ops/sec;  212.5 MB/sec

checksum=0
compress [AVG    20 runs] : 54598 (± 393) ops/sec;  213.3 (± 1.5) MB/sec
compress [MEDIAN 20 runs] : 54592 ops/sec;  213.3 MB/sec

Decompression:
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=uncompress[-X20] --compression_type=zstd --compression_checksum=1

checksum = 0
uncompress [AVG    20 runs] : 167499 (± 962) ops/sec;  654.3 (± 3.8) MB/sec
uncompress [MEDIAN 20 runs] : 167210 ops/sec;  653.2 MB/sec
checksum = 1
uncompress [AVG    20 runs] : 167980 (± 924) ops/sec;  656.2 (± 3.6) MB/sec
uncompress [MEDIAN 20 runs] : 168465 ops/sec;  658.1 MB/sec
```

Reviewed By: ajkr

Differential Revision: D48019378

Pulled By: cbi42

fbshipit-source-id: 674120c6e1853c2ced1436ac8138559d0204feba
2023-08-18 15:01:59 -07:00

323 lines
11 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/compressed_secondary_cache.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include "memory/memory_allocator_impl.h"
#include "monitoring/perf_context_imp.h"
#include "util/compression.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
CompressedSecondaryCache::CompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts)
: cache_(opts.LRUCacheOptions::MakeSharedCache()),
cache_options_(opts),
cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache_))) {}
CompressedSecondaryCache::~CompressedSecondaryCache() {
assert(cache_res_mgr_->GetTotalReservedCacheSize() == 0);
}
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
const Slice& key, const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase,
bool& kept_in_sec_cache) {
assert(helper);
std::unique_ptr<SecondaryCacheResultHandle> handle;
kept_in_sec_cache = false;
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return nullptr;
}
void* handle_value = cache_->Value(lru_handle);
if (handle_value == nullptr) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
return nullptr;
}
CacheAllocationPtr* ptr{nullptr};
CacheAllocationPtr merged_value;
size_t handle_value_charge{0};
if (cache_options_.enable_custom_split_merge) {
CacheValueChunk* value_chunk_ptr =
reinterpret_cast<CacheValueChunk*>(handle_value);
merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge);
ptr = &merged_value;
} else {
ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value);
handle_value_charge = cache_->GetCharge(lru_handle);
}
MemoryAllocator* allocator = cache_options_.memory_allocator.get();
Status s;
Cache::ObjectPtr value{nullptr};
size_t charge{0};
if (cache_options_.compression_type == kNoCompression ||
cache_options_.do_not_compress_roles.Contains(helper->role)) {
s = helper->create_cb(Slice(ptr->get(), handle_value_charge),
create_context, allocator, &value, &charge);
} else {
UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
size_t uncompressed_size{0};
CacheAllocationPtr uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), handle_value_charge,
&uncompressed_size, cache_options_.compress_format_version, allocator);
if (!uncompressed) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
return nullptr;
}
s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size),
create_context, allocator, &value, &charge);
}
if (!s.ok()) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
return nullptr;
}
if (advise_erase) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
// Insert a dummy handle.
cache_
->Insert(key, /*obj=*/nullptr,
GetHelper(cache_options_.enable_custom_split_merge),
/*charge=*/0)
.PermitUncheckedError();
} else {
kept_in_sec_cache = true;
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
}
handle.reset(new CompressedSecondaryCacheResultHandle(value, charge));
return handle;
}
Status CompressedSecondaryCache::Insert(const Slice& key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
bool force_insert) {
if (value == nullptr) {
return Status::InvalidArgument();
}
auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
if (!force_insert) {
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
// Insert a dummy handle if the handle is evicted for the first time.
return cache_->Insert(key, /*obj=*/nullptr, internal_helper,
/*charge=*/0);
} else {
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
}
}
size_t size = (*helper->size_cb)(value);
CacheAllocationPtr ptr =
AllocateBlock(size, cache_options_.memory_allocator.get());
Status s = (*helper->saveto_cb)(value, 0, size, ptr.get());
if (!s.ok()) {
return s;
}
Slice val(ptr.get(), size);
std::string compressed_val;
if (cache_options_.compression_type != kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type,
compression_opts);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
cache_options_.compression_type, sample_for_compression);
bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
if (!success) {
return Status::Corruption("Error compressing value.");
}
val = Slice(compressed_val);
size = compressed_val.size();
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size);
if (!cache_options_.enable_custom_split_merge) {
ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
memcpy(ptr.get(), compressed_val.data(), size);
}
}
PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
if (cache_options_.enable_custom_split_merge) {
size_t charge{0};
CacheValueChunk* value_chunks_head =
SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, internal_helper, charge);
} else {
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, internal_helper, size);
}
}
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
MutexLock l(&capacity_mutex_);
cache_options_.capacity = capacity;
cache_->SetCapacity(capacity);
return Status::OK();
}
Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
MutexLock l(&capacity_mutex_);
capacity = cache_options_.capacity;
return Status::OK();
}
std::string CompressedSecondaryCache::GetPrintableOptions() const {
std::string ret;
ret.reserve(20000);
const int kBufferSize{200};
char buffer[kBufferSize];
ret.append(cache_->GetPrintableOptions());
snprintf(buffer, kBufferSize, " compression_type : %s\n",
CompressionTypeToString(cache_options_.compression_type).c_str());
ret.append(buffer);
snprintf(buffer, kBufferSize, " compress_format_version : %d\n",
cache_options_.compress_format_version);
ret.append(buffer);
return ret;
}
CompressedSecondaryCache::CacheValueChunk*
CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
CompressionType compression_type,
size_t& charge) {
assert(!value.empty());
const char* src_ptr = value.data();
size_t src_size{value.size()};
CacheValueChunk dummy_head = CacheValueChunk();
CacheValueChunk* current_chunk = &dummy_head;
// Do not split when value size is large or there is no compression.
size_t predicted_chunk_size{0};
size_t actual_chunk_size{0};
size_t tmp_size{0};
while (src_size > 0) {
predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
auto upper =
std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
predicted_chunk_size);
// Do not split when value size is too small, too large, close to a bin
// size, or there is no compression.
if (upper == malloc_bin_sizes_.begin() ||
upper == malloc_bin_sizes_.end() ||
*upper - predicted_chunk_size < malloc_bin_sizes_.front() ||
compression_type == kNoCompression) {
tmp_size = predicted_chunk_size;
} else {
tmp_size = *(--upper);
}
CacheValueChunk* new_chunk =
reinterpret_cast<CacheValueChunk*>(new char[tmp_size]);
current_chunk->next = new_chunk;
current_chunk = current_chunk->next;
actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
memcpy(current_chunk->data, src_ptr, actual_chunk_size);
current_chunk->size = actual_chunk_size;
src_ptr += actual_chunk_size;
src_size -= actual_chunk_size;
charge += tmp_size;
}
current_chunk->next = nullptr;
return dummy_head.next;
}
CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue(
const void* chunks_head, size_t& charge) {
const CacheValueChunk* head =
reinterpret_cast<const CacheValueChunk*>(chunks_head);
const CacheValueChunk* current_chunk = head;
charge = 0;
while (current_chunk != nullptr) {
charge += current_chunk->size;
current_chunk = current_chunk->next;
}
CacheAllocationPtr ptr =
AllocateBlock(charge, cache_options_.memory_allocator.get());
current_chunk = head;
size_t pos{0};
while (current_chunk != nullptr) {
memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size);
pos += current_chunk->size;
current_chunk = current_chunk->next;
}
return ptr;
}
const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
bool enable_custom_split_merge) const {
if (enable_custom_split_merge) {
static const Cache::CacheItemHelper kHelper{
CacheEntryRole::kMisc,
[](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) {
CacheValueChunk* chunks_head = static_cast<CacheValueChunk*>(obj);
while (chunks_head != nullptr) {
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
obj = nullptr;
};
}};
return &kHelper;
} else {
static const Cache::CacheItemHelper kHelper{
CacheEntryRole::kMisc,
[](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) {
delete static_cast<CacheAllocationPtr*>(obj);
obj = nullptr;
}};
return &kHelper;
}
}
std::shared_ptr<SecondaryCache>
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this);
}
Status CompressedSecondaryCache::Deflate(size_t decrease) {
return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true);
}
Status CompressedSecondaryCache::Inflate(size_t increase) {
return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false);
}
} // namespace ROCKSDB_NAMESPACE