mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 20:43:57 +00:00
c2aad555c3
Summary:
Optionally enable zstd checksum flag (d857369028/lib/zstd.h (L428)
) to detect corruption during decompression. Main changes are in compression.h:
* User can set CompressionOptions::checksum to true to enable this feature.
* We enable this feature in ZSTD by setting the checksum flag in ZSTD compression context: `ZSTD_CCtx`.
* Uses `ZSTD_compress2()` to do compression since it supports frame parameter like the checksum flag. Compression level is also set in compression context as a flag.
* Error handling during decompression to propagate error message from ZSTD.
* Updated microbench to test read performance impact.
About compatibility, the current compression decoders should continue to work with the data created by the new compression API `ZSTD_compress2()`: https://github.com/facebook/zstd/issues/3711.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/11666
Test Plan:
* Existing unit tests for zstd compression
* Add unit test `DBTest2.ZSTDChecksum` to test the corruption case
* Manually tested that compression levels, parallel compression, dictionary compression, index compression all work with the new ZSTD_compress2() API.
* Manually tested with `sst_dump --command=recompress` that different compression levels and dictionary compression settings all work.
* Manually tested compiling with older versions of ZSTD: v1.3.8, v1.1.0, v0.6.2.
* Perf impact: from public benchmark data: http://fastcompression.blogspot.com/2019/03/presenting-xxh3.html for checksum and https://github.com/facebook/zstd#benchmarks, if decompression is 1700MB/s and checksum computation is 70000MB/s, checksum computation is an additional ~2.4% time for decompression. Compression is slower and checksumming should be less noticeable.
* Microbench:
```
TEST_TMPDIR=/dev/shm ./branch_db_basic_bench --benchmark_filter=DBGet/comp_style:0/max_data:1048576/per_key_size:256/enable_statistics:0/negative_query:0/enable_filter:0/mmap:0/compression_type:7/compression_checksum:1/no_blockcache:1/iterations:10000/threads:1 --benchmark_repetitions=100
Min out of 100 runs:
Main:
10390 10436 10456 10484 10499 10535 10544 10545 10565 10568
After this PR, checksum=false
10285 10397 10503 10508 10515 10557 10562 10635 10640 10660
After this PR, checksum=true
10827 10876 10925 10949 10971 11052 11061 11063 11100 11109
```
* db_bench:
```
Write perf
TEST_TMPDIR=/dev/shm/ ./db_bench_ichecksum --benchmarks=fillseq[-X10] --compression_type=zstd --num=10000000 --compression_checksum=..
[FillSeq checksum=0]
fillseq [AVG 10 runs] : 281635 (± 31711) ops/sec; 31.2 (± 3.5) MB/sec
fillseq [MEDIAN 10 runs] : 294027 ops/sec; 32.5 MB/sec
[FillSeq checksum=1]
fillseq [AVG 10 runs] : 286961 (± 34700) ops/sec; 31.7 (± 3.8) MB/sec
fillseq [MEDIAN 10 runs] : 283278 ops/sec; 31.3 MB/sec
Read perf
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=readrandom[-X20] --num=100000000 --reads=1000000 --use_existing_db=true --readonly=1
[Readrandom checksum=1]
readrandom [AVG 20 runs] : 360928 (± 3579) ops/sec; 4.0 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 362468 ops/sec; 4.0 MB/sec
[Readrandom checksum=0]
readrandom [AVG 20 runs] : 380365 (± 2384) ops/sec; 4.2 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 379800 ops/sec; 4.2 MB/sec
Compression
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=compress[-X20] --compression_type=zstd --num=100000000 --compression_checksum=1
checksum=1
compress [AVG 20 runs] : 54074 (± 634) ops/sec; 211.2 (± 2.5) MB/sec
compress [MEDIAN 20 runs] : 54396 ops/sec; 212.5 MB/sec
checksum=0
compress [AVG 20 runs] : 54598 (± 393) ops/sec; 213.3 (± 1.5) MB/sec
compress [MEDIAN 20 runs] : 54592 ops/sec; 213.3 MB/sec
Decompression:
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=uncompress[-X20] --compression_type=zstd --compression_checksum=1
checksum = 0
uncompress [AVG 20 runs] : 167499 (± 962) ops/sec; 654.3 (± 3.8) MB/sec
uncompress [MEDIAN 20 runs] : 167210 ops/sec; 653.2 MB/sec
checksum = 1
uncompress [AVG 20 runs] : 167980 (± 924) ops/sec; 656.2 (± 3.6) MB/sec
uncompress [MEDIAN 20 runs] : 168465 ops/sec; 658.1 MB/sec
```
Reviewed By: ajkr
Differential Revision: D48019378
Pulled By: cbi42
fbshipit-source-id: 674120c6e1853c2ced1436ac8138559d0204feba
123 lines
3.6 KiB
C++
123 lines
3.6 KiB
C++
// Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "util/compression.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
StreamingCompress* StreamingCompress::Create(CompressionType compression_type,
|
|
const CompressionOptions& opts,
|
|
uint32_t compress_format_version,
|
|
size_t max_output_len) {
|
|
switch (compression_type) {
|
|
case kZSTD: {
|
|
if (!ZSTD_Streaming_Supported()) {
|
|
return nullptr;
|
|
}
|
|
return new ZSTDStreamingCompress(opts, compress_format_version,
|
|
max_output_len);
|
|
}
|
|
default:
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
StreamingUncompress* StreamingUncompress::Create(
|
|
CompressionType compression_type, uint32_t compress_format_version,
|
|
size_t max_output_len) {
|
|
switch (compression_type) {
|
|
case kZSTD: {
|
|
if (!ZSTD_Streaming_Supported()) {
|
|
return nullptr;
|
|
}
|
|
return new ZSTDStreamingUncompress(compress_format_version,
|
|
max_output_len);
|
|
}
|
|
default:
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
|
|
char* output, size_t* output_pos) {
|
|
assert(input != nullptr && output != nullptr && output_pos != nullptr);
|
|
*output_pos = 0;
|
|
// Don't need to compress an empty input
|
|
if (input_size == 0) {
|
|
return 0;
|
|
}
|
|
#ifndef ZSTD_ADVANCED
|
|
(void)input;
|
|
(void)input_size;
|
|
(void)output;
|
|
return -1;
|
|
#else
|
|
if (input_buffer_.src == nullptr || input_buffer_.src != input) {
|
|
// New input
|
|
// Catch errors where the previous input was not fully decompressed.
|
|
assert(input_buffer_.pos == input_buffer_.size);
|
|
input_buffer_ = {input, input_size, /*pos=*/0};
|
|
} else if (input_buffer_.src == input) {
|
|
// Same input, not fully compressed.
|
|
}
|
|
ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
|
|
const size_t remaining =
|
|
ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end);
|
|
if (ZSTD_isError(remaining)) {
|
|
// Failure
|
|
Reset();
|
|
return -1;
|
|
}
|
|
// Success
|
|
*output_pos = output_buffer.pos;
|
|
return (int)remaining;
|
|
#endif
|
|
}
|
|
|
|
void ZSTDStreamingCompress::Reset() {
|
|
#ifdef ZSTD_ADVANCED
|
|
ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
|
|
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
|
|
#endif
|
|
}
|
|
|
|
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
|
|
char* output, size_t* output_pos) {
|
|
assert(output != nullptr && output_pos != nullptr);
|
|
*output_pos = 0;
|
|
// Don't need to uncompress an empty input
|
|
if (input_size == 0) {
|
|
return 0;
|
|
}
|
|
#ifdef ZSTD_ADVANCED
|
|
if (input) {
|
|
// New input
|
|
input_buffer_ = {input, input_size, /*pos=*/0};
|
|
}
|
|
ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
|
|
size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_);
|
|
if (ZSTD_isError(ret)) {
|
|
Reset();
|
|
return -1;
|
|
}
|
|
*output_pos = output_buffer.pos;
|
|
return (int)(input_buffer_.size - input_buffer_.pos);
|
|
#else
|
|
(void)input;
|
|
(void)input_size;
|
|
(void)output;
|
|
return -1;
|
|
#endif
|
|
}
|
|
|
|
void ZSTDStreamingUncompress::Reset() {
|
|
#ifdef ZSTD_ADVANCED
|
|
ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
|
|
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
|
|
#endif
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|