Compaction Support for Range Deletion

Summary:
This diff introduces RangeDelAggregator, which takes ownership of iterators
provided to it via AddTombstones(). The tombstones are organized in a two-level
map (snapshot stripe -> begin key -> tombstone). Tombstone creation avoids data
copy by holding Slices returned by the iterator, which remain valid thanks to pinning.

For compaction, we create a hierarchical range tombstone iterator with structure
matching the iterator over compaction input data. An aggregator based on that
iterator is used by CompactionIterator to determine which keys are covered by
range tombstones. In case of merge operand, the same aggregator is used by
MergeHelper. Upon finishing each file in the compaction, relevant range tombstones
are added to the output file's range tombstone metablock and file boundaries are
updated accordingly.

To check whether a key is covered by range tombstone, RangeDelAggregator::ShouldDelete()
considers tombstones in the key's snapshot stripe. When this function is used outside of
compaction, it also checks newer stripes, which can contain covering tombstones. Currently
the intra-stripe check involves a linear scan; however, in the future we plan to collapse ranges
within a stripe such that binary search can be used.

RangeDelAggregator::AddToBuilder() adds all range tombstones in the table's key-range
to a new table's range tombstone meta-block. Since range tombstones may fall in the gap
between files, we may need to extend some files' key-ranges. The strategy is (1) first file
extends as far left as possible and other files do not extend left, (2) all files extend right
until either the start of the next file or the end of the last range tombstone in the gap,
whichever comes first.

One other notable change is adding release/move semantics to ScopedArenaIterator
such that it can be used to transfer ownership of an arena-allocated iterator, similar to
how unique_ptr is used for malloc'd data.

Depends on D61473

Test Plan: compaction_iterator_test, mock_table, end-to-end tests in D63927

Reviewers: sdong, IslamAbdelRahman, wanning, yhchiang, lightmark

Reviewed By: lightmark

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D62205
This commit is contained in:
Andrew Kryczka 2016-10-18 12:04:56 -07:00
parent 257de78d9b
commit 6fbe96baf8
22 changed files with 494 additions and 84 deletions

View file

@ -272,6 +272,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/range_del_aggregator.cc
db/repair.cc
db/snapshot_impl.cc
db/table_cache.cc

View file

@ -112,16 +112,18 @@ Status BuildTable(
compression_opts, level);
}
std::unique_ptr<RangeDelAggregator> range_del_agg;
range_del_agg.reset(new RangeDelAggregator(internal_comparator, snapshots));
MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
mutable_cf_options.min_partial_merge_operands,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back());
CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
&merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot, env,
true /* internal key corruption is not ok */);
CompactionIterator c_iter(
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();

View file

@ -14,8 +14,9 @@ CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, const Compaction* compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
@ -23,6 +24,7 @@ CompactionIterator::CompactionIterator(
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
compaction_(compaction),
compaction_filter_(compaction_filter),
log_buffer_(log_buffer),
@ -153,6 +155,7 @@ void CompactionIterator::NextFromInput() {
if (!has_current_user_key_ ||
!cmp_->Equal(ikey_.user_key, current_user_key_)) {
// First occurrence of this user key
// Copy key for output
key_ = current_key_.SetKey(key_, &ikey_);
current_user_key_ = ikey_.user_key;
has_current_user_key_ = true;
@ -321,7 +324,7 @@ void CompactionIterator::NextFromInput() {
}
} else {
// We are at the end of the input, could not parse the next key, or hit
// the next key. The iterator returns the single delete if the key
// a different key. The iterator returns the single delete if the key
// possibly exists beyond the current output level. We set
// has_current_user_key to false so that if the iterator is at the next
// key, we do not compare it again against the previous key at the next
@ -390,7 +393,8 @@ void CompactionIterator::NextFromInput() {
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_);
merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot,
bottommost_level_);
merge_out_iter_.SeekToFirst();
if (merge_out_iter_.Valid()) {
@ -416,7 +420,15 @@ void CompactionIterator::NextFromInput() {
pinned_iters_mgr_.ReleasePinnedData();
}
} else {
valid_ = true;
// 1. new user key -OR-
// 2. different snapshot stripe
bool should_delete =
range_del_agg_->ShouldDelete(key_, true /* for_compaction */);
if (should_delete) {
input_->Next();
} else {
valid_ = true;
}
}
}
}

View file

@ -14,6 +14,7 @@
#include "db/compaction.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "rocksdb/compaction_filter.h"
#include "util/log_buffer.h"
@ -47,6 +48,7 @@ class CompactionIterator {
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
@ -99,6 +101,7 @@ class CompactionIterator {
const SequenceNumber earliest_write_conflict_snapshot_;
Env* env_;
bool expect_valid_internal_key_;
RangeDelAggregator* range_del_agg_;
const Compaction* compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;

View file

@ -4,6 +4,10 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction_iterator.h"
#include <string>
#include <vector>
#include "util/testharness.h"
#include "util/testutil.h"
@ -11,33 +15,45 @@ namespace rocksdb {
class CompactionIteratorTest : public testing::Test {
public:
CompactionIteratorTest() : cmp_(BytewiseComparator()), snapshots_({}) {}
CompactionIteratorTest()
: cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
void InitIterators(const std::vector<std::string>& ks,
const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks,
const std::vector<std::string>& range_del_vs,
SequenceNumber last_sequence) {
std::unique_ptr<InternalIterator> range_del_iter(
new test::VectorIterator(range_del_ks, range_del_vs));
range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_));
range_del_agg_->AddTombstones(std::move(range_del_iter));
void InitIterator(const std::vector<std::string>& ks,
const std::vector<std::string>& vs,
SequenceNumber last_sequence) {
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr,
nullptr, 0U, false, 0));
iter_.reset(new test::VectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false));
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get()));
}
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
const Comparator* cmp_;
const InternalKeyComparator icmp_;
std::vector<SequenceNumber> snapshots_;
std::unique_ptr<MergeHelper> merge_helper_;
std::unique_ptr<test::VectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
std::unique_ptr<RangeDelAggregator> range_del_agg_;
};
// It is possible that the output of the compaction iterator is empty even if
// the input is not.
TEST_F(CompactionIteratorTest, EmptyResult) {
InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue)},
{"", "val"}, 5);
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue)},
{"", "val"}, {}, {}, 5);
c_iter_->SeekToFirst();
ASSERT_FALSE(c_iter_->Valid());
}
@ -45,10 +61,10 @@ TEST_F(CompactionIteratorTest, EmptyResult) {
// If there is a corruption after a single deletion, the corrupted key should
// be preserved.
TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue, true),
test::KeyStr("b", 10, kTypeValue)},
{"", "val", "val2"}, 10);
InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue, true),
test::KeyStr("b", 10, kTypeValue)},
{"", "val", "val2"}, {}, {}, 10);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
@ -63,6 +79,43 @@ TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, SimpleRangeDeletion) {
InitIterators({test::KeyStr("morning", 5, kTypeValue),
test::KeyStr("morning", 2, kTypeValue),
test::KeyStr("night", 3, kTypeValue)},
{"zao", "zao", "wan"},
{test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) {
AddSnapshot(10);
std::vector<std::string> ks1;
ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
std::vector<std::string> vs1{"mz"};
std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
test::KeyStr("morning", 5, kTypeValue),
test::KeyStr("night", 40, kTypeValue),
test::KeyStr("night", 20, kTypeValue)};
std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
InitIterators(ks2, vs2, ks1, vs1, 40);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
}
} // namespace rocksdb
int main(int argc, char** argv) {

View file

@ -658,8 +658,11 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
std::unique_ptr<InternalIterator> input(
versions_->MakeInputIterator(sub_compact->compaction));
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
std::unique_ptr<RangeDelAggregator> range_del_agg(
new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_));
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
sub_compact->compaction, range_del_agg.get()));
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
@ -680,7 +683,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
}
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();
@ -737,7 +739,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
sub_compact->compaction, compaction_filter));
range_del_agg.get(), sub_compact->compaction, compaction_filter));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
const auto& c_iter_stats = c_iter->iter_stats();
@ -766,7 +768,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->ShouldStopBefore(
key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact);
status = FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get());
if (!status.ok()) {
break;
}
@ -843,6 +846,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
}
Status input_status = input->status();
c_iter->Next();
// Close output file if it is big enough
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
@ -850,14 +856,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// and 0.6MB instead of 1MB and 0.2MB)
if (sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input->status(), sub_compact);
const Slice* next_key = nullptr;
if (c_iter->Valid()) {
next_key = &c_iter->key();
}
status = FinishCompactionOutputFile(input_status, sub_compact,
range_del_agg.get(), next_key);
if (sub_compact->outputs.size() == 1) {
// Use dictionary from first output file for compression of subsequent
// files.
sub_compact->compression_dict = std::move(compression_dict);
}
}
c_iter->Next();
}
sub_compact->num_input_records = c_iter_stats.num_input_records;
@ -884,8 +894,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
if (status.ok() && sub_compact->builder == nullptr &&
range_del_agg->ShouldAddTombstones(bottommost_level_)) {
status = OpenCompactionOutputFile(sub_compact);
}
if (status.ok() && sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact);
status = FinishCompactionOutputFile(input->status(), sub_compact,
range_del_agg.get());
}
if (status.ok()) {
status = input->status();
@ -936,7 +951,9 @@ void CompactionJob::RecordDroppedKeys(
}
Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact) {
const Status& input_status, SubcompactionState* sub_compact,
RangeDelAggregator* range_del_agg,
const Slice* next_table_min_key /* = nullptr */) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(sub_compact != nullptr);
@ -951,6 +968,16 @@ Status CompactionJob::FinishCompactionOutputFile(
// Check for iterator errors
Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
if (s.ok()) {
// For the first output table, include range tombstones before the min key
// boundary. For subsequent output tables, this is unnecessary because we
// extend each file's max key boundary up until the next file's min key when
// range tombstones fall in the gap.
range_del_agg->AddToBuilder(
sub_compact->builder.get(),
sub_compact->outputs.size() == 1 /* extend_before_min_key */,
next_table_min_key, meta, bottommost_level_);
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
if (s.ok()) {

View file

@ -25,6 +25,7 @@
#include "db/job_context.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/range_del_aggregator.h"
#include "db/version_edit.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
@ -96,7 +97,9 @@ class CompactionJob {
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact);
SubcompactionState* sub_compact,
RangeDelAggregator* range_del_agg = nullptr,
const Slice* next_table_min_key = nullptr);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);

View file

@ -305,7 +305,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
dbfull()->TEST_WaitForCompact();
// Preloading iterator issues one table cache lookup and creates
// a new table reader. One file is created for flush and one for compaction.
// Compaction inputs make no table cache look-up.
// Compaction inputs make no table cache look-up for data iterators or
// range tombstone iterators since they're already cached.
ASSERT_EQ(num_table_cache_lookup, 2);
// Create new iterator for:
// (1) 1 for verifying flush results

View file

@ -507,23 +507,30 @@ struct RangeTombstone {
explicit RangeTombstone(Slice internal_key, Slice value) {
ParsedInternalKey parsed_key;
if (ParseInternalKey(internal_key, &parsed_key)) {
start_key_ = parsed_key.user_key;
seq_ = parsed_key.sequence;
end_key_ = value;
if (!ParseInternalKey(internal_key, &parsed_key)) {
assert(false);
}
start_key_ = parsed_key.user_key;
seq_ = parsed_key.sequence;
end_key_ = value;
}
// be careful to use Serialize(); InternalKey() allocates new memory
std::pair<InternalKey, Slice> Serialize() {
// be careful to use Serialize(), allocates new memory
std::pair<InternalKey, Slice> Serialize() const {
auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion);
Slice value = end_key_;
return std::make_pair(std::move(key), std::move(value));
}
InternalKey SerializeKey() {
// be careful to use SerializeKey(), allocates new memory
InternalKey SerializeKey() const {
return InternalKey(start_key_, seq_, kTypeRangeDeletion);
}
// be careful to use SerializeEndKey(), allocates new memory
InternalKey SerializeEndKey() const {
return InternalKey(end_key_, seq_, kTypeRangeDeletion);
}
};
} // namespace rocksdb

View file

@ -75,6 +75,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
// operands_ stores the list of merge operands encountered while merging.
// keys_[i] corresponds to operands_[i] for each i.
Status MergeHelper::MergeUntil(InternalIterator* iter,
RangeDelAggregator* range_del_agg,
const SequenceNumber stop_before,
const bool at_bottom) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
@ -171,6 +172,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
} else {
// hit a merge
// => if there is a compaction filter, apply it.
// => check for range tombstones covering the operand
// => merge the operand into the front of the operands_ list
// if not filtered
// => then continue because we haven't yet seen a Put/Delete.
@ -183,8 +185,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// 1) it's included in one of the snapshots. in that case we *must* write
// it out, no matter what compaction filter says
// 2) it's not filtered by a compaction filter
if (ikey.sequence <= latest_snapshot_ ||
!FilterMerge(orig_ikey.user_key, value_slice)) {
if ((ikey.sequence <= latest_snapshot_ ||
!FilterMerge(orig_ikey.user_key, value_slice)) &&
(range_del_agg == nullptr ||
!range_del_agg->ShouldDelete(iter->key()))) {
if (original_key_is_iter) {
// this is just an optimization that saves us one memcpy
keys_.push_front(std::move(original_key));

View file

@ -12,6 +12,7 @@
#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
@ -71,6 +72,7 @@ class MergeHelper {
// or - the end of iteration
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
// range_del_agg: (IN) filters merge operands covered by range tombstones.
// stop_before: (IN) a sequence number that merge should not cross.
// 0 means no restriction
// at_bottom: (IN) true if the iterator covers the bottem level, which means
@ -85,6 +87,7 @@ class MergeHelper {
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(InternalIterator* iter,
RangeDelAggregator* range_del_agg = nullptr,
const SequenceNumber stop_before = 0,
const bool at_bottom = false);

View file

@ -29,7 +29,8 @@ class MergeHelperTest : public testing::Test {
merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(),
merge_op_.get(), filter_.get(), nullptr,
2U, false, latest_snapshot));
return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom);
return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */,
stop_before, at_bottom);
}
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,

195
db/range_del_aggregator.cc Normal file
View file

@ -0,0 +1,195 @@
// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/range_del_aggregator.h"
#include <algorithm>
namespace rocksdb {
RangeDelAggregator::RangeDelAggregator(
const InternalKeyComparator& icmp,
const std::vector<SequenceNumber>& snapshots)
: icmp_(icmp) {
pinned_iters_mgr_.StartPinning();
for (auto snapshot : snapshots) {
stripe_map_.emplace(
snapshot,
TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator())));
}
// Data newer than any snapshot falls in this catch-all stripe
stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap());
}
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key,
bool for_compaction /* = false */) {
ParsedInternalKey parsed;
if (!ParseInternalKey(internal_key, &parsed)) {
assert(false);
}
assert(IsValueType(parsed.type));
// Starting point is the snapshot stripe in which the key lives, then need to
// search all earlier stripes too, unless it's for compaction.
for (auto stripe_map_iter = GetStripeMapIter(parsed.sequence);
stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) {
const auto& tombstone_map = stripe_map_iter->second;
for (const auto& start_key_and_tombstone : tombstone_map) {
const auto& tombstone = start_key_and_tombstone.second;
if (icmp_.user_comparator()->Compare(parsed.user_key,
tombstone.start_key_) < 0) {
break;
}
if (parsed.sequence < tombstone.seq_ &&
icmp_.user_comparator()->Compare(parsed.user_key,
tombstone.end_key_) <= 0) {
return true;
}
}
if (for_compaction) {
break;
}
}
return false;
}
bool RangeDelAggregator::ShouldAddTombstones(
bool bottommost_level /* = false */) {
auto stripe_map_iter = stripe_map_.begin();
assert(stripe_map_iter != stripe_map_.end());
if (bottommost_level) {
// For the bottommost level, keys covered by tombstones in the first
// (oldest) stripe have been compacted away, so the tombstones are obsolete.
++stripe_map_iter;
}
while (stripe_map_iter != stripe_map_.end()) {
if (!stripe_map_iter->second.empty()) {
return true;
}
++stripe_map_iter;
}
return false;
}
void RangeDelAggregator::AddTombstones(ScopedArenaIterator input) {
AddTombstones(input.release(), true /* arena */);
}
void RangeDelAggregator::AddTombstones(
std::unique_ptr<InternalIterator> input) {
AddTombstones(input.release(), false /* arena */);
}
void RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
pinned_iters_mgr_.PinIterator(input, arena);
input->SeekToFirst();
while (input->Valid()) {
RangeTombstone tombstone(input->key(), input->value());
auto& tombstone_map = GetStripeMapIter(tombstone.seq_)->second;
tombstone_map.emplace(tombstone.start_key_.ToString(),
std::move(tombstone));
input->Next();
}
}
RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter(
SequenceNumber seq) {
// The stripe includes seqnum for the snapshot above and excludes seqnum for
// the snapshot below.
StripeMap::iterator iter;
if (seq > 0) {
// upper_bound() checks strict inequality so need to subtract one
iter = stripe_map_.upper_bound(seq - 1);
} else {
iter = stripe_map_.begin();
}
// catch-all stripe justifies this assertion in either of above cases
assert(iter != stripe_map_.end());
return iter;
}
// TODO(andrewkr): We should implement an iterator over range tombstones in our
// map. It'd enable compaction to open tables on-demand, i.e., only once range
// tombstones are known to be available, without the code duplication we have
// in ShouldAddTombstones(). It'll also allow us to move the table-modifying
// code into more coherent places: CompactionJob and BuildTable().
void RangeDelAggregator::AddToBuilder(TableBuilder* builder,
bool extend_before_min_key,
const Slice* next_table_min_key,
FileMetaData* meta,
bool bottommost_level /* = false */) {
auto stripe_map_iter = stripe_map_.begin();
assert(stripe_map_iter != stripe_map_.end());
if (bottommost_level) {
// For the bottommost level, keys covered by tombstones in the first
// (oldest) stripe have been compacted away, so the tombstones are obsolete.
++stripe_map_iter;
}
// Note the order in which tombstones are stored is insignificant since we
// insert them into a std::map on the read path.
bool first_added = false;
while (stripe_map_iter != stripe_map_.end()) {
for (const auto& start_key_and_tombstone : stripe_map_iter->second) {
const auto& tombstone = start_key_and_tombstone.second;
if (next_table_min_key != nullptr &&
icmp_.user_comparator()->Compare(*next_table_min_key,
tombstone.start_key_) < 0) {
// Tombstones starting after next_table_min_key only need to be included
// in the next table.
break;
}
if (!extend_before_min_key && meta->smallest.size() != 0 &&
icmp_.user_comparator()->Compare(tombstone.end_key_,
meta->smallest.user_key()) < 0) {
// Tombstones ending before this table's smallest key can conditionally
// be excluded, e.g., when this table is a non-first compaction output,
// we know such tombstones are included in the previous table. In that
// case extend_before_min_key would be false.
continue;
}
auto ikey_and_end_key = tombstone.Serialize();
builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
if (!first_added) {
first_added = true;
if (extend_before_min_key &&
(meta->smallest.size() == 0 ||
icmp_.Compare(ikey_and_end_key.first, meta->smallest) < 0)) {
meta->smallest = ikey_and_end_key.first;
}
}
auto end_ikey = tombstone.SerializeEndKey();
if (meta->largest.size() == 0 ||
icmp_.Compare(meta->largest, end_ikey) < 0) {
if (next_table_min_key != nullptr &&
icmp_.Compare(*next_table_min_key, end_ikey.Encode()) < 0) {
// Pretend the largest key has the same user key as the min key in the
// following table in order for files to appear key-space partitioned.
// Choose highest seqnum so this file's largest comes before the next
// file's smallest. The fake seqnum is OK because the read path's
// file-picking code only considers the user key portion.
//
// Note Seek() also creates InternalKey with (user_key,
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key.
ParsedInternalKey parsed;
ParseInternalKey(*next_table_min_key, &parsed);
meta->largest = InternalKey(parsed.user_key, kMaxSequenceNumber,
kTypeRangeDeletion);
} else {
meta->largest = std::move(end_ikey);
}
}
meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_);
meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_);
}
++stripe_map_iter;
}
}
} // namespace rocksdb

53
db/range_del_aggregator.h Normal file
View file

@ -0,0 +1,53 @@
// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <map>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "db/version_edit.h"
#include "include/rocksdb/comparator.h"
#include "include/rocksdb/types.h"
#include "table/internal_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "table/table_builder.h"
#include "util/kv_map.h"
namespace rocksdb {
class RangeDelAggregator {
public:
RangeDelAggregator(const InternalKeyComparator& icmp,
const std::vector<SequenceNumber>& snapshots);
bool ShouldDelete(const Slice& internal_key, bool for_compaction = false);
bool ShouldAddTombstones(bool bottommost_level = false);
void AddTombstones(ScopedArenaIterator input);
void AddTombstones(std::unique_ptr<InternalIterator> input);
// write tombstones covering a range to a table builder
// usually don't add to a max-level table builder
void AddToBuilder(TableBuilder* builder, bool extend_before_min_key,
const Slice* next_table_min_key, FileMetaData* meta,
bool bottommost_level = false);
private:
// Maps tombstone start key -> tombstone object
typedef std::map<std::string, RangeTombstone, stl_wrappers::LessOfComparator>
TombstoneMap;
// Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e.,
// their seqnums are greater than the next smaller snapshot's seqnum.
typedef std::map<SequenceNumber, TombstoneMap> StripeMap;
void AddTombstones(InternalIterator* input, bool arena);
StripeMap::iterator GetStripeMapIter(SequenceNumber seq);
PinnedIteratorsManager pinned_iters_mgr_;
StripeMap stripe_map_;
const InternalKeyComparator icmp_;
};
} // namespace rocksdb

View file

@ -170,7 +170,10 @@ InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
bool for_compaction, Arena* arena, bool skip_filters, int level) {
bool for_compaction, Arena* arena, bool skip_filters, int level,
RangeDelAggregator* range_del_agg /* = nullptr */,
bool is_range_del_only /* = false */) {
assert(!is_range_del_only || range_del_agg != nullptr);
PERF_TIMER_GUARD(new_table_iterator_nanos);
if (table_reader_ptr != nullptr) {
@ -182,14 +185,18 @@ InternalIterator* TableCache::NewIterator(
size_t readahead = 0;
bool create_new_table_reader = false;
if (for_compaction) {
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;
// pointless to create a new table reader for range tombstones only since the
// reader isn't reused
if (!is_range_del_only) {
if (for_compaction) {
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;
}
} else {
readahead = options.readahead_size;
create_new_table_reader = readahead > 0;
}
} else {
readahead = options.readahead_size;
create_new_table_reader = readahead > 0;
}
if (create_new_table_reader) {
@ -216,23 +223,36 @@ InternalIterator* TableCache::NewIterator(
}
}
InternalIterator* result =
table_reader->NewIterator(options, arena, skip_filters);
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
} else if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (range_del_agg != nullptr) {
std::unique_ptr<InternalIterator> iter(
table_reader->NewRangeTombstoneIterator(options));
range_del_agg->AddTombstones(std::move(iter));
}
if (for_compaction) {
table_reader->SetupForCompaction();
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
}
InternalIterator* result = nullptr;
if (!is_range_del_only) {
result = table_reader->NewIterator(options, arena, skip_filters);
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
} else if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}
if (for_compaction) {
table_reader->SetupForCompaction();
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
}
} else {
assert(!create_new_table_reader);
// don't need the table reader at all since the iterator over the meta-block
// doesn't require it
if (handle != nullptr) {
UnrefEntry(cache_, handle);
}
}
return result;
}

View file

@ -15,6 +15,7 @@
#include <stdint.h>
#include "db/dbformat.h"
#include "db/range_del_aggregator.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
@ -47,12 +48,18 @@ class TableCache {
// returned iterator is live.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
// @param range_del_agg When non-nullptr, creates a range tombstone iterator
// over this file's meta-block and gives it to this object
// @param is_range_del_only When set, this function only gives a range
// tombstone iterator to range_del_agg and then returns nullptr
InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr,
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false, int level = -1);
Arena* arena = nullptr, bool skip_filters = false, int level = -1,
RangeDelAggregator* range_del_agg = nullptr,
bool is_range_del_only = false);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until

View file

@ -497,7 +497,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
HistogramImpl* file_read_hist, bool for_compaction,
bool prefix_enabled, bool skip_filters, int level)
bool prefix_enabled, bool skip_filters, int level,
RangeDelAggregator* range_del_agg)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache),
read_options_(read_options),
@ -506,7 +507,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
file_read_hist_(file_read_hist),
for_compaction_(for_compaction),
skip_filters_(skip_filters),
level_(level) {}
level_(level),
range_del_agg_(range_del_agg) {}
InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
if (meta_handle.size() != sizeof(FileDescriptor)) {
@ -518,7 +520,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr /* don't need reference to table*/, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_);
for_compaction_, nullptr /* arena */, skip_filters_, level_,
range_del_agg_, false /* is_range_del_only */);
}
}
@ -535,6 +538,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
bool for_compaction_;
bool skip_filters_;
int level_;
RangeDelAggregator* range_del_agg_;
};
// A wrapper of version builder which references the current version in
@ -826,13 +830,13 @@ void Version::AddIterators(const ReadOptions& read_options,
for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) {
if (storage_info_.LevelFilesBrief(level).num_files != 0) {
auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState));
auto* state = new (mem)
LevelFileIteratorState(cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level), level);
auto* state = new (mem) LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level),
level, nullptr /* range_del_agg */);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
@ -3343,7 +3347,8 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
}
}
InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
InternalIterator* VersionSet::MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg) {
auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums =
@ -3371,7 +3376,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
cfd->internal_comparator(), flevel->files[i].fd, nullptr,
nullptr, /* no per level latency histogram*/
true /* for_compaction */, nullptr /* arena */,
false /* skip_filters */, (int)which /* level */);
false /* skip_filters */, (int)which /* level */, range_del_agg);
}
} else {
// Create concatenating iterator for the files from this level
@ -3381,7 +3386,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
cfd->internal_comparator(),
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* prefix enabled */,
false /* skip_filters */, (int)which /* level */),
false /* skip_filters */, (int)which /* level */,
range_del_agg),
new LevelFileNumIterator(cfd->internal_comparator(),
c->input_levels(which)));
}

View file

@ -695,7 +695,8 @@ class VersionSet {
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
InternalIterator* MakeInputIterator(const Compaction* c);
InternalIterator* MakeInputIterator(const Compaction* c,
RangeDelAggregator* range_del_agg);
// Add all files listed in any live version to *live.
void AddLiveFiles(std::vector<FileDescriptor>* live_list);

1
src.mk
View file

@ -10,6 +10,7 @@ LIB_SOURCES = \
db/compaction_job.cc \
db/compaction_picker.cc \
db/convenience.cc \
db/range_del_aggregator.cc \
db/db_filesnapshot.cc \
db/dbformat.cc \
db/db_impl.cc \

View file

@ -86,13 +86,14 @@ class BlockBasedTableBuilder : public TableBuilder {
private:
bool ok() const { return status().ok(); }
// Call block's Finish() method and then write the finalize block contents to
// file.
// Call block's Finish() method
// and then write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
// Directly write block content to the file.
// Compress and write block content to the file.
void WriteBlock(const Slice& block_contents, BlockHandle* handle,
bool is_data_block);
// Directly write data to the file.
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);
Status InsertBlockInCache(const Slice& block_contents,
const CompressionType type,

View file

@ -121,7 +121,10 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
size_t meta_bytes_buf_size = 0;
ParsedInternalKey internal_key;
ParseInternalKey(key, &internal_key);
if (!ParseInternalKey(key, &internal_key)) {
assert(false);
return;
}
if (internal_key.type == kTypeRangeDeletion) {
status_ = Status::NotSupported("Range deletion unsupported");
return;

View file

@ -40,10 +40,16 @@ class ScopedArenaIterator {
}
InternalIterator* operator->() { return iter_; }
InternalIterator* get() { return iter_; }
void set(InternalIterator* iter) { reset(iter); }
InternalIterator* get() { return iter_; }
InternalIterator* release() {
assert(iter_ != nullptr);
auto* res = iter_;
iter_ = nullptr;
return res;
}
~ScopedArenaIterator() {
reset(nullptr);