From c1f588af71133483330564c058765e2503caa0fb Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Wed, 6 Aug 2014 14:06:41 -0400 Subject: [PATCH] Add support for C bindings to the compaction V2 filter mechanism. Test Plan: make c_test && ./c_test Some fixes after merge. --- db/c.cc | 147 ++++++++++++++++++++++++++++++++++++++++++++ db/c_test.c | 89 +++++++++++++++++++++++++++ db/db_impl.cc | 80 ++++++++++-------------- include/rocksdb/c.h | 41 +++++++++++- 4 files changed, 309 insertions(+), 48 deletions(-) diff --git a/db/c.cc b/db/c.cc index a1f7a8b0d7..fa609db7b3 100644 --- a/db/c.cc +++ b/db/c.cc @@ -36,6 +36,9 @@ using rocksdb::ColumnFamilyHandle; using rocksdb::ColumnFamilyOptions; using rocksdb::CompactionFilter; using rocksdb::CompactionFilterFactory; +using rocksdb::CompactionFilterV2; +using rocksdb::CompactionFilterFactoryV2; +using rocksdb::CompactionFilterContext; using rocksdb::CompactionOptionsFIFO; using rocksdb::Comparator; using rocksdb::CompressionType; @@ -154,6 +157,104 @@ struct rocksdb_compactionfilterfactory_t : public CompactionFilterFactory { virtual const char* Name() const { return (*name_)(state_); } }; +struct rocksdb_compactionfilterv2_t : public CompactionFilterV2 { + void* state_; + void (*destructor_)(void*); + const char* (*name_)(void*); + void (*filter_)(void*, int level, size_t num_keys, + const char* const* keys_list, const size_t* keys_list_sizes, + const char* const* existing_values_list, const size_t* existing_values_list_sizes, + char** new_values_list, size_t* new_values_list_sizes, + unsigned char* to_delete_list); + + virtual ~rocksdb_compactionfilterv2_t() { + (*destructor_)(state_); + } + + virtual const char* Name() const { + return (*name_)(state_); + } + + virtual std::vector Filter(int level, + const SliceVector& keys, + const SliceVector& existing_values, + std::vector* new_values, + std::vector* values_changed) const { + // Make a vector pointing to the underlying key data. + size_t num_keys = keys.size(); + std::vector keys_list(num_keys); + std::vector keys_list_sizes(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + keys_list[i] = keys[i].data(); + keys_list_sizes[i] = keys[i].size(); + } + // Make a vector pointing to the underlying value data. + std::vector existing_values_list(num_keys); + std::vector existing_values_list_sizes(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + existing_values_list[i] = existing_values[i].data(); + existing_values_list_sizes[i] = existing_values[i].size(); + } + // Make a vector which will accept newly-allocated char* arrays + // which we will take ownership of and assign to strings in new_values. + new_values->clear(); + std::vector new_values_list(num_keys); + std::vector new_values_list_sizes(num_keys); + // Resize values_changed to hold all keys. + values_changed->resize(num_keys); + // Make a vector for bools indicating a value should be deleted + // on compaction (true) or maintained (false). + std::vector to_delete_list(num_keys); + + (*filter_)( + state_, level, num_keys, &keys_list[0], &keys_list_sizes[0], + &existing_values_list[0], &existing_values_list_sizes[0], + &new_values_list[0], &new_values_list_sizes[0], &to_delete_list[0]); + + // Now, we transfer any changed values, setting values_changed and + // initializing new_values in the event a value changed. + std::vector to_delete(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + to_delete[i] = to_delete_list[i]; + (*values_changed)[i] = new_values_list[i] != nullptr; + if ((*values_changed)[i]) { + new_values->push_back(std::string(new_values_list[i], new_values_list_sizes[i])); + free(new_values_list[i]); + } + } + return to_delete; + } +}; + +struct rocksdb_compactionfilterfactoryv2_t : public CompactionFilterFactoryV2 { + void* state_; + void (*destructor_)(void*); + const char* (*name_)(void*); + rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2_)( + const rocksdb_compactionfiltercontext_t* context); + + rocksdb_compactionfilterfactoryv2_t(const SliceTransform* prefix_extractor) + : CompactionFilterFactoryV2(prefix_extractor) { + } + + virtual ~rocksdb_compactionfilterfactoryv2_t() { + (*destructor_)(state_); + } + + virtual const char* Name() const { + return (*name_)(state_); + } + + virtual std::unique_ptr CreateCompactionFilterV2( + const CompactionFilterContext& context) { + struct rocksdb_compactionfiltercontext_t c_context; + c_context.rep.is_full_compaction = context.is_full_compaction; + c_context.rep.is_manual_compaction = context.is_manual_compaction; + return std::unique_ptr( + (*create_compaction_filter_v2_)(&c_context)); + } +}; + struct rocksdb_comparator_t : public Comparator { void* state_; void (*destructor_)(void*); @@ -1004,6 +1105,12 @@ void rocksdb_options_set_merge_operator( opt->rep.merge_operator = std::shared_ptr(merge_operator); } +void rocksdb_options_set_compaction_filter_factory_v2( + rocksdb_options_t* opt, + rocksdb_compactionfilterfactoryv2_t* compaction_filter_factory_v2) { + opt->rep.compaction_filter_factory_v2 = std::shared_ptr(compaction_filter_factory_v2); +} + void rocksdb_options_set_filter_policy( rocksdb_options_t* opt, rocksdb_filterpolicy_t* policy) { @@ -1550,6 +1657,46 @@ void rocksdb_compactionfilterfactory_destroy( delete factory; } +rocksdb_compactionfilterv2_t* rocksdb_compactionfilterv2_create( + void* state, + void (*destructor)(void*), + void (*filter)(void*, int level, size_t num_keys, + const char* const* keys_list, const size_t* keys_list_sizes, + const char* const* existing_values_list, const size_t* existing_values_list_sizes, + char** new_values_list, size_t* new_values_list_sizes, + unsigned char* to_delete_list), + const char* (*name)(void*)) { + rocksdb_compactionfilterv2_t* result = new rocksdb_compactionfilterv2_t; + result->state_ = state; + result->destructor_ = destructor; + result->filter_ = filter; + result->name_ = name; + return result; +} + +void rocksdb_compactionfilterv2_destroy(rocksdb_compactionfilterv2_t* filter) { + delete filter; +} + +rocksdb_compactionfilterfactoryv2_t* rocksdb_compactionfilterfactoryv2_create( + void* state, + rocksdb_slicetransform_t* prefix_extractor, + void (*destructor)(void*), + rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2)( + const rocksdb_compactionfiltercontext_t* context), + const char* (*name)(void*)) { + rocksdb_compactionfilterfactoryv2_t* result = new rocksdb_compactionfilterfactoryv2_t(prefix_extractor); + result->state_ = state; + result->destructor_ = destructor; + result->create_compaction_filter_v2_ = create_compaction_filter_v2; + result->name_ = name; + return result; +} + +void rocksdb_compactionfilterfactoryv2_destroy(rocksdb_compactionfilterfactoryv2_t* factory) { + delete factory; +} + rocksdb_comparator_t* rocksdb_comparator_create( void* state, void (*destructor)(void*), diff --git a/db/c_test.c b/db/c_test.c index c900036ca8..664b3f1d82 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -225,6 +225,54 @@ static rocksdb_t* CheckCompaction(rocksdb_t* db, rocksdb_options_t* options, return db; } +// Custom compaction filter V2. +static void CompactionFilterV2Destroy(void* arg) { } +static const char* CompactionFilterV2Name(void* arg) { + return "TestCompactionFilterV2"; +} +static void CompactionFilterV2Filter( + void* arg, int level, size_t num_keys, + const char* const* keys_list, const size_t* keys_list_sizes, + const char* const* existing_values_list, const size_t* existing_values_list_sizes, + char** new_values_list, size_t* new_values_list_sizes, + unsigned char* to_delete_list) { + size_t i; + for (i = 0; i < num_keys; i++) { + // If any value is "gc", it's removed. + if (existing_values_list_sizes[i] == 2 && memcmp(existing_values_list[i], "gc", 2) == 0) { + to_delete_list[i] = 1; + } else if (existing_values_list_sizes[i] == 6 && memcmp(existing_values_list[i], "gc all", 6) == 0) { + // If any value is "gc all", all keys are removed. + size_t j; + for (j = 0; j < num_keys; j++) { + to_delete_list[j] = 1; + } + return; + } else if (existing_values_list_sizes[i] == 6 && memcmp(existing_values_list[i], "change", 6) == 0) { + // If value is "change", set changed value to "changed". + size_t len; + len = strlen("changed"); + new_values_list[i] = malloc(len); + memcpy(new_values_list[i], "changed", len); + new_values_list_sizes[i] = len; + } else { + // Otherwise, no keys are removed. + } + } +} + +// Custom compaction filter factory V2. +static void CompactionFilterFactoryV2Destroy(void* arg) { } +static const char* CompactionFilterFactoryV2Name(void* arg) { + return "TestCompactionFilterFactoryV2"; +} +static rocksdb_compactionfilterv2_t* CompactionFilterFactoryV2Create( + const rocksdb_compactionfiltercontext_t* context) { + return rocksdb_compactionfilterv2_create( + NULL, CompactionFilterV2Destroy, CompactionFilterV2Filter, + CompactionFilterV2Name); +} + // Custom merge operator static void MergeOperatorDestroy(void* arg) { } static const char* MergeOperatorName(void* arg) { @@ -531,6 +579,47 @@ int main(int argc, char** argv) { rocksdb_options_destroy(options); } + StartPhase("compaction_filter_v2"); + { + rocksdb_compactionfilterfactoryv2_t* factory; + rocksdb_slicetransform_t* prefix_extractor; + prefix_extractor = rocksdb_slicetransform_create_fixed_prefix(3); + factory = rocksdb_compactionfilterfactoryv2_create( + NULL, prefix_extractor, CompactionFilterFactoryV2Destroy, + CompactionFilterFactoryV2Create, CompactionFilterFactoryV2Name); + // Create new database + rocksdb_close(db); + rocksdb_destroy_db(options, dbname, &err); + rocksdb_options_set_compaction_filter_factory_v2(options, factory); + db = rocksdb_open(options, dbname, &err); + CheckNoError(err); + // Only foo2 is GC'd, foo3 is changed. + rocksdb_put(db, woptions, "foo1", 4, "no gc", 5, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "foo2", 4, "gc", 2, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "foo3", 4, "change", 6, &err); + CheckNoError(err); + // All bars are GC'd. + rocksdb_put(db, woptions, "bar1", 4, "no gc", 5, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "bar2", 4, "gc all", 6, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "bar3", 4, "no gc", 5, &err); + CheckNoError(err); + // Compact the DB to garbage collect. + rocksdb_compact_range(db, NULL, 0, NULL, 0); + + // Verify foo entries. + CheckGet(db, roptions, "foo1", "no gc"); + CheckGet(db, roptions, "foo2", NULL); + CheckGet(db, roptions, "foo3", "changed"); + // Verify bar entries were all deleted. + CheckGet(db, roptions, "bar1", NULL); + CheckGet(db, roptions, "bar2", NULL); + CheckGet(db, roptions, "bar3", NULL); + } + StartPhase("merge_operator"); { rocksdb_mergeoperator_t* merge_operator; diff --git a/db/db_impl.cc b/db/db_impl.cc index 3fedfac3e6..a16bda3a16 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -138,8 +138,6 @@ struct DBImpl::CompactionState { return context; } - std::vector key_buf_; - std::vector existing_value_buf_; std::vector key_str_buf_; std::vector existing_value_str_buf_; // new_value_buf_ will only be appended if a value changes @@ -149,12 +147,7 @@ struct DBImpl::CompactionState { std::vector value_changed_buf_; // to_delete_buf_[i] is true iff key_buf_[i] is deleted std::vector to_delete_buf_; - // buffer for the parsed internal keys, the string buffer is backed - // by key_str_buf_ - std::vector ikey_buf_; - std::vector other_key_buf_; - std::vector other_value_buf_; std::vector other_key_str_buf_; std::vector other_value_str_buf_; @@ -168,12 +161,6 @@ struct DBImpl::CompactionState { void BufferKeyValueSlices(const Slice& key, const Slice& value) { key_str_buf_.emplace_back(key.ToString()); existing_value_str_buf_.emplace_back(value.ToString()); - key_buf_.emplace_back(Slice(key_str_buf_.back())); - existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back())); - - ParsedInternalKey ikey; - ParseInternalKey(key_buf_.back(), &ikey); - ikey_buf_.emplace_back(ikey); } // Buffers the kv-pair that will not be run through compaction filter V2 @@ -181,8 +168,6 @@ struct DBImpl::CompactionState { void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) { other_key_str_buf_.emplace_back(key.ToString()); other_value_str_buf_.emplace_back(value.ToString()); - other_key_buf_.emplace_back(Slice(other_key_str_buf_.back())); - other_value_buf_.emplace_back(Slice(other_value_str_buf_.back())); } // Add a kv-pair to the combined buffer @@ -196,24 +181,24 @@ struct DBImpl::CompactionState { void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) { size_t i = 0; size_t j = 0; - size_t total_size = key_buf_.size() + other_key_buf_.size(); + size_t total_size = key_str_buf_.size() + other_key_str_buf_.size(); combined_key_buf_.reserve(total_size); combined_value_buf_.reserve(total_size); while (i + j < total_size) { int comp_res = 0; - if (i < key_buf_.size() && j < other_key_buf_.size()) { - comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]); - } else if (i >= key_buf_.size() && j < other_key_buf_.size()) { + if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) { + comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]); + } else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) { comp_res = 1; - } else if (j >= other_key_buf_.size() && i < key_buf_.size()) { + } else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) { comp_res = -1; } if (comp_res > 0) { - AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]); + AddToCombinedKeyValueSlices(other_key_str_buf_[j], other_value_str_buf_[j]); j++; } else if (comp_res < 0) { - AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]); + AddToCombinedKeyValueSlices(key_str_buf_[i], existing_value_str_buf_[i]); i++; } } @@ -221,29 +206,19 @@ struct DBImpl::CompactionState { void CleanupBatchBuffer() { to_delete_buf_.clear(); - key_buf_.clear(); - existing_value_buf_.clear(); key_str_buf_.clear(); existing_value_str_buf_.clear(); new_value_buf_.clear(); value_changed_buf_.clear(); - ikey_buf_.clear(); to_delete_buf_.shrink_to_fit(); - key_buf_.shrink_to_fit(); - existing_value_buf_.shrink_to_fit(); key_str_buf_.shrink_to_fit(); existing_value_str_buf_.shrink_to_fit(); new_value_buf_.shrink_to_fit(); value_changed_buf_.shrink_to_fit(); - ikey_buf_.shrink_to_fit(); - other_key_buf_.clear(); - other_value_buf_.clear(); other_key_str_buf_.clear(); other_value_str_buf_.clear(); - other_key_buf_.shrink_to_fit(); - other_value_buf_.shrink_to_fit(); other_key_str_buf_.shrink_to_fit(); other_value_str_buf_.shrink_to_fit(); } @@ -2862,9 +2837,22 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, return; } + // Assemble slice vectors for user keys and existing values. + // We also keep traack of our parsed internal key structs because + // we may need to access the sequence number in the event that + // keys are garbage collected during the filter process. + std::vector ikey_buf; std::vector user_key_buf; - for (const auto& key : compact->ikey_buf_) { - user_key_buf.emplace_back(key.user_key); + std::vector existing_value_buf; + + for (const auto& key : compact->key_str_buf_) { + ParsedInternalKey ikey; + ParseInternalKey(Slice(key), &ikey); + ikey_buf.emplace_back(ikey); + user_key_buf.emplace_back(ikey.user_key); + } + for (const auto& value : compact->existing_value_str_buf_) { + existing_value_buf.emplace_back(Slice(value)); } // If the user has specified a compaction filter and the sequence @@ -2874,16 +2862,16 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, // the entry with a delete marker. compact->to_delete_buf_ = compaction_filter_v2->Filter( compact->compaction->level(), - user_key_buf, compact->existing_value_buf_, + user_key_buf, existing_value_buf, &compact->new_value_buf_, &compact->value_changed_buf_); // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all // kv-pairs in this compaction run needs to be deleted. assert(compact->to_delete_buf_.size() == - compact->key_buf_.size()); + compact->key_str_buf_.size()); assert(compact->to_delete_buf_.size() == - compact->existing_value_buf_.size()); + compact->existing_value_str_buf_.size()); assert(compact->to_delete_buf_.size() == compact->value_changed_buf_.size()); @@ -2893,16 +2881,16 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, // update the string buffer directly // the Slice buffer points to the updated buffer UpdateInternalKey(&compact->key_str_buf_[i][0], - compact->key_str_buf_[i].size(), - compact->ikey_buf_[i].sequence, - kTypeDeletion); + compact->key_str_buf_[i].size(), + ikey_buf[i].sequence, + kTypeDeletion); // no value associated with delete - compact->existing_value_buf_[i].clear(); + compact->existing_value_str_buf_[i].clear(); RecordTick(stats_, COMPACTION_KEY_DROP_USER); } else if (compact->value_changed_buf_[i]) { - compact->existing_value_buf_[i] = - Slice(compact->new_value_buf_[new_value_idx++]); + compact->existing_value_str_buf_[i] = + compact->new_value_buf_[new_value_idx++]; } } // for } @@ -3031,7 +3019,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } else { // Now prefix changes, this batch is done. // Call compaction filter on the buffered values to change the value - if (compact->key_buf_.size() > 0) { + if (compact->key_str_buf_.size() > 0) { CallCompactionFilterV2(compact, compaction_filter_v2); } compact->cur_prefix_ = key_prefix.ToString(); @@ -3074,7 +3062,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, backup_input->Next(); if (!backup_input->Valid()) { // If this is the single last value, we need to merge it. - if (compact->key_buf_.size() > 0) { + if (compact->key_str_buf_.size() > 0) { CallCompactionFilterV2(compact, compaction_filter_v2); } compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); @@ -3097,7 +3085,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } } // done processing all prefix batches // finish the last batch - if (compact->key_buf_.size() > 0) { + if (compact->key_str_buf_.size() > 0) { CallCompactionFilterV2(compact, compaction_filter_v2); } compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 0d994c51a5..f678933756 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -61,6 +61,10 @@ typedef struct rocksdb_compactionfiltercontext_t rocksdb_compactionfiltercontext_t; typedef struct rocksdb_compactionfilterfactory_t rocksdb_compactionfilterfactory_t; +typedef struct rocksdb_compactionfilterv2_t + rocksdb_compactionfilterv2_t; +typedef struct rocksdb_compactionfilterfactoryv2_t + rocksdb_compactionfilterfactoryv2_t; typedef struct rocksdb_comparator_t rocksdb_comparator_t; typedef struct rocksdb_env_t rocksdb_env_t; typedef struct rocksdb_fifo_compaction_options_t rocksdb_fifo_compaction_options_t; @@ -359,11 +363,15 @@ extern void rocksdb_options_set_compaction_filter( rocksdb_compactionfilter_t*); extern void rocksdb_options_set_compaction_filter_factory( rocksdb_options_t*, rocksdb_compactionfilterfactory_t*); +extern void rocksdb_options_set_compaction_filter_factory_v2( + rocksdb_options_t*, + rocksdb_compactionfilterfactoryv2_t*); extern void rocksdb_options_set_comparator( rocksdb_options_t*, rocksdb_comparator_t*); -extern void rocksdb_options_set_merge_operator(rocksdb_options_t*, - rocksdb_mergeoperator_t*); +extern void rocksdb_options_set_merge_operator( + rocksdb_options_t*, + rocksdb_mergeoperator_t*); extern void rocksdb_options_set_compression_per_level( rocksdb_options_t* opt, int* level_values, @@ -571,6 +579,35 @@ extern rocksdb_compactionfilterfactory_t* extern void rocksdb_compactionfilterfactory_destroy( rocksdb_compactionfilterfactory_t*); +/* Compaction Filter V2 */ + +extern rocksdb_compactionfilterv2_t* rocksdb_compactionfilterv2_create( + void* state, + void (*destructor)(void*), + // num_keys specifies the number of array entries in every *list parameter. + // New values added to the new_values_list should be malloc'd and will be + // freed by the caller. Specify true in the to_delete_list to remove an + // entry during compaction; false to keep it. + void (*filter)( + void*, int level, size_t num_keys, + const char* const* keys_list, const size_t* keys_list_sizes, + const char* const* existing_values_list, const size_t* existing_values_list_sizes, + char** new_values_list, size_t* new_values_list_sizes, + unsigned char* to_delete_list), + const char* (*name)(void*)); +extern void rocksdb_compactionfilterv2_destroy(rocksdb_compactionfilterv2_t*); + +/* Compaction Filter Factory V2 */ + +extern rocksdb_compactionfilterfactoryv2_t* rocksdb_compactionfilterfactoryv2_create( + void* state, + rocksdb_slicetransform_t* prefix_extractor, + void (*destructor)(void*), + rocksdb_compactionfilterv2_t* (*create_compaction_filter_v2)( + const rocksdb_compactionfiltercontext_t* context), + const char* (*name)(void*)); +extern void rocksdb_compactionfilterfactoryv2_destroy(rocksdb_compactionfilterfactoryv2_t*); + /* Comparator */ extern rocksdb_comparator_t* rocksdb_comparator_create(