From 3b81df34bd906280633dab3c8ae03c072125ebc8 Mon Sep 17 00:00:00 2001 From: sumeet Date: Tue, 13 Aug 2013 10:56:20 -0700 Subject: [PATCH] Separate compaction filter for each compaction Summary: If we have same compaction filter for each compaction, application cannot know about the different compaction processes. Later on, we can put in more details in compaction filter for the application to consume and use it according to its needs. For e.g. In the universal compaction, we have a compaction process involving all the files while others don't involve all the files. Applications may want to collect some stats only when during full compaction. Test Plan: run existing unit tests Reviewers: haobo, dhruba Reviewed By: dhruba CC: xinyaohu, leveldb Differential Revision: https://reviews.facebook.net/D12057 --- db/db_impl.cc | 15 +++++++-- db/db_test.cc | 51 +++++++++++++++++++++++++---- include/leveldb/compaction_filter.h | 25 ++++++++++++++ include/leveldb/options.h | 11 +++++++ util/options.cc | 8 ++++- utilities/ttl/db_ttl.cc | 13 ++++++-- utilities/ttl/db_ttl.h | 38 +++++++++++++++++++-- utilities/ttl/ttl_test.cc | 29 ++++++++++++++-- 8 files changed, 174 insertions(+), 16 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a91c58de02..6083746fa3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -158,6 +158,10 @@ Options SanitizeOptions(const std::string& dbname, if (result.soft_rate_limit > result.hard_rate_limit) { result.soft_rate_limit = result.hard_rate_limit; } + if (result.compaction_filter && + result.compaction_filter_factory->CreateCompactionFilter().get()) { + Log(result.info_log, "Both filter and factory specified. Using filter"); + } return result; } @@ -1784,6 +1788,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { MergeHelper merge(user_comparator(), options_.merge_operator, options_.info_log.get(), false /* internal key corruption is expected */); + auto compaction_filter = options_.compaction_filter; + std::unique_ptr compaction_filter_from_factory = nullptr; + if (!compaction_filter) { + compaction_filter_from_factory = std::move( + options_.compaction_filter_factory->CreateCompactionFilter()); + compaction_filter = compaction_filter_from_factory.get(); + } for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { @@ -1830,7 +1841,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { visible_in_snapshot = kMaxSequenceNumber; // apply the compaction filter to the first occurrence of the user key - if (options_.compaction_filter && + if (compaction_filter && ikey.type == kTypeValue && (visible_at_tip || ikey.sequence > latest_snapshot)) { // If the user has specified a compaction filter and the sequence @@ -1841,7 +1852,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { bool value_changed = false; compaction_filter_value.clear(); bool to_delete = - options_.compaction_filter->Filter(compact->compaction->level(), + compaction_filter->Filter(compact->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); diff --git a/db/db_test.cc b/db/db_test.cc index 3a3e50cc5d..18f2e8832c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1698,12 +1698,52 @@ class ChangeFilter : public CompactionFilter { const int argv_; }; +class KeepFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new KeepFilter()); + } + + virtual const char* Name() const override { + return "KeepFilterFactory"; + } +}; + +class DeleteFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new DeleteFilter()); + } + + virtual const char* Name() const override { + return "DeleteFilterFactory"; + } +}; + +class ChangeFilterFactory : public CompactionFilterFactory { + public: + explicit ChangeFilterFactory(int argv) : argv_(argv) {} + + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new ChangeFilter(argv_)); + } + + virtual const char* Name() const override { + return "ChangeFilterFactory"; + } + + private: + const int argv_; +}; + TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto keep_filter = std::make_shared(); - options.compaction_filter = keep_filter.get(); + options.compaction_filter_factory = std::make_shared(); Reopen(&options); // Write 100K keys, these are written to a few files in L0. @@ -1778,8 +1818,7 @@ TEST(DBTest, CompactionFilter) { // create a new database with the compaction // filter in such a way that it deletes all keys - auto delete_filter = std::make_shared(); - options.compaction_filter = delete_filter.get(); + options.compaction_filter_factory = std::make_shared(); options.create_if_missing = true; DestroyAndReopen(&options); @@ -1843,8 +1882,8 @@ TEST(DBTest, CompactionFilterWithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto change_filter = std::make_shared(100); - options.compaction_filter = change_filter.get(); + options.compaction_filter_factory = + std::make_shared(100); Reopen(&options); // Write 100K+1 keys, these are written to a few files diff --git a/include/leveldb/compaction_filter.h b/include/leveldb/compaction_filter.h index 73614742e2..54a9b39ed2 100644 --- a/include/leveldb/compaction_filter.h +++ b/include/leveldb/compaction_filter.h @@ -41,6 +41,31 @@ class CompactionFilter { virtual const char* Name() const = 0; }; +// Each compaction will create a new CompactionFilter allowing the +// application to know about different campactions +class CompactionFilterFactory { + public: + virtual ~CompactionFilterFactory() { }; + virtual std::unique_ptr CreateCompactionFilter() = 0; + + // Returns a name that identifies this compaction filter factory. + virtual const char* Name() const = 0; +}; + +// Default implementaion of CompactionFilterFactory which does not +// return any filter +class DefaultCompactionFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(nullptr); + } + + virtual const char* Name() const override { + return "DefaultCompactionFilterFactory"; + } +}; + } // namespace leveldb #endif // STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 52b6a20581..30a8dc3285 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -25,6 +25,7 @@ class Logger; class MergeOperator; class Snapshot; class CompactionFilter; +class CompactionFilterFactory; using std::shared_ptr; @@ -84,8 +85,13 @@ struct Options { // Default: nullptr const MergeOperator* merge_operator; + // The client must provide compaction_filter_factory if it requires a new + // compaction filter to be used for different compaction processes // Allows an application to modify/delete a key-value during background // compaction. + // Ideally, client should specify only one of filter or factory. + // compaction_filter takes precedence over compaction_filter_factory if + // client specifies both. // Default: nullptr const CompactionFilter* compaction_filter; @@ -506,6 +512,11 @@ struct Options { // Default: a factory that provides a skip-list-based implementation of // MemTableRep. std::shared_ptr memtable_factory; + + // This is a factory that provides compaction filter objects which allow + // an application to modify/delete a key-value during background compaction. + // Default: a factory that doesn't provide any object + std::shared_ptr compaction_filter_factory; }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 127fe81812..cc5653326e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -80,7 +80,11 @@ Options::Options() bytes_per_sync(0), compaction_style(kCompactionStyleLevel), filter_deletes(false), - memtable_factory(std::shared_ptr(new SkipListFactory)) { + memtable_factory(std::shared_ptr(new SkipListFactory)), + compaction_filter_factory( + std::shared_ptr( + new DefaultCompactionFilterFactory())) { + assert(memtable_factory.get() != nullptr); } @@ -96,6 +100,8 @@ Options::Dump(Logger* log) const merge_operator? merge_operator->Name() : "None"); Log(log," Options.compaction_filter: %s", compaction_filter? compaction_filter->Name() : "None"); + Log(log," Options.compaction_filter_factory: %s", + compaction_filter_factory->Name()); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 4b643239a9..0791cdc878 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -20,9 +20,16 @@ DBWithTTL::DBWithTTL(const int32_t ttl, ttl_(ttl) { Options options_to_open = options; - ttl_comp_filter_.reset(new TtlCompactionFilter(ttl, - options.compaction_filter)); - options_to_open.compaction_filter = ttl_comp_filter_.get(); + if (options.compaction_filter) { + ttl_comp_filter_.reset( + new TtlCompactionFilter(ttl, options.compaction_filter)); + options_to_open.compaction_filter = ttl_comp_filter_.get(); + } else { + options_to_open.compaction_filter_factory = + std::shared_ptr( + new TtlCompactionFilterFactory( + ttl, options.compaction_filter_factory)); + } if (options.merge_operator) { ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index dae0ba8490..dcc41bce56 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -176,11 +176,19 @@ class TtlIterator : public Iterator { class TtlCompactionFilter : public CompactionFilter { public: - TtlCompactionFilter(int32_t ttl, const CompactionFilter* comp_filter) + TtlCompactionFilter( + int32_t ttl, + const CompactionFilter* user_comp_filter, + std::unique_ptr + user_comp_filter_from_factory = nullptr) : ttl_(ttl), - user_comp_filter_(comp_filter) { + user_comp_filter_(user_comp_filter), + user_comp_filter_from_factory_(std::move(user_comp_filter_from_factory)) { // Unlike the merge operator, compaction filter is necessary for TTL, hence // this would be called even if user doesn't specify any compaction-filter + if (!user_comp_filter_) { + user_comp_filter_ = user_comp_filter_from_factory_.get(); + } } virtual bool Filter(int level, @@ -215,6 +223,32 @@ class TtlCompactionFilter : public CompactionFilter { private: int32_t ttl_; const CompactionFilter* user_comp_filter_; + std::unique_ptr user_comp_filter_from_factory_; +}; + +class TtlCompactionFilterFactory : public CompactionFilterFactory { + public: + TtlCompactionFilterFactory( + int32_t ttl, + std::shared_ptr comp_filter_factory) + : ttl_(ttl), + user_comp_filter_factory_(comp_filter_factory) { } + + virtual std::unique_ptr CreateCompactionFilter() { + return std::unique_ptr( + new TtlCompactionFilter( + ttl_, + nullptr, + std::move(user_comp_filter_factory_->CreateCompactionFilter()))); + } + + virtual const char* Name() const override { + return "TtlCompactionFilterFactory"; + } + + private: + int32_t ttl_; + std::shared_ptr user_comp_filter_factory_; }; class TtlMergeOperator : public MergeOperator { diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index ba232001cb..3d357e0cf9 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include "leveldb/compaction_filter.h" #include "utilities/utility_db.h" #include "util/testharness.h" @@ -54,8 +55,9 @@ class TtlTest { // Open with TestFilter compaction filter void OpenTtlWithTestCompaction(int32_t ttl) { - test_comp_filter_.reset(new TestFilter(kSampleSize_, kNewValue_)); - options_.compaction_filter = test_comp_filter_.get(); + options_.compaction_filter_factory = + std::shared_ptr( + new TestFilterFactory(kSampleSize_, kNewValue_)); OpenTtl(ttl); } @@ -252,6 +254,29 @@ class TtlTest { const std::string kNewValue_; }; + class TestFilterFactory : public CompactionFilterFactory { + public: + TestFilterFactory(const int64_t kSampleSize, const std::string kNewValue) + : kSampleSize_(kSampleSize), + kNewValue_(kNewValue) { + } + + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr( + new TestFilter(kSampleSize_, kNewValue_)); + } + + virtual const char* Name() const override { + return "TestFilterFactory"; + } + + private: + const int64_t kSampleSize_; + const std::string kNewValue_; + }; + + // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer const int64_t kSampleSize_ = 100;