Add `CompressionOptions::checksum` for enabling ZSTD checksum (#11666)

Summary:
Optionally enable zstd checksum flag (d857369028/lib/zstd.h (L428)) to detect corruption during decompression. Main changes are in compression.h:
* User can set CompressionOptions::checksum to true to enable this feature.
* We enable this feature in ZSTD by setting the checksum flag in ZSTD compression context: `ZSTD_CCtx`.
* Uses `ZSTD_compress2()` to do compression since it supports frame parameter like the checksum flag. Compression level is also set in compression context as a flag.
* Error handling during decompression to propagate error message from ZSTD.
* Updated microbench to test read performance impact.

About compatibility, the current compression decoders should continue to work with the data created by the new compression API `ZSTD_compress2()`: https://github.com/facebook/zstd/issues/3711.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11666

Test Plan:
* Existing unit tests for zstd compression
* Add unit test `DBTest2.ZSTDChecksum` to test the corruption case
* Manually tested that compression levels, parallel compression, dictionary compression, index compression all work with the new ZSTD_compress2() API.
* Manually tested with `sst_dump --command=recompress` that different compression levels and dictionary compression settings all work.
* Manually tested compiling with older versions of ZSTD: v1.3.8, v1.1.0, v0.6.2.
* Perf impact: from public benchmark data: http://fastcompression.blogspot.com/2019/03/presenting-xxh3.html for checksum and https://github.com/facebook/zstd#benchmarks, if decompression is 1700MB/s and checksum computation is 70000MB/s, checksum computation is an additional ~2.4% time for decompression. Compression is slower and checksumming should be less noticeable.
* Microbench:
```
TEST_TMPDIR=/dev/shm ./branch_db_basic_bench --benchmark_filter=DBGet/comp_style:0/max_data:1048576/per_key_size:256/enable_statistics:0/negative_query:0/enable_filter:0/mmap:0/compression_type:7/compression_checksum:1/no_blockcache:1/iterations:10000/threads:1 --benchmark_repetitions=100

Min out of 100 runs:
Main:
10390 10436 10456 10484 10499 10535 10544 10545 10565 10568

After this PR, checksum=false
10285 10397 10503 10508 10515 10557 10562 10635 10640 10660

After this PR, checksum=true
10827 10876 10925 10949 10971 11052 11061 11063 11100 11109
```
* db_bench:
```
Write perf
TEST_TMPDIR=/dev/shm/ ./db_bench_ichecksum --benchmarks=fillseq[-X10] --compression_type=zstd --num=10000000 --compression_checksum=..

[FillSeq checksum=0]
fillseq [AVG    10 runs] : 281635 (± 31711) ops/sec;   31.2 (± 3.5) MB/sec
fillseq [MEDIAN 10 runs] : 294027 ops/sec;   32.5 MB/sec

[FillSeq checksum=1]
fillseq [AVG    10 runs] : 286961 (± 34700) ops/sec;   31.7 (± 3.8) MB/sec
fillseq [MEDIAN 10 runs] : 283278 ops/sec;   31.3 MB/sec

Read perf
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=readrandom[-X20] --num=100000000 --reads=1000000 --use_existing_db=true --readonly=1

[Readrandom checksum=1]
readrandom [AVG    20 runs] : 360928 (± 3579) ops/sec;    4.0 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 362468 ops/sec;    4.0 MB/sec

[Readrandom checksum=0]
readrandom [AVG    20 runs] : 380365 (± 2384) ops/sec;    4.2 (± 0.0) MB/sec
readrandom [MEDIAN 20 runs] : 379800 ops/sec;    4.2 MB/sec

Compression
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=compress[-X20] --compression_type=zstd --num=100000000 --compression_checksum=1

checksum=1
compress [AVG    20 runs] : 54074 (± 634) ops/sec;  211.2 (± 2.5) MB/sec
compress [MEDIAN 20 runs] : 54396 ops/sec;  212.5 MB/sec

checksum=0
compress [AVG    20 runs] : 54598 (± 393) ops/sec;  213.3 (± 1.5) MB/sec
compress [MEDIAN 20 runs] : 54592 ops/sec;  213.3 MB/sec

Decompression:
TEST_TMPDIR=/dev/shm ./db_bench_ichecksum --benchmarks=uncompress[-X20] --compression_type=zstd --compression_checksum=1

checksum = 0
uncompress [AVG    20 runs] : 167499 (± 962) ops/sec;  654.3 (± 3.8) MB/sec
uncompress [MEDIAN 20 runs] : 167210 ops/sec;  653.2 MB/sec
checksum = 1
uncompress [AVG    20 runs] : 167980 (± 924) ops/sec;  656.2 (± 3.6) MB/sec
uncompress [MEDIAN 20 runs] : 168465 ops/sec;  658.1 MB/sec
```

Reviewed By: ajkr

Differential Revision: D48019378

Pulled By: cbi42

fbshipit-source-id: 674120c6e1853c2ced1436ac8138559d0204feba
This commit is contained in:
Changyu Bi 2023-08-18 15:01:59 -07:00 committed by Facebook GitHub Bot
parent 0fa0c97d3e
commit c2aad555c3
21 changed files with 253 additions and 65 deletions

View File

@ -143,7 +143,8 @@ Status CompressedSecondaryCache::Insert(const Slice& key,
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type);
CompressionContext compression_context(cache_options_.compression_type,
compression_opts);
uint64_t sample_for_compression{0};
CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(),

View File

@ -261,7 +261,7 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_);
CompressionContext context(blob_compression_type_, opts);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),

View File

@ -406,7 +406,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1);
CompressionOptions opts;
CompressionContext context(kSnappyCompression);
CompressionContext context(kSnappyCompression, opts);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),

View File

@ -74,7 +74,7 @@ void WriteBlobFile(const ImmutableOptions& immutable_options,
}
} else {
CompressionOptions opts;
CompressionContext context(compression);
CompressionContext context(compression, opts);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
compression, sample_for_compression);

View File

@ -76,7 +76,7 @@ void WriteBlobFile(const ImmutableOptions& immutable_options,
}
} else {
CompressionOptions opts;
CompressionContext context(compression);
CompressionContext context(compression, opts);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
compression, sample_for_compression);

View File

@ -7670,6 +7670,41 @@ TEST_F(DBTest2, GetLatestSeqAndTsForKey) {
ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0));
}
#if defined(ZSTD_ADVANCED)
TEST_F(DBTest2, ZSTDChecksum) {
// Verify that corruption during decompression is caught.
Options options = CurrentOptions();
options.create_if_missing = true;
options.compression = kZSTD;
options.compression_opts.max_compressed_bytes_per_kb = 1024;
options.compression_opts.checksum = true;
DestroyAndReopen(options);
Random rnd(33);
ASSERT_OK(Put(Key(0), rnd.RandomString(4 << 10)));
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
[&](void* arg) {
std::string* output = static_cast<std::string*>(arg);
// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames
// Checksum is the last 4 bytes, corrupting that part in unit test is
// more controllable.
output->data()[output->size() - 1]++;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Flush());
PinnableSlice val;
Status s = Get(Key(0), &val);
ASSERT_TRUE(s.IsCorruption());
// Corruption caught during flush.
options.paranoid_file_checks = true;
DestroyAndReopen(options);
ASSERT_OK(Put(Key(0), rnd.RandomString(4 << 10)));
s = Flush();
ASSERT_TRUE(s.IsCorruption());
}
#endif
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -229,6 +229,7 @@ DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_int32(compression_parallel_threads);
DECLARE_uint64(compression_max_dict_buffer_bytes);
DECLARE_bool(compression_use_zstd_dict_trainer);
DECLARE_bool(compression_checksum);
DECLARE_string(checksum_type);
DECLARE_string(env_uri);
DECLARE_string(fs_uri);

View File

@ -845,6 +845,9 @@ DEFINE_bool(
"ZSTD 1.4.5+ is required. If ZSTD 1.4.5+ is not linked with the binary, "
"this flag will have the default value true.");
DEFINE_bool(compression_checksum, false,
"Turn on zstd's checksum feature for detecting corruption.");
DEFINE_string(bottommost_compression_type, "disable",
"Algorithm to use to compress bottommost level of the database. "
"\"disable\" means disabling the feature");

View File

@ -3206,6 +3206,9 @@ void InitializeOptionsFromFlags(
"cannot be used because ZSTD 1.4.5+ is not linked with the binary."
" zstd dictionary trainer will be used.\n");
}
if (FLAGS_compression_checksum) {
options.compression_opts.checksum = true;
}
options.max_manifest_file_size = FLAGS_max_manifest_file_size;
options.inplace_update_support = FLAGS_in_place_update;
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);

View File

@ -181,6 +181,14 @@ struct CompressionOptions {
// compressed by less than 12.5% (minimum ratio of 1.143:1).
int max_compressed_bytes_per_kb = 1024 * 7 / 8;
// ZSTD only.
// Enable compression algorithm's checksum feature.
// (https://github.com/facebook/zstd/blob/d857369028d997c92ff1f1861a4d7f679a125464/lib/zstd.h#L428)
// Each compressed frame will have a 32-bit checksum attached. The checksum
// computed from the uncompressed data and can be verified during
// decompression.
bool checksum = false;
// A convenience function for setting max_compressed_bytes_per_kb based on a
// minimum acceptable compression ratio (uncompressed size over compressed
// size).

View File

@ -538,6 +538,23 @@ static void ManualFlushArguments(benchmark::internal::Benchmark* b) {
BENCHMARK(ManualFlush)->Iterations(1)->Apply(ManualFlushArguments);
// Copied from test_util.cc to not depend on rocksdb_test_lib
// when building microbench binaries.
static Slice CompressibleString(Random* rnd, double compressed_fraction,
int len, std::string* dst) {
int raw = static_cast<int>(len * compressed_fraction);
if (raw < 1) raw = 1;
std::string raw_data = rnd->RandomBinaryString(raw);
// Duplicate the random data until we have filled "len" bytes
dst->clear();
while (dst->size() < (unsigned int)len) {
dst->append(raw_data);
}
dst->resize(len);
return Slice(*dst);
}
static void DBGet(benchmark::State& state) {
auto compaction_style = static_cast<CompactionStyle>(state.range(0));
uint64_t max_data = state.range(1);
@ -546,6 +563,9 @@ static void DBGet(benchmark::State& state) {
bool negative_query = state.range(4);
bool enable_filter = state.range(5);
bool mmap = state.range(6);
auto compression_type = static_cast<CompressionType>(state.range(7));
bool compression_checksum = static_cast<bool>(state.range(8));
bool no_blockcache = state.range(9);
uint64_t key_num = max_data / per_key_size;
// setup DB
@ -568,6 +588,13 @@ static void DBGet(benchmark::State& state) {
table_options.no_block_cache = true;
table_options.block_restart_interval = 1;
}
options.compression = compression_type;
options.compression_opts.checksum = compression_checksum;
if (no_blockcache) {
table_options.no_block_cache = true;
} else {
table_options.block_cache = NewLRUCache(100 << 20);
}
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
auto rnd = Random(301 + state.thread_index());
@ -581,9 +608,10 @@ static void DBGet(benchmark::State& state) {
// number.
auto wo = WriteOptions();
wo.disableWAL = true;
std::string val;
for (uint64_t i = 0; i < key_num; i++) {
Status s = db->Put(wo, kg_seq.Next(),
rnd.RandomString(static_cast<int>(per_key_size)));
CompressibleString(&rnd, 0.5, static_cast<int>(per_key_size), &val);
Status s = db->Put(wo, kg_seq.Next(), val);
if (!s.ok()) {
state.SkipWithError(s.ToString().c_str());
}
@ -641,14 +669,23 @@ static void DBGet(benchmark::State& state) {
static void DBGetArguments(benchmark::internal::Benchmark* b) {
for (int comp_style : {kCompactionStyleLevel, kCompactionStyleUniversal,
kCompactionStyleFIFO}) {
for (int64_t max_data : {128l << 20, 512l << 20}) {
for (int64_t max_data : {1l << 20, 128l << 20, 512l << 20}) {
for (int64_t per_key_size : {256, 1024}) {
for (bool enable_statistics : {false, true}) {
for (bool negative_query : {false, true}) {
for (bool enable_filter : {false, true}) {
for (bool mmap : {false, true}) {
b->Args({comp_style, max_data, per_key_size, enable_statistics,
negative_query, enable_filter, mmap});
for (int compression_type :
{kNoCompression /* 0x0 */, kZSTD /* 0x7 */}) {
for (bool compression_checksum : {false, true}) {
for (bool no_blockcache : {false, true}) {
b->Args({comp_style, max_data, per_key_size,
enable_statistics, negative_query, enable_filter,
mmap, compression_type, compression_checksum,
no_blockcache});
}
}
}
}
}
}
@ -657,11 +694,13 @@ static void DBGetArguments(benchmark::internal::Benchmark* b) {
}
}
b->ArgNames({"comp_style", "max_data", "per_key_size", "enable_statistics",
"negative_query", "enable_filter", "mmap"});
"negative_query", "enable_filter", "mmap", "compression_type",
"compression_checksum", "no_blockcache"});
}
BENCHMARK(DBGet)->Threads(1)->Apply(DBGetArguments);
BENCHMARK(DBGet)->Threads(8)->Apply(DBGetArguments);
static const uint64_t DBGetNum = 10000l;
BENCHMARK(DBGet)->Threads(1)->Iterations(DBGetNum)->Apply(DBGetArguments);
BENCHMARK(DBGet)->Threads(8)->Iterations(DBGetNum / 8)->Apply(DBGetArguments);
static void SimpleGetWithPerfContext(benchmark::State& state) {
// setup DB

View File

@ -173,6 +173,9 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct CompressionOptions, use_zstd_dict_trainer),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"checksum",
{offsetof(struct CompressionOptions, checksum), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
};
static std::unordered_map<std::string, OptionTypeInfo>

View File

@ -504,11 +504,12 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"compression=kNoCompression;"
"compression_opts={max_dict_buffer_bytes=5;use_zstd_dict_trainer=true;"
"enabled=false;parallel_threads=6;zstd_max_train_bytes=7;strategy=8;max_"
"dict_bytes=9;level=10;window_bits=11;max_compressed_bytes_per_kb=987;};"
"dict_bytes=9;level=10;window_bits=11;max_compressed_bytes_per_kb=987;"
"checksum=true};"
"bottommost_compression_opts={max_dict_buffer_bytes=4;use_zstd_dict_"
"trainer=true;enabled=true;parallel_threads=5;zstd_max_train_bytes=6;"
"strategy=7;max_dict_bytes=8;level=9;window_bits=10;max_compressed_bytes_"
"per_kb=876;};"
"per_kb=876;checksum=true};"
"bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;"
"num_levels=99;"

View File

@ -138,8 +138,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
if (sampled_output_fast && (LZ4_Supported() || Snappy_Supported())) {
CompressionType c =
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
CompressionContext context(c);
CompressionOptions options;
CompressionContext context(c, options);
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
@ -152,8 +152,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
// Sampling with a slow but high-compression algorithm
if (sampled_output_slow && (ZSTD_Supported() || Zlib_Supported())) {
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
CompressionContext context(c);
CompressionOptions options;
CompressionContext context(c, options);
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
@ -525,8 +525,10 @@ struct BlockBasedTableBuilder::Rep {
compression_dict_buffer_cache_res_mgr = nullptr;
}
assert(compression_ctxs.size() >= compression_opts.parallel_threads);
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
compression_ctxs[i].reset(
new CompressionContext(compression_type, compression_opts));
}
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
@ -1145,6 +1147,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
return;
}
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
&r->compressed_output);
WriteMaybeCompressedBlock(block_contents, type, handle, block_type,
&uncompressed_block_data);
r->compressed_output.clear();

View File

@ -645,19 +645,25 @@ Status UncompressBlockData(const UncompressionInfo& uncompression_info,
StopWatchNano timer(ioptions.clock,
ShouldReportDetailedTime(ioptions.env, ioptions.stats));
size_t uncompressed_size = 0;
CacheAllocationPtr ubuf =
UncompressData(uncompression_info, data, size, &uncompressed_size,
GetCompressFormatForVersion(format_version), allocator);
const char* error_msg = nullptr;
CacheAllocationPtr ubuf = UncompressData(
uncompression_info, data, size, &uncompressed_size,
GetCompressFormatForVersion(format_version), allocator, &error_msg);
if (!ubuf) {
if (!CompressionTypeSupported(uncompression_info.type())) {
return Status::NotSupported(
ret = Status::NotSupported(
"Unsupported compression method for this build",
CompressionTypeToString(uncompression_info.type()));
} else {
return Status::Corruption(
"Corrupted compressed block contents",
CompressionTypeToString(uncompression_info.type()));
std::ostringstream oss;
oss << "Corrupted compressed block contents";
if (error_msg) {
oss << ": " << error_msg;
}
ret = Status::Corruption(
oss.str(), CompressionTypeToString(uncompression_info.type()));
}
return ret;
}
*out_contents = BlockContents(std::move(ubuf), uncompressed_size);

View File

@ -2834,7 +2834,7 @@ class Benchmark {
std::string input_str(len, 'y');
std::string compressed;
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
CompressionContext context(FLAGS_compression_type_e, opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
@ -4002,7 +4002,8 @@ class Benchmark {
bool ok = true;
std::string compressed;
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
opts.level = FLAGS_compression_level;
CompressionContext context(FLAGS_compression_type_e, opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
@ -4031,8 +4032,10 @@ class Benchmark {
Slice input = gen.Generate(FLAGS_block_size);
std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e);
CompressionOptions compression_opts;
compression_opts.level = FLAGS_compression_level;
CompressionContext compression_ctx(FLAGS_compression_type_e,
compression_opts);
CompressionInfo compression_info(
compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e, FLAGS_sample_for_compression);

View File

@ -65,6 +65,7 @@ default_params = {
"compression_parallel_threads": 1,
"compression_max_dict_buffer_bytes": lambda: (1 << random.randint(0, 40)) - 1,
"compression_use_zstd_dict_trainer": lambda: random.randint(0, 1),
"compression_checksum": lambda: random.randint(0, 1),
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,

View File

@ -0,0 +1 @@
* Add a new compression option `CompressionOptions::checksum` for enabling ZSTD's checksum feature to detect corruption during decompression.

View File

@ -48,7 +48,7 @@ int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
if (input_size == 0) {
return 0;
}
#ifndef ZSTD_STREAMING
#ifndef ZSTD_ADVANCED
(void)input;
(void)input_size;
(void)output;
@ -77,7 +77,7 @@ int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
}
void ZSTDStreamingCompress::Reset() {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
@ -91,7 +91,7 @@ int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
if (input_size == 0) {
return 0;
}
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
if (input) {
// New input
input_buffer_ = {input, input_size, /*pos=*/0};
@ -113,7 +113,7 @@ int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
}
void ZSTDStreamingUncompress::Reset() {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif

View File

@ -53,8 +53,11 @@
#include <zdict.h>
#endif // ZSTD_VERSION_NUMBER >= 10103
// v1.4.0+
// ZSTD_Compress2(), ZSTD_compressStream2() and frame parameters all belong to
// advanced APIs and require v1.4.0+.
// https://github.com/facebook/zstd/blob/eb9f881eb810f2242f1ef36b3f3e7014eecb8fa6/lib/zstd.h#L297C40-L297C45
#if ZSTD_VERSION_NUMBER >= 10400
#define ZSTD_STREAMING
#define ZSTD_ADVANCED
#endif // ZSTD_VERSION_NUMBER >= 10400
namespace ROCKSDB_NAMESPACE {
// Need this for the context allocation override
@ -180,6 +183,9 @@ struct CompressionDict {
if (level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148
// TODO(cbi): ZSTD_CLEVEL_DEFAULT is exposed after
// https://github.com/facebook/zstd/pull/1174. Use ZSTD_CLEVEL_DEFAULT
// instead of hardcoding 3.
level = 3;
}
// Should be safe (but slower) if below call fails as we'll use the
@ -363,14 +369,43 @@ class CompressionContext {
private:
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
ZSTD_CCtx* zstd_ctx_ = nullptr;
void CreateNativeContext(CompressionType type) {
if (type == kZSTD || type == kZSTDNotFinalCompression) {
ZSTD_CCtx* CreateZSTDContext() {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ =
ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
#else // ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createCCtx();
return ZSTD_createCCtx();
#endif // ROCKSDB_ZSTD_CUSTOM_MEM
}
void CreateNativeContext(CompressionType type, int level, bool checksum) {
if (type == kZSTD || type == kZSTDNotFinalCompression) {
zstd_ctx_ = CreateZSTDContext();
#ifdef ZSTD_ADVANCED
if (level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148
level = 3;
}
size_t err =
ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(zstd_ctx_);
zstd_ctx_ = CreateZSTDContext();
}
if (checksum) {
err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(zstd_ctx_);
zstd_ctx_ = CreateZSTDContext();
}
}
#else
(void)level;
(void)checksum;
#endif
}
}
void DestroyNativeContext() {
@ -388,12 +423,14 @@ class CompressionContext {
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
private:
void CreateNativeContext(CompressionType /* type */) {}
void CreateNativeContext(CompressionType /* type */, int /* level */,
bool /* checksum */) {}
void DestroyNativeContext() {}
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public:
explicit CompressionContext(CompressionType type) {
CreateNativeContext(type);
explicit CompressionContext(CompressionType type,
const CompressionOptions& options) {
CreateNativeContext(type, options.level, options.checksum);
}
~CompressionContext() { DestroyNativeContext(); }
CompressionContext(const CompressionContext&) = delete;
@ -525,7 +562,7 @@ inline bool ZSTDNotFinal_Supported() {
}
inline bool ZSTD_Streaming_Supported() {
#if defined(ZSTD) && defined(ZSTD_STREAMING)
#if defined(ZSTD_ADVANCED)
return true;
#else
return false;
@ -1343,30 +1380,44 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
size_t compressBound = ZSTD_compressBound(length);
output->resize(static_cast<size_t>(output_header_len + compressBound));
size_t outlen = 0;
int level;
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148
level = 3;
} else {
level = info.options().level;
}
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_CCtx* context = info.context().ZSTDPreallocCtx();
assert(context != nullptr);
#ifdef ZSTD_ADVANCED
if (info.dict().GetDigestedZstdCDict() != nullptr) {
ZSTD_CCtx_refCDict(context, info.dict().GetDigestedZstdCDict());
} else {
ZSTD_CCtx_loadDictionary(context, info.dict().GetRawDict().data(),
info.dict().GetRawDict().size());
}
// Compression level is set in `contex` during CreateNativeContext()
outlen = ZSTD_compress2(context, &(*output)[output_header_len], compressBound,
input, length);
#else // ZSTD_ADVANCED
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
if (info.dict().GetDigestedZstdCDict() != nullptr) {
outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len],
compressBound, input, length,
info.dict().GetDigestedZstdCDict());
}
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ZSTD_VERSION_NUMBER >= 700
// TODO (cbi): error handling for compression.
if (outlen == 0) {
int level;
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148
level = 3;
} else {
level = info.options().level;
}
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
compressBound, input, length,
info.dict().GetRawDict().data(),
info.dict().GetRawDict().size(), level);
}
#endif // ZSTD_ADVANCED
#else // up to v0.4.x
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
length, level);
@ -1387,17 +1438,28 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
// @param compression_dict Data for presetting the compression library's
// dictionary.
// @param error_message If not null, will be set if decompression fails.
//
// Returns nullptr if decompression fails.
inline CacheAllocationPtr ZSTD_Uncompress(
const UncompressionInfo& info, const char* input_data, size_t input_length,
size_t* uncompressed_size, MemoryAllocator* allocator = nullptr) {
size_t* uncompressed_size, MemoryAllocator* allocator = nullptr,
const char** error_message = nullptr) {
#ifdef ZSTD
static const char* const kErrorDecodeOutputSize =
"Cannot decode output size.";
static const char* const kErrorOutputLenMismatch =
"Decompressed size does not match header.";
uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
&output_len)) {
if (error_message) {
*error_message = kErrorDecodeOutputSize;
}
return nullptr;
}
auto output = AllocateBlock(output_len, allocator);
CacheAllocationPtr output = AllocateBlock(output_len, allocator);
size_t actual_output_length = 0;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = info.context().GetZSTDContext();
@ -1407,19 +1469,31 @@ inline CacheAllocationPtr ZSTD_Uncompress(
actual_output_length = ZSTD_decompress_usingDDict(
context, output.get(), output_len, input_data, input_length,
info.dict().GetDigestedZstdDDict());
}
} else {
#endif // ROCKSDB_ZSTD_DDICT
if (actual_output_length == 0) {
actual_output_length = ZSTD_decompress_usingDict(
context, output.get(), output_len, input_data, input_length,
info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
#ifdef ROCKSDB_ZSTD_DDICT
}
#endif // ROCKSDB_ZSTD_DDICT
#else // up to v0.4.x
(void)info;
actual_output_length =
ZSTD_decompress(output.get(), output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len);
if (ZSTD_isError(actual_output_length)) {
if (error_message) {
*error_message = ZSTD_getErrorName(actual_output_length);
}
return nullptr;
} else if (actual_output_length != output_len) {
if (error_message) {
*error_message = kErrorOutputLenMismatch;
}
return nullptr;
}
*uncompressed_size = actual_output_length;
return output;
#else // ZSTD
@ -1428,6 +1502,7 @@ inline CacheAllocationPtr ZSTD_Uncompress(
(void)input_length;
(void)uncompressed_size;
(void)allocator;
(void)error_message;
return nullptr;
#endif
}
@ -1530,6 +1605,7 @@ inline std::string ZSTD_FinalizeDictionary(
return dict_data;
}
#else // up to v1.4.4
assert(false);
(void)samples;
(void)sample_lens;
(void)max_dict_bytes;
@ -1589,7 +1665,8 @@ inline bool CompressData(const Slice& raw,
inline CacheAllocationPtr UncompressData(
const UncompressionInfo& uncompression_info, const char* data, size_t n,
size_t* uncompressed_size, uint32_t compress_format_version,
MemoryAllocator* allocator = nullptr) {
MemoryAllocator* allocator = nullptr,
const char** error_message = nullptr) {
switch (uncompression_info.type()) {
case kSnappyCompression:
return Snappy_Uncompress(data, n, uncompressed_size, allocator);
@ -1609,8 +1686,9 @@ inline CacheAllocationPtr UncompressData(
return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
case kZSTD:
case kZSTDNotFinalCompression:
// TODO(cbi): error message handling for other compression algorithms.
return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
allocator);
allocator, error_message);
default:
return CacheAllocationPtr();
}
@ -1743,7 +1821,7 @@ class ZSTDStreamingCompress final : public StreamingCompress {
size_t max_output_len)
: StreamingCompress(kZSTD, opts, compress_format_version,
max_output_len) {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
cctx_ = ZSTD_createCCtx();
// Each compressed frame will have a checksum
ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
@ -1752,14 +1830,14 @@ class ZSTDStreamingCompress final : public StreamingCompress {
#endif
}
~ZSTDStreamingCompress() override {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_freeCCtx(cctx_);
#endif
}
int Compress(const char* input, size_t input_size, char* output,
size_t* output_pos) override;
void Reset() override;
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_CCtx* cctx_;
ZSTD_inBuffer input_buffer_;
#endif
@ -1770,14 +1848,14 @@ class ZSTDStreamingUncompress final : public StreamingUncompress {
explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
size_t max_output_len)
: StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
dctx_ = ZSTD_createDCtx();
assert(dctx_ != nullptr);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
}
~ZSTDStreamingUncompress() override {
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_freeDCtx(dctx_);
#endif
}
@ -1786,7 +1864,7 @@ class ZSTDStreamingUncompress final : public StreamingUncompress {
void Reset() override;
private:
#ifdef ZSTD_STREAMING
#ifdef ZSTD_ADVANCED
ZSTD_DCtx* dctx_;
ZSTD_inBuffer input_buffer_;
#endif

View File

@ -1148,7 +1148,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
CompressionType type = bdb_options_.compression;
CompressionOptions opts;
CompressionContext context(type);
CompressionContext context(type, opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
0 /* sample_for_compression */);
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,