Remove stale entries from L0 files when UDT is not persisted (#13035)

Summary:
When user-defined timestamps are not persisted, currently we replace the actual timestamp with min timestamp after an entry is output from compaction iterator. Compaction iterator won't be able to help with removing stale entries this way. This PR adds a wrapper iterator `TimestampStrippingIterator` for `MemTableIterator` that does the min timestamp replacement at the memtable iteration step. It is used by flush and can help remove stale entries from landing in L0 files.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13035

Test Plan: Added unit test

Reviewed By: pdillinger, cbi42

Differential Revision: D63423682

Pulled By: jowlyzhang

fbshipit-source-id: 087dcc9cee97b9ea51b8d2b88dc91c2984d54e55
This commit is contained in:
Yu Zhang 2024-10-14 12:28:35 -07:00
parent 965e813f0a
commit 67ce298735
9 changed files with 315 additions and 51 deletions

View file

@ -206,10 +206,6 @@ Status BuildTable(
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
const size_t ts_sz = ucmp->timestamp_size();
const bool logical_strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;
SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber;
std::string key_after_flush_buf;
std::string value_buf;
@ -222,16 +218,6 @@ Status BuildTable(
Slice key_after_flush = key_after_flush_buf;
Slice value_after_flush = value;
// If user defined timestamps will be stripped from user key after flush,
// the in memory version of the key act logically the same as one with a
// minimum timestamp. We update the timestamp here so file boundary and
// output validator, block builder all see the effect of the stripping.
if (logical_strip_timestamp) {
key_after_flush_buf.clear();
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
key_after_flush = key_after_flush_buf;
}
if (ikey.type == kTypeValuePreferredSeqno) {
auto [unpacked_value, unix_write_time] =
ParsePackedValueWithWriteTime(value);
@ -291,11 +277,7 @@ Status BuildTable(
Slice last_tombstone_start_user_key{};
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
// When user timestamp should not be persisted, we logically strip a
// range tombstone's start and end key's timestamp (replace it with min
// timestamp) before passing them along to table builder and to update
// file boundaries.
auto tombstone = range_del_it->Tombstone(logical_strip_timestamp);
auto tombstone = range_del_it->Tombstone();
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();

View file

@ -3870,6 +3870,91 @@ TEST_F(ManualFlushSkipRetainUDTTest, ManualFlush) {
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, FlushRemovesStaleEntries) {
column_family_options_.max_write_buffer_number = 4;
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ColumnFamilyData* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
for (int version = 0; version < 100; version++) {
if (version == 50) {
ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_SwitchMemtable(cfd));
}
ASSERT_OK(
Put(0, "foo", EncodeAsUint64(version), "v" + std::to_string(version)));
}
ASSERT_OK(Flush(0));
TablePropertiesCollection tables_properties;
ASSERT_OK(db_->GetPropertiesOfAllTables(&tables_properties));
ASSERT_EQ(1, tables_properties.size());
std::shared_ptr<const TableProperties> table_properties =
tables_properties.begin()->second;
ASSERT_EQ(1, table_properties->num_entries);
ASSERT_EQ(0, table_properties->num_deletions);
ASSERT_EQ(0, table_properties->num_range_deletions);
CheckEffectiveCutoffTime(100);
CheckAutomaticFlushRetainUDT(101);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, RangeDeletionFlushRemovesStaleEntries) {
column_family_options_.max_write_buffer_number = 4;
Open();
// TODO(yuzhangyu): a non 0 full history ts low is needed for this garbage
// collection to kick in. This doesn't work well for the very first flush of
// the column family. Not a big issue, but would be nice to improve this.
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(9)));
for (int i = 10; i < 100; i++) {
ASSERT_OK(Put(0, "foo" + std::to_string(i), EncodeAsUint64(i),
"val" + std::to_string(i)));
if (i % 2 == 1) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), "foo" + std::to_string(i - 1),
"foo" + std::to_string(i), EncodeAsUint64(i)));
}
}
ASSERT_OK(Flush(0));
CheckEffectiveCutoffTime(100);
std::string read_ts = EncodeAsUint64(100);
std::string min_ts = EncodeAsUint64(0);
ReadOptions ropts;
Slice read_ts_slice = read_ts;
std::string value;
ropts.timestamp = &read_ts_slice;
{
Iterator* iter = db_->NewIterator(ropts);
iter->SeekToFirst();
int i = 11;
while (iter->Valid()) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo" + std::to_string(i), iter->key());
ASSERT_EQ("val" + std::to_string(i), iter->value());
ASSERT_EQ(min_ts, iter->timestamp());
iter->Next();
i += 2;
}
ASSERT_OK(iter->status());
delete iter;
}
TablePropertiesCollection tables_properties;
ASSERT_OK(db_->GetPropertiesOfAllTables(&tables_properties));
ASSERT_EQ(1, tables_properties.size());
std::shared_ptr<const TableProperties> table_properties =
tables_properties.begin()->second;
// 45 point data + 45 range deletions. 45 obsolete point data are garbage
// collected.
ASSERT_EQ(90, table_properties->num_entries);
ASSERT_EQ(45, table_properties->num_deletions);
ASSERT_EQ(45, table_properties->num_range_deletions);
Close();
}
TEST_F(ManualFlushSkipRetainUDTTest, ManualCompaction) {
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(0)));

View file

@ -1667,10 +1667,19 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
Arena arena;
Status s;
TableProperties table_properties;
const auto* ucmp = cfd->internal_comparator().user_comparator();
assert(ucmp);
const size_t ts_sz = ucmp->timestamp_size();
const bool logical_strip_timestamp =
ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps;
{
ScopedArenaPtr<InternalIterator> iter(
mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
logical_strip_timestamp
? mem->NewTimestampStrippingIterator(
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr, ts_sz)
: mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" Level-0 table #%" PRIu64 ": started",
@ -1705,11 +1714,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter =
// This is called during recovery, where a live memtable is flushed
// directly. In this case, no fragmented tombstone list is cached in
// this memtable yet.
mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
false /* immutable_memtable */);
logical_strip_timestamp
? mem->NewTimestampStrippingRangeTombstoneIterator(
ro, kMaxSequenceNumber, ts_sz)
// This is called during recovery, where a live memtable is
// flushed directly. In this case, no fragmented tombstone list is
// cached in this memtable yet.
: mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
false /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
@ -1795,9 +1807,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
// For UDT in memtable only feature, move up the cutoff timestamp whenever
// a flush happens.
const Comparator* ucmp = cfd->user_comparator();
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps) {
if (logical_strip_timestamp) {
Slice mem_newest_udt = mem->GetNewestUDT();
std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
if (full_history_ts_low.empty() ||

View file

@ -1027,22 +1027,13 @@ struct RangeTombstone {
// User-defined timestamp is enabled, `sk` and `ek` should be user key
// with timestamp, `ts` will replace the timestamps in `sk` and
// `ek`.
// When `logical_strip_timestamp` is true, the timestamps in `sk` and `ek`
// will be replaced with min timestamp.
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts,
bool logical_strip_timestamp)
: seq_(sn) {
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts) : seq_(sn) {
const size_t ts_sz = ts.size();
assert(ts_sz > 0);
pinned_start_key_.reserve(sk.size());
pinned_end_key_.reserve(ek.size());
if (logical_strip_timestamp) {
AppendUserKeyWithMinTimestamp(&pinned_start_key_, sk, ts_sz);
AppendUserKeyWithMinTimestamp(&pinned_end_key_, ek, ts_sz);
} else {
AppendUserKeyWithDifferentTimestamp(&pinned_start_key_, sk, ts);
AppendUserKeyWithDifferentTimestamp(&pinned_end_key_, ek, ts);
}
AppendUserKeyWithDifferentTimestamp(&pinned_start_key_, sk, ts);
AppendUserKeyWithDifferentTimestamp(&pinned_end_key_, ek, ts);
start_key_ = pinned_start_key_;
end_key_ = pinned_end_key_;
ts_ = Slice(pinned_start_key_.data() + sk.size() - ts_sz, ts_sz);

View file

@ -858,6 +858,12 @@ Status FlushJob::WriteLevel0Table() {
meta_.temperature = mutable_cf_options_.default_write_temperature;
file_options_.temperature = meta_.temperature;
const auto* ucmp = cfd_->internal_comparator().user_comparator();
assert(ucmp);
const size_t ts_sz = ucmp->timestamp_size();
const bool logical_strip_timestamp =
ts_sz > 0 && !cfd_->ioptions()->persist_user_defined_timestamps;
std::vector<BlobFileAddition> blob_file_additions;
{
@ -893,10 +899,21 @@ Status FlushJob::WriteLevel0Table() {
db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr));
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (logical_strip_timestamp) {
memtables.push_back(m->NewTimestampStrippingIterator(
ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr, ts_sz));
} else {
memtables.push_back(
m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
}
auto* range_del_iter =
logical_strip_timestamp
? m->NewTimestampStrippingRangeTombstoneIterator(
ro, kMaxSequenceNumber, ts_sz)
: m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
true /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}

View file

@ -874,9 +874,15 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) {
expected_full_history_ts_low = full_history_ts_low;
}
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
CheckFileMetaData(cfd, smallest, largest, &fmeta);
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
if (!persist_udt_) {
InternalKey largest(largest_key, curr_seq_ - 1, ValueType::kTypeValue);
CheckFileMetaData(cfd, smallest, largest, &fmeta);
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
} else {
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
CheckFileMetaData(cfd, smallest, largest, &fmeta);
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
}
}
job_context.Clean();
ASSERT_TRUE(to_delete.empty());

View file

@ -613,6 +613,135 @@ InternalIterator* MemTable::NewIterator(
seqno_to_time_mapping, arena, prefix_extractor);
}
// An iterator wrapper that wraps a MemTableIterator and logically strips each
// key's user-defined timestamp.
class TimestampStrippingIterator : public InternalIterator {
public:
TimestampStrippingIterator(
MemTableIterator::Kind kind, const MemTable& memtable,
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* cf_prefix_extractor, size_t ts_sz)
: arena_mode_(arena != nullptr), kind_(kind), ts_sz_(ts_sz) {
assert(ts_sz_ != 0);
void* mem = arena ? arena->AllocateAligned(sizeof(MemTableIterator)) :
operator new(sizeof(MemTableIterator));
iter_ = new (mem)
MemTableIterator(kind, memtable, read_options, seqno_to_time_mapping,
arena, cf_prefix_extractor);
}
// No copying allowed
TimestampStrippingIterator(const TimestampStrippingIterator&) = delete;
void operator=(const TimestampStrippingIterator&) = delete;
~TimestampStrippingIterator() override {
if (arena_mode_) {
iter_->~MemTableIterator();
} else {
delete iter_;
}
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
iter_->SetPinnedItersMgr(pinned_iters_mgr);
}
bool Valid() const override { return iter_->Valid(); }
void Seek(const Slice& k) override {
iter_->Seek(k);
UpdateKeyAndValueBuffer();
}
void SeekForPrev(const Slice& k) override {
iter_->SeekForPrev(k);
UpdateKeyAndValueBuffer();
}
void SeekToFirst() override {
iter_->SeekToFirst();
UpdateKeyAndValueBuffer();
}
void SeekToLast() override {
iter_->SeekToLast();
UpdateKeyAndValueBuffer();
}
void Next() override {
iter_->Next();
UpdateKeyAndValueBuffer();
}
bool NextAndGetResult(IterateResult* result) override {
iter_->Next();
UpdateKeyAndValueBuffer();
bool is_valid = Valid();
if (is_valid) {
result->key = key();
result->bound_check_result = IterBoundCheck::kUnknown;
result->value_prepared = true;
}
return is_valid;
}
void Prev() override {
iter_->Prev();
UpdateKeyAndValueBuffer();
}
Slice key() const override {
assert(Valid());
return key_buf_;
}
uint64_t write_unix_time() const override { return iter_->write_unix_time(); }
Slice value() const override {
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
return value_buf_;
}
return iter_->value();
}
Status status() const override { return iter_->status(); }
bool IsKeyPinned() const override {
// Key is only in a buffer that is updated in each iteration.
return false;
}
bool IsValuePinned() const override {
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
return false;
}
return iter_->IsValuePinned();
}
private:
void UpdateKeyAndValueBuffer() {
key_buf_.clear();
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
value_buf_.clear();
}
if (!Valid()) {
return;
}
Slice original_key = iter_->key();
ReplaceInternalKeyWithMinTimestamp(&key_buf_, original_key, ts_sz_);
if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
Slice original_value = iter_->value();
AppendUserKeyWithMinTimestamp(&value_buf_, original_value, ts_sz_);
}
}
bool arena_mode_;
MemTableIterator::Kind kind_;
size_t ts_sz_;
MemTableIterator* iter_;
std::string key_buf_;
std::string value_buf_;
};
InternalIterator* MemTable::NewTimestampStrippingIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor, size_t ts_sz) {
assert(arena != nullptr);
auto mem = arena->AllocateAligned(sizeof(TimestampStrippingIterator));
return new (mem) TimestampStrippingIterator(
MemTableIterator::kPointEntries, *this, read_options,
seqno_to_time_mapping, arena, prefix_extractor, ts_sz);
}
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable) {
@ -624,6 +753,30 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
immutable_memtable);
}
FragmentedRangeTombstoneIterator*
MemTable::NewTimestampStrippingRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz) {
if (read_options.ignore_range_deletions ||
is_range_del_table_empty_.load(std::memory_order_relaxed)) {
return nullptr;
}
if (!timestamp_stripping_fragmented_range_tombstone_list_) {
// TODO: plumb Env::IOActivity, Env::IOPriority
auto* unfragmented_iter = new TimestampStrippingIterator(
MemTableIterator::kRangeDelEntries, *this, ReadOptions(),
/*seqno_to_time_mapping*/ nullptr, /* arena */ nullptr,
/* prefix_extractor */ nullptr, ts_sz);
timestamp_stripping_fragmented_range_tombstone_list_ =
std::make_unique<FragmentedRangeTombstoneList>(
std::unique_ptr<InternalIterator>(unfragmented_iter),
comparator_.comparator);
}
return new FragmentedRangeTombstoneIterator(
timestamp_stripping_fragmented_range_tombstone_list_.get(),
comparator_.comparator, read_seq, read_options.timestamp);
}
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable) {

View file

@ -213,6 +213,14 @@ class MemTable {
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor);
// Returns an iterator that wraps a MemTableIterator and logically strips the
// user-defined timestamp of each key. This API is only used by flush when
// user-defined timestamps in MemTable only feature is enabled.
InternalIterator* NewTimestampStrippingIterator(
const ReadOptions& read_options,
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
const SliceTransform* prefix_extractor, size_t ts_sz);
// Returns an iterator that yields the range tombstones of the memtable.
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live.
@ -227,6 +235,13 @@ class MemTable {
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable);
// Returns an iterator that yields the range tombstones of the memtable and
// logically strips the user-defined timestamp of each key (including start
// key, and end key). This API is only used by flush when user-defined
// timestamps in MemTable only feature is enabled.
FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz);
Status VerifyEncodedEntry(Slice encoded,
const ProtectionInfoKVOS64& kv_prot_info);
@ -704,6 +719,12 @@ class MemTable {
std::unique_ptr<FragmentedRangeTombstoneList>
fragmented_range_tombstone_list_;
// The fragmented range tombstone of this memtable with all keys' user-defined
// timestamps logically stripped. This is constructed and used by flush when
// user-defined timestamps in memtable only feature is enabled.
std::unique_ptr<FragmentedRangeTombstoneList>
timestamp_stripping_fragmented_range_tombstone_list_;
// makes sure there is a single range tombstone writer to invalidate cache
std::mutex range_del_mutex_;
CoreLocalArray<std::shared_ptr<FragmentedRangeTombstoneListCache>>

View file

@ -197,11 +197,10 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
pinned_seq_pos_ = tombstones_->seq_end();
}
RangeTombstone Tombstone(bool logical_strip_timestamp = false) const {
RangeTombstone Tombstone() const {
assert(Valid());
if (icmp_->user_comparator()->timestamp_size()) {
return RangeTombstone(start_key(), end_key(), seq(), timestamp(),
logical_strip_timestamp);
return RangeTombstone(start_key(), end_key(), seq(), timestamp());
}
return RangeTombstone(start_key(), end_key(), seq());
}