Add a clipping internal iterator (#8327)

Summary:
Logically, subcompactions process a key range [start, end); however, the way
this is currently implemented is that the `CompactionIterator` for any given
subcompaction keeps processing key-values until it actually outputs a key that
is out of range, which is then discarded. Instead of doing this, the patch
introduces a new type of internal iterator called `ClippingIterator` which wraps
another internal iterator and "clips" its range of key-values so that any KVs
returned are strictly in the [start, end) interval. This does eliminate a (minor)
inefficiency by stopping processing in subcompactions exactly at the limit;
however, the main motivation is related to BlobDB: namely, we need this to be
able to measure the amount of garbage generated by a subcompaction
precisely and prevent off-by-one errors.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8327

Test Plan: `make check`

Reviewed By: siying

Differential Revision: D28761541

Pulled By: ltamasi

fbshipit-source-id: ee0e7229f04edabbc7bed5adb51771fbdc287f69
This commit is contained in:
Levi Tamasi 2021-06-09 15:40:16 -07:00 committed by Facebook GitHub Bot
parent 2f93a3b809
commit db325a5904
7 changed files with 583 additions and 19 deletions

View File

@ -1133,6 +1133,7 @@ if(WITH_TESTS)
db/blob/db_blob_index_test.cc
db/column_family_test.cc
db/compact_files_test.cc
db/compaction/clipping_iterator_test.cc
db/compaction/compaction_job_stats_test.cc
db/compaction/compaction_job_test.cc
db/compaction/compaction_iterator_test.cc

View File

@ -1866,6 +1866,10 @@ db_blob_corruption_test: $(OBJ_DIR)/db/blob/db_blob_corruption_test.o $(TEST_LIB
db_write_buffer_manager_test: $(OBJ_DIR)/db/db_write_buffer_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
clipping_iterator_test: $(OBJ_DIR)/db/compaction/clipping_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
PREFIX ?= /usr/local

View File

@ -1051,6 +1051,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"clipping_iterator_test",
"db/compaction/clipping_iterator_test.cc",
"parallel",
[],
[],
],
[
"coding_test",
"util/coding_test.cc",

View File

@ -0,0 +1,275 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cassert>
#include "table/internal_iterator.h"
namespace ROCKSDB_NAMESPACE {
// An internal iterator that wraps another one and ensures that any keys
// returned are strictly within a range [start, end). If the underlying
// iterator has already performed the bounds checking, it relies on that result;
// otherwise, it performs the necessary key comparisons itself. Both bounds
// are optional.
class ClippingIterator : public InternalIterator {
public:
ClippingIterator(InternalIterator* iter, const Slice* start, const Slice* end,
const Comparator* cmp)
: iter_(iter), start_(start), end_(end), cmp_(cmp), valid_(false) {
assert(iter_);
assert(cmp_);
assert(!start_ || !end_ || cmp_->Compare(*start_, *end_) <= 0);
UpdateAndEnforceBounds();
}
bool Valid() const override { return valid_; }
void SeekToFirst() override {
if (start_) {
iter_->Seek(*start_);
} else {
iter_->SeekToFirst();
}
UpdateAndEnforceUpperBound();
}
void SeekToLast() override {
if (end_) {
iter_->SeekForPrev(*end_);
// Upper bound is exclusive, so we need a key which is strictly smaller
if (iter_->Valid() && cmp_->Compare(iter_->key(), *end_) == 0) {
iter_->Prev();
}
} else {
iter_->SeekToLast();
}
UpdateAndEnforceLowerBound();
}
void Seek(const Slice& target) override {
if (start_ && cmp_->Compare(target, *start_) < 0) {
iter_->Seek(*start_);
UpdateAndEnforceUpperBound();
return;
}
if (end_ && cmp_->Compare(target, *end_) >= 0) {
valid_ = false;
return;
}
iter_->Seek(target);
UpdateAndEnforceUpperBound();
}
void SeekForPrev(const Slice& target) override {
if (start_ && cmp_->Compare(target, *start_) < 0) {
valid_ = false;
return;
}
if (end_ && cmp_->Compare(target, *end_) >= 0) {
iter_->SeekForPrev(*end_);
// Upper bound is exclusive, so we need a key which is strictly smaller
if (iter_->Valid() && cmp_->Compare(iter_->key(), *end_) == 0) {
iter_->Prev();
}
UpdateAndEnforceLowerBound();
return;
}
iter_->SeekForPrev(target);
UpdateAndEnforceLowerBound();
}
void Next() override {
assert(valid_);
iter_->Next();
UpdateAndEnforceUpperBound();
}
bool NextAndGetResult(IterateResult* result) override {
assert(valid_);
assert(result);
IterateResult res;
valid_ = iter_->NextAndGetResult(&res);
if (!valid_) {
return false;
}
if (end_) {
EnforceUpperBoundImpl(res.bound_check_result);
if (!valid_) {
return false;
}
}
res.bound_check_result = IterBoundCheck::kInbound;
*result = res;
return true;
}
void Prev() override {
assert(valid_);
iter_->Prev();
UpdateAndEnforceLowerBound();
}
Slice key() const override {
assert(valid_);
return iter_->key();
}
Slice user_key() const override {
assert(valid_);
return iter_->user_key();
}
Slice value() const override {
assert(valid_);
return iter_->value();
}
Status status() const override { return iter_->status(); }
bool PrepareValue() override {
assert(valid_);
if (iter_->PrepareValue()) {
return true;
}
assert(!iter_->Valid());
valid_ = false;
return false;
}
bool MayBeOutOfLowerBound() override {
assert(valid_);
return false;
}
IterBoundCheck UpperBoundCheckResult() override {
assert(valid_);
return IterBoundCheck::kInbound;
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
iter_->SetPinnedItersMgr(pinned_iters_mgr);
}
bool IsKeyPinned() const override {
assert(valid_);
return iter_->IsKeyPinned();
}
bool IsValuePinned() const override {
assert(valid_);
return iter_->IsValuePinned();
}
Status GetProperty(std::string prop_name, std::string* prop) override {
return iter_->GetProperty(prop_name, prop);
}
private:
void UpdateValid() {
assert(!iter_->Valid() || iter_->status().ok());
valid_ = iter_->Valid();
}
void EnforceUpperBoundImpl(IterBoundCheck bound_check_result) {
if (bound_check_result == IterBoundCheck::kInbound) {
return;
}
if (bound_check_result == IterBoundCheck::kOutOfBound) {
valid_ = false;
return;
}
assert(bound_check_result == IterBoundCheck::kUnknown);
if (cmp_->Compare(key(), *end_) >= 0) {
valid_ = false;
}
}
void EnforceUpperBound() {
if (!valid_) {
return;
}
if (!end_) {
return;
}
EnforceUpperBoundImpl(iter_->UpperBoundCheckResult());
}
void EnforceLowerBound() {
if (!valid_) {
return;
}
if (!start_) {
return;
}
if (!iter_->MayBeOutOfLowerBound()) {
return;
}
if (cmp_->Compare(key(), *start_) < 0) {
valid_ = false;
}
}
void AssertBounds() {
assert(!valid_ || !start_ || cmp_->Compare(key(), *start_) >= 0);
assert(!valid_ || !end_ || cmp_->Compare(key(), *end_) < 0);
}
void UpdateAndEnforceBounds() {
UpdateValid();
EnforceUpperBound();
EnforceLowerBound();
AssertBounds();
}
void UpdateAndEnforceUpperBound() {
UpdateValid();
EnforceUpperBound();
AssertBounds();
}
void UpdateAndEnforceLowerBound() {
UpdateValid();
EnforceLowerBound();
AssertBounds();
}
InternalIterator* iter_;
const Slice* start_;
const Slice* end_;
const Comparator* cmp_;
bool valid_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,256 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/compaction/clipping_iterator.h"
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/comparator.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
// A vector iterator which does its own bounds checking. This is for testing the
// optimizations in the clipping iterator where we bypass the bounds checking if
// the input iterator has already performed it.
class BoundsCheckingVectorIterator : public test::VectorIterator {
public:
BoundsCheckingVectorIterator(const std::vector<std::string>& keys,
const std::vector<std::string>& values,
const Slice* start, const Slice* end,
const Comparator* cmp)
: VectorIterator(keys, values), start_(start), end_(end), cmp_(cmp) {
assert(cmp_);
}
bool NextAndGetResult(IterateResult* result) override {
assert(Valid());
assert(result);
Next();
if (!Valid()) {
return false;
}
result->key = key();
result->bound_check_result = UpperBoundCheckResult();
result->value_prepared = true;
return true;
}
bool MayBeOutOfLowerBound() override {
assert(Valid());
if (!start_) {
return false;
}
return cmp_->Compare(key(), *start_) < 0;
}
IterBoundCheck UpperBoundCheckResult() override {
assert(Valid());
if (!end_) {
return IterBoundCheck::kInbound;
}
return cmp_->Compare(key(), *end_) >= 0 ? IterBoundCheck::kOutOfBound
: IterBoundCheck::kInbound;
}
private:
const Slice* start_;
const Slice* end_;
const Comparator* cmp_;
};
class ClippingIteratorTest
: public ::testing::Test,
public ::testing::WithParamInterface<std::tuple<bool, size_t, size_t>> {};
TEST_P(ClippingIteratorTest, Clip) {
const std::vector<std::string> keys{"key0", "key1", "key2", "key3", "key4",
"key5", "key6", "key7", "key8", "key9"};
const std::vector<std::string> values{
"unused0", "value1", "value2", "value3", "unused4",
"unused5", "unused6", "unused7", "unused8", "unused9"};
assert(keys.size() == values.size());
// Note: the input always contains key1, key2, and key3; however, the clipping
// window is based on the test parameters: its left edge is a value in the
// range [0, 4], and its size is a value in the range [0, 5]
const std::vector<std::string> input_keys{keys[1], keys[2], keys[3]};
const std::vector<std::string> input_values{values[1], values[2], values[3]};
const bool use_bounds_checking_vec_it = std::get<0>(GetParam());
const size_t clip_start_idx = std::get<1>(GetParam());
const size_t clip_window_size = std::get<2>(GetParam());
const size_t clip_end_idx = clip_start_idx + clip_window_size;
const Slice start(keys[clip_start_idx]);
const Slice end(keys[clip_end_idx]);
std::unique_ptr<InternalIterator> input(
use_bounds_checking_vec_it
? new BoundsCheckingVectorIterator(input_keys, input_values, &start,
&end, BytewiseComparator())
: new test::VectorIterator(input_keys, input_values));
ClippingIterator clip(input.get(), &start, &end, BytewiseComparator());
// The range the clipping iterator should return values from. This is
// essentially the intersection of the input range [1, 4) and the clipping
// window [clip_start_idx, clip_end_idx)
const size_t data_start_idx =
std::max(clip_start_idx, static_cast<size_t>(1));
const size_t data_end_idx = std::min(clip_end_idx, static_cast<size_t>(4));
// Range is empty; all Seeks should fail
if (data_start_idx >= data_end_idx) {
clip.SeekToFirst();
ASSERT_FALSE(clip.Valid());
clip.SeekToLast();
ASSERT_FALSE(clip.Valid());
for (size_t i = 0; i < keys.size(); ++i) {
clip.Seek(keys[i]);
ASSERT_FALSE(clip.Valid());
clip.SeekForPrev(keys[i]);
ASSERT_FALSE(clip.Valid());
}
return;
}
// Range is non-empty; call SeekToFirst and iterate forward
clip.SeekToFirst();
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[data_start_idx]);
ASSERT_EQ(clip.value(), values[data_start_idx]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
for (size_t i = data_start_idx + 1; i < data_end_idx; ++i) {
clip.Next();
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[i]);
ASSERT_EQ(clip.value(), values[i]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
}
clip.Next();
ASSERT_FALSE(clip.Valid());
// Do it again using NextAndGetResult
clip.SeekToFirst();
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[data_start_idx]);
ASSERT_EQ(clip.value(), values[data_start_idx]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
for (size_t i = data_start_idx + 1; i < data_end_idx; ++i) {
IterateResult result;
ASSERT_TRUE(clip.NextAndGetResult(&result));
ASSERT_EQ(result.key, keys[i]);
ASSERT_EQ(result.bound_check_result, IterBoundCheck::kInbound);
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[i]);
ASSERT_EQ(clip.value(), values[i]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
}
IterateResult result;
ASSERT_FALSE(clip.NextAndGetResult(&result));
ASSERT_FALSE(clip.Valid());
// Call SeekToLast and iterate backward
clip.SeekToLast();
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[data_end_idx - 1]);
ASSERT_EQ(clip.value(), values[data_end_idx - 1]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
for (size_t i = data_end_idx - 2; i >= data_start_idx; --i) {
clip.Prev();
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[i]);
ASSERT_EQ(clip.value(), values[i]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
}
clip.Prev();
ASSERT_FALSE(clip.Valid());
// Call Seek/SeekForPrev for all keys; Seek should return the smallest key
// which is >= the target; SeekForPrev should return the largest key which is
// <= the target
for (size_t i = 0; i < keys.size(); ++i) {
clip.Seek(keys[i]);
if (i < data_start_idx) {
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[data_start_idx]);
ASSERT_EQ(clip.value(), values[data_start_idx]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
} else if (i < data_end_idx) {
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[i]);
ASSERT_EQ(clip.value(), values[i]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
} else {
ASSERT_FALSE(clip.Valid());
}
clip.SeekForPrev(keys[i]);
if (i < data_start_idx) {
ASSERT_FALSE(clip.Valid());
} else if (i < data_end_idx) {
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[i]);
ASSERT_EQ(clip.value(), values[i]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
} else {
ASSERT_TRUE(clip.Valid());
ASSERT_EQ(clip.key(), keys[data_end_idx - 1]);
ASSERT_EQ(clip.value(), values[data_end_idx - 1]);
ASSERT_FALSE(clip.MayBeOutOfLowerBound());
ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound);
}
}
}
INSTANTIATE_TEST_CASE_P(
ClippingIteratorTest, ClippingIteratorTest,
::testing::Combine(
::testing::Bool(),
::testing::Range(static_cast<size_t>(0), static_cast<size_t>(5)),
::testing::Range(static_cast<size_t>(0), static_cast<size_t>(6))));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -23,6 +23,7 @@
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_builder.h"
#include "db/builder.h"
#include "db/compaction/clipping_iterator.h"
#include "db/db_impl/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -1086,6 +1087,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_);
const Slice* const start = sub_compact->start;
const Slice* const end = sub_compact->end;
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
@ -1094,12 +1099,39 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;
read_options.iterate_lower_bound = start;
read_options.iterate_upper_bound = end;
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(
std::unique_ptr<InternalIterator> raw_input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
InternalIterator* input = raw_input.get();
IterKey start_ikey;
IterKey end_ikey;
Slice start_slice;
Slice end_slice;
if (start) {
start_ikey.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
start_slice = start_ikey.GetInternalKey();
}
if (end) {
end_ikey.SetInternalKey(*end, kMaxSequenceNumber, kValueTypeForSeek);
end_slice = end_ikey.GetInternalKey();
}
std::unique_ptr<InternalIterator> clip;
if (start || end) {
clip.reset(new ClippingIterator(
raw_input.get(), start ? &start_slice : nullptr,
end ? &end_slice : nullptr, &cfd->internal_comparator()));
input = clip.get();
}
input->SeekToFirst();
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@ -1154,21 +1186,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
reinterpret_cast<void*>(
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
Slice* start = sub_compact->start;
Slice* end = sub_compact->end;
if (start != nullptr) {
IterKey start_iter;
start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
input->Seek(start_iter.GetInternalKey());
} else {
input->SeekToFirst();
}
Status status;
const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
input, cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
@ -1199,12 +1221,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
const Slice& key = c_iter->key();
const Slice& value = c_iter->value();
// If an end key (exclusive) is specified, check if the current key is
// >= than it and exit if it is because the iterator is out of its range
if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break;
}
assert(!end ||
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
if (c_iter_stats.num_input_records % kRecordStatsEvery ==
kRecordStatsEvery - 1) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
@ -1391,7 +1410,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
sub_compact->c_iter.reset();
input.reset();
clip.reset();
raw_input.reset();
sub_compact->status = status;
}

1
src.mk
View File

@ -388,6 +388,7 @@ TEST_MAIN_SOURCES = \
db/blob/db_blob_index_test.cc \
db/column_family_test.cc \
db/compact_files_test.cc \
db/compaction/clipping_iterator_test.cc \
db/compaction/compaction_iterator_test.cc \
db/compaction/compaction_job_test.cc \
db/compaction/compaction_job_stats_test.cc \