Add support for C bindings to the compaction V2 filter mechanism.

Test Plan: make c_test && ./c_test

Some fixes after merge.
This commit is contained in:
Spencer Kimball 2014-08-06 14:06:41 -04:00
parent 1242bfcad7
commit c1f588af71
4 changed files with 309 additions and 48 deletions

147
db/c.cc
View file

@ -36,6 +36,9 @@ using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter;
using rocksdb::CompactionFilterFactory;
using rocksdb::CompactionFilterV2;
using rocksdb::CompactionFilterFactoryV2;
using rocksdb::CompactionFilterContext;
using rocksdb::CompactionOptionsFIFO;
using rocksdb::Comparator;
using rocksdb::CompressionType;
@ -154,6 +157,104 @@ struct rocksdb_compactionfilterfactory_t : public CompactionFilterFactory {
virtual const char* Name() const { return (*name_)(state_); }
};
struct rocksdb_compactionfilterv2_t : public CompactionFilterV2 {
void* state_;
void (*destructor_)(void*);
const char* (*name_)(void*);
void (*filter_)(void*, int level, size_t num_keys,
const char* const* keys_list, const size_t* keys_list_sizes,
const char* const* existing_values_list, const size_t* existing_values_list_sizes,
char** new_values_list, size_t* new_values_list_sizes,
unsigned char* to_delete_list);
virtual ~rocksdb_compactionfilterv2_t() {
(*destructor_)(state_);
}
virtual const char* Name() const {
return (*name_)(state_);
}
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed) const {
// Make a vector pointing to the underlying key data.
size_t num_keys = keys.size();
std::vector<const char*> keys_list(num_keys);
std::vector<size_t> keys_list_sizes(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
keys_list[i] = keys[i].data();
keys_list_sizes[i] = keys[i].size();
}
// Make a vector pointing to the underlying value data.
std::vector<const char*> existing_values_list(num_keys);
std::vector<size_t> existing_values_list_sizes(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
existing_values_list[i] = existing_values[i].data();
existing_values_list_sizes[i] = existing_values[i].size();
}
// Make a vector which will accept newly-allocated char* arrays
// which we will take ownership of and assign to strings in new_values.
new_values->clear();
std::vector<char*> new_values_list(num_keys);
std::vector<size_t> new_values_list_sizes(num_keys);
// Resize values_changed to hold all keys.
values_changed->resize(num_keys);
// Make a vector for bools indicating a value should be deleted
// on compaction (true) or maintained (false).
std::vector<unsigned char> to_delete_list(num_keys);
(*filter_)(
state_, level, num_keys, &keys_list[0], &keys_list_sizes[0],
&existing_values_list[0], &existing_values_list_sizes[0],
&new_values_list[0], &new_values_list_sizes[0], &to_delete_list[0]);
// Now, we transfer any changed values, setting values_changed and
// initializing new_values in the event a value changed.
std::vector<bool> to_delete(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
to_delete[i] = to_delete_list[i];
(*values_changed)[i] = new_values_list[i] != nullptr;
if ((*values_changed)[i]) {
new_values->push_back(std::string(new_values_list[i], new_values_list_sizes[i]));
free(new_values_list[i]);
}
}
return to_delete;
}
};
struct rocksdb_compactionfilterfactoryv2_t : public CompactionFilterFactoryV2 {
void* state_;
void (*destructor_)(void*);
const char* (*name_)(void*);
rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2_)(
const rocksdb_compactionfiltercontext_t* context);
rocksdb_compactionfilterfactoryv2_t(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) {
}
virtual ~rocksdb_compactionfilterfactoryv2_t() {
(*destructor_)(state_);
}
virtual const char* Name() const {
return (*name_)(state_);
}
virtual std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2(
const CompactionFilterContext& context) {
struct rocksdb_compactionfiltercontext_t c_context;
c_context.rep.is_full_compaction = context.is_full_compaction;
c_context.rep.is_manual_compaction = context.is_manual_compaction;
return std::unique_ptr<CompactionFilterV2>(
(*create_compaction_filter_v2_)(&c_context));
}
};
struct rocksdb_comparator_t : public Comparator {
void* state_;
void (*destructor_)(void*);
@ -1004,6 +1105,12 @@ void rocksdb_options_set_merge_operator(
opt->rep.merge_operator = std::shared_ptr<MergeOperator>(merge_operator);
}
void rocksdb_options_set_compaction_filter_factory_v2(
rocksdb_options_t* opt,
rocksdb_compactionfilterfactoryv2_t* compaction_filter_factory_v2) {
opt->rep.compaction_filter_factory_v2 = std::shared_ptr<CompactionFilterFactoryV2>(compaction_filter_factory_v2);
}
void rocksdb_options_set_filter_policy(
rocksdb_options_t* opt,
rocksdb_filterpolicy_t* policy) {
@ -1550,6 +1657,46 @@ void rocksdb_compactionfilterfactory_destroy(
delete factory;
}
rocksdb_compactionfilterv2_t* rocksdb_compactionfilterv2_create(
void* state,
void (*destructor)(void*),
void (*filter)(void*, int level, size_t num_keys,
const char* const* keys_list, const size_t* keys_list_sizes,
const char* const* existing_values_list, const size_t* existing_values_list_sizes,
char** new_values_list, size_t* new_values_list_sizes,
unsigned char* to_delete_list),
const char* (*name)(void*)) {
rocksdb_compactionfilterv2_t* result = new rocksdb_compactionfilterv2_t;
result->state_ = state;
result->destructor_ = destructor;
result->filter_ = filter;
result->name_ = name;
return result;
}
void rocksdb_compactionfilterv2_destroy(rocksdb_compactionfilterv2_t* filter) {
delete filter;
}
rocksdb_compactionfilterfactoryv2_t* rocksdb_compactionfilterfactoryv2_create(
void* state,
rocksdb_slicetransform_t* prefix_extractor,
void (*destructor)(void*),
rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2)(
const rocksdb_compactionfiltercontext_t* context),
const char* (*name)(void*)) {
rocksdb_compactionfilterfactoryv2_t* result = new rocksdb_compactionfilterfactoryv2_t(prefix_extractor);
result->state_ = state;
result->destructor_ = destructor;
result->create_compaction_filter_v2_ = create_compaction_filter_v2;
result->name_ = name;
return result;
}
void rocksdb_compactionfilterfactoryv2_destroy(rocksdb_compactionfilterfactoryv2_t* factory) {
delete factory;
}
rocksdb_comparator_t* rocksdb_comparator_create(
void* state,
void (*destructor)(void*),

View file

@ -225,6 +225,54 @@ static rocksdb_t* CheckCompaction(rocksdb_t* db, rocksdb_options_t* options,
return db;
}
// Custom compaction filter V2.
static void CompactionFilterV2Destroy(void* arg) { }
static const char* CompactionFilterV2Name(void* arg) {
return "TestCompactionFilterV2";
}
static void CompactionFilterV2Filter(
void* arg, int level, size_t num_keys,
const char* const* keys_list, const size_t* keys_list_sizes,
const char* const* existing_values_list, const size_t* existing_values_list_sizes,
char** new_values_list, size_t* new_values_list_sizes,
unsigned char* to_delete_list) {
size_t i;
for (i = 0; i < num_keys; i++) {
// If any value is "gc", it's removed.
if (existing_values_list_sizes[i] == 2 && memcmp(existing_values_list[i], "gc", 2) == 0) {
to_delete_list[i] = 1;
} else if (existing_values_list_sizes[i] == 6 && memcmp(existing_values_list[i], "gc all", 6) == 0) {
// If any value is "gc all", all keys are removed.
size_t j;
for (j = 0; j < num_keys; j++) {
to_delete_list[j] = 1;
}
return;
} else if (existing_values_list_sizes[i] == 6 && memcmp(existing_values_list[i], "change", 6) == 0) {
// If value is "change", set changed value to "changed".
size_t len;
len = strlen("changed");
new_values_list[i] = malloc(len);
memcpy(new_values_list[i], "changed", len);
new_values_list_sizes[i] = len;
} else {
// Otherwise, no keys are removed.
}
}
}
// Custom compaction filter factory V2.
static void CompactionFilterFactoryV2Destroy(void* arg) { }
static const char* CompactionFilterFactoryV2Name(void* arg) {
return "TestCompactionFilterFactoryV2";
}
static rocksdb_compactionfilterv2_t* CompactionFilterFactoryV2Create(
const rocksdb_compactionfiltercontext_t* context) {
return rocksdb_compactionfilterv2_create(
NULL, CompactionFilterV2Destroy, CompactionFilterV2Filter,
CompactionFilterV2Name);
}
// Custom merge operator
static void MergeOperatorDestroy(void* arg) { }
static const char* MergeOperatorName(void* arg) {
@ -531,6 +579,47 @@ int main(int argc, char** argv) {
rocksdb_options_destroy(options);
}
StartPhase("compaction_filter_v2");
{
rocksdb_compactionfilterfactoryv2_t* factory;
rocksdb_slicetransform_t* prefix_extractor;
prefix_extractor = rocksdb_slicetransform_create_fixed_prefix(3);
factory = rocksdb_compactionfilterfactoryv2_create(
NULL, prefix_extractor, CompactionFilterFactoryV2Destroy,
CompactionFilterFactoryV2Create, CompactionFilterFactoryV2Name);
// Create new database
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_set_compaction_filter_factory_v2(options, factory);
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
// Only foo2 is GC'd, foo3 is changed.
rocksdb_put(db, woptions, "foo1", 4, "no gc", 5, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "foo2", 4, "gc", 2, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "foo3", 4, "change", 6, &err);
CheckNoError(err);
// All bars are GC'd.
rocksdb_put(db, woptions, "bar1", 4, "no gc", 5, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "bar2", 4, "gc all", 6, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "bar3", 4, "no gc", 5, &err);
CheckNoError(err);
// Compact the DB to garbage collect.
rocksdb_compact_range(db, NULL, 0, NULL, 0);
// Verify foo entries.
CheckGet(db, roptions, "foo1", "no gc");
CheckGet(db, roptions, "foo2", NULL);
CheckGet(db, roptions, "foo3", "changed");
// Verify bar entries were all deleted.
CheckGet(db, roptions, "bar1", NULL);
CheckGet(db, roptions, "bar2", NULL);
CheckGet(db, roptions, "bar3", NULL);
}
StartPhase("merge_operator");
{
rocksdb_mergeoperator_t* merge_operator;

View file

@ -138,8 +138,6 @@ struct DBImpl::CompactionState {
return context;
}
std::vector<Slice> key_buf_;
std::vector<Slice> existing_value_buf_;
std::vector<std::string> key_str_buf_;
std::vector<std::string> existing_value_str_buf_;
// new_value_buf_ will only be appended if a value changes
@ -149,12 +147,7 @@ struct DBImpl::CompactionState {
std::vector<bool> value_changed_buf_;
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
std::vector<bool> to_delete_buf_;
// buffer for the parsed internal keys, the string buffer is backed
// by key_str_buf_
std::vector<ParsedInternalKey> ikey_buf_;
std::vector<Slice> other_key_buf_;
std::vector<Slice> other_value_buf_;
std::vector<std::string> other_key_str_buf_;
std::vector<std::string> other_value_str_buf_;
@ -168,12 +161,6 @@ struct DBImpl::CompactionState {
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
key_str_buf_.emplace_back(key.ToString());
existing_value_str_buf_.emplace_back(value.ToString());
key_buf_.emplace_back(Slice(key_str_buf_.back()));
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back()));
ParsedInternalKey ikey;
ParseInternalKey(key_buf_.back(), &ikey);
ikey_buf_.emplace_back(ikey);
}
// Buffers the kv-pair that will not be run through compaction filter V2
@ -181,8 +168,6 @@ struct DBImpl::CompactionState {
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
other_key_str_buf_.emplace_back(key.ToString());
other_value_str_buf_.emplace_back(value.ToString());
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back()));
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back()));
}
// Add a kv-pair to the combined buffer
@ -196,24 +181,24 @@ struct DBImpl::CompactionState {
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
size_t i = 0;
size_t j = 0;
size_t total_size = key_buf_.size() + other_key_buf_.size();
size_t total_size = key_str_buf_.size() + other_key_str_buf_.size();
combined_key_buf_.reserve(total_size);
combined_value_buf_.reserve(total_size);
while (i + j < total_size) {
int comp_res = 0;
if (i < key_buf_.size() && j < other_key_buf_.size()) {
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]);
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) {
if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) {
comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]);
} else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) {
comp_res = 1;
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) {
} else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) {
comp_res = -1;
}
if (comp_res > 0) {
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]);
AddToCombinedKeyValueSlices(other_key_str_buf_[j], other_value_str_buf_[j]);
j++;
} else if (comp_res < 0) {
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]);
AddToCombinedKeyValueSlices(key_str_buf_[i], existing_value_str_buf_[i]);
i++;
}
}
@ -221,29 +206,19 @@ struct DBImpl::CompactionState {
void CleanupBatchBuffer() {
to_delete_buf_.clear();
key_buf_.clear();
existing_value_buf_.clear();
key_str_buf_.clear();
existing_value_str_buf_.clear();
new_value_buf_.clear();
value_changed_buf_.clear();
ikey_buf_.clear();
to_delete_buf_.shrink_to_fit();
key_buf_.shrink_to_fit();
existing_value_buf_.shrink_to_fit();
key_str_buf_.shrink_to_fit();
existing_value_str_buf_.shrink_to_fit();
new_value_buf_.shrink_to_fit();
value_changed_buf_.shrink_to_fit();
ikey_buf_.shrink_to_fit();
other_key_buf_.clear();
other_value_buf_.clear();
other_key_str_buf_.clear();
other_value_str_buf_.clear();
other_key_buf_.shrink_to_fit();
other_value_buf_.shrink_to_fit();
other_key_str_buf_.shrink_to_fit();
other_value_str_buf_.shrink_to_fit();
}
@ -2862,9 +2837,22 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
return;
}
// Assemble slice vectors for user keys and existing values.
// We also keep traack of our parsed internal key structs because
// we may need to access the sequence number in the event that
// keys are garbage collected during the filter process.
std::vector<ParsedInternalKey> ikey_buf;
std::vector<Slice> user_key_buf;
for (const auto& key : compact->ikey_buf_) {
user_key_buf.emplace_back(key.user_key);
std::vector<Slice> existing_value_buf;
for (const auto& key : compact->key_str_buf_) {
ParsedInternalKey ikey;
ParseInternalKey(Slice(key), &ikey);
ikey_buf.emplace_back(ikey);
user_key_buf.emplace_back(ikey.user_key);
}
for (const auto& value : compact->existing_value_str_buf_) {
existing_value_buf.emplace_back(Slice(value));
}
// If the user has specified a compaction filter and the sequence
@ -2874,16 +2862,16 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
// the entry with a delete marker.
compact->to_delete_buf_ = compaction_filter_v2->Filter(
compact->compaction->level(),
user_key_buf, compact->existing_value_buf_,
user_key_buf, existing_value_buf,
&compact->new_value_buf_,
&compact->value_changed_buf_);
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert(compact->to_delete_buf_.size() ==
compact->key_buf_.size());
compact->key_str_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->existing_value_buf_.size());
compact->existing_value_str_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->value_changed_buf_.size());
@ -2894,15 +2882,15 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
// the Slice buffer points to the updated buffer
UpdateInternalKey(&compact->key_str_buf_[i][0],
compact->key_str_buf_[i].size(),
compact->ikey_buf_[i].sequence,
ikey_buf[i].sequence,
kTypeDeletion);
// no value associated with delete
compact->existing_value_buf_[i].clear();
compact->existing_value_str_buf_[i].clear();
RecordTick(stats_, COMPACTION_KEY_DROP_USER);
} else if (compact->value_changed_buf_[i]) {
compact->existing_value_buf_[i] =
Slice(compact->new_value_buf_[new_value_idx++]);
compact->existing_value_str_buf_[i] =
compact->new_value_buf_[new_value_idx++];
}
} // for
}
@ -3031,7 +3019,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact->key_buf_.size() > 0) {
if (compact->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->cur_prefix_ = key_prefix.ToString();
@ -3074,7 +3062,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact->key_buf_.size() > 0) {
if (compact->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
@ -3097,7 +3085,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
} // done processing all prefix batches
// finish the last batch
if (compact->key_buf_.size() > 0) {
if (compact->key_str_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

View file

@ -61,6 +61,10 @@ typedef struct rocksdb_compactionfiltercontext_t
rocksdb_compactionfiltercontext_t;
typedef struct rocksdb_compactionfilterfactory_t
rocksdb_compactionfilterfactory_t;
typedef struct rocksdb_compactionfilterv2_t
rocksdb_compactionfilterv2_t;
typedef struct rocksdb_compactionfilterfactoryv2_t
rocksdb_compactionfilterfactoryv2_t;
typedef struct rocksdb_comparator_t rocksdb_comparator_t;
typedef struct rocksdb_env_t rocksdb_env_t;
typedef struct rocksdb_fifo_compaction_options_t rocksdb_fifo_compaction_options_t;
@ -359,10 +363,14 @@ extern void rocksdb_options_set_compaction_filter(
rocksdb_compactionfilter_t*);
extern void rocksdb_options_set_compaction_filter_factory(
rocksdb_options_t*, rocksdb_compactionfilterfactory_t*);
extern void rocksdb_options_set_compaction_filter_factory_v2(
rocksdb_options_t*,
rocksdb_compactionfilterfactoryv2_t*);
extern void rocksdb_options_set_comparator(
rocksdb_options_t*,
rocksdb_comparator_t*);
extern void rocksdb_options_set_merge_operator(rocksdb_options_t*,
extern void rocksdb_options_set_merge_operator(
rocksdb_options_t*,
rocksdb_mergeoperator_t*);
extern void rocksdb_options_set_compression_per_level(
rocksdb_options_t* opt,
@ -571,6 +579,35 @@ extern rocksdb_compactionfilterfactory_t*
extern void rocksdb_compactionfilterfactory_destroy(
rocksdb_compactionfilterfactory_t*);
/* Compaction Filter V2 */
extern rocksdb_compactionfilterv2_t* rocksdb_compactionfilterv2_create(
void* state,
void (*destructor)(void*),
// num_keys specifies the number of array entries in every *list parameter.
// New values added to the new_values_list should be malloc'd and will be
// freed by the caller. Specify true in the to_delete_list to remove an
// entry during compaction; false to keep it.
void (*filter)(
void*, int level, size_t num_keys,
const char* const* keys_list, const size_t* keys_list_sizes,
const char* const* existing_values_list, const size_t* existing_values_list_sizes,
char** new_values_list, size_t* new_values_list_sizes,
unsigned char* to_delete_list),
const char* (*name)(void*));
extern void rocksdb_compactionfilterv2_destroy(rocksdb_compactionfilterv2_t*);
/* Compaction Filter Factory V2 */
extern rocksdb_compactionfilterfactoryv2_t* rocksdb_compactionfilterfactoryv2_create(
void* state,
rocksdb_slicetransform_t* prefix_extractor,
void (*destructor)(void*),
rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2)(
const rocksdb_compactionfiltercontext_t* context),
const char* (*name)(void*));
extern void rocksdb_compactionfilterfactoryv2_destroy(rocksdb_compactionfilterfactoryv2_t*);
/* Comparator */
extern rocksdb_comparator_t* rocksdb_comparator_create(