mirror of https://github.com/facebook/rocksdb.git
Add support for range deletion when user timestamps are not persisted (#12254)
Summary:
For the user defined timestamps in memtable only feature, some special handling for range deletion blocks are needed since both the key (start_key) and the value (end_key) of a range tombstone can contain user-defined timestamps. Handling for the key is taken care of in the same way as the other data blocks in the block based table. This PR adds the special handling needed for the value (end_key) part. This includes:
1) On the write path, when L0 SST files are first created from flush, user-defined timestamps are removed from an end key of a range tombstone. There are places where it's logically removed (replaced with a min timestamp) because there is still logic with the running comparator that expects a user key that contains timestamp. And in the block based builder, it is eventually physically removed before persisted in a block.
2) On the read path, when range deletion block is being read, we artificially pad a min timestamp to the end key of a range tombstone in `BlockBasedTableReader`.
3) For file boundary `FileMetaData.largest`, we artificially pad a max timestamp to it if it contains a range deletion sentinel. Anytime when range deletion end_key is used to update file boundaries, it's using max timestamp instead of the range tombstone's actual timestamp to mark it as an exclusive end. d69628e6ce/db/dbformat.h (L923-L935)
This max timestamp is removed when in memory `FileMetaData.largest` is persisted into Manifest, we pad it back when it's read from Manifest while handling related `VersionEdit` in `VersionEditHandler`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12254
Test Plan: Added unit test and enabled this feature combination's stress test.
Reviewed By: cbi42
Differential Revision: D52965527
Pulled By: jowlyzhang
fbshipit-source-id: e8315f8a2c5268e2ae0f7aec8012c266b86df985
This commit is contained in:
parent
8829ba9fe1
commit
071a146fa0
|
@ -210,7 +210,7 @@ Status BuildTable(
|
|||
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
|
||||
|
||||
const size_t ts_sz = ucmp->timestamp_size();
|
||||
const bool strip_timestamp =
|
||||
const bool logical_strip_timestamp =
|
||||
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;
|
||||
|
||||
std::string key_after_flush_buf;
|
||||
|
@ -224,7 +224,7 @@ Status BuildTable(
|
|||
// the in memory version of the key act logically the same as one with a
|
||||
// minimum timestamp. We update the timestamp here so file boundary and
|
||||
// output validator, block builder all see the effect of the stripping.
|
||||
if (strip_timestamp) {
|
||||
if (logical_strip_timestamp) {
|
||||
key_after_flush_buf.clear();
|
||||
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
|
||||
key_after_flush = key_after_flush_buf;
|
||||
|
@ -267,9 +267,12 @@ Status BuildTable(
|
|||
Slice last_tombstone_start_user_key{};
|
||||
for (range_del_it->SeekToFirst(); range_del_it->Valid();
|
||||
range_del_it->Next()) {
|
||||
auto tombstone = range_del_it->Tombstone();
|
||||
auto kv = tombstone.Serialize();
|
||||
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
|
||||
// When user timestamp should not be persisted, we logically strip a
|
||||
// range tombstone's start and end key's timestamp (replace it with min
|
||||
// timestamp) before passing them along to table builder and to update
|
||||
// file boundaries.
|
||||
auto tombstone = range_del_it->Tombstone(logical_strip_timestamp);
|
||||
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
|
||||
builder->Add(kv.first.Encode(), kv.second);
|
||||
InternalKey tombstone_end = tombstone.SerializeEndKey();
|
||||
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "logging/logging.h"
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "rocksdb/sst_partitioner.h"
|
||||
|
@ -21,9 +22,6 @@
|
|||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
const uint64_t kRangeTombstoneSentinel =
|
||||
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
|
||||
|
||||
int sstableKeyCompare(const Comparator* uc, const Slice& a, const Slice& b) {
|
||||
auto c = uc->CompareWithoutTimestamp(ExtractUserKey(a), ExtractUserKey(b));
|
||||
if (c != 0) {
|
||||
|
|
|
@ -3515,7 +3515,7 @@ class HandleFileBoundariesTest
|
|||
: DBBasicTestWithTimestampBase("/handle_file_boundaries") {}
|
||||
};
|
||||
|
||||
TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
|
||||
TEST_P(HandleFileBoundariesTest, ConfigurePersistUdtWithPut) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
// Write a timestamp that is not the min timestamp to help test the behavior
|
||||
|
@ -3539,7 +3539,7 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
|
|||
ASSERT_OK(
|
||||
db_->Put(WriteOptions(), largest_ukey_without_ts, write_ts, "val2"));
|
||||
|
||||
// Create a L0 SST file and its record is added to the Manfiest.
|
||||
// Create a L0 SST file and its record is added to the Manifest.
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
|
||||
|
@ -3571,6 +3571,61 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST_P(HandleFileBoundariesTest, ConfigurePersistUdtWithRangeDelete) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
// Write a timestamp that is not the min/max timestamp to help test the
|
||||
// behavior of flag `persist_user_defined_timestamps`.
|
||||
std::string write_ts;
|
||||
std::string min_ts;
|
||||
std::string max_ts;
|
||||
PutFixed64(&write_ts, 1);
|
||||
PutFixed64(&min_ts, 0);
|
||||
PutFixed64(&max_ts, std::numeric_limits<uint64_t>::max());
|
||||
std::string smallest_ukey_without_ts = "bar";
|
||||
std::string largest_ukey_without_ts = "foo";
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
bool persist_udt = test::ShouldPersistUDT(GetParam());
|
||||
options.persist_user_defined_timestamps = persist_udt;
|
||||
if (!persist_udt) {
|
||||
options.allow_concurrent_memtable_write = false;
|
||||
}
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
smallest_ukey_without_ts, largest_ukey_without_ts,
|
||||
write_ts));
|
||||
|
||||
// Create a L0 SST file and its record is added to the Manifest.
|
||||
ASSERT_OK(Flush());
|
||||
Close();
|
||||
|
||||
options.create_if_missing = false;
|
||||
// Reopen the DB and process manifest file.
|
||||
Reopen(options);
|
||||
|
||||
std::vector<std::vector<FileMetaData>> level_to_files;
|
||||
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
|
||||
&level_to_files);
|
||||
ASSERT_GT(level_to_files.size(), 1);
|
||||
// L0 only has one SST file.
|
||||
ASSERT_EQ(level_to_files[0].size(), 1);
|
||||
auto file_meta = level_to_files[0][0];
|
||||
if (persist_udt) {
|
||||
ASSERT_EQ(smallest_ukey_without_ts + write_ts,
|
||||
file_meta.smallest.user_key());
|
||||
} else {
|
||||
ASSERT_EQ(smallest_ukey_without_ts + min_ts, file_meta.smallest.user_key());
|
||||
}
|
||||
// When right file boundary comes from range deletion, it uses max timestamp
|
||||
// and a range deletion sentinel that uses the max sequence number to mark the
|
||||
// end key exclusive. This is regardless of whether timestamp is persisted.
|
||||
ASSERT_EQ(largest_ukey_without_ts + max_ts, file_meta.largest.user_key());
|
||||
auto largest_footer = ExtractInternalKeyFooter(file_meta.largest.Encode());
|
||||
ASSERT_EQ(largest_footer, kRangeTombstoneSentinel);
|
||||
Close();
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
ConfigurePersistUdt, HandleFileBoundariesTest,
|
||||
::testing::Values(
|
||||
|
@ -4006,42 +4061,80 @@ TEST_F(DBBasicTestWithTimestamp,
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
|
||||
class DeleteRangeWithTimestampTableOptions
|
||||
: public DBBasicTestWithTimestampBase,
|
||||
public testing::WithParamInterface<
|
||||
std::tuple<BlockBasedTableOptions::IndexType,
|
||||
test::UserDefinedTimestampTestMode>> {
|
||||
public:
|
||||
explicit DeleteRangeWithTimestampTableOptions()
|
||||
: DBBasicTestWithTimestampBase(
|
||||
"delete_range_with_timestamp_table_options") {}
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
Timestamp, DeleteRangeWithTimestampTableOptions,
|
||||
testing::Combine(
|
||||
testing::Values(
|
||||
BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||
BlockBasedTableOptions::IndexType::kHashSearch,
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
|
||||
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey),
|
||||
testing::Values(
|
||||
test::UserDefinedTimestampTestMode::kNormal,
|
||||
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp)));
|
||||
|
||||
TEST_P(DeleteRangeWithTimestampTableOptions, BasicReadAndIterate) {
|
||||
const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
|
||||
options.compression = kNoCompression;
|
||||
BlockBasedTableOptions bbto;
|
||||
bbto.index_type = GetParam();
|
||||
bbto.index_type = std::get<0>(GetParam());
|
||||
bbto.block_size = 100;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||
TestComparator test_cmp(kTimestampSize);
|
||||
options.comparator = &test_cmp;
|
||||
bool persist_udt = test::ShouldPersistUDT(std::get<1>(GetParam()));
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
options.persist_user_defined_timestamps = persist_udt;
|
||||
// UDT in memtables only not compatible with concurrent memtable writes.
|
||||
options.allow_concurrent_memtable_write = persist_udt;
|
||||
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// Write half of the keys before the tombstone and half after the tombstone.
|
||||
// Only covered keys (i.e., within the range and older than the tombstone)
|
||||
// should be deleted.
|
||||
std::string full_history_ts_low;
|
||||
int cutoff_ts = 0;
|
||||
for (int i = 0; i < kNum; ++i) {
|
||||
std::string write_ts;
|
||||
PutFixed64(&write_ts, i);
|
||||
if (i == kNum / 2) {
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
Key1(kRangeBegin), Key1(kRangeEnd),
|
||||
Timestamp(i, 0)));
|
||||
Key1(kRangeBegin), Key1(kRangeEnd), write_ts));
|
||||
}
|
||||
ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
|
||||
"val" + std::to_string(i)));
|
||||
ASSERT_OK(
|
||||
db_->Put(WriteOptions(), Key1(i), write_ts, "val" + std::to_string(i)));
|
||||
if (i == kNum - kNumPerFile) {
|
||||
if (!persist_udt) {
|
||||
// When UDTs are not persisted, mark the timestamps in the Memtables as
|
||||
// all expired so the followed flush can go through.
|
||||
cutoff_ts = i + 1;
|
||||
PutFixed64(&full_history_ts_low, cutoff_ts);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
}
|
||||
|
||||
ReadOptions read_opts;
|
||||
read_opts.total_order_seek = true;
|
||||
std::string read_ts = Timestamp(kNum, 0);
|
||||
std::string read_ts;
|
||||
PutFixed64(&read_ts, kNum);
|
||||
Slice read_ts_slice = read_ts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
{
|
||||
|
@ -4076,7 +4169,10 @@ TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
|
|||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(-1, expected);
|
||||
|
||||
read_ts = Timestamp(0, 0);
|
||||
// Cannot read below the cutoff timestamp when timestamps are not persisted.
|
||||
if (persist_udt) {
|
||||
read_ts.clear();
|
||||
PutFixed64(&read_ts, 0);
|
||||
read_ts_slice = read_ts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
iter.reset(db_->NewIterator(read_opts));
|
||||
|
@ -4087,22 +4183,29 @@ TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
|
|||
ASSERT_FALSE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
}
|
||||
|
||||
read_ts = Timestamp(kNum, 0);
|
||||
read_ts.clear();
|
||||
PutFixed64(&read_ts, kNum);
|
||||
read_ts_slice = read_ts;
|
||||
read_opts.timestamp = &read_ts_slice;
|
||||
std::string value, timestamp;
|
||||
Status s;
|
||||
std::string expected_ts;
|
||||
int int_expected_ts;
|
||||
for (int i = 0; i < kNum; ++i) {
|
||||
s = db_->Get(read_opts, Key1(i), &value, ×tamp);
|
||||
if (i >= kRangeBegin && i < kNum / 2) {
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0));
|
||||
int_expected_ts = (persist_udt || kNum / 2 >= cutoff_ts) ? kNum / 2 : 0;
|
||||
} else {
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(value, "val" + std::to_string(i));
|
||||
ASSERT_EQ(timestamp, Timestamp(i, 0));
|
||||
int_expected_ts = (persist_udt || i >= cutoff_ts) ? i : 0;
|
||||
}
|
||||
expected_ts.clear();
|
||||
PutFixed64(&expected_ts, int_expected_ts);
|
||||
ASSERT_EQ(timestamp, expected_ts);
|
||||
}
|
||||
|
||||
size_t batch_size = kNum;
|
||||
|
@ -4121,11 +4224,41 @@ TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
|
|||
for (int i = 0; i < kNum; ++i) {
|
||||
if (i >= kRangeBegin && i < kNum / 2) {
|
||||
ASSERT_TRUE(statuses[i].IsNotFound());
|
||||
ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0));
|
||||
int_expected_ts = (persist_udt || kNum / 2 >= cutoff_ts) ? kNum / 2 : 0;
|
||||
} else {
|
||||
ASSERT_OK(statuses[i]);
|
||||
ASSERT_EQ(values[i], "val" + std::to_string(i));
|
||||
ASSERT_EQ(timestamps[i], Timestamp(i, 0));
|
||||
int_expected_ts = (persist_udt || i >= cutoff_ts) ? i : 0;
|
||||
}
|
||||
expected_ts.clear();
|
||||
PutFixed64(&expected_ts, int_expected_ts);
|
||||
ASSERT_EQ(timestamps[i], expected_ts);
|
||||
}
|
||||
|
||||
CompactRangeOptions cro;
|
||||
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
||||
if (!persist_udt) {
|
||||
// Mark everything expired so manual compaction can go through
|
||||
full_history_ts_low.clear();
|
||||
PutFixed64(&full_history_ts_low, kNum);
|
||||
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
|
||||
full_history_ts_low));
|
||||
}
|
||||
Slice compaction_ts = full_history_ts_low;
|
||||
cro.full_history_ts_low = &compaction_ts;
|
||||
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
|
||||
for (int i = kRangeBegin; i < kNum / 2; ++i) {
|
||||
s = db_->Get(read_opts, Key1(i), &value, ×tamp);
|
||||
ASSERT_TRUE(s.IsNotFound());
|
||||
if (persist_udt) {
|
||||
expected_ts.clear();
|
||||
PutFixed64(&expected_ts, kNum / 2);
|
||||
ASSERT_EQ(timestamp, expected_ts);
|
||||
} else {
|
||||
// When timestamps are not persisted, data in SST files all logically have
|
||||
// min timestamp. A compaction to the last level will drop the range
|
||||
// tombstone.
|
||||
ASSERT_TRUE(timestamp.empty());
|
||||
}
|
||||
}
|
||||
Close();
|
||||
|
|
|
@ -66,6 +66,13 @@ void AppendInternalKeyWithDifferentTimestamp(std::string* result,
|
|||
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
|
||||
}
|
||||
|
||||
void AppendUserKeyWithDifferentTimestamp(std::string* result, const Slice& key,
|
||||
const Slice& ts) {
|
||||
assert(key.size() >= ts.size());
|
||||
result->append(key.data(), key.size() - ts.size());
|
||||
result->append(ts.data(), ts.size());
|
||||
}
|
||||
|
||||
void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
|
||||
ValueType t) {
|
||||
PutFixed64(result, PackSequenceAndType(s, t));
|
||||
|
@ -110,6 +117,7 @@ void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
|
|||
void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
|
||||
size_t ts_sz) {
|
||||
assert(ts_sz > 0);
|
||||
assert(key.size() >= kNumInternalBytes);
|
||||
size_t user_key_size = key.size() - kNumInternalBytes;
|
||||
result->reserve(key.size() + ts_sz);
|
||||
result->append(key.data(), user_key_size);
|
||||
|
@ -117,6 +125,17 @@ void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
|
|||
result->append(key.data() + user_key_size, kNumInternalBytes);
|
||||
}
|
||||
|
||||
void PadInternalKeyWithMaxTimestamp(std::string* result, const Slice& key,
|
||||
size_t ts_sz) {
|
||||
assert(ts_sz > 0);
|
||||
assert(key.size() >= kNumInternalBytes);
|
||||
size_t user_key_size = key.size() - kNumInternalBytes;
|
||||
result->reserve(key.size() + ts_sz);
|
||||
result->append(key.data(), user_key_size);
|
||||
result->append(std::string(ts_sz, '\xff'));
|
||||
result->append(key.data() + user_key_size, kNumInternalBytes);
|
||||
}
|
||||
|
||||
void StripTimestampFromInternalKey(std::string* result, const Slice& key,
|
||||
size_t ts_sz) {
|
||||
assert(key.size() >= ts_sz + kNumInternalBytes);
|
||||
|
|
|
@ -165,6 +165,9 @@ inline void UnPackSequenceAndType(uint64_t packed, uint64_t* seq,
|
|||
// assert(IsExtendedValueType(*t));
|
||||
}
|
||||
|
||||
const uint64_t kRangeTombstoneSentinel =
|
||||
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
|
||||
|
||||
EntryType GetEntryType(ValueType value_type);
|
||||
|
||||
// Append the serialization of "key" to *result.
|
||||
|
@ -184,6 +187,15 @@ void AppendInternalKeyWithDifferentTimestamp(std::string* result,
|
|||
const ParsedInternalKey& key,
|
||||
const Slice& ts);
|
||||
|
||||
// Append the user key to *result, replacing the original timestamp with
|
||||
// argument ts.
|
||||
//
|
||||
// input [user key]: <user_provided_key | original_ts>
|
||||
// output before: empty
|
||||
// output after: <user_provided_key | ts>
|
||||
void AppendUserKeyWithDifferentTimestamp(std::string* result, const Slice& key,
|
||||
const Slice& ts);
|
||||
|
||||
// Serialized internal key consists of user key followed by footer.
|
||||
// This function appends the footer to *result, assuming that *result already
|
||||
// contains the user key at the end.
|
||||
|
@ -237,6 +249,16 @@ void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
|
|||
void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
|
||||
size_t ts_sz);
|
||||
|
||||
// `key` is an internal key containing a user key without timestamp. Create a
|
||||
// new key in *result by padding a max timestamp of size `ts_sz` to the user key
|
||||
// and copying the remaining internal key bytes.
|
||||
//
|
||||
// input [internal key]: <user_provided_key | seqno + type>
|
||||
// output before: empty
|
||||
// output after: <user_provided_key | max_ts | seqno + type>
|
||||
void PadInternalKeyWithMaxTimestamp(std::string* result, const Slice& key,
|
||||
size_t ts_sz);
|
||||
|
||||
// `key` is an internal key containing a user key with timestamp of size
|
||||
// `ts_sz`. Create a new internal key in *result by stripping the timestamp from
|
||||
// the user key and copying the remaining internal key bytes.
|
||||
|
@ -883,17 +905,25 @@ struct RangeTombstone {
|
|||
// User-defined timestamp is enabled, `sk` and `ek` should be user key
|
||||
// with timestamp, `ts` will replace the timestamps in `sk` and
|
||||
// `ek`.
|
||||
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts)
|
||||
: seq_(sn), ts_(ts) {
|
||||
assert(!ts.empty());
|
||||
// When `logical_strip_timestamp` is true, the timestamps in `sk` and `ek`
|
||||
// will be replaced with min timestamp.
|
||||
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts,
|
||||
bool logical_strip_timestamp)
|
||||
: seq_(sn) {
|
||||
const size_t ts_sz = ts.size();
|
||||
assert(ts_sz > 0);
|
||||
pinned_start_key_.reserve(sk.size());
|
||||
pinned_start_key_.append(sk.data(), sk.size() - ts.size());
|
||||
pinned_start_key_.append(ts.data(), ts.size());
|
||||
pinned_end_key_.reserve(ek.size());
|
||||
pinned_end_key_.append(ek.data(), ek.size() - ts.size());
|
||||
pinned_end_key_.append(ts.data(), ts.size());
|
||||
if (logical_strip_timestamp) {
|
||||
AppendUserKeyWithMinTimestamp(&pinned_start_key_, sk, ts_sz);
|
||||
AppendUserKeyWithMinTimestamp(&pinned_end_key_, ek, ts_sz);
|
||||
} else {
|
||||
AppendUserKeyWithDifferentTimestamp(&pinned_start_key_, sk, ts);
|
||||
AppendUserKeyWithDifferentTimestamp(&pinned_end_key_, ek, ts);
|
||||
}
|
||||
start_key_ = pinned_start_key_;
|
||||
end_key_ = pinned_end_key_;
|
||||
ts_ = Slice(pinned_start_key_.data() + sk.size() - ts_sz, ts_sz);
|
||||
}
|
||||
|
||||
RangeTombstone(ParsedInternalKey parsed_key, Slice value) {
|
||||
|
|
|
@ -20,7 +20,8 @@ namespace ROCKSDB_NAMESPACE {
|
|||
FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool for_compaction,
|
||||
const std::vector<SequenceNumber>& snapshots) {
|
||||
const std::vector<SequenceNumber>& snapshots,
|
||||
const bool tombstone_end_include_ts) {
|
||||
if (unfragmented_tombstones == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
@ -45,7 +46,12 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
|||
last_start_key = pinned_last_start_key.Encode();
|
||||
}
|
||||
}
|
||||
if (is_sorted) {
|
||||
|
||||
auto ucmp = icmp.user_comparator();
|
||||
assert(ucmp);
|
||||
const size_t ts_sz = ucmp->timestamp_size();
|
||||
bool pad_min_ts_for_end = ts_sz > 0 && !tombstone_end_include_ts;
|
||||
if (is_sorted && !pad_min_ts_for_end) {
|
||||
FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
|
||||
snapshots);
|
||||
return;
|
||||
|
@ -63,8 +69,15 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
|
|||
unfragmented_tombstones->value().size();
|
||||
keys.emplace_back(unfragmented_tombstones->key().data(),
|
||||
unfragmented_tombstones->key().size());
|
||||
values.emplace_back(unfragmented_tombstones->value().data(),
|
||||
unfragmented_tombstones->value().size());
|
||||
Slice value = unfragmented_tombstones->value();
|
||||
if (pad_min_ts_for_end) {
|
||||
AppendKeyWithMinTimestamp(&values.emplace_back(), value, ts_sz);
|
||||
} else {
|
||||
values.emplace_back(value.data(), value.size());
|
||||
}
|
||||
}
|
||||
if (pad_min_ts_for_end) {
|
||||
total_tombstone_payload_bytes_ += num_unfragmented_tombstones_ * ts_sz;
|
||||
}
|
||||
// VectorIterator implicitly sorts by key during construction.
|
||||
auto iter = std::make_unique<VectorIterator>(std::move(keys),
|
||||
|
|
|
@ -54,7 +54,8 @@ struct FragmentedRangeTombstoneList {
|
|||
FragmentedRangeTombstoneList(
|
||||
std::unique_ptr<InternalIterator> unfragmented_tombstones,
|
||||
const InternalKeyComparator& icmp, bool for_compaction = false,
|
||||
const std::vector<SequenceNumber>& snapshots = {});
|
||||
const std::vector<SequenceNumber>& snapshots = {},
|
||||
const bool tombstone_end_include_ts = true);
|
||||
|
||||
std::vector<RangeTombstoneStack>::const_iterator begin() const {
|
||||
return tombstones_.begin();
|
||||
|
@ -198,13 +199,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
|
|||
pinned_seq_pos_ = tombstones_->seq_end();
|
||||
}
|
||||
|
||||
RangeTombstone Tombstone() const {
|
||||
RangeTombstone Tombstone(bool logical_strip_timestamp = false) const {
|
||||
assert(Valid());
|
||||
if (icmp_->user_comparator()->timestamp_size()) {
|
||||
return RangeTombstone(start_key(), end_key(), seq(), timestamp());
|
||||
return RangeTombstone(start_key(), end_key(), seq(), timestamp(),
|
||||
logical_strip_timestamp);
|
||||
}
|
||||
return RangeTombstone(start_key(), end_key(), seq());
|
||||
}
|
||||
|
||||
// Note that start_key() and end_key() are not guaranteed to have the
|
||||
// correct timestamp. User can call timestamp() to get the correct
|
||||
// timestamp().
|
||||
|
|
|
@ -709,9 +709,15 @@ Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles(
|
|||
}
|
||||
std::string smallest_buf;
|
||||
std::string largest_buf;
|
||||
Slice largest_slice = meta.largest.Encode();
|
||||
PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(),
|
||||
ts_sz);
|
||||
PadInternalKeyWithMinTimestamp(&largest_buf, meta.largest.Encode(), ts_sz);
|
||||
auto largest_footer = ExtractInternalKeyFooter(largest_slice);
|
||||
if (largest_footer == kRangeTombstoneSentinel) {
|
||||
PadInternalKeyWithMaxTimestamp(&largest_buf, largest_slice, ts_sz);
|
||||
} else {
|
||||
PadInternalKeyWithMinTimestamp(&largest_buf, largest_slice, ts_sz);
|
||||
}
|
||||
meta.smallest.DecodeFrom(smallest_buf);
|
||||
meta.largest.DecodeFrom(largest_buf);
|
||||
}
|
||||
|
|
|
@ -1072,8 +1072,16 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
|||
r->ioptions.logger);
|
||||
|
||||
} else if (value_type == kTypeRangeDeletion) {
|
||||
// TODO(yuzhangyu): handle range deletion entries for UDT in memtable only.
|
||||
r->range_del_block.Add(key, value);
|
||||
Slice persisted_end = value;
|
||||
// When timestamps should not be persisted, we physically strip away range
|
||||
// tombstone end key's user timestamp before passing it along to block
|
||||
// builder. Physically stripping away start key's user timestamp is
|
||||
// handled at the block builder level in the same way as the other data
|
||||
// blocks.
|
||||
if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
|
||||
persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
|
||||
}
|
||||
r->range_del_block.Add(key, persisted_end);
|
||||
// TODO offset passed in is not accurate for parallel compression case
|
||||
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
|
||||
r->table_properties_collectors,
|
||||
|
|
|
@ -773,7 +773,6 @@ Status BlockBasedTable::Open(
|
|||
PersistentCacheOptions(rep->table_options.persistent_cache,
|
||||
rep->base_cache_key, rep->ioptions.stats);
|
||||
|
||||
// TODO(yuzhangyu): handle range deletion entries for UDT in memtable only.
|
||||
s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(),
|
||||
metaindex_iter.get(), internal_comparator,
|
||||
&lookup_context);
|
||||
|
@ -1006,9 +1005,16 @@ Status BlockBasedTable::ReadRangeDelBlock(
|
|||
s.ToString().c_str());
|
||||
IGNORE_STATUS_IF_ERROR(s);
|
||||
} else {
|
||||
std::vector<SequenceNumber> snapshots;
|
||||
// When user defined timestamps are not persisted, the range tombstone end
|
||||
// key read from the data block doesn't include user timestamp.
|
||||
// The range tombstone start key should already include user timestamp as
|
||||
// it's handled at block parsing level in the same way as the other data
|
||||
// blocks.
|
||||
rep_->fragmented_range_dels =
|
||||
std::make_shared<FragmentedRangeTombstoneList>(std::move(iter),
|
||||
internal_comparator);
|
||||
std::make_shared<FragmentedRangeTombstoneList>(
|
||||
std::move(iter), internal_comparator, false /*for_compaction=*/,
|
||||
snapshots, rep_->user_defined_timestamps_persisted);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
|
|
|
@ -714,8 +714,6 @@ def finalize_and_sanitize(src_params):
|
|||
dest_params["use_write_buffer_manager"] = 0
|
||||
if dest_params["user_timestamp_size"] > 0 and dest_params["persist_user_defined_timestamps"] == 0:
|
||||
# Features that are not compatible with UDT in memtable only feature.
|
||||
dest_params["delpercent"] += dest_params["delrangepercent"]
|
||||
dest_params["delrangepercent"] = 0
|
||||
dest_params["enable_blob_files"] = 0
|
||||
dest_params["allow_setting_blob_options_dynamically"] = 0
|
||||
dest_params["atomic_flush"] = 0
|
||||
|
|
Loading…
Reference in New Issue