mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 07:30:54 +00:00
6fbe96baf8
Summary: This diff introduces RangeDelAggregator, which takes ownership of iterators provided to it via AddTombstones(). The tombstones are organized in a two-level map (snapshot stripe -> begin key -> tombstone). Tombstone creation avoids data copy by holding Slices returned by the iterator, which remain valid thanks to pinning. For compaction, we create a hierarchical range tombstone iterator with structure matching the iterator over compaction input data. An aggregator based on that iterator is used by CompactionIterator to determine which keys are covered by range tombstones. In case of merge operand, the same aggregator is used by MergeHelper. Upon finishing each file in the compaction, relevant range tombstones are added to the output file's range tombstone metablock and file boundaries are updated accordingly. To check whether a key is covered by range tombstone, RangeDelAggregator::ShouldDelete() considers tombstones in the key's snapshot stripe. When this function is used outside of compaction, it also checks newer stripes, which can contain covering tombstones. Currently the intra-stripe check involves a linear scan; however, in the future we plan to collapse ranges within a stripe such that binary search can be used. RangeDelAggregator::AddToBuilder() adds all range tombstones in the table's key-range to a new table's range tombstone meta-block. Since range tombstones may fall in the gap between files, we may need to extend some files' key-ranges. The strategy is (1) first file extends as far left as possible and other files do not extend left, (2) all files extend right until either the start of the next file or the end of the last range tombstone in the gap, whichever comes first. One other notable change is adding release/move semantics to ScopedArenaIterator such that it can be used to transfer ownership of an arena-allocated iterator, similar to how unique_ptr is used for malloc'd data. Depends on D61473 Test Plan: compaction_iterator_test, mock_table, end-to-end tests in D63927 Reviewers: sdong, IslamAbdelRahman, wanning, yhchiang, lightmark Reviewed By: lightmark Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62205
291 lines
11 KiB
C++
291 lines
11 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#include <algorithm>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "db/merge_helper.h"
|
|
#include "rocksdb/comparator.h"
|
|
#include "util/coding.h"
|
|
#include "util/testharness.h"
|
|
#include "util/testutil.h"
|
|
#include "utilities/merge_operators.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class MergeHelperTest : public testing::Test {
|
|
public:
|
|
MergeHelperTest() { env_ = Env::Default(); }
|
|
|
|
~MergeHelperTest() = default;
|
|
|
|
Status Run(SequenceNumber stop_before, bool at_bottom,
|
|
SequenceNumber latest_snapshot = 0) {
|
|
iter_.reset(new test::VectorIterator(ks_, vs_));
|
|
iter_->SeekToFirst();
|
|
merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(),
|
|
merge_op_.get(), filter_.get(), nullptr,
|
|
2U, false, latest_snapshot));
|
|
return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */,
|
|
stop_before, at_bottom);
|
|
}
|
|
|
|
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
|
|
const ValueType& t, const std::string& val,
|
|
bool corrupt = false) {
|
|
InternalKey ikey(user_key, seq, t);
|
|
if (corrupt) {
|
|
test::CorruptKeyType(&ikey);
|
|
}
|
|
ks_.push_back(ikey.Encode().ToString());
|
|
vs_.push_back(val);
|
|
}
|
|
|
|
Env* env_;
|
|
std::unique_ptr<test::VectorIterator> iter_;
|
|
std::shared_ptr<MergeOperator> merge_op_;
|
|
std::unique_ptr<MergeHelper> merge_helper_;
|
|
std::vector<std::string> ks_;
|
|
std::vector<std::string> vs_;
|
|
std::unique_ptr<test::FilterNumber> filter_;
|
|
};
|
|
|
|
// If MergeHelper encounters a new key on the last level, we know that
|
|
// the key has no more history and it can merge keys.
|
|
TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 20, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("b", 10, kTypeMerge, test::EncodeInt(4U)); // <- iter_ after merge
|
|
|
|
ASSERT_TRUE(Run(0, true).ok());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging with a value results in a successful merge.
|
|
TEST_F(MergeHelperTest, MergeValue) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); // <- iter_ after merge
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(0, false).ok());
|
|
ASSERT_EQ(ks_[3], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(8U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging stops before a snapshot.
|
|
TEST_F(MergeHelperTest, SnapshotBeforeValue) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(3U)); // <- iter_ after merge
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U));
|
|
AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(31, true).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// MergeHelper preserves the operand stack for merge operators that
|
|
// cannot do a partial merge.
|
|
TEST_F(MergeHelperTest, NoPartialMerge) {
|
|
merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, "v2");
|
|
AddKeyVal("a", 40, kTypeMerge, "v"); // <- iter_ after merge
|
|
AddKeyVal("a", 30, kTypeMerge, "v");
|
|
|
|
ASSERT_TRUE(Run(31, true).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ("v", merge_helper_->values()[0]);
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]);
|
|
ASSERT_EQ("v2", merge_helper_->values()[1]);
|
|
ASSERT_EQ(2U, merge_helper_->keys().size());
|
|
ASSERT_EQ(2U, merge_helper_->values().size());
|
|
}
|
|
|
|
// A single operand can not be merged.
|
|
TEST_F(MergeHelperTest, SingleOperand) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(31, true).IsMergeInProgress());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(1U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// Merging with a deletion turns the deletion into a value
|
|
TEST_F(MergeHelperTest, MergeDeletion) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 20, kTypeDeletion, "");
|
|
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(3U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// The merge helper stops upon encountering a corrupt key
|
|
TEST_F(MergeHelperTest, CorruptKey) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(1U));
|
|
// Corrupt key
|
|
AddKeyVal("a", 20, kTypeDeletion, "", true); // <- iter_ after merge
|
|
|
|
ASSERT_TRUE(Run(15, false).IsMergeInProgress());
|
|
ASSERT_EQ(ks_[2], iter_->key());
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]);
|
|
ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]);
|
|
ASSERT_EQ(1U, merge_helper_->keys().size());
|
|
ASSERT_EQ(1U, merge_helper_->values().size());
|
|
}
|
|
|
|
// The compaction filter is called on every merge operand
|
|
TEST_F(MergeHelperTest, FilterMergeOperands) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 25, kTypeValue, test::EncodeInt(1U));
|
|
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_EQ(test::KeyStr("a", 30, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(8U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
}
|
|
|
|
TEST_F(MergeHelperTest, FilterAllMergeOperands) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
|
|
|
|
// filtered out all
|
|
ASSERT_TRUE(Run(15, false).ok());
|
|
ASSERT_FALSE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// we have one operand that will survive because it's a delete
|
|
AddKeyVal("a", 24, kTypeDeletion, test::EncodeInt(5U));
|
|
AddKeyVal("b", 23, kTypeValue, test::EncodeInt(5U));
|
|
ASSERT_TRUE(Run(15, true).ok());
|
|
merge_output_iter = MergeOutputIterator(merge_helper_.get());
|
|
ASSERT_TRUE(iter_->Valid());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// when all merge operands are filtered out, we leave the iterator pointing to
|
|
// the Put/Delete that survived
|
|
ASSERT_EQ(test::KeyStr("a", 24, kTypeDeletion), iter_->key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(5U), iter_->value().ToString());
|
|
}
|
|
|
|
// Make sure that merge operands are filtered at the beginning
|
|
TEST_F(MergeHelperTest, FilterFirstMergeOperand) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); // Filtered
|
|
AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); // next user key
|
|
|
|
ASSERT_OK(Run(15, true));
|
|
ASSERT_TRUE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
// sequence number is 29 here, because the first merge operand got filtered
|
|
// out
|
|
ASSERT_EQ(test::KeyStr("a", 29, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(6U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
|
|
// make sure that we're passing user keys into the filter
|
|
ASSERT_EQ("a", filter_->last_merge_operand_key());
|
|
}
|
|
|
|
// Make sure that merge operands are not filtered out if there's a snapshot
|
|
// pointing at them
|
|
TEST_F(MergeHelperTest, DontFilterMergeOperandsBeforeSnapshotTest) {
|
|
merge_op_ = MergeOperators::CreateUInt64AddOperator();
|
|
filter_.reset(new test::FilterNumber(5U));
|
|
|
|
AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U));
|
|
AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U));
|
|
AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U));
|
|
AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U));
|
|
AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U));
|
|
|
|
ASSERT_OK(Run(15, true, 32));
|
|
ASSERT_TRUE(iter_->Valid());
|
|
MergeOutputIterator merge_output_iter(merge_helper_.get());
|
|
merge_output_iter.SeekToFirst();
|
|
ASSERT_EQ(test::KeyStr("a", 31, kTypeValue),
|
|
merge_output_iter.key().ToString());
|
|
ASSERT_EQ(test::EncodeInt(26U), merge_output_iter.value().ToString());
|
|
merge_output_iter.Next();
|
|
ASSERT_FALSE(merge_output_iter.Valid());
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|