mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-02 01:16:16 +00:00
Fix blob db compression bug
Summary: `CompressBlock()` will return the uncompressed slice (i.e. `Slice(value_unc)`) if compression ratio is not good enough. This is undesired. We need to always assign the compressed slice to `value`. Closes https://github.com/facebook/rocksdb/pull/2447 Differential Revision: D5244682 Pulled By: yiwu-arbug fbshipit-source-id: 6989dd8852c9622822ba9acec9beea02007dff09
This commit is contained in:
parent
7a380deff7
commit
ae8571f5c2
|
@ -897,15 +897,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
||||||
last_file_ = bfile;
|
last_file_ = bfile;
|
||||||
has_put_ = true;
|
has_put_ = true;
|
||||||
|
|
||||||
Slice value = value_unc;
|
|
||||||
std::string compression_output;
|
std::string compression_output;
|
||||||
if (impl_->bdb_options_.compression != kNoCompression) {
|
Slice value = impl_->GetCompressedSlice(value_unc, &compression_output);
|
||||||
CompressionType ct = impl_->bdb_options_.compression;
|
|
||||||
CompressionOptions compression_opts;
|
|
||||||
value = CompressBlock(value_unc, compression_opts, &ct,
|
|
||||||
kBlockBasedTableVersionFormat, Slice(),
|
|
||||||
&compression_output);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string headerbuf;
|
std::string headerbuf;
|
||||||
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
||||||
|
@ -1018,6 +1011,18 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
||||||
: -1);
|
: -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||||
|
std::string* compression_output) const {
|
||||||
|
if (bdb_options_.compression == kNoCompression) {
|
||||||
|
return raw;
|
||||||
|
}
|
||||||
|
CompressionType ct = bdb_options_.compression;
|
||||||
|
CompressionOptions compression_opts;
|
||||||
|
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat,
|
||||||
|
Slice(), compression_output);
|
||||||
|
return *compression_output;
|
||||||
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
const Slice& value_unc, int32_t expiration) {
|
const Slice& value_unc, int32_t expiration) {
|
||||||
|
@ -1028,15 +1033,8 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
||||||
|
|
||||||
if (!bfile) return Status::NotFound("Blob file not found");
|
if (!bfile) return Status::NotFound("Blob file not found");
|
||||||
|
|
||||||
Slice value = value_unc;
|
|
||||||
std::string compression_output;
|
std::string compression_output;
|
||||||
if (bdb_options_.compression != kNoCompression) {
|
Slice value = GetCompressedSlice(value_unc, &compression_output);
|
||||||
CompressionType ct = bdb_options_.compression;
|
|
||||||
CompressionOptions compression_opts;
|
|
||||||
value = CompressBlock(value_unc, compression_opts, &ct,
|
|
||||||
kBlockBasedTableVersionFormat, Slice(),
|
|
||||||
&compression_output);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string headerbuf;
|
std::string headerbuf;
|
||||||
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
||||||
|
|
|
@ -212,6 +212,9 @@ class BlobDBImpl : public BlobDB {
|
||||||
const std::string& index_entry, std::string* value,
|
const std::string& index_entry, std::string* value,
|
||||||
SequenceNumber* sequence = nullptr);
|
SequenceNumber* sequence = nullptr);
|
||||||
|
|
||||||
|
Slice GetCompressedSlice(const Slice& raw,
|
||||||
|
std::string* compression_output) const;
|
||||||
|
|
||||||
// Just before flush starts acting on memtable files,
|
// Just before flush starts acting on memtable files,
|
||||||
// this handler is called.
|
// this handler is called.
|
||||||
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
|
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
|
||||||
|
|
|
@ -205,7 +205,7 @@ TEST_F(BlobDBTest, Override) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef SNAPPY
|
#ifdef SNAPPY
|
||||||
TEST_F(BlobDBTest, DISABLED_Compression) {
|
TEST_F(BlobDBTest, Compression) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptionsImpl bdb_options;
|
BlobDBOptionsImpl bdb_options;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
|
|
Loading…
Reference in a new issue