mirror of https://github.com/facebook/rocksdb.git
Remove stale entries from L0 files when UDT is not persisted (#13035)
Summary: When user-defined timestamps are not persisted, currently we replace the actual timestamp with min timestamp after an entry is output from compaction iterator. Compaction iterator won't be able to help with removing stale entries this way. This PR adds a wrapper iterator `TimestampStrippingIterator` for `MemTableIterator` that does the min timestamp replacement at the memtable iteration step. It is used by flush and can help remove stale entries from landing in L0 files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13035 Test Plan: Added unit test Reviewed By: pdillinger, cbi42 Differential Revision: D63423682 Pulled By: jowlyzhang fbshipit-source-id: 087dcc9cee97b9ea51b8d2b88dc91c2984d54e55
This commit is contained in:
parent
f7237e3395
commit
8592517c89
|
@ -206,10 +206,6 @@ Status BuildTable(
|
||||||
/*compaction=*/nullptr, compaction_filter.get(),
|
/*compaction=*/nullptr, compaction_filter.get(),
|
||||||
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
|
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
|
||||||
|
|
||||||
const size_t ts_sz = ucmp->timestamp_size();
|
|
||||||
const bool logical_strip_timestamp =
|
|
||||||
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;
|
|
||||||
|
|
||||||
SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber;
|
SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber;
|
||||||
std::string key_after_flush_buf;
|
std::string key_after_flush_buf;
|
||||||
std::string value_buf;
|
std::string value_buf;
|
||||||
|
@ -222,16 +218,6 @@ Status BuildTable(
|
||||||
Slice key_after_flush = key_after_flush_buf;
|
Slice key_after_flush = key_after_flush_buf;
|
||||||
Slice value_after_flush = value;
|
Slice value_after_flush = value;
|
||||||
|
|
||||||
// If user defined timestamps will be stripped from user key after flush,
|
|
||||||
// the in memory version of the key act logically the same as one with a
|
|
||||||
// minimum timestamp. We update the timestamp here so file boundary and
|
|
||||||
// output validator, block builder all see the effect of the stripping.
|
|
||||||
if (logical_strip_timestamp) {
|
|
||||||
key_after_flush_buf.clear();
|
|
||||||
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
|
|
||||||
key_after_flush = key_after_flush_buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ikey.type == kTypeValuePreferredSeqno) {
|
if (ikey.type == kTypeValuePreferredSeqno) {
|
||||||
auto [unpacked_value, unix_write_time] =
|
auto [unpacked_value, unix_write_time] =
|
||||||
ParsePackedValueWithWriteTime(value);
|
ParsePackedValueWithWriteTime(value);
|
||||||
|
@ -291,11 +277,7 @@ Status BuildTable(
|
||||||
Slice last_tombstone_start_user_key{};
|
Slice last_tombstone_start_user_key{};
|
||||||
for (range_del_it->SeekToFirst(); range_del_it->Valid();
|
for (range_del_it->SeekToFirst(); range_del_it->Valid();
|
||||||
range_del_it->Next()) {
|
range_del_it->Next()) {
|
||||||
// When user timestamp should not be persisted, we logically strip a
|
auto tombstone = range_del_it->Tombstone();
|
||||||
// range tombstone's start and end key's timestamp (replace it with min
|
|
||||||
// timestamp) before passing them along to table builder and to update
|
|
||||||
// file boundaries.
|
|
||||||
auto tombstone = range_del_it->Tombstone(logical_strip_timestamp);
|
|
||||||
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
|
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
|
||||||
builder->Add(kv.first.Encode(), kv.second);
|
builder->Add(kv.first.Encode(), kv.second);
|
||||||
InternalKey tombstone_end = tombstone.SerializeEndKey();
|
InternalKey tombstone_end = tombstone.SerializeEndKey();
|
||||||
|
|
|
@ -3876,6 +3876,91 @@ TEST_F(ManualFlushSkipRetainUDTTest, ManualFlush) {
|
||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ManualFlushSkipRetainUDTTest, FlushRemovesStaleEntries) {
|
||||||
|
column_family_options_.max_write_buffer_number = 4;
|
||||||
|
Open();
|
||||||
|
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
|
||||||
|
|
||||||
|
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
|
||||||
|
ColumnFamilyData* cfd =
|
||||||
|
static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
|
||||||
|
for (int version = 0; version < 100; version++) {
|
||||||
|
if (version == 50) {
|
||||||
|
ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable(cfd));
|
||||||
|
}
|
||||||
|
ASSERT_OK(
|
||||||
|
Put(0, "foo", EncodeAsUint64(version), "v" + std::to_string(version)));
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(Flush(0));
|
||||||
|
TablePropertiesCollection tables_properties;
|
||||||
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&tables_properties));
|
||||||
|
ASSERT_EQ(1, tables_properties.size());
|
||||||
|
std::shared_ptr<const TableProperties> table_properties =
|
||||||
|
tables_properties.begin()->second;
|
||||||
|
ASSERT_EQ(1, table_properties->num_entries);
|
||||||
|
ASSERT_EQ(0, table_properties->num_deletions);
|
||||||
|
ASSERT_EQ(0, table_properties->num_range_deletions);
|
||||||
|
CheckEffectiveCutoffTime(100);
|
||||||
|
CheckAutomaticFlushRetainUDT(101);
|
||||||
|
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ManualFlushSkipRetainUDTTest, RangeDeletionFlushRemovesStaleEntries) {
|
||||||
|
column_family_options_.max_write_buffer_number = 4;
|
||||||
|
Open();
|
||||||
|
// TODO(yuzhangyu): a non 0 full history ts low is needed for this garbage
|
||||||
|
// collection to kick in. This doesn't work well for the very first flush of
|
||||||
|
// the column family. Not a big issue, but would be nice to improve this.
|
||||||
|
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(9)));
|
||||||
|
|
||||||
|
for (int i = 10; i < 100; i++) {
|
||||||
|
ASSERT_OK(Put(0, "foo" + std::to_string(i), EncodeAsUint64(i),
|
||||||
|
"val" + std::to_string(i)));
|
||||||
|
if (i % 2 == 1) {
|
||||||
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), "foo" + std::to_string(i - 1),
|
||||||
|
"foo" + std::to_string(i), EncodeAsUint64(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(Flush(0));
|
||||||
|
CheckEffectiveCutoffTime(100);
|
||||||
|
std::string read_ts = EncodeAsUint64(100);
|
||||||
|
std::string min_ts = EncodeAsUint64(0);
|
||||||
|
ReadOptions ropts;
|
||||||
|
Slice read_ts_slice = read_ts;
|
||||||
|
std::string value;
|
||||||
|
ropts.timestamp = &read_ts_slice;
|
||||||
|
{
|
||||||
|
Iterator* iter = db_->NewIterator(ropts);
|
||||||
|
iter->SeekToFirst();
|
||||||
|
int i = 11;
|
||||||
|
while (iter->Valid()) {
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("foo" + std::to_string(i), iter->key());
|
||||||
|
ASSERT_EQ("val" + std::to_string(i), iter->value());
|
||||||
|
ASSERT_EQ(min_ts, iter->timestamp());
|
||||||
|
iter->Next();
|
||||||
|
i += 2;
|
||||||
|
}
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
delete iter;
|
||||||
|
}
|
||||||
|
TablePropertiesCollection tables_properties;
|
||||||
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&tables_properties));
|
||||||
|
ASSERT_EQ(1, tables_properties.size());
|
||||||
|
std::shared_ptr<const TableProperties> table_properties =
|
||||||
|
tables_properties.begin()->second;
|
||||||
|
// 45 point data + 45 range deletions. 45 obsolete point data are garbage
|
||||||
|
// collected.
|
||||||
|
ASSERT_EQ(90, table_properties->num_entries);
|
||||||
|
ASSERT_EQ(45, table_properties->num_deletions);
|
||||||
|
ASSERT_EQ(45, table_properties->num_range_deletions);
|
||||||
|
|
||||||
|
Close();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ManualFlushSkipRetainUDTTest, ManualCompaction) {
|
TEST_F(ManualFlushSkipRetainUDTTest, ManualCompaction) {
|
||||||
Open();
|
Open();
|
||||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
|
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
|
||||||
|
|
|
@ -1667,9 +1667,18 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||||
Arena arena;
|
Arena arena;
|
||||||
Status s;
|
Status s;
|
||||||
TableProperties table_properties;
|
TableProperties table_properties;
|
||||||
|
const auto* ucmp = cfd->internal_comparator().user_comparator();
|
||||||
|
assert(ucmp);
|
||||||
|
const size_t ts_sz = ucmp->timestamp_size();
|
||||||
|
const bool logical_strip_timestamp =
|
||||||
|
ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps;
|
||||||
{
|
{
|
||||||
ScopedArenaPtr<InternalIterator> iter(
|
ScopedArenaPtr<InternalIterator> iter(
|
||||||
mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
|
logical_strip_timestamp
|
||||||
|
? mem->NewTimestampStrippingIterator(
|
||||||
|
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
|
||||||
|
/*prefix_extractor=*/nullptr, ts_sz)
|
||||||
|
: mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
|
||||||
/*prefix_extractor=*/nullptr));
|
/*prefix_extractor=*/nullptr));
|
||||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
||||||
"[%s] [WriteLevel0TableForRecovery]"
|
"[%s] [WriteLevel0TableForRecovery]"
|
||||||
|
@ -1705,10 +1714,13 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||||
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||||
range_del_iters;
|
range_del_iters;
|
||||||
auto range_del_iter =
|
auto range_del_iter =
|
||||||
// This is called during recovery, where a live memtable is flushed
|
logical_strip_timestamp
|
||||||
// directly. In this case, no fragmented tombstone list is cached in
|
? mem->NewTimestampStrippingRangeTombstoneIterator(
|
||||||
// this memtable yet.
|
ro, kMaxSequenceNumber, ts_sz)
|
||||||
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
|
// This is called during recovery, where a live memtable is
|
||||||
|
// flushed directly. In this case, no fragmented tombstone list is
|
||||||
|
// cached in this memtable yet.
|
||||||
|
: mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
|
||||||
false /* immutable_memtable */);
|
false /* immutable_memtable */);
|
||||||
if (range_del_iter != nullptr) {
|
if (range_del_iter != nullptr) {
|
||||||
range_del_iters.emplace_back(range_del_iter);
|
range_del_iters.emplace_back(range_del_iter);
|
||||||
|
@ -1795,9 +1807,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||||
|
|
||||||
// For UDT in memtable only feature, move up the cutoff timestamp whenever
|
// For UDT in memtable only feature, move up the cutoff timestamp whenever
|
||||||
// a flush happens.
|
// a flush happens.
|
||||||
const Comparator* ucmp = cfd->user_comparator();
|
if (logical_strip_timestamp) {
|
||||||
size_t ts_sz = ucmp->timestamp_size();
|
|
||||||
if (ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps) {
|
|
||||||
Slice mem_newest_udt = mem->GetNewestUDT();
|
Slice mem_newest_udt = mem->GetNewestUDT();
|
||||||
std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
|
std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
|
||||||
if (full_history_ts_low.empty() ||
|
if (full_history_ts_low.empty() ||
|
||||||
|
|
|
@ -1027,22 +1027,13 @@ struct RangeTombstone {
|
||||||
// User-defined timestamp is enabled, `sk` and `ek` should be user key
|
// User-defined timestamp is enabled, `sk` and `ek` should be user key
|
||||||
// with timestamp, `ts` will replace the timestamps in `sk` and
|
// with timestamp, `ts` will replace the timestamps in `sk` and
|
||||||
// `ek`.
|
// `ek`.
|
||||||
// When `logical_strip_timestamp` is true, the timestamps in `sk` and `ek`
|
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts) : seq_(sn) {
|
||||||
// will be replaced with min timestamp.
|
|
||||||
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts,
|
|
||||||
bool logical_strip_timestamp)
|
|
||||||
: seq_(sn) {
|
|
||||||
const size_t ts_sz = ts.size();
|
const size_t ts_sz = ts.size();
|
||||||
assert(ts_sz > 0);
|
assert(ts_sz > 0);
|
||||||
pinned_start_key_.reserve(sk.size());
|
pinned_start_key_.reserve(sk.size());
|
||||||
pinned_end_key_.reserve(ek.size());
|
pinned_end_key_.reserve(ek.size());
|
||||||
if (logical_strip_timestamp) {
|
|
||||||
AppendUserKeyWithMinTimestamp(&pinned_start_key_, sk, ts_sz);
|
|
||||||
AppendUserKeyWithMinTimestamp(&pinned_end_key_, ek, ts_sz);
|
|
||||||
} else {
|
|
||||||
AppendUserKeyWithDifferentTimestamp(&pinned_start_key_, sk, ts);
|
AppendUserKeyWithDifferentTimestamp(&pinned_start_key_, sk, ts);
|
||||||
AppendUserKeyWithDifferentTimestamp(&pinned_end_key_, ek, ts);
|
AppendUserKeyWithDifferentTimestamp(&pinned_end_key_, ek, ts);
|
||||||
}
|
|
||||||
start_key_ = pinned_start_key_;
|
start_key_ = pinned_start_key_;
|
||||||
end_key_ = pinned_end_key_;
|
end_key_ = pinned_end_key_;
|
||||||
ts_ = Slice(pinned_start_key_.data() + sk.size() - ts_sz, ts_sz);
|
ts_ = Slice(pinned_start_key_.data() + sk.size() - ts_sz, ts_sz);
|
||||||
|
|
|
@ -858,6 +858,12 @@ Status FlushJob::WriteLevel0Table() {
|
||||||
meta_.temperature = mutable_cf_options_.default_write_temperature;
|
meta_.temperature = mutable_cf_options_.default_write_temperature;
|
||||||
file_options_.temperature = meta_.temperature;
|
file_options_.temperature = meta_.temperature;
|
||||||
|
|
||||||
|
const auto* ucmp = cfd_->internal_comparator().user_comparator();
|
||||||
|
assert(ucmp);
|
||||||
|
const size_t ts_sz = ucmp->timestamp_size();
|
||||||
|
const bool logical_strip_timestamp =
|
||||||
|
ts_sz > 0 && !cfd_->ioptions()->persist_user_defined_timestamps;
|
||||||
|
|
||||||
std::vector<BlobFileAddition> blob_file_additions;
|
std::vector<BlobFileAddition> blob_file_additions;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -893,10 +899,21 @@ Status FlushJob::WriteLevel0Table() {
|
||||||
db_options_.info_log,
|
db_options_.info_log,
|
||||||
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
||||||
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
||||||
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
|
if (logical_strip_timestamp) {
|
||||||
&arena, /*prefix_extractor=*/nullptr));
|
memtables.push_back(m->NewTimestampStrippingIterator(
|
||||||
auto* range_del_iter = m->NewRangeTombstoneIterator(
|
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
|
||||||
ro, kMaxSequenceNumber, true /* immutable_memtable */);
|
/*prefix_extractor=*/nullptr, ts_sz));
|
||||||
|
} else {
|
||||||
|
memtables.push_back(
|
||||||
|
m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
|
||||||
|
/*prefix_extractor=*/nullptr));
|
||||||
|
}
|
||||||
|
auto* range_del_iter =
|
||||||
|
logical_strip_timestamp
|
||||||
|
? m->NewTimestampStrippingRangeTombstoneIterator(
|
||||||
|
ro, kMaxSequenceNumber, ts_sz)
|
||||||
|
: m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
|
||||||
|
true /* immutable_memtable */);
|
||||||
if (range_del_iter != nullptr) {
|
if (range_del_iter != nullptr) {
|
||||||
range_del_iters.emplace_back(range_del_iter);
|
range_del_iters.emplace_back(range_del_iter);
|
||||||
}
|
}
|
||||||
|
|
|
@ -874,10 +874,16 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) {
|
||||||
expected_full_history_ts_low = full_history_ts_low;
|
expected_full_history_ts_low = full_history_ts_low;
|
||||||
}
|
}
|
||||||
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
|
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
|
||||||
|
if (!persist_udt_) {
|
||||||
|
InternalKey largest(largest_key, curr_seq_ - 1, ValueType::kTypeValue);
|
||||||
|
CheckFileMetaData(cfd, smallest, largest, &fmeta);
|
||||||
|
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
|
||||||
|
} else {
|
||||||
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
|
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
|
||||||
CheckFileMetaData(cfd, smallest, largest, &fmeta);
|
CheckFileMetaData(cfd, smallest, largest, &fmeta);
|
||||||
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
|
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
job_context.Clean();
|
job_context.Clean();
|
||||||
ASSERT_TRUE(to_delete.empty());
|
ASSERT_TRUE(to_delete.empty());
|
||||||
}
|
}
|
||||||
|
|
153
db/memtable.cc
153
db/memtable.cc
|
@ -613,6 +613,135 @@ InternalIterator* MemTable::NewIterator(
|
||||||
seqno_to_time_mapping, arena, prefix_extractor);
|
seqno_to_time_mapping, arena, prefix_extractor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// An iterator wrapper that wraps a MemTableIterator and logically strips each
|
||||||
|
// key's user-defined timestamp.
|
||||||
|
class TimestampStrippingIterator : public InternalIterator {
|
||||||
|
public:
|
||||||
|
TimestampStrippingIterator(
|
||||||
|
MemTableIterator::Kind kind, const MemTable& memtable,
|
||||||
|
const ReadOptions& read_options,
|
||||||
|
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
|
||||||
|
const SliceTransform* cf_prefix_extractor, size_t ts_sz)
|
||||||
|
: arena_mode_(arena != nullptr), kind_(kind), ts_sz_(ts_sz) {
|
||||||
|
assert(ts_sz_ != 0);
|
||||||
|
void* mem = arena ? arena->AllocateAligned(sizeof(MemTableIterator)) :
|
||||||
|
operator new(sizeof(MemTableIterator));
|
||||||
|
iter_ = new (mem)
|
||||||
|
MemTableIterator(kind, memtable, read_options, seqno_to_time_mapping,
|
||||||
|
arena, cf_prefix_extractor);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No copying allowed
|
||||||
|
TimestampStrippingIterator(const TimestampStrippingIterator&) = delete;
|
||||||
|
void operator=(const TimestampStrippingIterator&) = delete;
|
||||||
|
|
||||||
|
~TimestampStrippingIterator() override {
|
||||||
|
if (arena_mode_) {
|
||||||
|
iter_->~MemTableIterator();
|
||||||
|
} else {
|
||||||
|
delete iter_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
|
||||||
|
iter_->SetPinnedItersMgr(pinned_iters_mgr);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Valid() const override { return iter_->Valid(); }
|
||||||
|
void Seek(const Slice& k) override {
|
||||||
|
iter_->Seek(k);
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
void SeekForPrev(const Slice& k) override {
|
||||||
|
iter_->SeekForPrev(k);
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
void SeekToFirst() override {
|
||||||
|
iter_->SeekToFirst();
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
void SeekToLast() override {
|
||||||
|
iter_->SeekToLast();
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
void Next() override {
|
||||||
|
iter_->Next();
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
bool NextAndGetResult(IterateResult* result) override {
|
||||||
|
iter_->Next();
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
bool is_valid = Valid();
|
||||||
|
if (is_valid) {
|
||||||
|
result->key = key();
|
||||||
|
result->bound_check_result = IterBoundCheck::kUnknown;
|
||||||
|
result->value_prepared = true;
|
||||||
|
}
|
||||||
|
return is_valid;
|
||||||
|
}
|
||||||
|
void Prev() override {
|
||||||
|
iter_->Prev();
|
||||||
|
UpdateKeyAndValueBuffer();
|
||||||
|
}
|
||||||
|
Slice key() const override {
|
||||||
|
assert(Valid());
|
||||||
|
return key_buf_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t write_unix_time() const override { return iter_->write_unix_time(); }
|
||||||
|
Slice value() const override {
|
||||||
|
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
|
||||||
|
return value_buf_;
|
||||||
|
}
|
||||||
|
return iter_->value();
|
||||||
|
}
|
||||||
|
Status status() const override { return iter_->status(); }
|
||||||
|
bool IsKeyPinned() const override {
|
||||||
|
// Key is only in a buffer that is updated in each iteration.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
bool IsValuePinned() const override {
|
||||||
|
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return iter_->IsValuePinned();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void UpdateKeyAndValueBuffer() {
|
||||||
|
key_buf_.clear();
|
||||||
|
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
|
||||||
|
value_buf_.clear();
|
||||||
|
}
|
||||||
|
if (!Valid()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Slice original_key = iter_->key();
|
||||||
|
ReplaceInternalKeyWithMinTimestamp(&key_buf_, original_key, ts_sz_);
|
||||||
|
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
|
||||||
|
Slice original_value = iter_->value();
|
||||||
|
AppendUserKeyWithMinTimestamp(&value_buf_, original_value, ts_sz_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bool arena_mode_;
|
||||||
|
MemTableIterator::Kind kind_;
|
||||||
|
size_t ts_sz_;
|
||||||
|
MemTableIterator* iter_;
|
||||||
|
std::string key_buf_;
|
||||||
|
std::string value_buf_;
|
||||||
|
};
|
||||||
|
|
||||||
|
InternalIterator* MemTable::NewTimestampStrippingIterator(
|
||||||
|
const ReadOptions& read_options,
|
||||||
|
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
|
||||||
|
const SliceTransform* prefix_extractor, size_t ts_sz) {
|
||||||
|
assert(arena != nullptr);
|
||||||
|
auto mem = arena->AllocateAligned(sizeof(TimestampStrippingIterator));
|
||||||
|
return new (mem) TimestampStrippingIterator(
|
||||||
|
MemTableIterator::kPointEntries, *this, read_options,
|
||||||
|
seqno_to_time_mapping, arena, prefix_extractor, ts_sz);
|
||||||
|
}
|
||||||
|
|
||||||
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
|
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
|
||||||
const ReadOptions& read_options, SequenceNumber read_seq,
|
const ReadOptions& read_options, SequenceNumber read_seq,
|
||||||
bool immutable_memtable) {
|
bool immutable_memtable) {
|
||||||
|
@ -624,6 +753,30 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
|
||||||
immutable_memtable);
|
immutable_memtable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FragmentedRangeTombstoneIterator*
|
||||||
|
MemTable::NewTimestampStrippingRangeTombstoneIterator(
|
||||||
|
const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz) {
|
||||||
|
if (read_options.ignore_range_deletions ||
|
||||||
|
is_range_del_table_empty_.load(std::memory_order_relaxed)) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
if (!timestamp_stripping_fragmented_range_tombstone_list_) {
|
||||||
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
||||||
|
auto* unfragmented_iter = new TimestampStrippingIterator(
|
||||||
|
MemTableIterator::kRangeDelEntries, *this, ReadOptions(),
|
||||||
|
/*seqno_to_time_mapping*/ nullptr, /* arena */ nullptr,
|
||||||
|
/* prefix_extractor */ nullptr, ts_sz);
|
||||||
|
|
||||||
|
timestamp_stripping_fragmented_range_tombstone_list_ =
|
||||||
|
std::make_unique<FragmentedRangeTombstoneList>(
|
||||||
|
std::unique_ptr<InternalIterator>(unfragmented_iter),
|
||||||
|
comparator_.comparator);
|
||||||
|
}
|
||||||
|
return new FragmentedRangeTombstoneIterator(
|
||||||
|
timestamp_stripping_fragmented_range_tombstone_list_.get(),
|
||||||
|
comparator_.comparator, read_seq, read_options.timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
|
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
|
||||||
const ReadOptions& read_options, SequenceNumber read_seq,
|
const ReadOptions& read_options, SequenceNumber read_seq,
|
||||||
bool immutable_memtable) {
|
bool immutable_memtable) {
|
||||||
|
|
|
@ -213,6 +213,14 @@ class MemTable {
|
||||||
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
|
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
|
||||||
const SliceTransform* prefix_extractor);
|
const SliceTransform* prefix_extractor);
|
||||||
|
|
||||||
|
// Returns an iterator that wraps a MemTableIterator and logically strips the
|
||||||
|
// user-defined timestamp of each key. This API is only used by flush when
|
||||||
|
// user-defined timestamps in MemTable only feature is enabled.
|
||||||
|
InternalIterator* NewTimestampStrippingIterator(
|
||||||
|
const ReadOptions& read_options,
|
||||||
|
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
|
||||||
|
const SliceTransform* prefix_extractor, size_t ts_sz);
|
||||||
|
|
||||||
// Returns an iterator that yields the range tombstones of the memtable.
|
// Returns an iterator that yields the range tombstones of the memtable.
|
||||||
// The caller must ensure that the underlying MemTable remains live
|
// The caller must ensure that the underlying MemTable remains live
|
||||||
// while the returned iterator is live.
|
// while the returned iterator is live.
|
||||||
|
@ -227,6 +235,13 @@ class MemTable {
|
||||||
const ReadOptions& read_options, SequenceNumber read_seq,
|
const ReadOptions& read_options, SequenceNumber read_seq,
|
||||||
bool immutable_memtable);
|
bool immutable_memtable);
|
||||||
|
|
||||||
|
// Returns an iterator that yields the range tombstones of the memtable and
|
||||||
|
// logically strips the user-defined timestamp of each key (including start
|
||||||
|
// key, and end key). This API is only used by flush when user-defined
|
||||||
|
// timestamps in MemTable only feature is enabled.
|
||||||
|
FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
|
||||||
|
const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz);
|
||||||
|
|
||||||
Status VerifyEncodedEntry(Slice encoded,
|
Status VerifyEncodedEntry(Slice encoded,
|
||||||
const ProtectionInfoKVOS64& kv_prot_info);
|
const ProtectionInfoKVOS64& kv_prot_info);
|
||||||
|
|
||||||
|
@ -704,6 +719,12 @@ class MemTable {
|
||||||
std::unique_ptr<FragmentedRangeTombstoneList>
|
std::unique_ptr<FragmentedRangeTombstoneList>
|
||||||
fragmented_range_tombstone_list_;
|
fragmented_range_tombstone_list_;
|
||||||
|
|
||||||
|
// The fragmented range tombstone of this memtable with all keys' user-defined
|
||||||
|
// timestamps logically stripped. This is constructed and used by flush when
|
||||||
|
// user-defined timestamps in memtable only feature is enabled.
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneList>
|
||||||
|
timestamp_stripping_fragmented_range_tombstone_list_;
|
||||||
|
|
||||||
// makes sure there is a single range tombstone writer to invalidate cache
|
// makes sure there is a single range tombstone writer to invalidate cache
|
||||||
std::mutex range_del_mutex_;
|
std::mutex range_del_mutex_;
|
||||||
CoreLocalArray<std::shared_ptr<FragmentedRangeTombstoneListCache>>
|
CoreLocalArray<std::shared_ptr<FragmentedRangeTombstoneListCache>>
|
||||||
|
|
|
@ -197,11 +197,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
|
||||||
pinned_seq_pos_ = tombstones_->seq_end();
|
pinned_seq_pos_ = tombstones_->seq_end();
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeTombstone Tombstone(bool logical_strip_timestamp = false) const {
|
RangeTombstone Tombstone() const {
|
||||||
assert(Valid());
|
assert(Valid());
|
||||||
if (icmp_->user_comparator()->timestamp_size()) {
|
if (icmp_->user_comparator()->timestamp_size()) {
|
||||||
return RangeTombstone(start_key(), end_key(), seq(), timestamp(),
|
return RangeTombstone(start_key(), end_key(), seq(), timestamp());
|
||||||
logical_strip_timestamp);
|
|
||||||
}
|
}
|
||||||
return RangeTombstone(start_key(), end_key(), seq());
|
return RangeTombstone(start_key(), end_key(), seq());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue