Compaction filter support for BlobDB (#6850)

Summary:
Added compaction filter support for BlobDB non-TTL values. Same as vanilla RocksDB, user compaction filter applies to all k/v pairs of the compaction for non-TTL values. It honors `min_blob_size`, which potentially results value transitions between inlined data and stored-in-blob data when size of value is changed.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6850

Reviewed By: siying

Differential Revision: D22263487

Pulled By: ltamasi

fbshipit-source-id: 8fc03f8cde2a5c831e63b436b3dbf1b7f90939e8
This commit is contained in:
Burton Li 2020-06-29 17:30:04 -07:00 committed by Facebook GitHub Bot
parent 58547e533b
commit 5be2cb6948
12 changed files with 643 additions and 252 deletions

View File

@ -19,6 +19,7 @@
* DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called.
* Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5.
* `BackupableDBOptions::new_naming_for_backup_files` is added. This option is true by default. When it is true, backup table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. When there is no `db_session_id` available in the table file, we use `file_size` as a fallback. Note that when this option is true, it comes into effect only when both `share_files_with_checksum` and `share_table_files` are true.
* Added compaction filter support for BlobDB non-TTL values. Same as vanilla RocksDB, user compaction filter applies to all k/v pairs of the compaction for non-TTL values. It honors `min_blob_size`, which potentially results value transitions between inlined data and stored-in-blob data when size of value is changed.
### Bug Fixes
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.

View File

@ -182,7 +182,7 @@ void CompactionIterator::Next() {
PrepareOutput();
}
void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
@ -225,14 +225,31 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
if (ikey_.type == kTypeBlobIndex) {
// value transfer from blob file to inlined data
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
} else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
if (ikey_.type == kTypeValue) {
// value transfer from inlined data to blob file
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kIOError) {
status_ =
Status::IOError("Failed to access blob during compaction filter");
return false;
}
}
return true;
}
void CompactionIterator::NextFromInput() {
@ -295,8 +312,9 @@ void CompactionIterator::NextFromInput() {
// Apply the compaction filter to the first committed version of the user
// key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
if (current_key_committed_ &&
!InvokeFilterIfNeeded(&need_skip, &skip_until)) {
break;
}
} else {
// Update the current key to reflect the new sequence number/type without
@ -316,8 +334,9 @@ void CompactionIterator::NextFromInput() {
current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the
// user key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
if (current_key_committed_ &&
!InvokeFilterIfNeeded(&need_skip, &skip_until)) {
break;
}
}
}

View File

@ -121,7 +121,8 @@ class CompactionIterator {
void PrepareOutput();
// Invoke compaction filter if needed.
void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
// Return true on success, false on failures (e.g.: kIOError).
bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.

View File

@ -45,6 +45,8 @@ class CompactionFilter {
kRemove,
kChangeValue,
kRemoveAndSkipUntil,
kChangeBlobIndex, // used internally by BlobDB.
kIOError, // used internally by BlobDB.
};
enum class BlobDecision { kKeep, kChangeValue, kCorruption, kIOError };

View File

@ -6,18 +6,40 @@
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_compaction_filter.h"
#include "db/dbformat.h"
#include <cinttypes>
#include "db/dbformat.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() {
if (blob_file_) {
CloseAndRegisterNewBlobFile();
}
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
}
CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
int /*level*/, const Slice& key, ValueType value_type, const Slice& value,
std::string* /*new_value*/, std::string* /*skip_until*/) const {
int level, const Slice& key, ValueType value_type, const Slice& value,
std::string* new_value, std::string* skip_until) const {
const CompactionFilter* ucf = user_comp_filter();
if (value_type != kBlobIndex) {
return Decision::kKeep;
if (ucf == nullptr) {
return Decision::kKeep;
}
// Apply user compaction filter for inlined data.
CompactionFilter::Decision decision =
ucf->FilterV2(level, key, value_type, value, new_value, skip_until);
if (decision == Decision::kChangeValue) {
return HandleValueChange(key, new_value);
}
return decision;
}
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value);
@ -44,26 +66,82 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
// Hack: Internal key is passed to BlobIndexCompactionFilter for it to
// get sequence number.
ParsedInternalKey ikey;
bool ok = ParseInternalKey(key, &ikey);
if (!ParseInternalKey(key, &ikey)) {
assert(false);
return Decision::kKeep;
}
// Remove keys that could have been remove by last FIFO eviction.
// If get error while parsing key, ignore and continue.
if (ok && ikey.sequence < context_.fifo_eviction_seq) {
if (ikey.sequence < context_.fifo_eviction_seq) {
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;
}
}
// Apply user compaction filter for all non-TTL blob data.
if (ucf != nullptr && !blob_index.HasTTL()) {
// Hack: Internal key is passed to BlobIndexCompactionFilter for it to
// get sequence number.
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
assert(false);
return Decision::kKeep;
}
// Read value from blob file.
PinnableSlice blob;
CompressionType compression_type = kNoCompression;
constexpr bool need_decompress = true;
if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress,
&compression_type)) {
return Decision::kIOError;
}
CompactionFilter::Decision decision = ucf->FilterV2(
level, ikey.user_key, kValue, blob, new_value, skip_until);
if (decision == Decision::kChangeValue) {
return HandleValueChange(ikey.user_key, new_value);
}
return decision;
}
return Decision::kKeep;
}
BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
if (blob_file_) {
CloseAndRegisterNewBlobFile();
CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange(
const Slice& key, std::string* new_value) const {
BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) {
// Keep new_value inlined.
return Decision::kChangeValue;
}
if (!OpenNewBlobFileIfNeeded()) {
return Decision::kIOError;
}
Slice new_blob_value(*new_value);
std::string compression_output;
if (blob_db_impl->bdb_options_.compression != kNoCompression) {
new_blob_value =
blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output);
}
uint64_t new_blob_file_number = 0;
uint64_t new_blob_offset = 0;
if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number,
&new_blob_offset)) {
return Decision::kIOError;
}
if (!CloseAndRegisterNewBlobFileIfNeeded()) {
return Decision::kIOError;
}
BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
new_blob_value.size(),
blob_db_impl->bdb_options_.compression);
return Decision::kChangeBlobIndex;
}
assert(context_gc_.blob_db_impl);
BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
assert(context().blob_db_impl);
ROCKS_LOG_INFO(context_gc_.blob_db_impl->db_options_.info_log,
ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log,
"GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64
" bytes), relocated %" PRIu64 " blobs (%" PRIu64
" bytes), created %" PRIu64 " new blob file(s)",
@ -80,12 +158,164 @@ BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError());
}
bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const {
if (blob_file_) {
assert(writer_);
return true;
}
return false;
}
bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const {
if (IsBlobFileOpened()) {
return true;
}
BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
const Status s = blob_db_impl->CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(), "GC", &blob_file_, &writer_);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error opening new blob file during GC, status: %s",
s.ToString().c_str());
blob_file_.reset();
writer_.reset();
return false;
}
assert(blob_file_);
assert(writer_);
return true;
}
bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile(
const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
bool need_decompress, CompressionType* compression_type) const {
BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
Status s = blob_db_impl->GetRawBlobFromFile(
key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
blob, compression_type);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error reading blob during GC, key: %s (%s), status: %s",
key.ToString(/* output_hex */ true).c_str(),
blob_index.DebugString(/* output_hex */ true).c_str(),
s.ToString().c_str());
return false;
}
if (need_decompress && *compression_type != kNoCompression) {
s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob);
if (!s.ok()) {
ROCKS_LOG_ERROR(
blob_db_impl->db_options_.info_log,
"Uncompression error during blob read from file: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s status: '%s'",
blob_index.file_number(), blob_index.offset(), blob_index.size(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
return false;
}
}
return true;
}
bool BlobIndexCompactionFilterBase::WriteBlobToNewFile(
const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const {
TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile");
assert(new_blob_file_number);
assert(new_blob_offset);
assert(blob_file_);
*new_blob_file_number = blob_file_->BlobFileNumber();
assert(writer_);
uint64_t new_key_offset = 0;
const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
new_blob_offset);
if (!s.ok()) {
const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
ROCKS_LOG_ERROR(
blob_db_impl->db_options_.info_log,
"Error writing blob to new file %s during GC, key: %s, status: %s",
blob_file_->PathName().c_str(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
return false;
}
const uint64_t new_size =
BlobLogRecord::kHeaderSize + key.size() + blob.size();
blob_file_->BlobRecordAdded(new_size);
BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
blob_db_impl->total_blob_size_ += new_size;
return true;
}
bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded()
const {
const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
return true;
}
return CloseAndRegisterNewBlobFile();
}
bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const {
BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
Status s;
{
WriteLock wl(&blob_db_impl->mutex_);
s = blob_db_impl->CloseBlobFile(blob_file_);
// Note: we delay registering the new blob file until it's closed to
// prevent FIFO eviction from processing it during the GC run.
blob_db_impl->RegisterBlobFile(blob_file_);
}
assert(blob_file_->Immutable());
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error closing new blob file %s during GC, status: %s",
blob_file_->PathName().c_str(), s.ToString().c_str());
}
blob_file_.reset();
return s.ok();
}
CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
const Slice& key, const Slice& existing_value,
std::string* new_value) const {
assert(new_value);
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
const BlobDBImpl* const blob_db_impl = context().blob_db_impl;
(void)blob_db_impl;
assert(blob_db_impl);
@ -125,7 +355,7 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
PinnableSlice blob;
CompressionType compression_type = kNoCompression;
if (!ReadBlobFromOldFile(key, blob_index, &blob, &compression_type)) {
if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) {
gc_stats_.SetError();
return BlobDecision::kIOError;
}
@ -151,139 +381,30 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
}
bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
if (blob_file_) {
assert(writer_);
if (IsBlobFileOpened()) {
return true;
}
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
const Status s = blob_db_impl->CreateBlobFileAndWriter(
/* has_ttl */ false, ExpirationRange(), "GC", &blob_file_, &writer_);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error opening new blob file during GC, status: %s",
s.ToString().c_str());
return false;
bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded();
if (result) {
gc_stats_.AddNewFile();
}
assert(blob_file_);
assert(writer_);
gc_stats_.AddNewFile();
return true;
return result;
}
bool BlobIndexCompactionFilterGC::ReadBlobFromOldFile(
const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
CompressionType* compression_type) const {
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
const Status s = blob_db_impl->GetRawBlobFromFile(
key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
blob, compression_type);
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error reading blob during GC, key: %s (%s), status: %s",
key.ToString(/* output_hex */ true).c_str(),
blob_index.DebugString(/* output_hex */ true).c_str(),
s.ToString().c_str());
return false;
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory(
const CompactionFilter::Context& context) const {
std::unique_ptr<CompactionFilter> user_comp_filter_from_factory;
if (user_comp_filter_factory_) {
user_comp_filter_from_factory =
user_comp_filter_factory_->CreateCompactionFilter(context);
}
return true;
}
bool BlobIndexCompactionFilterGC::WriteBlobToNewFile(
const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const {
assert(new_blob_file_number);
assert(new_blob_offset);
assert(blob_file_);
*new_blob_file_number = blob_file_->BlobFileNumber();
assert(writer_);
uint64_t new_key_offset = 0;
const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
new_blob_offset);
if (!s.ok()) {
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
ROCKS_LOG_ERROR(
blob_db_impl->db_options_.info_log,
"Error writing blob to new file %s during GC, key: %s, status: %s",
blob_file_->PathName().c_str(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
return false;
}
const uint64_t new_size =
BlobLogRecord::kHeaderSize + key.size() + blob.size();
blob_file_->BlobRecordAdded(new_size);
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
blob_db_impl->total_blob_size_ += new_size;
return true;
}
bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFileIfNeeded() const {
const BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
return true;
}
return CloseAndRegisterNewBlobFile();
}
bool BlobIndexCompactionFilterGC::CloseAndRegisterNewBlobFile() const {
BlobDBImpl* const blob_db_impl = context_gc_.blob_db_impl;
assert(blob_db_impl);
assert(blob_file_);
Status s;
{
WriteLock wl(&blob_db_impl->mutex_);
s = blob_db_impl->CloseBlobFile(blob_file_);
// Note: we delay registering the new blob file until it's closed to
// prevent FIFO eviction from processing it during the GC run.
blob_db_impl->RegisterBlobFile(blob_file_);
}
assert(blob_file_->Immutable());
blob_file_.reset();
if (!s.ok()) {
ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
"Error closing new blob file %s during GC, status: %s",
blob_file_->PathName().c_str(), s.ToString().c_str());
return false;
}
return true;
return user_comp_filter_from_factory;
}
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
const CompactionFilter::Context& _context) {
assert(env());
int64_t current_time = 0;
@ -298,13 +419,17 @@ BlobIndexCompactionFilterFactory::CreateCompactionFilter(
BlobCompactionContext context;
blob_db_impl()->GetCompactionContext(&context);
std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
CreateUserCompactionFilterFromFactory(_context);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
std::move(context), current_time, statistics()));
std::move(context), user_comp_filter(),
std::move(user_comp_filter_from_factory), current_time, statistics()));
}
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
const CompactionFilter::Context& _context) {
assert(env());
int64_t current_time = 0;
@ -320,8 +445,12 @@ BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
BlobCompactionContextGC context_gc;
blob_db_impl()->GetCompactionContext(&context, &context_gc);
std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
CreateUserCompactionFilterFromFactory(_context);
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC(
std::move(context), std::move(context_gc), current_time, statistics()));
std::move(context), std::move(context_gc), user_comp_filter(),
std::move(user_comp_filter_from_factory), current_time, statistics()));
}
} // namespace blob_db

View File

@ -13,11 +13,13 @@
#include "rocksdb/env.h"
#include "utilities/blob_db/blob_db_gc_stats.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
namespace ROCKSDB_NAMESPACE {
namespace blob_db {
struct BlobCompactionContext {
BlobDBImpl* blob_db_impl = nullptr;
uint64_t next_file_number = 0;
std::unordered_set<uint64_t> current_blob_files;
SequenceNumber fifo_eviction_seq = 0;
@ -25,41 +27,59 @@ struct BlobCompactionContext {
};
struct BlobCompactionContextGC {
BlobDBImpl* blob_db_impl = nullptr;
uint64_t cutoff_file_number = 0;
};
// Compaction filter that deletes expired blob indexes from the base DB.
// Comes into two varieties, one for the non-GC case and one for the GC case.
class BlobIndexCompactionFilterBase : public CompactionFilter {
class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
public:
BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
uint64_t current_time, Statistics* stats)
: context_(std::move(context)),
BlobIndexCompactionFilterBase(
BlobCompactionContext&& _context,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
context_(std::move(_context)),
current_time_(current_time),
statistics_(stats) {}
~BlobIndexCompactionFilterBase() override {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
}
~BlobIndexCompactionFilterBase() override;
// Filter expired blob indexes regardless of snapshots.
bool IgnoreSnapshots() const override { return true; }
Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
const Slice& value, std::string* /*new_value*/,
std::string* /*skip_until*/) const override;
Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& value, std::string* new_value,
std::string* skip_until) const override;
protected:
bool IsBlobFileOpened() const;
virtual bool OpenNewBlobFileIfNeeded() const;
bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
PinnableSlice* blob, bool need_decompress,
CompressionType* compression_type) const;
bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const;
bool CloseAndRegisterNewBlobFileIfNeeded() const;
bool CloseAndRegisterNewBlobFile() const;
Statistics* statistics() const { return statistics_; }
const BlobCompactionContext& context() const { return context_; }
private:
Decision HandleValueChange(const Slice& key, std::string* new_value) const;
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
mutable std::shared_ptr<BlobFile> blob_file_;
mutable std::shared_ptr<Writer> writer_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
@ -70,20 +90,28 @@ class BlobIndexCompactionFilterBase : public CompactionFilter {
class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilter(BlobCompactionContext&& context,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
}
BlobIndexCompactionFilter(
BlobCompactionContext&& _context,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
std::move(_user_comp_filter_from_factory),
current_time, stats) {}
const char* Name() const override { return "BlobIndexCompactionFilter"; }
};
class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
BlobCompactionContextGC&& context_gc,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
BlobIndexCompactionFilterGC(
BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
std::move(_user_comp_filter_from_factory),
current_time, stats),
context_gc_(std::move(context_gc)) {}
~BlobIndexCompactionFilterGC() override;
@ -94,20 +122,10 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
std::string* new_value) const override;
private:
bool OpenNewBlobFileIfNeeded() const;
bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
PinnableSlice* blob,
CompressionType* compression_type) const;
bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const;
bool CloseAndRegisterNewBlobFileIfNeeded() const;
bool CloseAndRegisterNewBlobFile() const;
bool OpenNewBlobFileIfNeeded() const override;
private:
BlobCompactionContextGC context_gc_;
mutable std::shared_ptr<BlobFile> blob_file_;
mutable std::shared_ptr<Writer> writer_;
mutable BlobDBGarbageCollectionStats gc_stats_;
};
@ -117,50 +135,63 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {}
: blob_db_impl_(_blob_db_impl),
env_(_env),
statistics_(_statistics),
user_comp_filter_(_cf_options.compaction_filter),
user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
protected:
std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
const CompactionFilter::Context& context) const;
BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
Env* env() const { return env_; }
Statistics* statistics() const { return statistics_; }
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
BlobDBImpl* blob_db_impl_;
Env* env_;
Statistics* statistics_;
const CompactionFilter* user_comp_filter_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
class BlobIndexCompactionFilterFactory
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
}
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
_statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override;
const CompactionFilter::Context& context) override;
};
class BlobIndexCompactionFilterFactoryGC
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
}
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
_statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactoryGC";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override;
const CompactionFilter::Context& context) override;
};
} // namespace blob_db

View File

@ -137,11 +137,6 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
return Status::NotSupported("No blob directory in options");
}
if (cf_options_.compaction_filter != nullptr ||
cf_options_.compaction_filter_factory != nullptr) {
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
if (bdb_options_.garbage_collection_cutoff < 0.0 ||
bdb_options_.garbage_collection_cutoff > 1.0) {
return Status::InvalidArgument(
@ -169,6 +164,12 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
if ((cf_options_.compaction_filter != nullptr ||
cf_options_.compaction_filter_factory != nullptr)) {
ROCKS_LOG_INFO(db_options_.info_log,
"BlobDB only support compaction filter on non-TTL values.");
}
// Open blob directory.
s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) {
@ -194,15 +195,18 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
if (bdb_options_.enable_garbage_collection) {
db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
statistics_);
std::make_shared<BlobIndexCompactionFilterFactoryGC>(
this, env_, cf_options_, statistics_);
} else {
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
statistics_);
std::make_shared<BlobIndexCompactionFilterFactory>(
this, env_, cf_options_, statistics_);
}
// Reset user compaction filter after building into compaction factory.
cf_options_.compaction_filter = nullptr;
// Open base db.
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
@ -811,6 +815,7 @@ Status BlobDBImpl::CreateBlobFileAndWriter(
bool has_ttl, const ExpirationRange& expiration_range,
const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
std::shared_ptr<Writer>* writer) {
TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
assert(has_ttl == (expiration_range.first || expiration_range.second));
assert(blob_file);
assert(writer);
@ -1137,6 +1142,31 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return *compression_output;
}
Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
CompressionType compression_type,
PinnableSlice* value_output) const {
if (compression_type != kNoCompression) {
BlockContents contents;
auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
Status s = UncompressBlockContentsForCompressionType(
info, compressed_value.data(), compressed_value.size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
if (!s.ok()) {
return Status::Corruption("Unable to decompress blob.");
}
}
value_output->PinSelf(contents.data);
}
return Status::OK();
}
Status BlobDBImpl::CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names, const int output_level,
@ -1165,10 +1195,10 @@ Status BlobDBImpl::CompactFiles(
return s;
}
void BlobDBImpl::GetCompactionContextCommon(
BlobCompactionContext* context) const {
void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
assert(context);
context->blob_db_impl = this;
context->next_file_number = next_file_number_.load();
context->current_blob_files.clear();
for (auto& p : blob_files_) {
@ -1193,8 +1223,6 @@ void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
ReadLock l(&mutex_);
GetCompactionContextCommon(context);
context_gc->blob_db_impl = this;
if (!live_imm_non_ttl_blob_files_.empty()) {
auto it = live_imm_non_ttl_blob_files_.begin();
std::advance(it, bdb_options_.garbage_collection_cutoff *
@ -1418,20 +1446,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
}
if (compression_type != kNoCompression) {
BlockContents contents;
auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
s = UncompressBlockContentsForCompressionType(
info, value->data(), value->size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
}
s = DecompressSlice(*value, compression_type, value);
if (!s.ok()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
@ -1442,11 +1457,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
blob_index.file_number(), blob_index.offset(), blob_index.size(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
}
return Status::Corruption("Unable to uncompress blob.");
return s;
}
value->PinSelf(contents.data);
}
return Status::OK();
@ -1706,6 +1718,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
}
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
assert(bfile);
assert(!bfile->Immutable());
assert(!bfile->Obsolete());

View File

@ -71,6 +71,7 @@ class BlobDBImpl : public BlobDB {
friend class BlobDBIterator;
friend class BlobDBListener;
friend class BlobDBListenerGC;
friend class BlobIndexCompactionFilterBase;
friend class BlobIndexCompactionFilterGC;
public:
@ -168,7 +169,7 @@ class BlobDBImpl : public BlobDB {
// Common part of the two GetCompactionContext methods below.
// REQUIRES: read lock on mutex_
void GetCompactionContextCommon(BlobCompactionContext* context) const;
void GetCompactionContextCommon(BlobCompactionContext* context);
void GetCompactionContext(BlobCompactionContext* context);
void GetCompactionContext(BlobCompactionContext* context,
@ -232,6 +233,10 @@ class BlobDBImpl : public BlobDB {
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;
Status DecompressSlice(const Slice& compressed_value,
CompressionType compression_type,
PinnableSlice* value_output) const;
// Close a file by appending a footer, and removes file from open files list.
// REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
// and the blob file's mutex_. If called on a blob file which is visible only

View File

@ -1111,27 +1111,187 @@ TEST_F(BlobDBTest, InlineSmallValues) {
ASSERT_TRUE(ttl_file->HasTTL());
}
TEST_F(BlobDBTest, CompactionFilterNotSupported) {
class TestCompactionFilter : public CompactionFilter {
const char *Name() const override { return "TestCompactionFilter"; }
TEST_F(BlobDBTest, UserCompactionFilter) {
class CustomerFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
std::string *new_value, bool *value_changed) const override {
*value_changed = false;
// changing value size to test value transitions between inlined data
// and stored-in-blob data
if (value.size() % 4 == 1) {
*new_value = value.ToString();
// double size by duplicating value
*new_value += *new_value;
*value_changed = true;
return false;
} else if (value.size() % 3 == 1) {
*new_value = value.ToString();
// trancate value size by half
*new_value = new_value->substr(0, new_value->size() / 2);
*value_changed = true;
return false;
} else if (value.size() % 2 == 1) {
return true;
}
return false;
}
bool IgnoreSnapshots() const override { return true; }
const char *Name() const override { return "CustomerFilter"; }
};
class TestCompactionFilterFactory : public CompactionFilterFactory {
const char *Name() const override { return "TestCompactionFilterFactory"; }
class CustomerFilterFactory : public CompactionFilterFactory {
const char *Name() const override { return "CustomerFilterFactory"; }
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context & /*context*/) override {
return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
return std::unique_ptr<CompactionFilter>(new CustomerFilter());
}
};
for (int i = 0; i < 2; i++) {
constexpr size_t kNumPuts = 1 << 10;
// Generate both inlined and blob value
constexpr uint64_t kMinValueSize = 1 << 6;
constexpr uint64_t kMaxValueSize = 1 << 8;
constexpr uint64_t kMinBlobSize = 1 << 7;
static_assert(kMinValueSize < kMinBlobSize, "");
static_assert(kMaxValueSize > kMinBlobSize, "");
BlobDBOptions bdb_options;
bdb_options.min_blob_size = kMinBlobSize;
bdb_options.blob_file_size = kMaxValueSize * 10;
bdb_options.disable_background_tasks = true;
if (Snappy_Supported()) {
bdb_options.compression = CompressionType::kSnappyCompression;
}
// case_num == 0: Test user defined compaction filter
// case_num == 1: Test user defined compaction filter factory
for (int case_num = 0; case_num < 2; case_num++) {
Options options;
if (i == 0) {
options.compaction_filter = new TestCompactionFilter();
if (case_num == 0) {
options.compaction_filter = new CustomerFilter();
} else {
options.compaction_filter_factory.reset(
new TestCompactionFilterFactory());
options.compaction_filter_factory.reset(new CustomerFilterFactory());
}
ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
options.disable_auto_compactions = true;
options.env = mock_env_.get();
options.statistics = CreateDBStatistics();
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, std::string> data_after_compact;
Random rnd(301);
uint64_t value_size = kMinValueSize;
int drop_record = 0;
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value(
test::RandomHumanReadableString(&rnd, (int)value_size));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
if (value.length() % 4 == 1) {
data_after_compact[key] = value + value;
} else if (value.length() % 3 == 1) {
data_after_compact[key] = value.substr(0, value.size() / 2);
} else if (value.length() % 2 == 1) {
++drop_record;
} else {
data_after_compact[key] = value;
}
if (++value_size > kMaxValueSize) {
value_size = kMinValueSize;
}
}
// Verify full data set
VerifyDB(data);
// Applying compaction filter for records
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Verify data after compaction, only value with even length left.
VerifyDB(data_after_compact);
ASSERT_EQ(drop_record,
options.statistics->getTickerCount(COMPACTION_KEY_DROP_USER));
delete options.compaction_filter;
Destroy();
}
}
// Test user comapction filter when there is IO error on blob data.
TEST_F(BlobDBTest, UserCompactionFilter_BlobIOError) {
class CustomerFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice & /*key*/, const Slice &value,
std::string *new_value, bool *value_changed) const override {
*new_value = value.ToString() + "_new";
*value_changed = true;
return false;
}
bool IgnoreSnapshots() const override { return true; }
const char *Name() const override { return "CustomerFilter"; }
};
constexpr size_t kNumPuts = 100;
constexpr int kValueSize = 100;
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = kValueSize * 10;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kNoCompression;
std::vector<std::string> io_failure_cases = {
"BlobDBImpl::CreateBlobFileAndWriter",
"BlobIndexCompactionFilterBase::WriteBlobToNewFile",
"BlobDBImpl::CloseBlobFile"};
for (size_t case_num = 0; case_num < io_failure_cases.size(); case_num++) {
Options options;
options.compaction_filter = new CustomerFilter();
options.disable_auto_compactions = true;
options.env = fault_injection_env_.get();
options.statistics = CreateDBStatistics();
Open(bdb_options, options);
std::map<std::string, std::string> data;
Random rnd(301);
for (size_t i = 0; i < kNumPuts; ++i) {
std::ostringstream oss;
oss << "key" << std::setw(4) << std::setfill('0') << i;
const std::string key(oss.str());
const std::string value(
test::RandomHumanReadableString(&rnd, kValueSize));
const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
ASSERT_OK(Put(key, value));
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
data[key] = value;
}
// Verify full data set
VerifyDB(data);
SyncPoint::GetInstance()->SetCallBack(
io_failure_cases[case_num], [&](void * /*arg*/) {
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
});
SyncPoint::GetInstance()->EnableProcessing();
auto s = blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.IsIOError());
// Verify full data set after compaction failure
VerifyDB(data);
// Reactivate file system to allow test to close DB.
fault_injection_env_->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks();
delete options.compaction_filter;
Destroy();
}
}
@ -1925,7 +2085,7 @@ TEST_F(BlobDBTest, ShutdownWait) {
{"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
});
// Force all tasks to be scheduled immediately.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
SyncPoint::GetInstance()->SetCallBack(
"TimeQueue::Add:item.end", [&](void *arg) {
std::chrono::steady_clock::time_point *tp =
static_cast<std::chrono::steady_clock::time_point *>(arg);
@ -1933,7 +2093,7 @@ TEST_F(BlobDBTest, ShutdownWait) {
std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
SyncPoint::GetInstance()->SetCallBack(
"BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
// Sleep 3 ms to increase the chance of data race.
// We've synced up the code so that EvictExpiredFiles()

View File

@ -27,6 +27,7 @@ class BlobFile {
friend class BlobDBImpl;
friend struct BlobFileComparator;
friend struct BlobFileComparatorTTL;
friend class BlobIndexCompactionFilterBase;
friend class BlobIndexCompactionFilterGC;
private:

View File

@ -0,0 +1,37 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
#include "rocksdb/compaction_filter.h"
namespace ROCKSDB_NAMESPACE {
// Abstract base class for building layered compation filter on top of
// user compaction filter.
// See BlobIndexCompactionFilter or TtlCompactionFilter for a basic usage.
class LayeredCompactionFilterBase : public CompactionFilter {
public:
LayeredCompactionFilterBase(
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
: user_comp_filter_(_user_comp_filter),
user_comp_filter_from_factory_(
std::move(_user_comp_filter_from_factory)) {
if (!user_comp_filter_) {
user_comp_filter_ = user_comp_filter_from_factory_.get();
}
}
// Return a pointer to user compaction filter
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
const CompactionFilter* user_comp_filter_;
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -17,6 +17,7 @@
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/utility_db.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
#ifdef _WIN32
// Windows API macro interference
@ -148,23 +149,16 @@ class TtlIterator : public Iterator {
Iterator* iter_;
};
class TtlCompactionFilter : public CompactionFilter {
class TtlCompactionFilter : public LayeredCompactionFilterBase {
public:
TtlCompactionFilter(
int32_t ttl, Env* env, const CompactionFilter* user_comp_filter,
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr)
: ttl_(ttl),
env_(env),
user_comp_filter_(user_comp_filter),
user_comp_filter_from_factory_(
std::move(user_comp_filter_from_factory)) {
// Unlike the merge operator, compaction filter is necessary for TTL, hence
// this would be called even if user doesn't specify any compaction-filter
if (!user_comp_filter_) {
user_comp_filter_ = user_comp_filter_from_factory_.get();
}
}
TtlCompactionFilter(int32_t ttl, Env* env,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
env_(env) {}
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
@ -172,14 +166,14 @@ class TtlCompactionFilter : public CompactionFilter {
if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) {
return true;
}
if (user_comp_filter_ == nullptr) {
if (user_comp_filter() == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
@ -195,8 +189,6 @@ class TtlCompactionFilter : public CompactionFilter {
private:
int32_t ttl_;
Env* env_;
const CompactionFilter* user_comp_filter_;
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
};
class TtlCompactionFilterFactory : public CompactionFilterFactory {