Make MergeOperator+CompactionFilter/Factory into Customizable Classes (#8481)

Summary:
- Changed MergeOperator, CompactionFilter, and CompactionFilterFactory into Customizable classes.
 - Added Options/Configurable/Object Registration for TTL and Cassandra variants
 - Changed the StringAppend MergeOperators to accept a string delimiter rather than a simple char.  Made the delimiter into a configurable option
 - Added tests for new functionality

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8481

Reviewed By: zhichao-cao

Differential Revision: D30136050

Pulled By: mrambacher

fbshipit-source-id: 271d1772835935b6773abaf018ee71e42f9491af
This commit is contained in:
mrambacher 2021-08-06 08:26:23 -07:00 committed by Facebook GitHub Bot
parent fd2079938d
commit d057e8326d
39 changed files with 1273 additions and 319 deletions

View File

@ -849,6 +849,7 @@ set(SOURCES
utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/compaction_filters.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/debug.cc
utilities/env_mirror.cc
@ -857,6 +858,7 @@ set(SOURCES
utilities/fault_injection_fs.cc
utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc
utilities/merge_operators.cc
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc

View File

@ -368,6 +368,7 @@ cpp_library(
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
"utilities/debug.cc",
@ -377,6 +378,7 @@ cpp_library(
"utilities/fault_injection_fs.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
@ -681,6 +683,7 @@ cpp_library(
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
"utilities/debug.cc",
@ -690,6 +693,7 @@ cpp_library(
"utilities/fault_injection_fs.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",

View File

@ -337,8 +337,9 @@ class PartialDeleteCompactionFilter : public CompactionFilter {
TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
options.env = env_;
auto delete_comp_filter = PartialDeleteCompactionFilter();
options.compaction_filter = &delete_comp_filter;
std::unique_ptr<CompactionFilter> delete_comp_filter(
new PartialDeleteCompactionFilter());
options.compaction_filter = delete_comp_filter.get();
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);

View File

@ -13,6 +13,7 @@
#include <string>
#include <vector>
#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
@ -24,7 +25,7 @@ class SliceTransform;
// CompactionFilter allows an application to modify/delete a key-value during
// table file creation.
class CompactionFilter {
class CompactionFilter : public Customizable {
public:
enum ValueType {
kValue,
@ -59,6 +60,10 @@ class CompactionFilter {
};
virtual ~CompactionFilter() {}
static const char* Type() { return "CompactionFilter"; }
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& name,
const CompactionFilter** result);
// The table file creation process invokes this method before adding a kv to
// the table file. A return value of false indicates that the kv should be
@ -193,7 +198,7 @@ class CompactionFilter {
// Returns a name that identifies this `CompactionFilter`.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
const char* Name() const override = 0;
// Internal (BlobDB) use only. Do not override in application code.
virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; }
@ -214,9 +219,13 @@ class CompactionFilter {
// `CompactionFilter` according to `ShouldFilterTableFileCreation()`. This
// allows the application to know about the different ongoing threads of work
// and makes it unnecessary for `CompactionFilter` to provide thread-safety.
class CompactionFilterFactory {
class CompactionFilterFactory : public Customizable {
public:
virtual ~CompactionFilterFactory() {}
static const char* Type() { return "CompactionFilterFactory"; }
static Status CreateFromString(
const ConfigOptions& config_options, const std::string& name,
std::shared_ptr<CompactionFilterFactory>* result);
// Returns whether a thread creating table files for the specified `reason`
// should invoke `CreateCompactionFilter()` and pass KVs through the returned

View File

@ -87,7 +87,18 @@ class Customizable : public Configurable {
// @param name The name of the instance to find.
// Returns true if the class is an instance of the input name.
virtual bool IsInstanceOf(const std::string& name) const {
return name == Name();
if (name.empty()) {
return false;
} else if (name == Name()) {
return true;
} else {
const char* nickname = NickName();
if (nickname != nullptr && name == nickname) {
return true;
} else {
return false;
}
}
}
// Returns the named instance of the Customizable as a T*, or nullptr if not
@ -179,6 +190,10 @@ class Customizable : public Configurable {
virtual const Customizable* Inner() const { return nullptr; }
protected:
// Some classes have both a class name (e.g. PutOperator) and a nickname
// (e.g. put). Classes can override this method to return a
// nickname. Nicknames can be used by InstanceOf and object creation.
virtual const char* NickName() const { return ""; }
// Given a name (e.g. rocksdb.my.type.opt), returns the short name (opt)
std::string GetOptionName(const std::string& long_name) const override;
#ifndef ROCKSDB_LITE

View File

@ -10,6 +10,7 @@
#include <string>
#include <vector>
#include "rocksdb/customizable.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
@ -43,10 +44,13 @@ class Logger;
//
// Refer to rocksdb-merge wiki for more details and example implementations.
//
class MergeOperator {
class MergeOperator : public Customizable {
public:
virtual ~MergeOperator() {}
static const char* Type() { return "MergeOperator"; }
static Status CreateFromString(const ConfigOptions& opts,
const std::string& id,
std::shared_ptr<MergeOperator>* result);
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.

View File

@ -35,10 +35,7 @@ enum class OptionType {
kCompactionPri,
kSliceTransform,
kCompressionType,
kCompactionFilter,
kCompactionFilterFactory,
kCompactionStopStyle,
kMergeOperator,
kMemTableRepFactory,
kFilterPolicy,
kChecksumType,

View File

@ -15,6 +15,7 @@
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/concurrent_task_limiter.h"
#include "rocksdb/configurable.h"
#include "rocksdb/convenience.h"
@ -656,30 +657,18 @@ static std::unordered_map<std::string, OptionTypeInfo>
}
}}},
{"compaction_filter",
{offset_of(&ImmutableCFOptions::compaction_filter),
OptionType::kCompactionFilter, OptionVerificationType::kByName,
OptionTypeFlags::kNone}},
OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
offset_of(&ImmutableCFOptions::compaction_filter),
OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"compaction_filter_factory",
{offset_of(&ImmutableCFOptions::compaction_filter_factory),
OptionType::kCompactionFilterFactory, OptionVerificationType::kByName,
OptionTypeFlags::kNone}},
OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
offset_of(&ImmutableCFOptions::compaction_filter_factory),
OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"merge_operator",
{offset_of(&ImmutableCFOptions::merge_operator),
OptionType::kMergeOperator,
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kCompareLoose,
// Parses the input value as a MergeOperator, updating the value
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto mop = static_cast<std::shared_ptr<MergeOperator>*>(addr);
Status status =
opts.registry->NewSharedObject<MergeOperator>(value, mop);
// Only support static comparator for now.
if (status.ok()) {
return status;
}
return Status::OK();
}}},
OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
offset_of(&ImmutableCFOptions::merge_operator),
OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kCompareLoose | OptionTypeFlags::kAllowNull)},
{"compaction_style",
{offset_of(&ImmutableCFOptions::compaction_style),
OptionType::kCompactionStyle, OptionVerificationType::kNormal,

View File

@ -29,6 +29,7 @@
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
#ifndef GFLAGS
bool FLAGS_enable_print = false;
@ -944,6 +945,26 @@ static int RegisterTestObjects(ObjectLibrary& library,
static test::SimpleSuffixReverseComparator ssrc;
return &ssrc;
});
library.Register<MergeOperator>(
"Changling",
[](const std::string& uri, std::unique_ptr<MergeOperator>* guard,
std::string* /* errmsg */) {
guard->reset(new test::ChanglingMergeOperator(uri));
return guard->get();
});
library.Register<CompactionFilter>(
"Changling",
[](const std::string& uri, std::unique_ptr<CompactionFilter>* /*guard*/,
std::string* /* errmsg */) {
return new test::ChanglingCompactionFilter(uri);
});
library.Register<CompactionFilterFactory>(
"Changling", [](const std::string& uri,
std::unique_ptr<CompactionFilterFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new test::ChanglingCompactionFilterFactory(uri));
return guard->get();
});
return static_cast<int>(library.GetFactoryCount(&num_types));
}
@ -1136,6 +1157,58 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) {
}
}
TEST_F(LoadCustomizableTest, LoadMergeOperatorTest) {
std::shared_ptr<MergeOperator> result;
ASSERT_NOK(
MergeOperator::CreateFromString(config_options_, "Changling", &result));
ASSERT_OK(MergeOperator::CreateFromString(config_options_, "put", &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), "PutOperator");
if (RegisterTests("Test")) {
ASSERT_OK(
MergeOperator::CreateFromString(config_options_, "Changling", &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), "ChanglingMergeOperator");
}
}
TEST_F(LoadCustomizableTest, LoadCompactionFilterFactoryTest) {
std::shared_ptr<CompactionFilterFactory> result;
ASSERT_NOK(CompactionFilterFactory::CreateFromString(config_options_,
"Changling", &result));
if (RegisterTests("Test")) {
ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_,
"Changling", &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), "ChanglingCompactionFilterFactory");
}
}
TEST_F(LoadCustomizableTest, LoadCompactionFilterTest) {
const CompactionFilter* result = nullptr;
ASSERT_NOK(CompactionFilter::CreateFromString(config_options_, "Changling",
&result));
#ifndef ROCKSDB_LITE
ASSERT_OK(CompactionFilter::CreateFromString(
config_options_, RemoveEmptyValueCompactionFilter::kClassName(),
&result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), RemoveEmptyValueCompactionFilter::kClassName());
delete result;
result = nullptr;
if (RegisterTests("Test")) {
ASSERT_OK(CompactionFilter::CreateFromString(config_options_, "Changling",
&result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), "ChanglingCompactionFilter");
delete result;
}
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
TEST_F(LoadCustomizableTest, LoadEventListenerTest) {
std::shared_ptr<EventListener> result;

View File

@ -562,32 +562,12 @@ bool SerializeSingleOptionHelper(const void* opt_address,
: kNullptrString;
break;
}
case OptionType::kCompactionFilter: {
// it's a const pointer of const CompactionFilter*
const auto* ptr =
static_cast<const CompactionFilter* const*>(opt_address);
*value = *ptr ? (*ptr)->Name() : kNullptrString;
break;
}
case OptionType::kCompactionFilterFactory: {
const auto* ptr =
static_cast<const std::shared_ptr<CompactionFilterFactory>*>(
opt_address);
*value = ptr->get() ? ptr->get()->Name() : kNullptrString;
break;
}
case OptionType::kMemTableRepFactory: {
const auto* ptr =
static_cast<const std::shared_ptr<MemTableRepFactory>*>(opt_address);
*value = ptr->get() ? ptr->get()->Name() : kNullptrString;
break;
}
case OptionType::kMergeOperator: {
const auto* ptr =
static_cast<const std::shared_ptr<MergeOperator>*>(opt_address);
*value = ptr->get() ? ptr->get()->Name() : kNullptrString;
break;
}
case OptionType::kFilterPolicy: {
const auto* ptr =
static_cast<const std::shared_ptr<FilterPolicy>*>(opt_address);

View File

@ -30,6 +30,9 @@
#include "util/stderr_logger.h"
#include "util/string_util.h"
#include "utilities/merge_operators/bytesxor.h"
#include "utilities/merge_operators/sortlist.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
#ifndef GFLAGS
bool FLAGS_enable_print = false;
@ -395,13 +398,6 @@ TEST_F(OptionsTest, GetColumnFamilyOptionsFromStringTest) {
// MergeOperator from object registry
std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
std::string kMoName = bxo->Name();
ObjectLibrary::Default()->Register<MergeOperator>(
kMoName,
[](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
std::string* /* errmsg */) {
guard->reset(new BytesXOROperator());
return guard->get();
});
ASSERT_OK(GetColumnFamilyOptionsFromString(config_options, base_cf_opt,
"merge_operator=" + kMoName + ";",
@ -2281,14 +2277,6 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
// MergeOperator from object registry
std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
std::string kMoName = bxo->Name();
ObjectLibrary::Default()->Register<MergeOperator>(
kMoName,
[](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
std::string* /* errmsg */) {
guard->reset(new BytesXOROperator());
return guard->get();
});
ASSERT_OK(GetColumnFamilyOptionsFromString(
base_cf_opt, "merge_operator=" + kMoName + ";", &new_cf_opt));
ASSERT_EQ(kMoName, std::string(new_cf_opt.merge_operator->Name()));
@ -3096,8 +3084,8 @@ void VerifyCFPointerTypedOptions(
// change the name of merge operator back-and-forth
{
auto* merge_operator = dynamic_cast<test::ChanglingMergeOperator*>(
base_cf_opt->merge_operator.get());
auto* merge_operator = base_cf_opt->merge_operator
->CheckedCast<test::ChanglingMergeOperator>();
if (merge_operator != nullptr) {
name_buffer = merge_operator->Name();
// change the name and expect non-ok status
@ -3114,8 +3102,8 @@ void VerifyCFPointerTypedOptions(
// change the name of the compaction filter factory back-and-forth
{
auto* compaction_filter_factory =
dynamic_cast<test::ChanglingCompactionFilterFactory*>(
base_cf_opt->compaction_filter_factory.get());
base_cf_opt->compaction_filter_factory
->CheckedCast<test::ChanglingCompactionFilterFactory>();
if (compaction_filter_factory != nullptr) {
name_buffer = compaction_filter_factory->Name();
// change the name and expect non-ok status
@ -4186,6 +4174,90 @@ TEST_F(ConfigOptionsTest, EnvFromConfigOptions) {
delete mem_env;
}
TEST_F(ConfigOptionsTest, MergeOperatorFromString) {
ConfigOptions config_options;
std::shared_ptr<MergeOperator> merge_op;
ASSERT_OK(MergeOperator::CreateFromString(config_options, "put", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("put"));
ASSERT_STREQ(merge_op->Name(), "PutOperator");
ASSERT_OK(
MergeOperator::CreateFromString(config_options, "put_v1", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("PutOperator"));
ASSERT_OK(
MergeOperator::CreateFromString(config_options, "uint64add", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("uint64add"));
ASSERT_STREQ(merge_op->Name(), "UInt64AddOperator");
ASSERT_OK(MergeOperator::CreateFromString(config_options, "max", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("max"));
ASSERT_STREQ(merge_op->Name(), "MaxOperator");
ASSERT_OK(
MergeOperator::CreateFromString(config_options, "bytesxor", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("bytesxor"));
ASSERT_STREQ(merge_op->Name(), BytesXOROperator::kClassName());
ASSERT_OK(
MergeOperator::CreateFromString(config_options, "sortlist", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("sortlist"));
ASSERT_STREQ(merge_op->Name(), SortList::kClassName());
ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappend",
&merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
auto delimiter = merge_op->GetOptions<std::string>("Delimiter");
ASSERT_NE(delimiter, nullptr);
ASSERT_EQ(*delimiter, ",");
ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappendtest",
&merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
delimiter = merge_op->GetOptions<std::string>("Delimiter");
ASSERT_NE(delimiter, nullptr);
ASSERT_EQ(*delimiter, ",");
ASSERT_OK(MergeOperator::CreateFromString(
config_options, "id=stringappend; delimiter=||", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
delimiter = merge_op->GetOptions<std::string>("Delimiter");
ASSERT_NE(delimiter, nullptr);
ASSERT_EQ(*delimiter, "||");
ASSERT_OK(MergeOperator::CreateFromString(
config_options, "id=stringappendtest; delimiter=&&", &merge_op));
ASSERT_NE(merge_op, nullptr);
ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
delimiter = merge_op->GetOptions<std::string>("Delimiter");
ASSERT_NE(delimiter, nullptr);
ASSERT_EQ(*delimiter, "&&");
std::shared_ptr<MergeOperator> copy;
std::string mismatch;
std::string opts_str = merge_op->ToString(config_options);
ASSERT_OK(MergeOperator::CreateFromString(config_options, opts_str, &copy));
ASSERT_TRUE(merge_op->AreEquivalent(config_options, copy.get(), &mismatch));
ASSERT_NE(copy, nullptr);
delimiter = copy->GetOptions<std::string>("Delimiter");
ASSERT_NE(delimiter, nullptr);
ASSERT_EQ(*delimiter, "&&");
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

2
src.mk
View File

@ -232,6 +232,7 @@ LIB_SOURCES = \
utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
utilities/debug.cc \
@ -241,6 +242,7 @@ LIB_SOURCES = \
utilities/fault_injection_fs.cc \
utilities/leveldb_options/leveldb_options.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
utilities/merge_operators/sortlist.cc \

View File

@ -772,6 +772,15 @@ class ChanglingMergeOperator : public MergeOperator {
Logger* /*logger*/) const override {
return false;
}
static const char* kClassName() { return "ChanglingMergeOperator"; }
virtual bool IsInstanceOf(const std::string& id) const override {
if (id == kClassName()) {
return true;
} else {
return MergeOperator::IsInstanceOf(id);
}
}
virtual const char* Name() const override { return name_.c_str(); }
protected:
@ -796,6 +805,15 @@ class ChanglingCompactionFilter : public CompactionFilter {
return false;
}
static const char* kClassName() { return "ChanglingCompactionFilter"; }
virtual bool IsInstanceOf(const std::string& id) const override {
if (id == kClassName()) {
return true;
} else {
return CompactionFilter::IsInstanceOf(id);
}
}
const char* Name() const override { return name_.c_str(); }
private:
@ -821,6 +839,14 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
// Returns a name that identifies this compaction filter factory.
const char* Name() const override { return name_.c_str(); }
static const char* kClassName() { return "ChanglingCompactionFilterFactory"; }
virtual bool IsInstanceOf(const std::string& id) const override {
if (id == kClassName()) {
return true;
} else {
return CompactionFilterFactory::IsInstanceOf(id);
}
}
protected:
std::string name_;

View File

@ -3929,6 +3929,7 @@ class Benchmark {
void InitializeOptionsFromFlags(Options* opts) {
printf("Initializing RocksDB Options from command-line flags\n");
Options& options = *opts;
ConfigOptions config_options(options);
assert(db_.db == nullptr);
@ -4294,12 +4295,14 @@ class Benchmark {
options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
// merge operator options
options.merge_operator = MergeOperators::CreateFromStringId(
FLAGS_merge_operator);
if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
fprintf(stderr, "invalid merge operator: %s\n",
FLAGS_merge_operator.c_str());
exit(1);
if (!FLAGS_merge_operator.empty()) {
Status s = MergeOperator::CreateFromString(
config_options, FLAGS_merge_operator, &options.merge_operator);
if (!s.ok()) {
fprintf(stderr, "invalid merge operator[%s]: %s\n",
FLAGS_merge_operator.c_str(), s.ToString().c_str());
exit(1);
}
}
options.max_successive_merges = FLAGS_max_successive_merges;
options.report_bg_io_stats = FLAGS_report_bg_io_stats;

View File

@ -4,15 +4,35 @@
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/cassandra/cassandra_compaction_filter.h"
#include <string>
#include "rocksdb/slice.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/merge_operator.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
static std::unordered_map<std::string, OptionTypeInfo>
cassandra_filter_type_info = {
#ifndef ROCKSDB_LITE
{"purge_ttl_on_expiration",
{offsetof(struct CassandraOptions, purge_ttl_on_expiration),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"gc_grace_period_in_seconds",
{offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
const char* CassandraCompactionFilter::Name() const {
return "CassandraCompactionFilter";
CassandraCompactionFilter::CassandraCompactionFilter(
bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
: options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
RegisterOptions(&options_, &cassandra_filter_type_info);
}
CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
@ -23,12 +43,12 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());
RowValue compacted =
purge_ttl_on_expiration_
options_.purge_ttl_on_expiration
? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);
if (value_type == ValueType::kValue) {
compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds);
}
if(compacted.Empty()) {
@ -43,5 +63,48 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
return Decision::kKeep;
}
CassandraCompactionFilterFactory::CassandraCompactionFilterFactory(
bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
: options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
RegisterOptions(&options_, &cassandra_filter_type_info);
}
std::unique_ptr<CompactionFilter>
CassandraCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context&) {
std::unique_ptr<CompactionFilter> result(new CassandraCompactionFilter(
options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds));
return result;
}
#ifndef ROCKSDB_LITE
int RegisterCassandraObjects(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<MergeOperator>(
CassandraValueMergeOperator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /* errmsg */) {
guard->reset(new CassandraValueMergeOperator(0));
return guard->get();
});
library.Register<CompactionFilter>(
CassandraCompactionFilter::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilter>* /*guard */,
std::string* /* errmsg */) {
return new CassandraCompactionFilter(false, 0);
});
library.Register<CompactionFilterFactory>(
CassandraCompactionFilterFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilterFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new CassandraCompactionFilterFactory(false, 0));
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
#endif // ROCKSDB_LITE
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -5,8 +5,10 @@
#pragma once
#include <string>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"
#include "utilities/cassandra/cassandra_options.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
@ -25,18 +27,31 @@ namespace cassandra {
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds)
: purge_ttl_on_expiration_(purge_ttl_on_expiration),
gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
int32_t gc_grace_period_in_seconds);
static const char* kClassName() { return "CassandraCompactionFilter"; }
const char* Name() const override { return kClassName(); }
const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;
private:
bool purge_ttl_on_expiration_;
int32_t gc_grace_period_in_seconds_;
CassandraOptions options_;
};
class CassandraCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit CassandraCompactionFilterFactory(bool purge_ttl_on_expiration,
int32_t gc_grace_period_in_seconds);
~CassandraCompactionFilterFactory() override {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
static const char* kClassName() { return "CassandraCompactionFilterFactory"; }
const char* Name() const override { return kClassName(); }
private:
CassandraOptions options_;
};
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -6,9 +6,10 @@
#include <iostream>
#include "db/db_impl/db_impl.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/random.h"
@ -318,6 +319,99 @@ TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
ASSERT_FALSE(std::get<0>(store.Get("k1")));
}
#ifndef ROCKSDB_LITE
TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
ConfigOptions config_options;
std::shared_ptr<MergeOperator> mo;
config_options.ignore_unsupported_options = false;
ASSERT_NOK(MergeOperator::CreateFromString(
config_options, CassandraValueMergeOperator::kClassName(), &mo));
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
"cassandra");
ASSERT_OK(MergeOperator::CreateFromString(
config_options, CassandraValueMergeOperator::kClassName(), &mo));
ASSERT_NE(mo, nullptr);
ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
mo.reset();
ASSERT_OK(MergeOperator::CreateFromString(
config_options,
std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
CassandraValueMergeOperator::kClassName(),
&mo));
ASSERT_NE(mo, nullptr);
ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
const auto* opts = mo->GetOptions<CassandraOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
ASSERT_EQ(opts->operands_limit, 20);
}
TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
ConfigOptions config_options;
const CompactionFilter* filter = nullptr;
config_options.ignore_unsupported_options = false;
ASSERT_NOK(CompactionFilter::CreateFromString(
config_options, CassandraCompactionFilter::kClassName(), &filter));
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
"cassandra");
ASSERT_OK(CompactionFilter::CreateFromString(
config_options, CassandraCompactionFilter::kClassName(), &filter));
ASSERT_NE(filter, nullptr);
ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
delete filter;
filter = nullptr;
ASSERT_OK(CompactionFilter::CreateFromString(
config_options,
std::string(
"purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
CassandraCompactionFilter::kClassName(),
&filter));
ASSERT_NE(filter, nullptr);
ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
const auto* opts = filter->GetOptions<CassandraOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
ASSERT_TRUE(opts->purge_ttl_on_expiration);
delete filter;
}
TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
ConfigOptions config_options;
std::shared_ptr<CompactionFilterFactory> factory;
config_options.ignore_unsupported_options = false;
ASSERT_NOK(CompactionFilterFactory::CreateFromString(
config_options, CassandraCompactionFilterFactory::kClassName(),
&factory));
config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
"cassandra");
ASSERT_OK(CompactionFilterFactory::CreateFromString(
config_options, CassandraCompactionFilterFactory::kClassName(),
&factory));
ASSERT_NE(factory, nullptr);
ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
factory.reset();
ASSERT_OK(CompactionFilterFactory::CreateFromString(
config_options,
std::string(
"purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
CassandraCompactionFilterFactory::kClassName(),
&factory));
ASSERT_NE(factory, nullptr);
ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
const auto* opts = factory->GetOptions<CassandraOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
ASSERT_TRUE(opts->purge_ttl_on_expiration);
}
#endif // ROCKSDB_LITE
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,40 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cinttypes>
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class ObjectLibrary;
namespace cassandra {
struct CassandraOptions {
static const char* kName() { return "CassandraOptions"; }
CassandraOptions(int32_t _gc_grace_period_in_seconds, size_t _operands_limit,
bool _purge_ttl_on_expiration = false)
: operands_limit(_operands_limit),
gc_grace_period_in_seconds(_gc_grace_period_in_seconds),
purge_ttl_on_expiration(_purge_ttl_on_expiration) {}
// Limit on the number of merge operands.
size_t operands_limit;
// How long (in seconds) tombstoned data remains before it is purged
int32_t gc_grace_period_in_seconds;
// If is set to true, expired data will be directly purged.
// Otherwise expired data will be converted tombstones first,
// then be eventually removed after gc grace period. This value should
// only true if all writes have same ttl setting, otherwise it could bring old
// data back.
bool purge_ttl_on_expiration;
};
#ifndef ROCKSDB_LITE
extern "C" {
int RegisterCassandraObjects(ObjectLibrary& library, const std::string& arg);
} // extern "C"
#endif // ROCKSDB_LITE
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -5,16 +5,36 @@
#include "merge_operator.h"
#include <memory>
#include <assert.h>
#include "rocksdb/slice.h"
#include <memory>
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/cassandra/format.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
static std::unordered_map<std::string, OptionTypeInfo>
merge_operator_options_info = {
#ifndef ROCKSDB_LITE
{"gc_grace_period_in_seconds",
{offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"operands_limit",
{offsetof(struct CassandraOptions, operands_limit), OptionType::kSizeT,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
CassandraValueMergeOperator::CassandraValueMergeOperator(
int32_t gc_grace_period_in_seconds, size_t operands_limit)
: options_(gc_grace_period_in_seconds, operands_limit) {
RegisterOptions(&options_, &merge_operator_options_info);
}
// Implementation for the merge operation (merges two Cassandra values)
bool CassandraValueMergeOperator::FullMergeV2(
@ -34,7 +54,7 @@ bool CassandraValueMergeOperator::FullMergeV2(
}
RowValue merged = RowValue::Merge(std::move(row_values));
merged = merged.RemoveTombstones(gc_grace_period_in_seconds_);
merged = merged.RemoveTombstones(options_.gc_grace_period_in_seconds);
merge_out->new_value.reserve(merged.Size());
merged.Serialize(&(merge_out->new_value));
@ -58,10 +78,6 @@ bool CassandraValueMergeOperator::PartialMergeMulti(
return true;
}
const char* CassandraValueMergeOperator::Name() const {
return "CassandraValueMergeOperator";
}
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -6,6 +6,7 @@
#pragma once
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "utilities/cassandra/cassandra_options.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
@ -16,9 +17,7 @@ namespace cassandra {
class CassandraValueMergeOperator : public MergeOperator {
public:
explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
size_t operands_limit = 0)
: gc_grace_period_in_seconds_(gc_grace_period_in_seconds),
operands_limit_(operands_limit) {}
size_t operands_limit = 0);
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
@ -28,17 +27,18 @@ public:
std::string* new_value,
Logger* logger) const override;
virtual const char* Name() const override;
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "CassandraValueMergeOperator"; }
virtual bool AllowSingleOperand() const override { return true; }
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
return operands_limit_ > 0 && operands.size() >= operands_limit_;
return options_.operands_limit > 0 &&
operands.size() >= options_.operands_limit;
}
private:
int32_t gc_grace_period_in_seconds_;
size_t operands_limit_;
CassandraOptions options_;
};
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,56 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <memory>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
static int RegisterBuiltinCompactionFilters(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<CompactionFilter>(
RemoveEmptyValueCompactionFilter::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilter>* /*guard*/,
std::string* /*errmsg*/) {
return new RemoveEmptyValueCompactionFilter();
});
return 1;
}
#endif // ROCKSDB_LITE
Status CompactionFilter::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
const CompactionFilter** result) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinCompactionFilters(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
CompactionFilter* filter = const_cast<CompactionFilter*>(*result);
Status status = LoadStaticObject<CompactionFilter>(config_options, value,
nullptr, &filter);
if (status.ok()) {
*result = const_cast<CompactionFilter*>(filter);
}
return status;
}
Status CompactionFilterFactory::CreateFromString(
const ConfigOptions& config_options, const std::string& value,
std::shared_ptr<CompactionFilterFactory>* result) {
// Currently there are no builtin CompactionFilterFactories.
// If any are introduced, they need to be registered here.
Status status = LoadSharedObject<CompactionFilterFactory>(
config_options, value, nullptr, result);
return status;
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -10,7 +10,7 @@
namespace ROCKSDB_NAMESPACE {
// Abstract base class for building layered compation filter on top of
// Abstract base class for building layered compaction filter on top of
// user compaction filter.
// See BlobIndexCompactionFilter or TtlCompactionFilter for a basic usage.
class LayeredCompactionFilterBase : public CompactionFilter {
@ -29,8 +29,12 @@ class LayeredCompactionFilterBase : public CompactionFilter {
// Return a pointer to user compaction filter
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
const Customizable* Inner() const override { return user_comp_filter_; }
protected:
const CompactionFilter* user_comp_filter_;
private:
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
};

View File

@ -12,10 +12,6 @@
namespace ROCKSDB_NAMESPACE {
const char* RemoveEmptyValueCompactionFilter::Name() const {
return "RemoveEmptyValueCompactionFilter";
}
bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
const Slice& /*key*/,
const Slice& existing_value,

View File

@ -16,12 +16,13 @@ namespace ROCKSDB_NAMESPACE {
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
public:
const char* Name() const override;
bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const override;
static const char* kClassName() { return "RemoveEmptyValueCompactionFilter"; }
const char* Name() const override { return kClassName(); }
bool Filter(int level, const Slice& key, const Slice& existing_value,
std::string* new_value, bool* value_changed) const override;
};
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE

View File

@ -0,0 +1,125 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "utilities/merge_operators.h"
#include <memory>
#include "rocksdb/merge_operator.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "utilities/merge_operators/bytesxor.h"
#include "utilities/merge_operators/sortlist.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace ROCKSDB_NAMESPACE {
static bool LoadMergeOperator(const std::string& id,
std::shared_ptr<MergeOperator>* result) {
bool success = true;
// TODO: Hook the "name" up to the actual Name() of the MergeOperators?
// Requires these classes be moved into a header file...
if (id == "put" || id == "PutOperator") {
*result = MergeOperators::CreatePutOperator();
} else if (id == "put_v1") {
*result = MergeOperators::CreateDeprecatedPutOperator();
} else if (id == "uint64add" || id == "UInt64AddOperator") {
*result = MergeOperators::CreateUInt64AddOperator();
} else if (id == "max" || id == "MaxOperator") {
*result = MergeOperators::CreateMaxOperator();
#ifdef ROCKSDB_LITE
// The remainder of the classes are handled by the ObjectRegistry in
// non-LITE mode
} else if (id == StringAppendOperator::kNickName() ||
id == StringAppendOperator::kClassName()) {
*result = MergeOperators::CreateStringAppendOperator();
} else if (id == StringAppendTESTOperator::kNickName() ||
id == StringAppendTESTOperator::kClassName()) {
*result = MergeOperators::CreateStringAppendTESTOperator();
} else if (id == BytesXOROperator::kNickName() ||
id == BytesXOROperator::kClassName()) {
*result = MergeOperators::CreateBytesXOROperator();
} else if (id == SortList::kNickName() || id == SortList::kClassName()) {
*result = MergeOperators::CreateSortOperator();
#endif // ROCKSDB_LITE
} else {
success = false;
}
return success;
}
#ifndef ROCKSDB_LITE
static int RegisterBuiltinMergeOperators(ObjectLibrary& library,
const std::string& /*arg*/) {
size_t num_types;
auto AsRegex = [](const std::string& name, const std::string& alt) {
std::string regex;
regex.append("(").append(name);
regex.append("|").append(alt).append(")");
return regex;
};
library.Register<MergeOperator>(
AsRegex(StringAppendOperator::kClassName(),
StringAppendOperator::kNickName()),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /*errmsg*/) {
guard->reset(new StringAppendOperator(","));
return guard->get();
});
library.Register<MergeOperator>(
AsRegex(StringAppendTESTOperator::kClassName(),
StringAppendTESTOperator::kNickName()),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /*errmsg*/) {
guard->reset(new StringAppendTESTOperator(","));
return guard->get();
});
library.Register<MergeOperator>(
AsRegex(SortList::kClassName(), SortList::kNickName()),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /*errmsg*/) {
guard->reset(new SortList());
return guard->get();
});
library.Register<MergeOperator>(
AsRegex(BytesXOROperator::kClassName(), BytesXOROperator::kNickName()),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /*errmsg*/) {
guard->reset(new BytesXOROperator());
return guard->get();
});
return static_cast<int>(library.GetFactoryCount(&num_types));
}
#endif // ROCKSDB_LITE
Status MergeOperator::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<MergeOperator>* result) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinMergeOperators(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
return LoadSharedObject<MergeOperator>(config_options, value,
LoadMergeOperator, result);
}
std::shared_ptr<MergeOperator> MergeOperators::CreateFromStringId(
const std::string& id) {
std::shared_ptr<MergeOperator> result;
Status s = MergeOperator::CreateFromString(ConfigOptions(), id, &result);
if (s.ok()) {
return result;
} else {
// Empty or unknown, just return nullptr
return nullptr;
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -28,30 +28,8 @@ class MergeOperators {
static std::shared_ptr<MergeOperator> CreateSortOperator();
// Will return a different merge operator depending on the string.
// TODO: Hook the "name" up to the actual Name() of the MergeOperators?
static std::shared_ptr<MergeOperator> CreateFromStringId(
const std::string& name) {
if (name == "put") {
return CreatePutOperator();
} else if (name == "put_v1") {
return CreateDeprecatedPutOperator();
} else if ( name == "uint64add") {
return CreateUInt64AddOperator();
} else if (name == "stringappend") {
return CreateStringAppendOperator();
} else if (name == "stringappendtest") {
return CreateStringAppendTESTOperator();
} else if (name == "max") {
return CreateMaxOperator();
} else if (name == "bytesxor") {
return CreateBytesXOROperator();
} else if (name == "sortlist") {
return CreateSortOperator();
} else {
// Empty or unknown, just return nullptr
return nullptr;
}
}
const std::string& name);
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -28,9 +28,11 @@ class BytesXOROperator : public AssociativeMergeOperator {
std::string* new_value,
Logger* logger) const override;
virtual const char* Name() const override {
return "BytesXOR";
}
static const char* kClassName() { return "BytesXOR"; }
static const char* kNickName() { return "bytesxor"; }
const char* NickName() const override { return kNickName(); }
const char* Name() const override { return kClassName(); }
void XOR(const Slice* existing_value, const Slice& value,
std::string* new_value) const;

View File

@ -64,7 +64,10 @@ class MaxOperator : public MergeOperator {
return true;
}
const char* Name() const override { return "MaxOperator"; }
static const char* kClassName() { return "MaxOperator"; }
static const char* kNickName() { return "max"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
};
} // end of anonymous namespace

View File

@ -48,7 +48,10 @@ class PutOperator : public MergeOperator {
return true;
}
const char* Name() const override { return "PutOperator"; }
static const char* kClassName() { return "PutOperator"; }
static const char* kNickName() { return "put_v1"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
};
class PutOperatorV2 : public PutOperator {
@ -67,6 +70,9 @@ class PutOperatorV2 : public PutOperator {
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}
static const char* kNickName() { return "put"; }
const char* NickName() const override { return kNickName(); }
};
} // end of anonymous namespace

View File

@ -49,8 +49,6 @@ bool SortList::PartialMergeMulti(const Slice& /*key*/,
return true;
}
const char* SortList::Name() const { return "MergeSortOperator"; }
void SortList::MakeVector(std::vector<int>& operand, Slice slice) const {
do {
const char* begin = slice.data_;

View File

@ -27,7 +27,11 @@ class SortList : public MergeOperator {
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const override;
const char* Name() const override;
static const char* kClassName() { return "MergeSortOperator"; }
static const char* kNickName() { return "sortlist"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
void MakeVector(std::vector<int>& operand, Slice slice) const;

View File

@ -6,21 +6,36 @@
#include "stringappend.h"
#include <memory>
#include <assert.h>
#include "rocksdb/slice.h"
#include <memory>
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
static std::unordered_map<std::string, OptionTypeInfo>
stringappend_merge_type_info = {
#ifndef ROCKSDB_LITE
{"delimiter",
{0, OptionType::kString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
} // namespace
// Constructor: also specify the delimiter character.
StringAppendOperator::StringAppendOperator(char delim_char)
: delim_(1, delim_char) {}
: delim_(1, delim_char) {
RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
}
StringAppendOperator::StringAppendOperator(const std::string& delim)
: delim_(delim) {}
: delim_(delim) {
RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
}
// Implementation for the merge operation (concatenates two strings)
bool StringAppendOperator::Merge(const Slice& /*key*/,
@ -46,9 +61,6 @@ bool StringAppendOperator::Merge(const Slice& /*key*/,
return true;
}
const char* StringAppendOperator::Name() const {
return "StringAppendOperator";
}
std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() {
return std::make_shared<StringAppendOperator>(',');

View File

@ -22,7 +22,10 @@ class StringAppendOperator : public AssociativeMergeOperator {
std::string* new_value,
Logger* logger) const override;
virtual const char* Name() const override;
static const char* kClassName() { return "StringAppendOperator"; }
static const char* kNickName() { return "stringappend"; }
virtual const char* Name() const override { return kClassName(); }
virtual const char* NickName() const override { return kNickName(); }
private:
std::string delim_; // The delimiter is inserted between elements

View File

@ -5,22 +5,38 @@
#include "stringappend2.h"
#include <memory>
#include <string>
#include <assert.h>
#include "rocksdb/slice.h"
#include <memory>
#include <string>
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
static std::unordered_map<std::string, OptionTypeInfo>
stringappend2_merge_type_info = {
#ifndef ROCKSDB_LITE
{"delimiter",
{0, OptionType::kString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
} // namespace
// Constructor: also specify the delimiter character.
StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
: delim_(1, delim_char) {}
: delim_(1, delim_char) {
RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
}
StringAppendTESTOperator::StringAppendTESTOperator(const std::string& delim)
: delim_(delim) {}
: delim_(delim) {
RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
}
// Implementation for the merge operation (concatenates two strings)
bool StringAppendTESTOperator::FullMergeV2(
@ -37,6 +53,7 @@ bool StringAppendTESTOperator::FullMergeV2(
// Compute the space needed for the final result.
size_t numBytes = 0;
for (auto it = merge_in.operand_list.begin();
it != merge_in.operand_list.end(); ++it) {
numBytes += it->size() + delim_.size();
@ -107,11 +124,6 @@ bool StringAppendTESTOperator::_AssocPartialMergeMulti(
return true;
}
const char* StringAppendTESTOperator::Name() const {
return "StringAppendTESTOperator";
}
std::shared_ptr<MergeOperator>
MergeOperators::CreateStringAppendTESTOperator() {
return std::make_shared<StringAppendTESTOperator>(',');

View File

@ -34,7 +34,10 @@ class StringAppendTESTOperator : public MergeOperator {
std::string* new_value, Logger* logger) const
override;
virtual const char* Name() const override;
static const char* kClassName() { return "StringAppendTESTOperator"; }
static const char* kNickName() { return "stringappendtest"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
private:
// A version of PartialMerge that actually performs "partial merging".

View File

@ -36,7 +36,10 @@ class UInt64AddOperator : public AssociativeMergeOperator {
return true; // Return true always since corruption will be treated as 0
}
const char* Name() const override { return "UInt64AddOperator"; }
static const char* kClassName() { return "UInt64AddOperator"; }
static const char* kNickName() { return "uint64add"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }
private:
// Takes the string and decodes it into a uint64_t

View File

@ -13,9 +13,146 @@
#include "rocksdb/iterator.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
{{"user_operator",
OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}};
TtlMergeOperator::TtlMergeOperator(
const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
: user_merge_op_(merge_op), clock_(clock) {
RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
}
bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
ROCKS_LOG_ERROR(merge_in.logger,
"Error: Could not remove timestamp from existing value.");
return false;
}
// Extract time-stamp from each operand to be passed to user_merge_op_
std::vector<Slice> operands_without_ts;
for (const auto& operand : merge_in.operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(merge_in.logger,
"Error: Could not remove timestamp from operand value.");
return false;
}
operands_without_ts.push_back(operand);
operands_without_ts.back().remove_suffix(ts_len);
}
// Apply the user merge operator (store result in *new_value)
bool good = true;
MergeOperationOutput user_merge_out(merge_out->new_value,
merge_out->existing_operand);
if (merge_in.existing_value) {
Slice existing_value_without_ts(merge_in.existing_value->data(),
merge_in.existing_value->size() - ts_len);
good = user_merge_op_->FullMergeV2(
MergeOperationInput(merge_in.key, &existing_value_without_ts,
operands_without_ts, merge_in.logger),
&user_merge_out);
} else {
good = user_merge_op_->FullMergeV2(
MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
merge_in.logger),
&user_merge_out);
}
// Return false if the user merge operator returned false
if (!good) {
return false;
}
if (merge_out->existing_operand.data()) {
merge_out->new_value.assign(merge_out->existing_operand.data(),
merge_out->existing_operand.size());
merge_out->existing_operand = Slice(nullptr, 0);
}
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
merge_in.logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, (int32_t)curtime);
merge_out->new_value.append(ts_string, ts_len);
return true;
}
}
bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
std::deque<Slice> operands_without_ts;
for (const auto& operand : operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
return false;
}
operands_without_ts.push_back(
Slice(operand.data(), operand.size() - ts_len));
}
// Apply the user partial-merge operator (store result in *new_value)
assert(new_value);
if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
logger)) {
return false;
}
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, (int32_t)curtime);
new_value->append(ts_string, ts_len);
return true;
}
}
Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
if (clock_ == nullptr) {
clock_ = config_options.env->GetSystemClock().get();
}
return MergeOperator::PrepareOptions(config_options);
}
Status TtlMergeOperator::ValidateOptions(
const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
if (user_merge_op_ == nullptr) {
return Status::InvalidArgument(
"UserMergeOperator required by TtlMergeOperator");
} else if (clock_ == nullptr) {
return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
} else {
return MergeOperator::ValidateOptions(db_opts, cf_opts);
}
}
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
SystemClock* clock) {
@ -34,6 +171,139 @@ void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
}
}
static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
{"ttl", {0, OptionType::kInt32T}},
};
static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
{"user_filter_factory",
OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
0, OptionVerificationType::kByNameAllowFromNull,
OptionTypeFlags::kNone)}};
static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
{"user_filter",
OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
TtlCompactionFilter::TtlCompactionFilter(
int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
clock_(clock) {
RegisterOptions("TTL", &ttl_, &ttl_type_info);
RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
}
bool TtlCompactionFilter::Filter(int level, const Slice& key,
const Slice& old_val, std::string* new_val,
bool* value_changed) const {
if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
return true;
}
if (user_comp_filter() == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
DBWithTTLImpl::kTSLength);
}
return false;
}
Status TtlCompactionFilter::PrepareOptions(
const ConfigOptions& config_options) {
if (clock_ == nullptr) {
clock_ = config_options.env->GetSystemClock().get();
}
return LayeredCompactionFilterBase::PrepareOptions(config_options);
}
Status TtlCompactionFilter::ValidateOptions(
const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
if (clock_ == nullptr) {
return Status::InvalidArgument(
"SystemClock required by TtlCompactionFilter");
} else {
return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
}
}
TtlCompactionFilterFactory::TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
RegisterOptions("UserOptions", &user_comp_filter_factory_,
&ttl_cff_type_info);
RegisterOptions("TTL", &ttl_, &ttl_type_info);
}
std::unique_ptr<CompactionFilter>
TtlCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& context) {
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr;
if (user_comp_filter_factory_) {
user_comp_filter_from_factory =
user_comp_filter_factory_->CreateCompactionFilter(context);
}
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
}
Status TtlCompactionFilterFactory::PrepareOptions(
const ConfigOptions& config_options) {
if (clock_ == nullptr) {
clock_ = config_options.env->GetSystemClock().get();
}
return CompactionFilterFactory::PrepareOptions(config_options);
}
Status TtlCompactionFilterFactory::ValidateOptions(
const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
if (clock_ == nullptr) {
return Status::InvalidArgument(
"SystemClock required by TtlCompactionFilterFactory");
} else {
return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
}
}
int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
library.Register<MergeOperator>(
TtlMergeOperator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
std::string* /* errmsg */) {
guard->reset(new TtlMergeOperator(nullptr, nullptr));
return guard->get();
});
library.Register<CompactionFilterFactory>(
TtlCompactionFilterFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilterFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
return guard->get();
});
library.Register<CompactionFilter>(
TtlCompactionFilter::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CompactionFilter>* /*guard*/,
std::string* /* errmsg */) {
return new TtlCompactionFilter(0, nullptr, nullptr);
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
@ -68,9 +338,15 @@ Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
return s;
}
void DBWithTTLImpl::RegisterTtlClasses() {
static std::once_flag once;
std::call_once(once, [&]() {
ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
});
}
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
DBWithTTL** dbptr, int32_t ttl, bool read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
@ -93,6 +369,7 @@ Status DBWithTTL::Open(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
const std::vector<int32_t>& ttls, bool read_only) {
DBWithTTLImpl::RegisterTtlClasses();
if (ttls.size() != column_families.size()) {
return Status::InvalidArgument(
"ttls size has to be the same as number of column families");
@ -128,6 +405,7 @@ Status DBWithTTL::Open(
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle, int ttl) {
RegisterTtlClasses();
ColumnFamilyOptions sanitized_options = options;
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
GetEnv()->GetSystemClock().get());

View File

@ -25,12 +25,15 @@
#endif
namespace ROCKSDB_NAMESPACE {
struct ConfigOptions;
class ObjectLibrary;
class ObjectRegistry;
class DBWithTTLImpl : public DBWithTTL {
public:
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
SystemClock* clock);
static void RegisterTtlClasses();
explicit DBWithTTLImpl(DB* db);
virtual ~DBWithTTLImpl();
@ -155,37 +158,24 @@ class TtlCompactionFilter : public LayeredCompactionFilterBase {
TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
clock_(clock) {}
_user_comp_filter_from_factory = nullptr);
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
override {
if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
std::string* new_val, bool* value_changed) const override;
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "TtlCompactionFilter"; }
bool IsInstanceOf(const std::string& name) const override {
if (name == "Delete By TTL") {
return true;
} else {
return LayeredCompactionFilterBase::IsInstanceOf(name);
}
if (user_comp_filter() == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
new_val->append(
old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
DBWithTTLImpl::kTSLength);
}
return false;
}
virtual const char* Name() const override { return "Delete By TTL"; }
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
private:
int32_t ttl_;
@ -196,30 +186,21 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl),
clock_(clock),
user_comp_filter_factory_(comp_filter_factory) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr;
if (user_comp_filter_factory_) {
user_comp_filter_from_factory =
user_comp_filter_factory_->CreateCompactionFilter(context);
}
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
}
std::shared_ptr<CompactionFilterFactory> comp_filter_factory);
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
virtual const char* Name() const override {
return "TtlCompactionFilterFactory";
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "TtlCompactionFilterFactory"; }
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
const Customizable* Inner() const override {
return user_comp_filter_factory_.get();
}
private:
@ -232,125 +213,38 @@ class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
SystemClock* clock)
: user_merge_op_(merge_op), clock_(clock) {
assert(merge_op);
assert(clock);
}
SystemClock* clock);
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
ROCKS_LOG_ERROR(merge_in.logger,
"Error: Could not remove timestamp from existing value.");
return false;
}
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
// Extract time-stamp from each operand to be passed to user_merge_op_
std::vector<Slice> operands_without_ts;
for (const auto& operand : merge_in.operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(
merge_in.logger,
"Error: Could not remove timestamp from operand value.");
return false;
}
operands_without_ts.push_back(operand);
operands_without_ts.back().remove_suffix(ts_len);
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const override;
// Apply the user merge operator (store result in *new_value)
bool good = true;
MergeOperationOutput user_merge_out(merge_out->new_value,
merge_out->existing_operand);
if (merge_in.existing_value) {
Slice existing_value_without_ts(merge_in.existing_value->data(),
merge_in.existing_value->size() - ts_len);
good = user_merge_op_->FullMergeV2(
MergeOperationInput(merge_in.key, &existing_value_without_ts,
operands_without_ts, merge_in.logger),
&user_merge_out);
} else {
good = user_merge_op_->FullMergeV2(
MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
merge_in.logger),
&user_merge_out);
}
static const char* kClassName() { return "TtlMergeOperator"; }
// Return false if the user merge operator returned false
if (!good) {
return false;
}
if (merge_out->existing_operand.data()) {
merge_out->new_value.assign(merge_out->existing_operand.data(),
merge_out->existing_operand.size());
merge_out->existing_operand = Slice(nullptr, 0);
}
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
merge_in.logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, (int32_t)curtime);
merge_out->new_value.append(ts_string, ts_len);
const char* Name() const override { return kClassName(); }
bool IsInstanceOf(const std::string& name) const override {
if (name == "Merge By TTL") {
return true;
}
}
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
std::deque<Slice> operands_without_ts;
for (const auto& operand : operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(logger,
"Error: Could not remove timestamp from value.");
return false;
}
operands_without_ts.push_back(
Slice(operand.data(), operand.size() - ts_len));
}
// Apply the user partial-merge operator (store result in *new_value)
assert(new_value);
if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
logger)) {
return false;
}
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, (int32_t)curtime);
new_value->append(ts_string, ts_len);
return true;
return MergeOperator::IsInstanceOf(name);
}
}
virtual const char* Name() const override { return "Merge By TTL"; }
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
const Customizable* Inner() const override { return user_merge_op_.get(); }
private:
std::shared_ptr<MergeOperator> user_merge_op_;
SystemClock* clock_;
};
extern "C" {
int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/);
} // extern "C"
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -7,10 +7,16 @@
#include <map>
#include <memory>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/testharness.h"
#include "util/string_util.h"
#include "utilities/merge_operators/bytesxor.h"
#include "utilities/ttl/db_ttl_impl.h"
#ifndef OS_WIN
#include <unistd.h>
#endif
@ -719,6 +725,171 @@ TEST_F(TtlTest, DeleteRangeTest) {
CloseTtl();
}
class DummyFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
return false;
}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DummyFilter"; }
};
class DummyFilterFactory : public CompactionFilterFactory {
public:
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DummyFilterFactory"; }
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context&) override {
std::unique_ptr<CompactionFilter> f(new DummyFilter());
return f;
}
};
static int RegisterTestObjects(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<CompactionFilter>(
"DummyFilter", [](const std::string& /*uri*/,
std::unique_ptr<CompactionFilter>* /*guard*/,
std::string* /* errmsg */) {
static DummyFilter dummy;
return &dummy;
});
library.Register<CompactionFilterFactory>(
"DummyFilterFactory", [](const std::string& /*uri*/,
std::unique_ptr<CompactionFilterFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new DummyFilterFactory());
return guard->get();
});
return 2;
}
class TtlOptionsTest : public testing::Test {
public:
TtlOptionsTest() {
config_options_.registry->AddLibrary("RegisterTtlObjects",
RegisterTtlObjects, "");
config_options_.registry->AddLibrary("RegisterTtlTestObjects",
RegisterTestObjects, "");
}
ConfigOptions config_options_;
};
TEST_F(TtlOptionsTest, LoadTtlCompactionFilter) {
const CompactionFilter* filter = nullptr;
ASSERT_OK(CompactionFilter::CreateFromString(
config_options_, TtlCompactionFilter::kClassName(), &filter));
ASSERT_NE(filter, nullptr);
ASSERT_STREQ(filter->Name(), TtlCompactionFilter::kClassName());
auto ttl = filter->GetOptions<int32_t>("TTL");
ASSERT_NE(ttl, nullptr);
ASSERT_EQ(*ttl, 0);
ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
delete filter;
filter = nullptr;
ASSERT_OK(CompactionFilter::CreateFromString(
config_options_, "id=TtlCompactionFilter; ttl=123", &filter));
ASSERT_NE(filter, nullptr);
ttl = filter->GetOptions<int32_t>("TTL");
ASSERT_NE(ttl, nullptr);
ASSERT_EQ(*ttl, 123);
ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
delete filter;
filter = nullptr;
ASSERT_OK(CompactionFilter::CreateFromString(
config_options_,
"id=TtlCompactionFilter; ttl=456; user_filter=DummyFilter;", &filter));
ASSERT_NE(filter, nullptr);
auto inner = filter->CheckedCast<DummyFilter>();
ASSERT_NE(inner, nullptr);
ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
std::string mismatch;
std::string opts_str = filter->ToString(config_options_);
const CompactionFilter* copy = nullptr;
ASSERT_OK(
CompactionFilter::CreateFromString(config_options_, opts_str, &copy));
ASSERT_TRUE(filter->AreEquivalent(config_options_, copy, &mismatch));
delete filter;
delete copy;
}
TEST_F(TtlOptionsTest, LoadTtlCompactionFilterFactory) {
std::shared_ptr<CompactionFilterFactory> cff;
ASSERT_OK(CompactionFilterFactory::CreateFromString(
config_options_, TtlCompactionFilterFactory::kClassName(), &cff));
ASSERT_NE(cff.get(), nullptr);
ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
auto ttl = cff->GetOptions<int32_t>("TTL");
ASSERT_NE(ttl, nullptr);
ASSERT_EQ(*ttl, 0);
ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
ASSERT_OK(CompactionFilterFactory::CreateFromString(
config_options_, "id=TtlCompactionFilterFactory; ttl=123", &cff));
ASSERT_NE(cff.get(), nullptr);
ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
ttl = cff->GetOptions<int32_t>("TTL");
ASSERT_NE(ttl, nullptr);
ASSERT_EQ(*ttl, 123);
ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
ASSERT_OK(CompactionFilterFactory::CreateFromString(
config_options_,
"id=TtlCompactionFilterFactory; ttl=456; "
"user_filter_factory=DummyFilterFactory;",
&cff));
ASSERT_NE(cff.get(), nullptr);
auto filter = cff->CreateCompactionFilter(CompactionFilter::Context());
ASSERT_NE(filter.get(), nullptr);
auto ttlf = filter->CheckedCast<TtlCompactionFilter>();
ASSERT_EQ(filter.get(), ttlf);
auto user = filter->CheckedCast<DummyFilter>();
ASSERT_NE(user, nullptr);
ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
std::string opts_str = cff->ToString(config_options_);
std::string mismatch;
std::shared_ptr<CompactionFilterFactory> copy;
ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, opts_str,
&copy));
ASSERT_TRUE(cff->AreEquivalent(config_options_, copy.get(), &mismatch));
}
TEST_F(TtlOptionsTest, LoadTtlMergeOperator) {
std::shared_ptr<MergeOperator> mo;
config_options_.invoke_prepare_options = false;
ASSERT_OK(MergeOperator::CreateFromString(
config_options_, TtlMergeOperator::kClassName(), &mo));
ASSERT_NE(mo.get(), nullptr);
ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
ASSERT_NOK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
config_options_.invoke_prepare_options = true;
ASSERT_OK(MergeOperator::CreateFromString(
config_options_, "id=TtlMergeOperator; user_operator=bytesxor", &mo));
ASSERT_NE(mo.get(), nullptr);
ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
ASSERT_OK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
auto ttl_mo = mo->CheckedCast<TtlMergeOperator>();
ASSERT_EQ(mo.get(), ttl_mo);
auto user = ttl_mo->CheckedCast<BytesXOROperator>();
ASSERT_NE(user, nullptr);
std::string mismatch;
std::string opts_str = mo->ToString(config_options_);
std::shared_ptr<MergeOperator> copy;
ASSERT_OK(MergeOperator::CreateFromString(config_options_, opts_str, &copy));
ASSERT_TRUE(mo->AreEquivalent(config_options_, copy.get(), &mismatch));
}
} // namespace ROCKSDB_NAMESPACE
// A black-box test for the ttl wrapper around rocksdb